Commit a225177f authored by xiao-hesheng's avatar xiao-hesheng
Browse files

修改case目录 添加data文件

parent 0c277885
......@@ -4,4 +4,5 @@
.project
target
*.pyc
log
\ No newline at end of file
log
.pyc
\ No newline at end of file
# -*- encoding=utf8 -*-
__author__ = "xiaohesheng"
"""
case_tag:mdm-web,100086
主数据平台:统一登录接口,第一步获取验证码,第二步登录系统,第三步平台登录验证token
"""
from common.common_func import commonFuc
import time
module = "spd3"
#获取验证码接口
import os
url = commonFuc().get_api_url() + commonFuc().get_business_data(module, "VerifyCode_url")
print("aaaaaaaaaaaaaaaaaa"+url)
# token = commonFuc().get_token(module, "cancel_enc_user_id")
# timestamp = str(int(time.time()))
# payload = commonFuc().get_business_data(module, "payload")
# headers = commonFuc().get_business_data(module, "post_headers", token, timestamp,
# commonFuc().get_business_data(module, "cancel_contentType"))
# result = commonFuc().http_post(url, payload, headers)
# check_dict = commonFuc().get_business_data(module, "cancel_checkDict")
# commonFuc().check_result(check_dict, result)
\ No newline at end of file
......@@ -21,52 +21,78 @@ def get_message(case_path, case_name):
return tags
def get_case_tag_list(case_path, tagList):
def get_case_tag_list(case_path, cases):
"""
遍历case目录下的文件,从文件中获取case tag,加入列表中
再加个判断,判断是跑线上还是线下
"""
tagList = cases.split('#')
env = os.environ['ENV']
case_list = []
for f in os.listdir(case_path):
script = os.path.join(case_path, f)
air_name = f.replace('.air', '')
case_tags = get_message(script, air_name)
comLevels = list(set(case_tags)&set(tagList))
if comLevels:
case_list.append(air_name)
nf_path = os.path.join(case_path, f)
if os.path.isdir(nf_path) and nf_path.endswith(".air") == False:
for nf in os.listdir(nf_path):
script = os.path.join(nf_path, nf)
if script.endswith(".air"):
air_name = nf.replace('.air', '')
case_tags = get_message(script, air_name)
flag = False
for con in tagList:
conList = con.split(',')
if list(set(case_tags) & set(conList)):
flag = True
else:
flag = False
break
if flag and (env in case_tags or env in ('sit', 'sita')):
case_hash = {}
case_hash["air_path"] = str(script)
case_hash["air_name"] = str(air_name)
case_hash["module"] = str(script.split(os.sep)[-2])
case_list.append(case_hash)
else:
script = os.path.join(case_path, f)
if f.endswith(".air"):
air_name = f.replace('.air', '')
case_tags = get_message(script, air_name)
flag = False
for con in tagList:
conList = con.split(',')
if list(set(case_tags) & set(conList)):
flag = True
else:
flag = False
break
if flag and (env in case_tags or env in ('sit', 'sita')):
case_hash = {}
case_hash["air_path"] = str(script)
case_hash["air_name"] = str(air_name)
case_hash["module"] = str(script.split(os.sep)[-2])
case_list.append(case_hash)
return case_list
def get_author(case_path, case_name):
"""
获取case是谁写的
:return:
"""
name = 'Unknown'
pyfile = open(case_path + os.sep + case_name + '.air' + os.sep + case_name + '.py', 'r', encoding='utf-8')
lines = pyfile.readlines()
for line in lines:
if '__author__' in line:
line = line.split('__author__ = ')[-1]
name = line.replace('"', '')
name = name.strip().strip('\n')
pyfile.close()
return name
def get_case_by_author(case_path, user):
def get_case_tag_list_2(case_path, cases):
"""
根据作者执行用例
:param case_path:
:param user:
:return:
遍历case目录下的文件,从文件中获取case tag,加入列表中
"""
tagList = cases.split('#')
case_list = []
for f in os.listdir(case_path):
script = os.path.join(case_path, f)
air_name = f.replace('.air', '')
author = get_author(case_path, air_name)
case_tags = get_message(script, air_name)
tagList = ['core']
comLevels = list(set(case_tags)&set(tagList))
if user == author and comLevels:
flag = False
for con in tagList:
conList = con.split(',')
if list(set(case_tags) & set(conList)):
flag = True
else:
flag = False
break
if flag:
case_list.append(air_name)
return case_list
......@@ -112,7 +112,6 @@ def setup_by_args(args):
devices = [args.device]
else:
devices = []
print("do not connect device")
# set base dir to find tpl
dirpath, _ = script_dir_name(args.script)
......
import yaml
import os
from common.rw import Rw
import requests
import base64
import json
import requests
from airtest.core.api import *
from common.confop import confOP
from common.dubboUtils import GetDubboService2, InvokeDubboApi
from common.rw import Rw
workspace = os.path.abspath(".")
business_path = workspace + os.sep + "data" + os.sep
class commonFuc(object):
def headers(self, token):
headers = {
'clientInfo': '{"birthday":"2018-11-18","screenwidth":"375","clientVersion":"2.4.2","screenheight":"667","partner":"meitunmama","clientip":"10.180.81.127","traderName":"iPhone 6S","clientAppVersion":"2.4.2","clientYunyuVersion":"7.9.6","clientSystem":"ios","nettype":"wifi","deviceCode":"1f4b3860acfa303bca0407f5128bc5ea0f529fec"}',
'platform': "1",
'signature': "144c6b3c78fc20ad57da1ebdb879615b",
'token': token,
'timestamp': str(int(time.time()))
}
return headers
def find_path(self, module=""):
if module == "":
return business_path
else:
return business_path + module + os.sep
def get_business_data(self, module, key, *args):
env = os.environ['ENV']
data_list = confOP().getBusiYamlValue(self.find_path(module), "data")
if args is not None and len(args) > 0:
if isinstance(data_list[key], dict):
result = json.loads(json.dumps(data_list[key]) % args)
else:
if str(data_list[key]) == "":
result = data_list[key]
else:
result = data_list[key] % args
else:
result = data_list[key]
if "_sit_" in str(result) and "_on_" in str(result):
if env == "on":
return result["_on_"]
else:
return result["_sit_"]
else:
return result
def get_message(self, module, key):
message_list = confOP().getBusiYamlValue(self.find_path(module), "message")
return message_list[key]
def get_openauth_url(self):
return "http://openauth.meitun.com/tools/sign/create"
def get_api_url(self):
"""
......@@ -33,9 +62,15 @@ class commonFuc(object):
elif env == 'sita':
url = 'http://sita-m.meitun.com'
else:
url = 'http://sit-m.meitun.com'
# url = 'http://sit-m.meitun.com'
url = 'http://spddev.cmic.com.cn'
return url
def get_token(self, module, enc_user_id="enc_user_id"):
enc_user_id = self.get_business_data(module, enc_user_id)
token = self.check_token(enc_user_id)
return token
def check_token(self, enc_user_id='u779700044448'):
"""
多个case同一个token不用一直查数据库
......@@ -44,21 +79,27 @@ class commonFuc(object):
"""
env = os.environ['ENV']
result = Rw().r_token(enc_user_id, env)
print(result)
if result == None:
if result is None:
return Rw().w_token(enc_user_id, env)
else:
return result["token"]
def http_get(self, url, token, params=""):
def http_get(self, url, headers="", params=""):
"""
一个get请求,返回json
"""
result = requests.get(url, headers=self.headers(token), params=params)
print(result.text)
result = requests.request("GET", url, headers=headers, params=params)
result = json.loads(result.text)
return result
def http_post(self, url, postdata=None, header=None):
"""
一个post请求,返回json
"""
result = requests.post(url, data=postdata, headers=header)
result = json.loads(result.content)
return result
def check_result(self, check_dict, result):
"""
结果检查,要检查的字段及值放在字典里, 结果是完全匹配
......@@ -80,12 +121,12 @@ class commonFuc(object):
else:
for k, v in result.items():
res = self.analysis_json(key, v)
if res != None:
if res is not None:
break
elif self.typeof(result) == 'list':
for value in result:
res = self.analysis_json(key, value)
if res != None:
if res is not None:
break
else:
pass
......@@ -114,10 +155,302 @@ class commonFuc(object):
else:
return None
def get_data(self, path, module):
return confOP().getBusiYamlValue(path, module)
def get_openApi_url(self):
"""
获取openapi的接口
"""
env = os.environ['ENV']
print(env)
if env == 'on':
url = 'https://openapi.meitun.com'
elif env == 'pre':
url = 'http://pre-openapi.meitun.com'
elif env == 'sita':
url = 'http://sita-openapi.meitun.com'
else:
url = 'http://sit-openapi.meitun.com'
return url
def get_ip_by_pool(self, poolName, type='sit'):
"""
根据hostname获取ip地址
:param poolName: ip:端口/应用名:端口
:param type: local/sit
:return:
"""
if str(type).lower() == 'local':
return poolName
else:
pool = str(poolName).split(":")[0]
print("pool: ", pool)
port = str(poolName).split(":")[1]
result = self.http_get(
"http://apollo.baobaoshu.com/apiv1/united_devices/?stage=%s&application=%s&usage=SERVER" % (type, pool))
ip = result["results"][0]["ip"]
return str(ip) + ":" + port
def run_local_dubbo(self, content, dubbo_service, dubbo_method, *args):
"""
运行本地dubbo接口
:param dubbo_service: dubbo中 服务名 如:com.zl.mall.api.IItemService
:param dubbo_method: 服务中的方法 如:updateItem
:param args: 方法请求需要的参数
:return:
"""
dubbo_info = GetDubboService2().get_dubbo_info2(content)
invokeDubboApi = InvokeDubboApi(server_host=dubbo_info.get("server_host"),
server_post=dubbo_info.get("server_post"))
return invokeDubboApi.invoke_dubbo_api(dubbo_service, dubbo_method, *args)
def get_open_url(self):
"""
获取openapi的接口
"""
env = os.environ['ENV']
print(env)
if env == 'on':
url = 'https://openapi.meitun.com'
elif env == 'pre':
url = 'http://pre-openapi.meitun.com'
elif env == 'sita':
url = 'http://sita-openapi.meitun.com'
else:
url = 'http://sit-openapi.meitun.com'
return url
def get_openapi_signature(self, module, params):
"""
获取openapi的验签
:param module:
:param params:
:return:
"""
private_key = self.get_business_data(module, "private_key")
app_secret = self.get_business_data(module, "app_secret")
openauth_url = self.get_openauth_url()
data = self.get_business_data(module, "openapi_data", private_key, app_secret, params)
result = commonFuc().http_post(openauth_url, data)
signature = result['signature']
return signature
def get_url(self, pool=None):
"""
根据环境或者url
:return:
"""
env = os.environ['ENV']
print(env)
if pool =="bid":
if env == 'on':
url = 'http://bid.babytree.com'
else:
url = 'https://bid.babytree-test.com'
elif pool =="advertise-go-web":
url = 'http://g.kexin001.com'
elif pool =="ad_Delivery":
url = 'http://go.kexin001.com'
elif pool =="search-platform-index":
if env == 'on':
url = 'http://search-index.babytree.com/index/build'
else:
url = 'http://search-index.babytree-test.com/index/build'
elif pool =="search-platform-web":
if env == 'on':
url = 'http://search-query.babytree.com/search/query'
else:
url = 'http://search-query.babytree-test.com/search/query'
elif pool =="search-merger":
if env == 'on':
url = 'http://merger.babytree.com/search'
else:
url = 'http://merger.babytree-test.com/search'
elif pool =="search-suggest":
if env == 'on':
url = 'http://suggest.babytree.com'
else:
url = 'http://suggest.babytree-test.com'
else:
if env == 'on':
url = 'https://backend.meitunmama.com/'
elif env == 'pre':
url = 'http://pre-backend.meitunmama.com'
elif env == 'sita':
url = 'http://sita-backend.meitunmama.com'
else:
url = 'http://sit-backend.meitunmama.com'
return url
def login_backend(self, driver):
driver.get(self.get_url())
sleep(3)
#driver.assert_template(Template(r"tpl1580989830894.png", record_pos=(3.27, 2.99), resolution=(100, 100)),
#"请填写测试点")
driver.set_window_size(1366, 868)
if os.environ['ENV'] == 'on' or os.environ['ENV'] == 'pre':
es = 'aG9uZ2xp'
driver.find_element_by_id("loginName").send_keys(base64.b64decode(es.encode('utf-8')).decode("utf-8"))
sleep(1)
es = 'aGxiYjEyMTA5Mg=='
driver.find_element_by_xpath("//input[@type='password']").send_keys(
base64.b64decode(es.encode('utf-8')).decode("utf-8"))
sleep(1)
driver.find_element_by_id("smsCode").send_keys("111111")
sleep(1)
else:
driver.find_element_by_id("loginName").send_keys('autotest')
driver.find_element_by_xpath("//input[@type='password']").send_keys('123@qwe')
driver.find_element_by_id("smsCode").send_keys("111111")
driver.find_element_by_id("sub_btn").click()
sleep(2)
#driver.assert_template(Template(r"tpl1579258499558.png", record_pos=(0.47, 0.975), resolution=(100, 100)),
#"验证登录成功了")
# 选择打开页面的路径,如: 大健康-课程包-课程包管理
def enter_channel_manage(self, driver, title, classfy, content):
driver.find_element_by_xpath("//a[@title='%s']" % title).click()
driver.find_element_by_id("east").find_element_by_xpath("//*[text()='%s']" % classfy).click()
driver.find_element_by_xpath("//a[@title='%s']" % content).click()
def enter_album_check(self, driver):
"""
进入专辑审核页
"""
driver.get(
self.get_url() + "/bighealth-service/outcourse/list.htm?source=1&linkId=big_health_audit_sync_btn1_link&tabId=big_health_audit_sync_btn1")
def remove_readonly(self, driver):
"""
去除input的只读属性
"""
inputs = driver.find_elements_by_tag_name("input")
for input in inputs:
driver.execute_script('arguments[0].removeAttribute(\"readonly\")', input)
driver.execute_script('arguments[0].removeAttribute(\"autocomplete\")', input)
def check_variable_exist(self, check_list, result):
"""
结果检查,检查字段值存在
"""
Flag = False
for variable in check_list:
if variable in result.keys():
Flag = True
assert_equal(Flag, True, '验证参数' + variable + "存在")
def enter_h5_page(self, driver, params):
"""
:param driver:
:param params: 请求的URL
:return:
"""
url = self.get_api_url()
driver.get(url + params)
def get_start(self, pageno, pagesize):
"""
获取limit的start数
:param pageno:
:param pagesize:
:return:
"""
return (pageno - 1) * pagesize
def click_iterm(self, driver, el_list, name):
"""
点击对应元素
"""
for el in el_list:
print(el.text)
if el.text == name:
driver.execute_script("arguments[0].scrollIntoView();", el)
el.click()
break
def get_mapi_babytree_url(self):
"""
接口mapi_babytree类型的请求
:return:
"""
env = os.environ['ENV']
print(env)
if env == 'on':
url = 'http://mapiweb.babytree.com'
elif env == 'pre':
url = 'http://pre-mapiweb.babytree.com'
else:
url = 'http://mapiweb.babytree-test.com'
return url
def get_localhome_babytree_url(self):
"""
接口mapi_babytree类型的请求
:return:
"""
env = os.environ['ENV']
print(env)
if env == 'on':
url = 'http://localhome.babytree.com'
elif env == 'pre':
url = 'http://pre-localhome.babytree.com'
else:
url = 'http://localhome.babytree-test.com'
return url
def get_go_babytree_url(self):
"""
接口go_babytree类型的请求
:return:
"""
env = os.environ['ENV']
print(env)
if env == 'on':
url = 'http://go.babytree.com'
elif env == 'pre':
url = 'http://go.babytree.com'
else:
url = 'http://go-1.babytree-test.com'
return url
def get_inno_babytree_url(self):
"""
接口mapi_babytree类型的请求
:return:
"""
env = os.environ['ENV']
print(env)
if env == 'on':
url = 'http://apilocal.babytree.com'
elif env == 'pre':
url = 'http://pre-apilocal.babytree.com'
else:
url = 'http://apilocal.babytree-test.com'
return url
def get_localfront_babytree_url(self):
"""
接口mapi_babytree类型的请求
:return:
"""
env = os.environ['ENV']
print(env)
if env == 'on':
url = 'http://localfront.babytree.com'
elif env == 'pre':
url = 'http://pre-localfront.babytree.com'
else:
url = 'http://localfront.babytree-test.com'
return url
def get_search_platform_hz_index_url(self):
"""
接口类型的请求
:return:
"""
env = os.environ['ENV']
if env == 'on':
url = 'http://search-index.babytree.com'
else:
url = 'http://sit-search-index.babytree.com'
return url
\ No newline at end of file
......@@ -6,21 +6,41 @@ cf = configparser.ConfigParser()
curPath = os.path.abspath(os.path.dirname(__file__))
rootPath = rootPath = os.path.split(curPath)[0]
def getConfValue(filename, section, option, type='string'):
cf.read(filename)
if (type == 'string'):
value = cf.get(section, option)
else:
value = cf.getint(section, option)
return value
def getYamlValue(filename):
file = os.path.join(rootPath, "data" + os.sep + filename)
f = open(file, 'r', encoding='utf-8')
cont = f.read()
value = yaml.load(cont)
f.close()
return value
class confOP:
def getConfValue(self, filename, section, option, type='string'):
cf.read(filename)
if (type == 'string'):
value = cf.get(section, option)
else:
value = cf.getint(section, option)
return value
def getYamlValue(self, filename):
"""
获取公共sql中的sql
:param filename:
:return:
"""
file = os.path.join(rootPath, "data" + os.sep + filename)
f = open(file, 'r', encoding='utf-8')
cont = f.read()
value = yaml.load(cont, Loader=yaml.FullLoader)
f.close()
return value
def getBusiYamlValue(self, path, filename):
"""
获取业务中的yaml文件数据
:param path:
:param filename:
:return:
"""
file = os.path.join(path, filename)
f = open(file, 'r', encoding='utf-8')
cont = f.read()
value = yaml.load(cont, Loader=yaml.FullLoader)
f.close()
return value
......
u779700044448_sit:
time: 2021-03-02 14:48:30.914746
token: u779700044448_5c36630e639120c4471f216c928ffa2e_1614305054
time: 2021-12-27 17:11:01.269465
token: u779700044448_562a6850c0ed622ce15bb087140e27e3_1640592012
enc_user_id: u779700044448
[meitun_db]
host = 192.168.24.42
host = rm-bp18as20m3solyi69.mysql.rds.aliyuncs.com
port = 3306
user = developer
password = DJ@TdX73
user = test_admin
password = ahdp0b.cr76_Pje1
[bj_db]
host = 172.25.1.6
port = 3320
user = local_rw
password = baidugoogleyahoo
[promotion]
livedb = 19
......@@ -21,3 +26,4 @@ port = 27017
redis_host=192.168.24.31
import pymysql
import os
import sys
from common.confop import confOP
from common.db.sql.sqlByIdb import sqlByIdb
import pymongo
curPath = os.path.abspath(os.path.dirname(__file__))
rootPath = os.path.split(curPath)[0]
import sys
sys.path.append(rootPath)
from common import confop
import pandas as pd
import pymongo
import redis
class dbOP:
def selectSql(self, schema, param=[], db="meitun_db"):
class dbOP():
def __init__(self, path=rootPath):
self.path = path
def selectSql(self, schema, param=[], db="bj_db"):
host = mySql().getConf(db)[0]
port = mySql().getConf(db)[1]
user = mySql().getConf(db)[2]
pwd = mySql().getConf(db)[3]
value = confop.getYamlValue("sql")
mySql().selectSql(host, port, user, pwd, value[schema][0], value[schema][1], param)
value = confOP().getYamlValue("sql")
if schema not in value:
value = confOP().getBusiYamlValue(self.path, "sql")
env = os.environ["ENV"]
if env == "on" or env == "pre":
database = value[schema][0]
sql = value[schema][1]
param = param
result = sqlByIdb().selectSql(database, sql, param, env)
else:
result = mySql().selectSql(host, port, user, pwd, value[schema][0], value[schema][1], param)
return result
def ddlSql(self, schema, param=[], db="meitun_db"):
def ddlSql(self, schema, param=[], db="bj_db"):
host = mySql().getConf(db)[0]
port = mySql().getConf(db)[1]
user = mySql().getConf(db)[2]
pwd = mySql().getConf(db)[3]
value = confop.getYamlValue("sql")
mySql().executeUpdate(host,port,user,pwd,value[schema][0],value[schema][1], param)
value = confOP().getYamlValue("sql")
if schema not in value:
value = confOP().getBusiYamlValue(self.path, "sql")
result = mySql().executeUpdate(host, port, user, pwd, value[schema][0], value[schema][1], param)
return result
class mySql:
def __init__(self):
pass
def getConf(self,db="meitun_db"):
host = confop.getConfValue("conf.ini", db, "host")
port = confop.getConfValue("conf.ini", db, "port", "int")
user = confop.getConfValue("conf.ini", db, "user")
pwd = confop.getConfValue("conf.ini", db, "password")
def getConf(self, db="bj_db"):
host = confOP().getConfValue(curPath + "/conf.ini", db, "host")
port = confOP().getConfValue(curPath + "/conf.ini", db, "port", "int")
user = confOP().getConfValue(curPath + "/conf.ini", db, "user")
pwd = confOP().getConfValue(curPath + "/conf.ini", db, "password")
return host,port,user,pwd
def connection(self, host, port, user, pwd, database):
......@@ -52,21 +67,20 @@ class mySql:
for i in cols:
col.append(i[0])
data = list(map(list, data))
result = pd.DataFrame(data, columns=col)
return result
return data
except Exception as e:
print(e)
conn.rollback()
cursor.close()
conn.close()
def executeUpdate(self, host, port,user,pwd,database, sql, param=[]):
conn = self.connection(host, port,user,pwd,database)
def executeUpdate(self, host, port, user, pwd, database, sql, param=[]):
conn = self.connection(host, port, user, pwd, database)
cursor = conn.cursor()
try:
cursor.execute(sql, param)
ret = cursor.execute(sql, param)
conn.commit()
return cursor
return ret
except Exception as e:
print(e)
conn.rollback()
......@@ -77,8 +91,8 @@ class mySql:
class mongodb:
def connection(self,db,collect):
ip = confop.getConfValue("conf.ini", "mongo", "ip")
port = confop.getConfValue("conf.ini", "mongo", "port")
ip = confOP().getConfValue(curPath + "/conf.ini", "mongo", "ip")
port = confOP().getConfValue(curPath + "/conf.ini", "mongo", "port")
path = "mongodb://" + ip + ":" + port + "/"
myclient = pymongo.MongoClient(path)
mydb = myclient[db]
......@@ -89,7 +103,7 @@ class mongodb:
def findALl(self, db, collect):
result = []
mycol = self.connection(db, collect)
for x in mycol.find():
for x in mycol.find():
result.append(x)
return result
......@@ -102,25 +116,20 @@ class mongodb:
return result
class redisClass:
def connect(self):
redis_host = confop.getConfValue("conf.ini", "redis", "redis_host")
pool = redis.ConnectionPool(host=redis_host)
r = redis.Redis(connection_pool=pool)
return r
def getValue(self,key):
r = redisClass().connect()
return r.get(key)
if __name__ == '__main__':
env = 'sit'
os.environ['ENV'] = env.lower()
path = "D:\\myCode\\autotest-airtest-local\\data\\月嫂"
#mysql的例子
dbOP().selectSql("prc_prom_price", [13])
result = dbOP(path).selectSql("local_worker_info")
print(result[0])
#monggdb的例子
mongodb().findByCon("yapi","user", {"role": "admin"})
#mongodb().findByCon("yapi","user", {"role": "admin"})
#redis的例子
redisClass().connect()
#redisClass().connect()
......@@ -57,12 +57,12 @@ class InvokeDubboApi(object):
def invoke_dubbo_api(self, dubbo_service, dubbor_method, args):
api_name = dubbo_service + "." + dubbor_method + "({})"
cmd = "invoke " + api_name.format(args)
print("调用命令是:{}".format(cmd))
# print("调用命令是:{}".format(cmd))
resp0 = None
try:
if self.login_flag:
resp0 = self.telnet_client.execute_some_command(cmd)
print("接口响应是,resp={}".format(resp0))
# print("接口响应是,resp={}".format(resp0))
# dubbo接口返回的数据中有 elapsed: 4 ms. 耗时,需要使用elapsed 进行切割
return str(re.compile(".+").findall(resp0).pop(0)).split("elapsed").pop(0).strip()
else:
......
......@@ -2,18 +2,21 @@
from common.cli.runner import AirtestCase, run_script
from argparse import *
import jinja2
import json
import shutil
import os
import io
import time
import multiprocessing as mp
from multiprocessing import Process, JoinableQueue, Pool
import re
import math
import report.report.report as report
import logging
from airtest.utils.logger import get_logger
from multiprocessing import Pool
import math
import requests
import json
import random
from common.confop import confOP
class CustomAirTestCase(AirtestCase):
# @classMethod
......@@ -36,54 +39,96 @@ class CustomAirTestCase(AirtestCase):
:param log_path:
:param runtime_log:
"""
self.manager = mp.Manager
self.workspace = work_space
self.case_path = case_path
self.log_path = log_path
self.runtime_log = runtime_log
self.device_dict = self.manager().dict()
level = self.get_log_level(log_level)
logger = get_logger("airtest")
logger.setLevel(level)
def get_log_level(self, log_level):
if log_level == 'WARN':
return logging.WARN
elif log_level == 'DEBUG':
return logging.DEBUG
elif log_level == 'INFO':
return logging.INFO
def get_phone_by_user(self, user):
url = "http://cloud.baobaoshu.com/cloud/groupUser/findPhoneByUser?name=%s" % user
payload = {}
headers = {
'Content-Type': 'application/json'
}
response = requests.request("GET", url, headers=headers, data=payload)
result = json.loads(response.text)
return result["data"]
def dingding_send_markdown(self, env, title, content, job_url, author, token):
at_phones = "@"
at = {
"atMobiles": [
# 需要填写自己的手机号,钉钉通过手机号@对应人
],
"isAtAll": True # 是否@所有人,默认否
}
if author in ["zhangaizhen", "caiyubing", "hongli", "lihong", "wuchenlong"]:
at_phones = "@ALL"
else:
print("输入日志信息不对")
return logging.DEBUG
phones = self.get_phone_by_user(author)
if phones.isdigit():
at_phones = at_phones + phones
at["isAtAll"] = False
else:
at_phones = "@ALL"
at["isAtAll"] = True
phoneArr = []
phoneArr.append(str(phones))
at["atMobiles"] = phoneArr
def dingding_send(self, content):
"""
报错的用例发钉钉群
:param content:
报错的用例以markdown的格式发钉钉群
:param title: 消息主题
:param content: 消息内容
:return:
"""
url = 'https://oapi.dingtalk.com/robot/send?access_token=d9e673ded1ab61ee6a631333a3d80b4291684e3adf61169ab6c1bb83d375e26d'
url = 'https://oapi.dingtalk.com/robot/send?access_token=%s' % token
pagrem = {
"msgtype": "text",
"text": {
"content": content
"msgtype": "markdown",
"markdown": {
"title": "自动化执行结果",
"text": "#### **[" + str(env) + "环境]" + str(title) + " 用例报错** \n> ##### 执行链接: " + str(
job_url) + "\n> ##### 接口返回值: " + str(content) + " " + str(at_phones)
},
"at": {
"atMobiles": [
],
"isAtAll": True # 是否@所有人,默认否
}
"at": at
}
headers = {
'Content-Type': 'application/json'
}
response = requests.post(url, data=json.dumps(pagrem), headers=headers)
print(response.text)
return response.text
def get_error_log_info(self, log):
"""
获取报错的日志
:param log:
:return:
"""
txt_log = os.path.join(log, 'log.txt')
f = open(txt_log, 'r', encoding='utf-8')
lines = f.readlines()
f.close()
for line in lines:
if "assert_not_equal" in line:
expect_val = json.loads(line)["data"]["call_args"]["first"]
return expect_val
else:
return ""
def get_log_level(self, log_level):
if log_level == 'warn':
return logging.WARN
elif log_level == 'debug':
return logging.DEBUG
elif log_level == 'info':
return logging.INFO
else:
print("输入日志信息不对")
return logging.DEBUG
def setUp(self):
print("custom setup")
......@@ -93,23 +138,60 @@ class CustomAirTestCase(AirtestCase):
print("custom tearDown")
super(CustomAirTestCase, self).tearDown()
def get_author(self, case_name):
def get_author(self, case_name, filepath):
"""
获取case是谁写的
:return:
"""
name = 'Unknown'
pyfile = open(self.case_path + os.sep + case_name + '.air' + os.sep + case_name + '.py', 'r', encoding='utf-8')
pyfile = open(filepath + os.sep + case_name + '.air' + os.sep + case_name + '.py', 'r', encoding='utf-8')
lines = pyfile.readlines()
for line in lines:
line = line.replace(" ", "")
if '__author__' in line:
line = line.split('__author__ = ')[-1]
line = line.split('__author__=')[-1]
name = line.replace('"', '')
name = name.strip().strip('\n')
pyfile.close()
print(name)
return name
def create_log(self, log_path, air_name):
"""
创建对应case的log 文件
:return:
"""
if os.path.isdir(log_path):
pass
else:
os.makedirs(log_path)
log = os.path.join(log_path, air_name)
if os.path.isdir(log):
shutil.rmtree(log)
else:
os.makedirs(log)
return log
def get_case_runtime(self, log):
"""
读取case运行日志,获取case执行开始时间,结束时间
:param log:
:return:
"""
txt_log = os.path.join(log, 'log.txt')
f = open(txt_log, 'r', encoding='utf-8')
lines = f.readlines()
f.close()
start_time = float(re.findall('.*"start_time": (.*), "ret".*', lines[0])[0])
start_time = int(start_time * 1000)
try:
end_time = float(re.findall('.*"end_time": (.*)}}.*', lines[-1])[0])
except:
# case执行失败的话没打end_time
end_time = float(re.findall('.*"time": (.*), "data".*', lines[-2])[0])
end_time = int(end_time * 1000)
return start_time, end_time
def assign_tasks(self, task_list, num):
"""
将任务平均分配给num个设备跑
......@@ -135,69 +217,46 @@ class CustomAirTestCase(AirtestCase):
if script in case_dict[k]:
return k
def get_all_dir(self, case_name):
def get_cases_and_log(self, case_name, device_list):
"""
:return: 获得需要执行的所有air的文件夹,同时创建单个log目录
:return: 获得需要执行的所有air的文件夹,同时对每个设备创建log目录
"""
dirs_ls = []
for dirs in os.listdir(self.case_path):
isdir = os.path.join(self.case_path, dirs)
if os.path.isdir(isdir) and isdir.endswith(".air"):
air_name = dirs.replace('.air', '')
if 'all' in case_name or air_name in case_name:
if case_name == 'all' or air_name in case_name:
script = os.path.join(self.case_path, isdir)
dirs_ls.append(script)
log = self.create_log(self.log_path, air_name)
CustomAirTestCase.log_list.append(log)
for device in device_list:
self.create_log(os.path.join(self.log_path, device), air_name)
return dirs_ls
def get_cases_and_log(self, case_name, device_list):
def get_all_dir(self, case_name):
"""
:return: 获得需要执行的所有air的文件夹,同时对每个设备创建log目录
:return: 获得需要执行的所有air的文件夹,同时创建单个log目录
"""
dirs_ls = []
for dirs in os.listdir(self.case_path):
isdir = os.path.join(self.case_path, dirs)
if os.path.isdir(isdir) and isdir.endswith(".air"):
air_name = dirs.replace('.air', '')
if case_name == 'all' or air_name in case_name:
if 'all' in case_name or air_name in case_name:
script = os.path.join(self.case_path, isdir)
dirs_ls.append(script)
for device in device_list:
self.create_log(os.path.join(self.log_path, device), air_name)
log = self.create_log(self.log_path, air_name)
CustomAirTestCase.log_list.append(log)
return dirs_ls
def create_log(self, log_path, air_name):
"""
创建对应case的log 文件
:return:
"""
if os.path.isdir(log_path):
pass
else:
print("创建目录:" + log_path)
os.makedirs(log_path)
log = os.path.join(log_path, air_name)
if os.path.isdir(log):
pass
else:
os.makedirs(log)
print(str(log) + 'is created')
return log
def do_run_by_pool(self, run_device, cases, log_path, log_level):
def do_run_by_pool(self, run_device, cases, log_path):
"""
在指定设备上跑指定脚本
:param run_device:
:param cases:
:param log_path:
:param log_level:
:param script:
:param log:
:return:
"""
level = self.get_log_level(log_level)
logger = get_logger("airtest")
logger.setLevel(level)
run_device = 'Android:///' + run_device
for script in cases:
air_name = script.split(os.sep)[-1].replace('.air', '')
......@@ -210,34 +269,36 @@ class CustomAirTestCase(AirtestCase):
pass
print(run_device + ' run ' + script + ' ---->end')
def get_case_runtime(self, log):
def doReport(self, results, total, report_name):
"""
读取case运行日志,获取case执行开始时间,结束时间
:param log:
输出具体报告
:param case_results:
:param summary_message:
:param device:
:return:
"""
txt_log = os.path.join(log, 'log.txt')
f = open(txt_log, 'r')
lines = f.readlines()
f.close()
if len(lines) > 0:
start_time_mes = re.findall('.*"start_time": (.*), "ret".*', lines[0])
# 有可能手机没装app 或者其他问题导致日志里没有start_time, 所以要做个判断。
if len(start_time_mes) > 0:
start_time = float(start_time_mes[0])
start_time = int(start_time * 1000)
try:
end_time = float(re.findall('.*"end_time": (.*)}}.*', lines[-1])[0])
except:
# case执行失败的话没打end_time
end_time = float(re.findall('.*"time": (.*), "data".*', lines[-2])[0])
end_time = int(end_time * 1000)
else:
return 0, 1
report_path = os.path.join(self.workspace, 'report')
if len(results) == 0:
print("没有执行case")
return
else:
return 0, 1
return start_time, end_time
env = jinja2.Environment(
loader=jinja2.FileSystemLoader(os.path.join(self.workspace, 'templates')),
extensions=(),
autoescape=True
)
total["passRate"] = round(total["passNum"]*100/total["caseNum"], 2)
template = env.get_template("summary_template.html")
# log_path必须传相对路径,要不在jenkins上展示报告会报错
log_path = '../log'
# report_name 是根据设备命名的,如果report_name里有设备名的话,那log_path得加一层设备名
if '_' in report_name:
log_path = '../log/' + report_name.split('_')[-1].replace('.html', '')
html = template.render({"case_results": results, "summary_message": total, "log_path": log_path})
output_file = os.path.join(report_path, report_name)
with io.open(output_file, 'w', encoding="utf-8") as f:
f.write(html)
print("报告文件: " + output_file)
def delWithLogToReportByDevice(self, script_list, device):
"""
......@@ -268,21 +329,10 @@ class CustomAirTestCase(AirtestCase):
result["result"] = rpt.test_result
if result["result"]:
summary_message["passNum"] += 1
else:
if air_name in ('initialized', 'initialize_oppo') and device != 'B2T7N16A28002135':
self.dingding_send("设备检查: " + device + "解锁及检查失败")
elif air_name == 'install_app':
self.dingding_send("设备检查: " + device + "安装app失败")
else:
pass
start, end = self.get_case_runtime(log)
result["time"] = (end - start)/1000 if end > start else 0
author = self.get_author(air_name)
result["author"] = author
case_results.append(result)
summary_message["caseNum"] = count
summary_message["passRate"] = round(summary_message["passNum"]*100/count, 2)
summary_message["time"] = CustomAirTestCase.summary_message["time"]
report_name = 'summary_' + device + '.html'
self.doReport(case_results, summary_message, report_name)
......@@ -314,117 +364,102 @@ class CustomAirTestCase(AirtestCase):
except:
start = 1
end = 60
result["time"] = (end - start)/1000 if end > start else 0
author = self.get_author(air_name)
result["time"] = (end - start) / 1000 if end > start else 0
author = self.get_author(air_name, self.case_path)
result["author"] = author
result['device'] = self.device_dict[air_name]
CustomAirTestCase.case_results.append(result)
CustomAirTestCase.summary_message["caseNum"] = count
CustomAirTestCase.summary_message["passRate"] = round(CustomAirTestCase.summary_message["passNum"]\
* 100/CustomAirTestCase.summary_message["caseNum"], 2)
print("summary_message:")
print(CustomAirTestCase.summary_message)
CustomAirTestCase.summary_message["passRate"] = round(CustomAirTestCase.summary_message["passNum"] \
* 100 / CustomAirTestCase.summary_message["caseNum"], 2)
self.doReport(CustomAirTestCase.case_results, CustomAirTestCase.summary_message, 'summary.html')
def doReport(self, results, total, report_name):
"""
输出具体报告
:param results:
:param total:
:param report_name:
:return:
def run_case(self, run_case, case_name, job_url, case_scenario_name):
"""
report_path = os.path.join(self.workspace, 'report')
if len(results) == 0:
print("没有执行case")
return
else:
env = jinja2.Environment(
loader=jinja2.FileSystemLoader(os.path.join(self.workspace, 'templates')),
extensions=(),
autoescape=True
)
total["passRate"] = round(total["passNum"]*100/total["caseNum"], 2)
template = env.get_template("summary_template.html")
# log_path必须传相对路径,要不在jenkins上展示报告会报错
log_path = '../log'
# report_name 是根据设备命名的,如果report_name里有设备名的话,那log_path得加一层设备名
if '_' in report_name:
log_path = '../log/' + report_name.split('_')[-1].replace('.html', '')
html = template.render({"case_results": results, "summary_message": total, "log_path": log_path})
output_file = os.path.join(report_path, report_name)
with io.open(output_file, 'w', encoding="utf-8") as f:
f.write(html)
print("报告文件: " + output_file)
def producer(self, script_list, q):
for script in script_list:
q.put(script)
q.join()
def consumer(self, device, q, log_level):
level = self.get_log_level(log_level)
logger = get_logger("airtest")
logger.setLevel(level)
while True:
script = q.get()
run_device = 'Android:///' + device
# 执行脚本
air_name = script.split(os.sep)[-1].replace('.air', '')
log = os.path.join(self.log_path, air_name)
print(device + ' run ' + script + ' ---->start')
args = Namespace(device=run_device, log=log, recording=None, script=script)
try:
run_script(args, AirtestCase)
except:
print("噢? 失败啦")
pass
finally:
self.device_dict[air_name] = device
print(device + ' run ' + script + ' ---->end')
print('\033[45m%s 执行了 %s\033[0m' % (os.getpid(), script))
q.task_done() # 向q.join()发送一次信号,证明一个数据已经被取走了
def run_case_by_distri(self, device_list, case_name, log_level):
"""
多进程跑case,将任务平均分给不同的设备,多进程跑,都跑完后,读取各case的log.txt 文件,获取case的开始结束时间信息
:param device_list
运行单个case
:param case_name:
:param log_level:
:return:
:return: 返回case的运行结果集合,包括case名,case执行起止时间,供校验埋点的时候用
"""
workers = []
summary_message = {}
summary_message["caseNum"] = 0
summary_message["passNum"] = 0
all_start = int(time.time() * 1000)
# 获取所有需要执行的用例集
script_list = self.get_all_dir(case_name)
#random.shuffle(script_list)
device_count = len(device_list)
q = JoinableQueue()
# 生产者们:收集要执行的用例
p1 = Process(target=self.producer, args=(script_list, q))
workers.append(p1)
print("我们总共需要执行" + str(len(workers)) + "个用例")
print("奥力给!")
# 消费者们:各设备执行用例
for i in range(device_count):
c1 = Process(target=self.consumer, args=(device_list[i], q, log_level))
c1.daemon = True
workers.append(c1)
# 开始
for p in workers:
time.sleep(2)
p.start()
p1.join()
print("all SubProcesses done")
fp = open(self.runtime_log, 'w+', encoding='utf-8')
report_dir = '../../report/report/'
static_root = report_dir
case_image_dir = '../../air_case'
lang = 'zh'
plugin = ['airtest_selenium.report']
# 聚合结果
case_results = []
# d获取所有用例集
for filepath, dirnames, filenames in os.walk(self.case_path):
for f in dirnames:
if f.endswith(".air"):
# f为.air用例名称:demo.air
air_name = f.replace('.air', '')
if 'all' in case_name or air_name in case_name:
script = os.path.join(filepath, f)
# 创建日志
log = self.create_log(self.log_path, air_name)
output_file = os.path.join(log, 'log.html')
# global args
args = Namespace(device=None, log=log, recording=None, script=script)
start = int(time.time() * 1000)
try:
flag = False
i = 0
if "AutoTest_" in log:
while flag == False and i < 2:
try:
run_script(args, AirtestCase)
flag = True
i += 1
print("yes,SUCCESS")
except:
i += 1
pass
else:
run_script(args, AirtestCase)
flag = True
print("yes,SUCCESS")
except:
pass
print("oh, FAILED")
finally:
end = int(time.time() * 1000)
author = self.get_author(air_name, filepath)
fp.write(air_name + '\t' + str(start) + '\t' + str(end) + '\t' + author + '\n')
log_root = 'log/' + air_name
report.main(log_root, static_root, script, lang, plugin, report_dir, case_image_dir, output_file)
result = {}
result["name"] = air_name
result["result"] = flag
result["author"] = author
result["time"] = (end - start) / 1000 if end > start else 0
case_results.append(result)
if flag == False:
msg = confOP().getYamlValue("msg")
token = msg["dingding_msg"][0]
if "AutoTest_" in log and token != None:
title = air_name
if air_name in case_scenario_name:
title = case_scenario_name[air_name] + "---" + air_name
content = self.get_error_log_info(log)
env = os.environ['ENV']
if len(str(content)) > 20000:
content = str(content)[1:300]
self.dingding_send_markdown(env, title, content, job_url, author, token)
print("case 执行完毕,开始生成报告")
fp.close()
seconds = (int(time.time() * 1000) - all_start) / 1000
m, s = divmod(seconds, 60)
CustomAirTestCase.summary_message["time"] = "%d分%d秒" % (m, s)
self.delWithLogToReport(script_list)
summary_message["time"] = "%d分%d秒" % (m, s)
new_case_result, summary_message = self.merge_run_result(run_case, case_results, summary_message)
self.doReport(new_case_result, summary_message, 'summary.html')
self.merge_log_txt(run_case, static_root, lang, plugin, report_dir, case_image_dir)
print("报告已生成")
def run_case(self, device, case_name):
def run_case_single(self, device, case_name):
"""
运行单个case
:param device:
......@@ -435,7 +470,7 @@ class CustomAirTestCase(AirtestCase):
summary_message["caseNum"] = 0
summary_message["passNum"] = 0
all_start = int(time.time() * 1000)
fp = open(self.runtime_log, 'w+')
fp = open(self.runtime_log, 'w+', encoding='utf-8')
# 聚合结果
case_results = []
# d获取所有用例集
......@@ -454,12 +489,12 @@ class CustomAirTestCase(AirtestCase):
start = int(time.time() * 1000)
try:
run_script(args, AirtestCase)
# summary_message["passNum"] += 1
summary_message["passNum"] += 1
except:
pass
finally:
end = int(time.time() * 1000)
author = self.get_author(air_name)
author = self.get_author(air_name, self.case_path)
fp.write(device + '\t' + air_name + '\t' + str(start) + '\t' + str(end) + '\t' + author + '\n')
rpt = report.LogToHtml(script, log)
report_dir = '../../report/report/'
......@@ -468,11 +503,7 @@ class CustomAirTestCase(AirtestCase):
result = {}
result["name"] = air_name
result["result"] = rpt.test_result
if result["result"]:
summary_message["passNum"] += 1
result["time"] = (end - start) / 1000 if end > start else 0
author = self.get_author(air_name)
result["author"] = author
case_results.append(result)
summary_message["caseNum"] += 1
print("case 执行完毕,开始生成报告")
......@@ -484,26 +515,23 @@ class CustomAirTestCase(AirtestCase):
print("报告已生成")
return True
def run_case_by_Multi(self, device_list, case_name, log_level):
def run_case_by_Multi(self, device_list, case_name):
"""
兼容性测试,case在每个机器上跑一遍
:param device_list:
:param case_name:
:param log_level:
:return:
"""
all_start = int(time.time() * 1000)
p = Pool(len(device_list))
# 获取所有需要执行的用例集
script_list = self.get_cases_and_log(case_name, device_list)
#random.shuffle(script_list)
print("任务列表:")
print(script_list)
for device in device_list:
# 设置进程数和设备数一样
try:
p.apply_async(self.do_run_by_pool, args=(device, script_list, os.path.join(self.log_path, device), log_level))
time.sleep(3)
p.apply_async(self.do_run_by_pool, args=(device, script_list, os.path.join(self.log_path, device)))
except:
pass
p.close()
......@@ -514,3 +542,324 @@ class CustomAirTestCase(AirtestCase):
CustomAirTestCase.summary_message["time"] = "%d分%d秒" % (m, s)
for device in device_list:
self.delWithLogToReportByDevice(script_list, device)
def run_case_by_distri(self, device_list, case_name):
"""
多进程跑case,将任务平均分给不同的设备,多进程跑,都跑完后,读取各case的log.txt 文件,获取case的开始结束时间信息
:param device_list
:param case_name:
:return:
"""
all_start = int(time.time() * 1000)
# 获取所有需要执行的用例集
script_list = self.get_all_dir(case_name)
device_count = len(device_list)
#把任务分配给不同的设备,分布式跑就是不同机器跑不同任务最终完成所有任务
case_list = self.assign_tasks(script_list, device_count)
print("任务列表:")
print(case_list)
# 记录case 及脚本对应关系
case_dict = {}
#设置进程数和设备数一样
p = Pool(device_count)
for i in range(device_count):
run_device = device_list[i]
cases = case_list[i]
case_dict[run_device] = cases
try:
p.apply_async(self.do_run_by_pool, args=(run_device, cases, self.log_path))
except:
pass
p.close()
p.join()
print("all SubProcesses done")
seconds = (int(time.time() * 1000) - all_start) /1000
m, s = divmod(seconds, 60)
CustomAirTestCase.summary_message["time"] = "%d分%d秒" % (m, s)
self.delWithLogToReport(script_list, case_dict)
def analysis_air_file(self, case_name, case_path, new_air_name):
"""
解析需要执行的air文件,将场景和场景之间隔开
:param case_name: 需要分开场景的air文件
:return:
"""
case_name_list = []
file_name = case_path + os.sep + case_name + '.py'
pyfile = open(file_name, 'r', encoding='utf-8')
lines = pyfile.readlines()
for line in lines:
line = line.replace(" ", "").replace("\n", "")
if re.match("^场景", line):
line = line.replace("\\", "").replace("/", "").replace(":", "").replace("*", "").replace("?", "").replace("\"", "").replace("<", "").replace(">", "").replace("|", "")
if line in new_air_name:
line = line + str(random.randint(1, 500)) + str(random.choice("abcdefghijklmnopqrstuvwxyz"))
case_name_list.append(line)
pyfile.close()
"""
如果含有多条(>1)场景,则对场景进行代码块划分
"""
if len(case_name_list) > 1:
i = 0
header_file = open(case_path + os.sep + 'header.py', 'w', encoding='utf-8')
for line in lines:
if re.match("^场景", line.replace(" ", "").replace("\n", "")):
new_file = open(case_path + os.sep + case_name_list[i] + '.py', 'w', encoding='utf-8')
new_file.write(line)
new_file.close()
i += 1
else:
if i == 0:
header_file.write(line)
else:
new_file = open(case_path + os.sep + case_name_list[i-1] + '.py', 'a+', encoding='utf-8')
new_file.write(line)
new_file.close()
header_file.close()
new_air_name.extend(case_name_list)
return case_name_list, new_air_name
else:
case_name_list = []
if case_name in new_air_name:
case_name = case_name + str(random.randint(1, 500)) + str(random.choice("abcdefghijklmnopqrstuvwxyz"))
case_name_list.append(case_name)
new_air_name.extend(case_name_list)
return case_name_list, new_air_name
def merge_new_file(self, case_list):
"""
将公共文件和场景文件合并成一个新的air文件
:param case_list:
:return:
"""
new_case_list = []
new_air_name = []
case_scenario_name = {}
for i in range(len(case_list)):
case_hash = {}
air_name = case_list[i]["air_name"]
air_path = case_list[i]["air_path"]
module_name = case_list[i]["module"]
case_hash["run_case"] = air_name
case_hash["module"] = module_name
case_name_list, new_air_name = self.analysis_air_file(air_name, air_path, new_air_name)
if len(case_name_list) > 1:
case_hash["scene_run_cases"] = case_name_list
for j in range(len(case_name_list)):
case_scenario_name[case_name_list[j]] = air_name
if not os.path.exists(air_path):
os.mkdir(air_path)
header_file = open(air_path + os.sep + 'header.py', 'r', encoding='utf-8')
header_lines = header_file.readlines()
header_file.close()
new_file = open(air_path + os.sep + case_name_list[j] + '.py', 'r', encoding='utf-8')
new_file_lines = new_file.readlines()
last_line = new_file_lines[-1]
if re.match("^[\"]+$", last_line):
new_file_lines = new_file_lines[:-1]
new_file.close()
dir_name = self.case_path + os.sep + module_name + os.sep + case_name_list[j] + ".air"
if not os.path.exists(dir_name):
os.mkdir(dir_name)
new_air = open(dir_name + os.sep + case_name_list[j] + '.py', 'w', encoding='utf-8')
new_air.writelines(header_lines)
for line in new_file_lines:
new_air.write(line)
new_air.close()
os.remove(air_path + os.sep + case_name_list[j] + '.py')
os.remove(air_path + os.sep + 'header.py')
new_case_list.append(case_hash)
return new_case_list, new_air_name, case_scenario_name
def merge_run_result(self, run_case_list, case_result, summary_message):
"""
将场景执行结果合并,确定最后场景执行的最后状态,成功or失败
:return:
"""
new_case_result = []
"""
遍历跑的所有用例,包括拆分出来的所有场景
判断每一个list中是否存在scene_run_cases,
如果存在,则一个接口中有多个场景,遍历scene_run_cases,去case_result中找执行的结果,进行合并
如果不存在,则将执行结果赋值到新的结果中
"""
count = 0
summary_message["result"] = True
for i in range(len(run_case_list)):
new_dict_result = {}
json_case = run_case_list[i]
if 'scene_run_cases' in json_case:
time = 0
new_dict_result["result"] = True
scene_names = json_case["scene_run_cases"]
new_dict_result["name"] = json_case["run_case"]
# 遍历scene_run_cases
break_flag = False
for m in range(len(scene_names)):
air_name = scene_names[m]
for j in range(len(case_result)):
dict_case = case_result[j]
if air_name == dict_case["name"]:
new_dict_result["author"] = dict_case["author"]
time = float(time) + float(dict_case["time"])
if dict_case["result"] == False:
new_dict_result["result"] = False
count += 1
break_flag = True
summary_message["result"] = False
break
if break_flag:
break
new_dict_result["time"] = time
new_case_result.append(new_dict_result)
else:
air_name = json_case["run_case"]
for j in range(len(case_result)):
dict_case = case_result[j]
if air_name == dict_case["name"]:
if dict_case["result"] == False:
count += 1
new_case_result.append(dict_case)
break
summary_message["caseNum"] = len(run_case_list)
summary_message["passNum"] = len(run_case_list) - count
return new_case_result, summary_message
def merge_log_txt(self, run_case_list, static_root, lang, plugin, report_dir, case_image_dir):
"""
将log.txt内容合并
:param run_case_list:
:return:
"""
for i in range(len(run_case_list)):
json_case = run_case_list[i]
module_name = json_case["module"]
if 'scene_run_cases' in json_case:
air_name = json_case["run_case"]
script = self.case_path + os.sep + module_name + os.sep + air_name + ".air"
log_root = "log" + os.sep + air_name
air_log_path = self.log_path + os.sep + air_name
if not os.path.exists(air_log_path):
os.mkdir(air_log_path)
new_log = open(self.log_path + os.sep + air_name + os.sep + 'log.txt', 'w', encoding='utf-8')
scene_run_cases = json_case["scene_run_cases"]
for j in range(len(scene_run_cases)):
sub_air_name = scene_run_cases[j]
try:
sub_log = open(self.log_path + os.sep + sub_air_name + os.sep + 'log.txt', 'r', encoding='utf-8')
lines = sub_log.readlines()
if len(lines) > 0:
json_line = json.loads(lines[0])
total_time = str(json_line["time"])
if "start_time" in json_line["data"]:
start_time = json_line["data"]["start_time"]
end_time = json_line["data"]["end_time"]
else:
start_time = "1603779572.984517"
end_time = "1603779572.984517"
else:
total_time = "1603779572.984517"
start_time = "1603779572.984517"
end_time = "1603779572.984517"
new_log.write('{"tag": "function", "depth": 1, "time": %s, "data": {"name": "assert_equal", "call_args": {"first": "autotest", "second": "autotest", "msg": ""}, "start_time": %s, "ret": null, "end_time": %s}, "scene_name": "%s"}\n' % (total_time, start_time, end_time, str(sub_air_name)))
new_log.writelines(lines)
sub_log.close()
new_air_path = self.case_path + os.sep + module_name
os.remove(new_air_path + os.sep + sub_air_name + ".air" + os.sep + sub_air_name + ".py")
shutil.rmtree(new_air_path + os.sep + sub_air_name + ".air")
except Exception as e:
print("mergeLogTxt: ", e)
new_air_path = self.case_path + os.sep + module_name
os.remove(new_air_path + os.sep + sub_air_name + ".air" + os.sep + sub_air_name + ".py")
shutil.rmtree(new_air_path + os.sep + sub_air_name + ".air")
new_log.close()
output_file = self.log_path + os.sep + air_name + os.sep + "log.html"
report.main(log_root, static_root, script, lang, plugin, report_dir, case_image_dir, output_file)
def run_by_scenario(self, module_name, case_name, scenario_name):
case_results = []
"""
按照一条用例中的场景执行
:return:
"""
case_path = self.case_path + os.sep + module_name + os.sep + case_name + ".air"
case_name_list = self.analysis_air_file(case_name, case_path, [])
for i in range(len(case_name_list)):
if scenario_name in case_name_list[i]:
scenario_name = case_name_list[i]
script = self.case_path + os.sep + module_name + os.sep + scenario_name + ".air"
air_path = self.case_path + os.sep + module_name + os.sep + scenario_name + ".air"
if not os.path.exists(air_path):
os.mkdir(air_path)
header_file = open(case_path + os.sep + 'header.py', 'r', encoding='utf-8')
header_lines = header_file.readlines()
header_file.close()
new_file = open(case_path + os.sep + scenario_name + '.py', 'r', encoding='utf-8')
new_file_lines = new_file.readlines()
last_line = new_file_lines[-1]
if re.match("^[\"]+$", last_line):
new_file_lines = new_file_lines[:-1]
new_file.close()
dir_name = self.case_path + os.sep + module_name + os.sep + scenario_name + ".air"
if os.path.isdir(dir_name):
new_air = open(dir_name + os.sep + scenario_name + '.py', 'w', encoding='utf-8')
new_air.writelines(header_lines)
for line in new_file_lines:
new_air.write(line)
new_air.close()
for j in range(len(case_name_list)):
os.remove(case_path + os.sep + case_name_list[j] + '.py')
os.remove(case_path + os.sep + 'header.py')
summary_message = {}
summary_message["caseNum"] = 0
summary_message["passNum"] = 0
fp = open(self.runtime_log, 'w+', encoding='utf-8')
# 创建日志
log = self.create_log(self.log_path, scenario_name)
output_file = os.path.join(log, 'log.html')
# global args
args = Namespace(device=None, log=log, recording=None, script=script)
start = int(time.time() * 1000)
try:
run_script(args, AirtestCase)
summary_message["passNum"] += 1
except:
pass
finally:
end = int(time.time() * 1000)
author = self.get_author(scenario_name, self.case_path + os.sep + module_name)
fp.write(scenario_name + '\t' + str(start) + '\t' + str(end) + '\t' + author + '\n')
rpt = report.LogToHtml(script, log)
report_dir = '../../report/report/'
case_image_dir = '../../air_case'
rpt.report("log_template.html", report_dir, case_image_dir, output_file=output_file)
result = {}
result["name"] = scenario_name
result["result"] = rpt.test_result
result["time"] = (end - start) / 1000 if end > start else 0
case_results.append(result)
summary_message["caseNum"] += 1
self.doReport(case_results, summary_message, 'summary.html')
os.remove(dir_name + os.sep + scenario_name + ".py")
shutil.rmtree(dir_name)
print("报告已生成")
return True
def insert_api_path_to_cloud(self, case_list):
for i in range(len(case_list)):
every_case = case_list[i]
......@@ -8,7 +8,7 @@ curpath = os.path.dirname(os.path.realpath(__file__))
# 数据读入和写入文件
class Rw:
def w_token(self, enc_user_id='u779700044448', env='on'):
result = dbOP().selectSql('select_user_token', [enc_user_id])
result = dbOP().selectSql('select_user_token', [enc_user_id], "meitun_db")
token = result[0][0]
dict = {}
token_value = {}
......@@ -23,12 +23,11 @@ class Rw:
yaml.dump(dict, f, Dumper=yaml.RoundTripDumper)
return token
def r_token(self, enc_user_id='u779700044448', env='on'):
yamlpath = os.path.join(curpath, "data")
file_value = open(yamlpath, 'r')
result = yaml.load(file_value.read(), Loader=yaml.Loader)
if result != None:
if result is not None:
key = enc_user_id + "_" + env
if key in result:
return result[key]
......
# dev
# dev
172.16.10.203 api.babytree-dev.com
172.16.10.203 api-test11.babytree-test.com
172.16.10.203 api.test11.babytree-fpm.com
172.16.10.203 api.babytree.com
10.88.9.88 config.meitun.com copy.meitun.com
10.54.17.151 sita-h5.meitun.com
10.54.17.151 sita-cu.meitun.com
10.54.11.80 jenkins.baobaoshu.com atms.baobaoshu.com cloud.baobaoshu.com
10.88.7.8 fastdfs1.mt.com img11.meitun.com img01.meituncdn.com
10.88.7.8 fastdfs2.mt.com img12.meitun.com img02.meituncdn.com
# online
10.54.11.80 idb.baobaoshu.com
10.54.11.80 grafana.baobaoshu.com
10.54.11.80 zabbix.baobaoshu.com
10.54.11.80 cacti.baobaoshu.com
......@@ -21,58 +14,12 @@
10.54.11.80 cmdb.baobaoshu.com
10.54.11.80 apollo.baobaoshu.com muse.baobaoshu.com
10.54.11.80 splunk.baobaoshu.com
192.168.24.30 dbm.mt.com
192.168.24.30 dbs1.mt.com
192.168.24.30 redisdb1.mt.com redisdb2.mt.com redis.mobile.db01.mt.com rediscache1.mt.com rediscache2.mt.com rediscache3.mt.com
192.168.24.46 mqmaster.mt.com
192.168.24.47 mqslave01.mt.com
10.88.9.166 zk.mt.com
10.54.11.80 ucmweb.baobaoshu.com
172.16.9.154 webview.test9.dev.babytree-inc.com
172.16.9.154 test11.dev.babytree-inc.com
172.16.10.203 test11.babytree-dev.com test11.babytree-fpm.com m.test11.babytree-test.com test11.babytree-test.com test19.babytree-test.com webview.test9.dev.babytree-inc.com
10.54.17.151 sita-live.meitun.com
10.88.9.118 kafka.mt.com
10.88.7.11 pkg.baobaoshu.com
172.16.10.203 test100.babytree-dev.com g.babytree-dev.com
122.9.41.244 gerrit.babytree-inc.com
10.54.17.153 localproxy.baobaoshu.com
192.168.24.43 gerrit.mtmm.com
192.168.24.43 gerrit.baobaoshu.com
0.0.0.0 account.jetbrains.com
10.54.11.80 idb.baobaoshu.com
10.54.17.153 sit-backend.meitunmama.com
10.54.17.153 sit-openapi.meitun.com
180.168.13.174 backend.meitunmama.com
10.54.17.153 sit-m.meitun.com
10.54.17.153 sit-static.meitun.com
10.54.17.153 sit-static.meituncdn.com
10.54.17.153 static.meitun.com
10.54.17.153 static.meituncdn.com
47.97.216.230 m.meitun.com
10.88.17.43 pre-backend.meitunmama.com
172.16.9.154 pack.babytree-inc.com
# 线上
117.121.137.17 jumper.meitunmama.com
192.168.60.26 ops.baobaoshu.com
192.168.60.26 falcon.baobaoshu.com
115.159.253.118 ppt.baobaoshu.com
10.50.80.66 oneops.baobaoshu.com
10.50.80.66 ssv.baobaoshu.com
10.88.9.204 im.meitun.com
10.50.253.100 workflow.meitunmama.com
172.16.9.163 space.babytree-inc.com
10.88.17.43 pre-m.meitun.com
10.88.17.43 pre-openapi.meitun.com
10.54.11.80 mock.baobaoshu.com
10.54.11.80 mantis.baobaoshu.com
10.54.11.80 testportal.baobaoshu.com
\ No newline at end of file
10.54.50.90 sso.baobaoshu.com
8.131.247.52 docp.plt.babytree-inc.com
\ No newline at end of file
user:
live:
- 9
- 4
- 310
pre:
- 9
- 2
- 307
live: 6ea032c10ef24b10aa7fcfe5424446afin01
pre: 6ea032c10ef24b10aa7fcfe5424446afin01
health:
live:
- 35
- 4
- 165
pre :
- 35
- 2
- 163
live: 6013c13418a245b9866e9cdec92ff9b9in01
pre : 6013c13418a245b9866e9cdec92ff9b9in01
sit: health
alimall:
live:
- 736
- 4
- 828
pre :
- 736
- 2
- 826
live: f4d539e7b8204a8890929320f2052d68in01
pre : f4d539e7b8204a8890929320f2052d68in01
sit: alimall
promotion:
live:
- 19
- 4
- 129
pre:
- 19
- 2
- 58
live: 9e33dd3e26a54c8caeaf871e873edb38in01
pre: 9e33dd3e26a54c8caeaf871e873edb38in01
sit: promotion
salesorder:
live:
- 6
- 4
- 134
pre :
- 6
- 2
- 6
live: 7f9c968b57084c0782aeb620e2a064b1in01
pre : 7f9c968b57084c0782aeb620e2a064b1in01
sit: salesorder
seller:
live:
- 7
- 4
- 144
pre:
- 7
- 2
- 7
live: 2888c1775daf4c60897073068b6c219ain01
pre: 2888c1775daf4c60897073068b6c219ain01
sit: seller
cms:
live:
- 22
- 4
- 142
pre:
- 22
- 2
- 41
live: 790afd094355416895ca39891c09c0d3in01
pre: 790afd094355416895ca39891c09c0d3in01
sit: cms
community:
live:
- 25
- 4
- 147
pre:
- 25
- 2
- 37
live: 790afd094355416895ca39891c09c0d3in01
pre: 790afd094355416895ca39891c09c0d3in01
sit: community
price:
live:
- 14
- 4
- 138
pre:
- 14
- 2
- 51
live: 6ea032c10ef24b10aa7fcfe5424446afin01
pre: 6ea032c10ef24b10aa7fcfe5424446afin01
sit: price
medical:
live:
- 766
- 4
- 1131
pre:
- 766
- 2
- 1128
live: 89a813f93c454db0a23c25f95c0d2e2cin01
pre: 89a813f93c454db0a23c25f95c0d2e2cin01
sit: medical
bcoin:
live:
- 717
- 4
- 750
pre:
- 717
- 2
- 747
live: f4d539e7b8204a8890929320f2052d68in01
pre: f4d539e7b8204a8890929320f2052d68in01
sit: bcoin
edu:
live:
- 727
- 4
- 794
pre:
- 727
- 2
- 792
live: 8e7f2572f92e49f4b0061f75682470eain01
pre: 8e7f2572f92e49f4b0061f75682470eain01
sit: edu
account:
live:
- 62
- 4
- 293
pre:
- 62
- 2
- 291
live: 2888c1775daf4c60897073068b6c219ain01
pre: 2888c1775daf4c60897073068b6c219ain01
sit: account
finance:
live:
- 10
- 4
- 136
pre:
- 10
- 2
- 139
live: 6013c13418a245b9866e9cdec92ff9b9in01
pre: 6013c13418a245b9866e9cdec92ff9b9in01
sit: finance
socialec:
live:
- 762
- 4
- 1079
pre:
- 762
- 2
- 1077
live: 89a813f93c454db0a23c25f95c0d2e2cin01
pre: 89a813f93c454db0a23c25f95c0d2e2cin01
sit: socialec
item:
live:
- 8
- 4
- 143
pre:
- 8
- 2
- 8
live: 2888c1775daf4c60897073068b6c219ain01
pre: 2888c1775daf4c60897073068b6c219ain01
sit: item
new_wms:
live:
- 763
- 4
- 1089
pre:
- 763
- 2
- 1086
live: 89a813f93c454db0a23c25f95c0d2e2cin01
pre: 89a813f93c454db0a23c25f95c0d2e2cin01
sit: new_wms
distribution:
live:
- 67
- 4
- 312
pre:
- 67
- 2
- 315
sit: new_wms
live: 8e7f2572f92e49f4b0061f75682470eain01
pre: 8e7f2572f92e49f4b0061f75682470eain01
sit: distribution
supplier:
live: 790afd094355416895ca39891c09c0d3in01
sit: supplier
local:
live: pc-2zerh8cqy748e33i8
sit : b9e5fbff-08e9-11ed-866f-00163e324a72
"dingding_msg":
-
- 59d455293168352cecfaa4fb67f2568f3fa54453a2fdb87ce9817c316dc41b57
# 登录外网(主数据平台)系统
"VerifyCode_url": "/test2api/oauthService/auth/getVerifyCode"
login_data: {"orderCode": "10200512100243412"}
login_contentType: "application/json"
login_checkDict: {"rtn_code": "0000001"}
"login_enc_user_id": "国药集团湖北分公司mQDwR7"
"login_url": "/test2api/oauthService/auth/getVerifyCode"
\ No newline at end of file
"course":
- health
- select * from health.course where status=2 and type=0 ORDER BY create_time DESC LIMIT 10;
"select_user_token":
- user
- select token from user.uc_user_token where enc_user_id=%s;
"prc_prom_price":
- cloud
- select * from access_log where id = %s;
\ No newline at end of file
"exp_expert":
- medical
- SELECT id,expert_name,base_service_count,service_count from exp_expert where deleted=%s and status=%s;
"select_memberPrivilegeId":
- user
- SELECT id ,enc_user_id,privilege_type FROM uc_member_privilege WHERE left_times>0 and status=1 AND privilege_type IN (5,12,9,2,10,11) AND start_time <NOW() AND end_time >NOW() AND enc_user_id IN (SELECT enc_user_id FROM uc_user_member WHERE final_end_time >NOW() AND level_type_code=001) order by id desc limit 1
\ No newline at end of file
......@@ -9,6 +9,7 @@ from bs4 import BeautifulSoup
from random import shuffle
import datetime
import pypinyin
import hashlib
def exchange(word):
......@@ -23,6 +24,63 @@ def exchange(word):
return s
def get_message(case_path, case_name):
# 从注释中获取某个文件的 case tag 信息
tags = []
pyfile = open(case_path + os.sep + case_name + '.py', 'r', encoding='utf-8')
lines = pyfile.readlines()
for line in lines:
if 'case_tag' in line:
line = line.split('case_tag')[-1]
# 对不规范的写法做一些处理,如中文分号, 中文逗号,多空格等。。
line = re.sub(' ', '', line)
line = line.replace(',', ',').replace(':', '').replace(':', '')
line = line.strip().strip('\n')
tags = line.split(',')
pyfile.close()
return tags
def get_all_dir(case_name, case_path):
"""
:return: 获得需要执行的所有air的文件夹,同时创建单个log目录
"""
case_list = []
for dirs in os.listdir(case_path):
isdir = os.path.join(case_path, dirs)
if os.path.isdir(isdir) and isdir.endswith(".air"):
air_name = dirs.replace('.air', '')
if 'all' in case_name or air_name in case_name:
# script = os.path.join(case_path, isdir)
case_list.append(air_name)
return case_list
def get_case_tag_list(case_path, cases):
"""
遍历case目录下的文件,从文件中获取case tag,加入列表中
如果想同时满足两个条件,用#分割
"""
tagList = cases.split('#')
env = os.environ['ENV']
case_list = []
for f in os.listdir(case_path):
script = os.path.join(case_path, f)
air_name = f.replace('.air', '')
case_tags = get_message(script, air_name)
flag = False
for con in tagList:
conList = con.split(',')
if list(set(case_tags) & set(conList)):
flag = True
else:
flag = False
break
if flag and (env in case_tags or env in ('sit', 'sita')):
case_list.append(air_name)
return case_list
def get_dir_exist(path, div_num):
"""
判断报告文件夹是否存在
......@@ -33,7 +91,6 @@ def get_dir_exist(path, div_num):
return False
return True
def merge_report(div_num, report_path):
"""
合并报告
......@@ -81,17 +138,17 @@ def merge_report(div_num, report_path):
if status == '成功':
html = '''
<tr width="600">
<td class="details-col-elapsed">%s</td>
<td class="details-col-elapsed"><a href='../log/%s/log.html' target='_blank'>%s</a></td>
<td class="success">%s</td>
<td class="details-col-elapsed">%s</td>
</tr>''' % (case_name, status, run_time)
</tr>''' % (case_name, case_name, status, run_time)
else:
html = '''
<tr width="600">
<td class="details-col-elapsed">%s</td>
<td class="details-col-elapsed"><a href='../log/%s/log.html' target='_blank'>%s</a></td>
<td class="fail">%s</td>
<td class="details-col-elapsed">%s</td>
</tr>''' % (case_name, status, run_time)
</tr>''' % (case_name, case_name, status, run_time)
lines.append(html)
#print(html)
pass_rate = round(succ_cases*100/total_cases, 2)
......@@ -159,7 +216,10 @@ def get_mess_from_report(report_path):
succ_cases = int(tds[1].getText())
time_str = tds[2].getText()
time_str = time_str.replace('分', 'm').replace('秒', 's')
pass_rate = round(succ_cases * 100 / total_cases, 2)
if int(total_cases) != 0:
pass_rate = round(succ_cases * 100 / total_cases, 2)
else:
pass_rate = 0.00
htmlfile.close()
return total_cases, succ_cases, time_str, pass_rate
......@@ -207,7 +267,7 @@ def call_back_cloud(runenv, pool_name, build_num, start, end, job_url, build_use
status = 'success'
else:
status = 'fail'
job_url = job_url + str(build_num) + '/artifact/report/summary.html'
url = "http://cloud.baobaoshu.com/cloud/buildautotest?type=" + \
runenv + "&buildid=" + str(build_num) + "&pool=" + pool_name + \
"&codeversion=" + str(newrev) + "&lastcversion=" + str(oldrev) + \
......@@ -238,49 +298,134 @@ def call_back_muse(deployid, cbtoken, build_user, pass_rate, job_url, build_num)
print('非远程触发')
pass
def call_back_gitool(deployid, cbtoken, build_user, pass_rate, job_url, build_num,env_t):
"""
回调gitool
:return:
"""
print("call back gitool")
if deployid != 'deployid' and cbtoken == 'cbtoken' and build_user == 'SYSTEM':
timestamp = str(int(time.time()))
data = "gitool_autotest_api_token(- - #)" + timestamp
sign_md5 = hashlib.md5()
sign_md5.update(data.encode(encoding='utf-8'))
data = sign_md5.hexdigest()
verify_str = data + "_" + timestamp
if pass_rate == 100.0:
status = 'pass'
else:
status = 'failed'
if env_t == 'on':
url = 'http://gitool.plt.babytree-inc.com/gitool/update_online_autotest_result?'
elif env_t == 'pre':
url = 'http://gitool.plt.babytree-inc.com/gitool/update_pre_autotest_result?'
elif env_t == 'sit':
url = 'http://gitool.plt.babytree-inc.com/gitool/update_test_autotest_result?'
url = url + "branch_id="+str(deployid)+"&result="+str(status)+"&result_url="+str(job_url)+"&detail_result=&"+"verify_str="+str(verify_str)
print(url)
response = get(url)
print("回调gitool返回结果")
print(response.content.decode())
else:
print('非远程触发')
pass
if __name__ == '__main__':
start = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
print('start')
case_path = os.path.abspath(".") + os.sep + 'air_case'
report_path = os.path.abspath(".") + os.sep + 'report'
# 第一个参数,case执行类型
tag = sys.argv[1]
# 想运行多个tag 用逗号分割。 想运行同时满足两种tag的用&号连接
cases = sys.argv[2]
# 第3个参数,日志数据等级
log_level = sys.argv[3]
# 第4个参数,子任务个数,或者 是 app运行模式
num = sys.argv[4]
# 第5个参数,子任务名或者 设备uid
JobOrDevice = sys.argv[5]
# pool名
pool_name = sys.argv[6]
runenv = sys.argv[7]
job_name = sys.argv[8]
build_num = sys.argv[9]
job_url = sys.argv[10]
build_user = exchange(sys.argv[11])
pool_name = sys.argv[1]
runenv = sys.argv[2]
job_name = sys.argv[3]
build_num = sys.argv[4]
job_url = sys.argv[5]
build_user = exchange(sys.argv[6])
oldrev = sys.argv[12]
newrev = sys.argv[13]
oldrev = sys.argv[7]
newrev = sys.argv[8]
deployid = sys.argv[14]
cbtoken = sys.argv[15]
if runenv == 'autotestpreid':
env_t = 'pre'
elif runenv == 'autotestonlineid':
env_t = 'on'
elif runenv == 'autotestsitaid':
env_t = 'sita'
else:
env_t = 'sit'
os.environ['ENV'] = env_t
Jobs = ['AutoTest_Pregnancy_First', 'AutoTest_Pregnancy_Personal', 'AutoTest_Pregnancy_Community', \
'AutoTest_Pregnancy_Contents', 'AutoTest_Pregnancy_Tools/']
for i in range(5):
cmd = 'rd /s /q report\\report_' + str(i+1)
# 如果num是 s,m,d 表示是运行app 用例
if num in ('s', 'm', 'd'):
cmd = 'python runner_app.py ' + JobOrDevice + ' ' + num + ' ' + tag + ' ' + cases + ' ' + log_level
os.system(cmd)
url = 'http://192.168.24.100:8080/view/Tools/job/' + Jobs[i] + '/buildWithParameters?token=123456'
print(url)
requests.get(url)
# 等待1分钟
time.sleep(2400)
# 判断报告传输是否完成
while True:
is_done = get_dir_exist(report_path, 5)
if is_done:
total_cases, succ_cases, run_time, pass_rate = merge_report(5, report_path)
break
total_cases, succ_cases, run_time, pass_rate = get_mess_from_report(report_path)
else:
num = int(num)
# 如果将任务数设置为1并且子任务名等于job名,就不分任务到子job,直接本机跑
if num != 1 or JobOrDevice != job_name:
case_list = cases.split(',')
if tag == 'tag':
case_list = get_case_tag_list(case_path, cases)
if tag == 'all':
case_list = get_all_dir(case_list, case_path)
shuffle(case_list)
length = len(case_list)
if length >= num:
div_num = num
else:
div_num = length
print(div_num)
for i in range(div_num):
cmd = 'rd /s /q report\\report_' + str(i+1)
os.system(cmd)
one_list = case_list[math.floor(i / div_num * length):math.floor((i + 1) / div_num * length)]
one_list_str = ",".join(one_list)
print(one_list_str)
url = 'http://jenkins.baobaoshu.com/view/Tools/job/' + JobOrDevice + '_' + str(i+1) + '/buildWithParameters?token=123456&tag=choose&cases=' + one_list_str + "&log_level=" + log_level + "&env=" + env_t + "&triger_job_name=" + job_name
print(url)
requests.get(url)
# 等待1分钟
time.sleep(60)
# 判断报告传输是否完成
while True:
is_done = get_dir_exist(report_path, div_num)
if is_done:
total_cases, succ_cases, run_time, pass_rate = merge_report(div_num, report_path)
break
else:
time.sleep(5)
else:
time.sleep(5)
run_url = str(job_url) + str(build_num) + os.sep + "artifact" + os.sep + "report" + os.sep + "summary.html"
cmd = 'python runner.py ' + tag + ' ' + cases + ' ' + log_level + ' ' + env_t + ' ' + run_url
os.system(cmd)
total_cases, succ_cases, run_time, pass_rate = get_mess_from_report(report_path)
end = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
# 回传数据到gitool
call_back_gitool(deployid, cbtoken, build_user, pass_rate, job_url, build_num, env_t)
# 回传数据到muse
#call_back_muse(deployid, cbtoken, build_user, pass_rate, job_url, build_num)
call_back_muse(deployid, cbtoken, build_user, pass_rate, job_url, build_num)
# 回传数据到atms
call_back_atms(runenv, job_name, build_num, total_cases, pass_rate, run_time, pool_name, build_user)
......@@ -290,3 +435,4 @@ if __name__ == '__main__':
if pass_rate != 100.0:
sys.exit(1)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment