Commit 0c277885 authored by xiao-hesheng's avatar xiao-hesheng
Browse files

Signed-off-by: xiao-hesheng <xhs89@sina.com>

parent 26c6f206
Pipeline #3481 failed with stages
in 25 seconds
.idea
*.iml
.DS_Store
.project
target
*.pyc
log
\ No newline at end of file
import os
from jpype import *
import jpype
curPath = os.path.abspath(os.path.dirname(__file__))
rootPath = rootPath = os.path.split(curPath)[0]
startJVM(jpype.getDefaultJVMPath(), "-ea", "-Djava.class.path=%s" % rootPath + os.sep + "jar" + os.sep + "utils.jar")
jclass = JClass("com.babytree.AesUtils")
class aesUtils:
def encrypt(self, content):
"""
aes_ecb模式加密
:param content:
:return:
"""
result = jclass.encrypt(content)
return result
def decrypt(self, content):
"""
aes_ecb模式解密
:param content:
:return:
"""
result = jclass.decrypt(content)
return result
def shutdown(self):
"""
关闭虚拟机
:return:
"""
shutdownJVM()
if __name__ == '__main__':
sign = aesUtils().encrypt("helloword")
print("加密:", sign)
sign2 = aesUtils().decrypt(sign)
print("解密: ", sign2)
aesUtils().shutdown()
import re
import os
def get_message(case_path, case_name):
"""
从注释中获取某个文件的 case tag 信息
"""
tags = []
pyfile = open(case_path + os.sep + case_name + '.py', 'r', encoding='utf-8')
lines = pyfile.readlines()
for line in lines:
if 'case_tag' in line:
line = line.split('case_tag')[-1]
#对不规范的写法做一些处理,如中文分号, 中文逗号,多空格等。。
line = re.sub(' ', '', line)
line = line.replace(',', ',').replace(':', '').replace(':', '')
line = line.strip().strip('\n')
tags = line.split(',')
pyfile.close()
return tags
def get_case_tag_list(case_path, tagList):
"""
遍历case目录下的文件,从文件中获取case tag,加入列表中
"""
case_list = []
for f in os.listdir(case_path):
script = os.path.join(case_path, f)
air_name = f.replace('.air', '')
case_tags = get_message(script, air_name)
comLevels = list(set(case_tags)&set(tagList))
if comLevels:
case_list.append(air_name)
return case_list
def get_author(case_path, case_name):
"""
获取case是谁写的
:return:
"""
name = 'Unknown'
pyfile = open(case_path + os.sep + case_name + '.air' + os.sep + case_name + '.py', 'r', encoding='utf-8')
lines = pyfile.readlines()
for line in lines:
if '__author__' in line:
line = line.split('__author__ = ')[-1]
name = line.replace('"', '')
name = name.strip().strip('\n')
pyfile.close()
return name
def get_case_by_author(case_path, user):
"""
根据作者执行用例
:param case_path:
:param user:
:return:
"""
case_list = []
for f in os.listdir(case_path):
script = os.path.join(case_path, f)
air_name = f.replace('.air', '')
author = get_author(case_path, air_name)
case_tags = get_message(script, air_name)
tagList = ['core']
comLevels = list(set(case_tags)&set(tagList))
if user == author and comLevels:
case_list.append(air_name)
return case_list
# -*- coding: utf-8 -*-
import unittest
import os
import sys
import six
import re
import shutil
import traceback
import warnings
from io import open
from airtest.core.api import G, auto_setup, log
from airtest.core.settings import Settings as ST
from airtest.utils.compat import decode_path, script_dir_name, script_log_dir
from copy import copy
class AirtestCase(unittest.TestCase):
PROJECT_ROOT = "."
SCRIPTEXT = ".air"
TPLEXT = ".png"
@classmethod
def setUpClass(cls):
cls.args = args
setup_by_args(args)
# setup script exec scope
cls.scope = copy(globals())
cls.scope["exec_script"] = cls.exec_other_script
def setUp(self):
if self.args.log and self.args.recording:
for dev in G.DEVICE_LIST:
try:
dev.start_recording()
except:
traceback.print_exc()
def tearDown(self):
if self.args.log and self.args.recording:
for k, dev in enumerate(G.DEVICE_LIST):
try:
output = os.path.join(self.args.log, "recording_%d.mp4" % k)
dev.stop_recording(output)
except:
traceback.print_exc()
def runTest(self):
scriptpath, pyfilename = script_dir_name(self.args.script)
pyfilepath = os.path.join(scriptpath, pyfilename)
pyfilepath = os.path.abspath(pyfilepath)
self.scope["__file__"] = pyfilepath
with open(pyfilepath, 'r', encoding="utf8") as f:
code = f.read()
pyfilepath = pyfilepath.encode(sys.getfilesystemencoding())
try:
exec(compile(code.encode("utf-8"), pyfilepath, 'exec'), self.scope)
except Exception as err:
tb = traceback.format_exc()
log("Final Error", tb)
six.reraise(*sys.exc_info())
@classmethod
def exec_other_script(cls, scriptpath):
"""run other script in test script"""
warnings.simplefilter("always")
warnings.warn("please use using() api instead.", PendingDeprecationWarning)
def _sub_dir_name(scriptname):
dirname = os.path.splitdrive(os.path.normpath(scriptname))[-1]
dirname = dirname.strip(os.path.sep).replace(os.path.sep, "_").replace(cls.SCRIPTEXT, "_sub")
return dirname
def _copy_script(src, dst):
if os.path.isdir(dst):
shutil.rmtree(dst, ignore_errors=True)
os.mkdir(dst)
for f in os.listdir(src):
srcfile = os.path.join(src, f)
if not (os.path.isfile(srcfile) and f.endswith(cls.TPLEXT)):
continue
dstfile = os.path.join(dst, f)
shutil.copy(srcfile, dstfile)
# find script in PROJECT_ROOT
scriptpath = os.path.join(ST.PROJECT_ROOT, scriptpath)
# copy submodule's images into sub_dir
sub_dir = _sub_dir_name(scriptpath)
sub_dirpath = os.path.join(cls.args.script, sub_dir)
_copy_script(scriptpath, sub_dirpath)
# read code
pyfilename = os.path.basename(scriptpath).replace(cls.SCRIPTEXT, ".py")
pyfilepath = os.path.join(scriptpath, pyfilename)
pyfilepath = os.path.abspath(pyfilepath)
with open(pyfilepath, 'r', encoding='utf8') as f:
code = f.read()
# replace tpl filepath with filepath in sub_dir
code = re.sub("[\'\"](\w+.png)[\'\"]", "\"%s/\g<1>\"" % sub_dir, code)
exec(compile(code.encode("utf8"), pyfilepath, 'exec'), cls.scope)
def setup_by_args(args):
# init devices
if isinstance(args.device, list):
devices = args.device
elif args.device:
devices = [args.device]
else:
devices = []
print("do not connect device")
# set base dir to find tpl
dirpath, _ = script_dir_name(args.script)
# set log dir
if args.log:
args.log = script_log_dir(dirpath, args.log)
print("save log in '%s'" % args.log)
else:
print("do not save log")
# guess project_root to be basedir of current .air path
project_root = os.path.dirname(args.script) if not ST.PROJECT_ROOT else None
try:
auto_setup(dirpath, devices, args.log, project_root)
except:
os.system('adb devices')
auto_setup(dirpath, devices, args.log, project_root)
def run_script(parsed_args, testcase_cls=AirtestCase):
global args # make it global deliberately to be used in AirtestCase & test scripts
args = parsed_args
suite = unittest.TestSuite()
suite.addTest(testcase_cls())
result = unittest.TextTestRunner(verbosity=0).run(suite)
if not result.wasSuccessful():
sys.exit(-1)
import yaml
import os
from common.rw import Rw
import requests
import json
from airtest.core.api import *
from common.confop import confOP
class commonFuc(object):
def headers(self, token):
headers = {
'clientInfo': '{"birthday":"2018-11-18","screenwidth":"375","clientVersion":"2.4.2","screenheight":"667","partner":"meitunmama","clientip":"10.180.81.127","traderName":"iPhone 6S","clientAppVersion":"2.4.2","clientYunyuVersion":"7.9.6","clientSystem":"ios","nettype":"wifi","deviceCode":"1f4b3860acfa303bca0407f5128bc5ea0f529fec"}',
'platform': "1",
'signature': "144c6b3c78fc20ad57da1ebdb879615b",
'token': token,
'timestamp': str(int(time.time()))
}
return headers
def get_api_url(self):
"""
接口类型的请求
:return:
"""
env = os.environ['ENV']
print(env)
if env == 'on':
url = 'https://m.meitun.com'
elif env == 'pre':
url = 'http://pre-m.meitun.com'
elif env == 'sita':
url = 'http://sita-m.meitun.com'
else:
url = 'http://sit-m.meitun.com'
return url
def check_token(self, enc_user_id='u779700044448'):
"""
多个case同一个token不用一直查数据库
:param enc_user_id:
:return:
"""
env = os.environ['ENV']
result = Rw().r_token(enc_user_id, env)
print(result)
if result == None:
return Rw().w_token(enc_user_id, env)
else:
return result["token"]
def http_get(self, url, token, params=""):
"""
一个get请求,返回json
"""
result = requests.get(url, headers=self.headers(token), params=params)
print(result.text)
result = json.loads(result.text)
return result
def check_result(self, check_dict, result):
"""
结果检查,要检查的字段及值放在字典里, 结果是完全匹配
result 是json
"""
assert_not_equal(result, [], "只想看下result的值")
for k, v in check_dict.items():
actual_value = self.analysis_json(k, result)
assert_equal(str(v).replace(" ", ""), str(actual_value).replace(" ", ""))
def analysis_json(self, key, result):
"""
解析json
"""
res = None
if self.typeof(result) == 'dict':
if key in result.keys():
return result[key]
else:
for k, v in result.items():
res = self.analysis_json(key, v)
if res != None:
break
elif self.typeof(result) == 'list':
for value in result:
res = self.analysis_json(key, value)
if res != None:
break
else:
pass
return res
def typeof(self, variate):
"""
变量类型
:param variate:
:return:
"""
if isinstance(variate, int):
return "int"
elif isinstance(variate, str):
return "str"
elif isinstance(variate, float):
return "float"
elif isinstance(variate, list):
return "list"
elif isinstance(variate, tuple):
return "tuple"
elif isinstance(variate, dict):
return "dict"
elif isinstance(variate, set):
return "set"
else:
return None
def get_data(self, path, module):
return confOP().getBusiYamlValue(path, module)
import configparser
import os
import yaml
cf = configparser.ConfigParser()
curPath = os.path.abspath(os.path.dirname(__file__))
rootPath = rootPath = os.path.split(curPath)[0]
def getConfValue(filename, section, option, type='string'):
cf.read(filename)
if (type == 'string'):
value = cf.get(section, option)
else:
value = cf.getint(section, option)
return value
def getYamlValue(filename):
file = os.path.join(rootPath, "data" + os.sep + filename)
f = open(file, 'r', encoding='utf-8')
cont = f.read()
value = yaml.load(cont)
f.close()
return value
u779700044448_sit:
time: 2021-03-02 14:48:30.914746
token: u779700044448_5c36630e639120c4471f216c928ffa2e_1614305054
enc_user_id: u779700044448
import datetime
import calendar
class dateUtils:
def get_current_start_end(self):
"""
获取当日的开始时间和结束时间
:return:
"""
start, end = datetime.datetime.now().replace(hour=0, minute=0, second=0).strftime(
"%Y-%m-%d %H:%M:%S"), datetime.datetime.now().replace(hour=23, minute=59, second=59).strftime(
"%Y-%m-%d %H:%M:%S")
return start, end
def get_week_start_end(self):
"""
获取本周的第一天和最后一天
:return:
"""
monday, sunday = datetime.date.today(), datetime.date.today()
one_day = datetime.timedelta(days=1)
while monday.weekday() != 0:
monday -= one_day
while sunday.weekday() != 6:
sunday += one_day
# 返回当前的星期一和星期天的日期
monday, sunday = monday.strftime("%Y-%m-%d %H:%M:%S"), sunday.strftime("%Y-%m-%d %H:%M:%S").replace("00:00:00", "23:59:59")
return monday, sunday
def get_current_time(self):
"""
获取当前时间,返回格式是yyyy-mm-dd hh:mm:ss
:return:
"""
curr_time = datetime.datetime.now()
time = datetime.datetime.strftime(curr_time, '%Y-%m-%d %H:%M:%S')
return time
def get_current_date(self):
"""
获取当前日期,返回格式是yyyy-mm-dd
:return:
"""
curr_time = datetime.datetime.now()
date = datetime.datetime.strftime(curr_time, '%Y-%m-%d')
return date
def get_week_of_date(self, date):
"""
获取传入的日期是星期几
:param date:
:return:
"""
week_day_dict = {
0: '星期一',
1: '星期二',
2: '星期三',
3: '星期四',
4: '星期五',
5: '星期六',
6: '星期日',
}
day = datetime.datetime.strptime(date, "%Y-%m-%d").weekday()
return week_day_dict[day]
def add_days(self, date, days):
"""
获取新的日期
:param date: 日期
:param days: 天数(可正数,可负数)
:return:
"""
dt = datetime.datetime.strptime(date, "%Y-%m-%d")
out_date = (dt + datetime.timedelta(days=days)).strftime("%Y-%m-%d")
return out_date
def add_hours(self, date, hours):
"""
获取新的时间
:param date:
:param hours:
:return:
"""
dt = datetime.datetime.strptime(date, "%Y-%m-%d %H:%M:%S")
out_date = (dt + datetime.timedelta(hours=hours)).strftime("%Y-%m-%d %H:%M:%S")
return out_date
def get_month_start_end(self, year, month):
"""
获取本月的第一天和最后一天
:return:
"""
weekDay, monthCountDay = calendar.monthrange(year, month)
firstDay = datetime.date(year, month, day=1)
lastDay = datetime.date(year, month, day=monthCountDay)
return firstDay, lastDay
def get_every_month_start_end(self, year):
"""
获取某年的每个月的第一天和最后一天
:param year:
:return:
"""
start = []
end = []
for x in range(1, 13):
dt_start = (datetime.datetime(int(year), x, 1)).strftime("%Y%m%d")
if 12 == x:
dt_end = (datetime.datetime(int(year), 12, 31)).strftime("%Y%m%d")
else:
dt_end = (datetime.datetime(int(year), x + 1, 1) - datetime.timedelta(days=1)).strftime("%Y%m%d")
start.append(dt_start)
end.append(dt_end)
return start, end
def get_year_month(self, time):
"""
获取某个时间的年、月份
:param time:
:return:
"""
dt = datetime.datetime.strptime(time, "%Y-%m-%d %H:%M:%S")
return dt.year, dt.month
if __name__ == '__main__':
print(dateUtils().get_current_date())
print(" \n ")
print(dateUtils().get_current_start_end())
print(" \n ")
print(dateUtils().add_days("2020-10-18", 10))
print(" \n ")
print(dateUtils().add_hours("2020-10-18 00:00:00", 3))
print(" \n ")
print(dateUtils().get_current_time())
print(" \n ")
print(dateUtils().get_every_month_start_end("2020"))
print(" \n ")
print(dateUtils().get_week_of_date("2020-10-18"))
[meitun_db]
host = 192.168.24.42
port = 3306
user = developer
password = DJ@TdX73
[promotion]
livedb = 19
liveenv = 4
liveserver = 129
predb = 19
preenv = 2
preserver = 58
[mongo]
ip = 127.0.0.1
port = 27017
[redis]
redis_host=192.168.24.31
import pymysql
import os
curPath = os.path.abspath(os.path.dirname(__file__))
rootPath = os.path.split(curPath)[0]
import sys
sys.path.append(rootPath)
from common import confop
import pandas as pd
import pymongo
import redis
class dbOP:
def selectSql(self, schema, param=[], db="meitun_db"):
host = mySql().getConf(db)[0]
port = mySql().getConf(db)[1]
user = mySql().getConf(db)[2]
pwd = mySql().getConf(db)[3]
value = confop.getYamlValue("sql")
mySql().selectSql(host, port, user, pwd, value[schema][0], value[schema][1], param)
def ddlSql(self, schema, param=[], db="meitun_db"):
host = mySql().getConf(db)[0]
port = mySql().getConf(db)[1]
user = mySql().getConf(db)[2]
pwd = mySql().getConf(db)[3]
value = confop.getYamlValue("sql")
mySql().executeUpdate(host,port,user,pwd,value[schema][0],value[schema][1], param)
class mySql:
def __init__(self):
pass
def getConf(self,db="meitun_db"):
host = confop.getConfValue("conf.ini", db, "host")
port = confop.getConfValue("conf.ini", db, "port", "int")
user = confop.getConfValue("conf.ini", db, "user")
pwd = confop.getConfValue("conf.ini", db, "password")
return host,port,user,pwd
def connection(self, host, port, user, pwd, database):
return pymysql.connect(host=host, port=port,user=user,passwd=pwd,db=database)
def selectSql(self, host, port, user, pwd, database, sql, param=[]):
conn = self.connection(host, port, user, pwd, database)
cursor = conn.cursor()
try:
cursor.execute(sql, param)
data = cursor.fetchall()
cols = cursor.description
col = []
for i in cols:
col.append(i[0])
data = list(map(list, data))
result = pd.DataFrame(data, columns=col)
return result
except Exception as e:
print(e)
conn.rollback()
cursor.close()
conn.close()
def executeUpdate(self, host, port,user,pwd,database, sql, param=[]):
conn = self.connection(host, port,user,pwd,database)
cursor = conn.cursor()
try:
cursor.execute(sql, param)
conn.commit()
return cursor
except Exception as e:
print(e)
conn.rollback()
cursor.close()
conn.close()
class mongodb:
def connection(self,db,collect):
ip = confop.getConfValue("conf.ini", "mongo", "ip")
port = confop.getConfValue("conf.ini", "mongo", "port")
path = "mongodb://" + ip + ":" + port + "/"
myclient = pymongo.MongoClient(path)
mydb = myclient[db]
mycol = mydb[collect]
return mycol
#查询所有的值
def findALl(self, db, collect):
result = []
mycol = self.connection(db, collect)
for x in mycol.find():
result.append(x)
return result
#按照条件查询:条件为{}类型
def findByCon(self, db, collect, condition):
result = []
mycol = self.connection(db, collect)
for x in mycol.find(condition):
result.append(x)
return result
class redisClass:
def connect(self):
redis_host = confop.getConfValue("conf.ini", "redis", "redis_host")
pool = redis.ConnectionPool(host=redis_host)
r = redis.Redis(connection_pool=pool)
return r
def getValue(self,key):
r = redisClass().connect()
return r.get(key)
if __name__ == '__main__':
#mysql的例子
dbOP().selectSql("prc_prom_price", [13])
#monggdb的例子
mongodb().findByCon("yapi","user", {"role": "admin"})
#redis的例子
redisClass().connect()
import redis
import os
curPath = os.path.abspath(os.path.dirname(__file__))
rootPath = os.path.split(curPath)[0]
import sys
sys.path.append(rootPath)
from common.confop import confOP
class redisClass:
def connect(self):
redis_host = confOP.getConfValue(curPath + "/conf.ini", "redis", "redis_host")
pool = redis.ConnectionPool(host=redis_host)
r = redis.Redis(connection_pool=pool)
return r
def randomkey(self):
"""
获取随机的一个键
:return: b'name'
"""
r = redisClass().connect()
return r.randomkey()
def exists(self, name):
"""
判断一个键是否存在
:param name:键名
:return: True
例子: .exists('name')
是否存在name这个键
"""
r = redisClass().connect()
return r.exists(name)
def delete(self, name):
"""
删除一个键
:param name:键名
:return: 1
例子: .delete('name')
删除name这个键
"""
r = redisClass().connect()
return r.delete(name)
def type(self, name):
"""
判断键类型
:param name:
:return:b'string'
例子:.type('name')
判断name这个键类型
"""
r = redisClass().connect()
return r.type(name)
def keys(self, pattern):
"""
获取所有符合规则的键
:param pattern:匹配规则
:return:[b'name']
例子:.keys('n*')
获取所有以n开头的键
"""
r = redisClass().connect()
return r.keys(pattern)
def rename(self, src, dst):
"""
重命名键
:param src: 原键名;
:param dst: 新键名
:return:True
例子:.rename('name', 'nickname')
将name重命名为nickname
"""
r = redisClass().connect()
return r.rename(src, dst)
def dbsize(self):
"""
获取当前数据库中键的数目
:return: 100
"""
r = redisClass().connect()
return r.dbsize()
def expire(self, name, time):
"""
设定键的过期时间,单位为秒
:param name:键名;
:param time:秒数
:return:True
例子:.expire('name', 2)
将name键的过期时间设置为2秒
"""
r = redisClass().connect()
return r.expire(name, time)
def ttl(self, name):
"""
获取键的过期时间,单位为秒,-1表示永久不过期
:param name: 键名
:return: -1
例子:.ttl('name')
获取name这个键的过期时间
"""
r = redisClass().connect()
return r.ttl(name)
def move(self, name, db):
"""
将键移动到其他数据库
:param name: 键名;
:param db: 数据库代号
:return: True
例子: .move('name', 2)
将name移动到2号数据库
"""
r = redisClass().connect()
return r.move(name, db)
def flushdb(self):
"""
删除当前选择数据库中的所有键
:return: True
"""
r = redisClass().connect()
return r.flushdb()
def flushall(self):
"""
删除所有数据库中的所有键
:return: True
"""
r = redisClass().connect()
return r.flushall()
def set(self, name, value):
"""
给数据库中键为name的string赋予值value
:param name: 键名;
:param value:值
:return: True
例子: .set('name', 'Bob')
给name这个键的value赋值为Bob
"""
r = redisClass().connect()
return r.set(name, value)
def get(self, name):
"""
返回数据库中键为name的string的value
:param name: 键名
:return: b'Bob'
例子: .get('name')
返回name这个键的value
"""
r = redisClass().connect()
return r.get(name)
def getset(self, name, value):
"""
给数据库中键为name的string赋予值value并返回上次的value
:param name: 键名;
:param value: 新值
:return: b'Bob'
例子: .getset('name', 'Mike')
赋值name为Mike并得到上次的value
"""
r = redisClass().connect()
return r.getset(name, value)
def mget(self, keys, *args):
"""
返回多个键对应的value
:param keys: 键的列表
:param args:
:return: [b'Mike', b'Miker']
例子: .mget(['name', 'nickname'])
返回name和nickname的value
"""
r = redisClass().connect()
return r.mget(keys, *args)
def setnx(self, name, value):
"""
如果不存在这个键值对,则更新value,否则不变
:param name: 键名
:param value:
:return: 第一次运行结果是True,第二次运行结果是False
例子: .setnx('newname', 'James')
如果newname这个键不存在,则设置值为James
"""
r = redisClass().connect()
return r.setnx(name, value)
def setex(self, name, time, value):
"""
设置可以对应的值为string类型的value,并指定此键值对应的有效期
:param name: 键名;
:param time: 有效期;
:param value: 值
:return: True
例子: .setex('name', 1, 'James')
将name这个键的值设为James,有效期为1秒
"""
r = redisClass().connect()
return r.setex(name, time, value)
def setrange(self, name, offset, value):
"""
设置指定键的value值的子字符串
:param name: 键名;
:param offset: 偏移量;
:param value: 值
:return:xx,修改后的字符串长度
例子:
.set('name', 'Hello')
.setrange('name', 6, 'World')
设置name为Hello字符串,并在index为6的位置补World
"""
r = redisClass().connect()
return r.setrange(name, offset, value)
def mset(self, mapping):
"""
批量赋值
:param mapping: 字典
:return: True
例子: .mset({'name1': 'Durant', 'name2': 'James'})
将name1设为Durant,name2设为James
"""
r = redisClass().connect()
return r.mset(mapping)
def msetnx(self, mapping):
"""
键均不存在时才批量赋值
:param mapping: 字典
:return: True
例子: .msetnx({'name3': 'Smith', 'name4': 'Curry'})
在name3和name4均不存在的情况下才设置二者值
"""
r = redisClass().connect()
return r.msetnx(mapping)
def incr(self, name, amount=1):
"""
键为name的value增值操作,默认为1,键不存在则被创建并设为amount
:param name: 键名;
:param amount: 增长的值
:return: 1,即修改后的值
例子: .incr('age', 1)
age对应的值增1,若不存在,则会创建并设置为1
"""
r = redisClass().connect()
return r.incr(name, amount)
def decr(self, name, amount=1):
"""
键为name的value减值操作,默认为1,键不存在则被创建并将value设置为-amount
:param name: 键名;
:param amount: 减少的值
:return: -1,即修改后的值
例子: .decr('age', 1)
age对应的值减1,若不存在,则会创建并设置为-1
"""
r = redisClass().connect()
return r.decr(name, amount)
def append(self, key, value):
"""
键为name的string的值附加value
:param key: 键名
:param value:
:return: 13,即修改后的字符串长度
例子: .append('nickname', 'OK')
向键为nickname的值后追加OK
"""
r = redisClass().connect()
return r.append(key, value)
def substr(self, name, start, end=-1):
"""
返回键为name的string的子串
:param name: 键名;
:param start: 起始索引;
:param end: 终止索引,默认为-1,表示截取到末尾
:return: b'ello'
例子:.substr('name', 1, 4)
返回键为name的值的字符串,截取索引为1~4的字符
"""
r = redisClass().connect()
return r.substr(name, start, end)
def getrange(self, key, start, end):
"""
获取键的value值从start到end的子字符串
:param key: 键名;
:param start: 起始索引;
:param end: 终止索引
:return: b'ello'
例子: .getrange('name', 1, 4)
返回键为name的值的字符串,截取索引为1~4的字符
"""
r = redisClass().connect()
return r.getrange(key, start, end)
def rpush(self, name, *values):
"""
在键为name的列表末尾添加值为value的元素,可以传多个
:param name: 键名;
:param values: 值
:return: 3,列表大小
例子: .rpush('list', 1, 2, 3)
向键为list的列表尾添加1、2、3
"""
r = redisClass().connect()
return r.rpush(name, *values)
def lpush(self, name, *values):
"""
在键为name的列表头添加值为value的元素,可以传多个
:param name: 键名;
:param values: 值
:return: 4,列表大小
例子:.lpush('list', 0)
向键为list的列表头部添加0
"""
r = redisClass().connect()
return r.lpush(name, *values)
def llen(self, name):
"""
返回键为name的列表的长度
:param name: 键名
:return: 4
例子:.llen('list')
返回键为list的列表的长度
"""
r = redisClass().connect()
return r.llen(name)
def lrange(self, name, start, end):
"""
返回键为name的列表中start至end之间的元素
:param name: 键名;
:param start: 起始索引;
:param end: 终止索引
:return: [b'3', b'2', b'1']
例子:.lrange('list', 1, 3)
返回起始索引为1终止索引为3的索引范围对应的列表
"""
r = redisClass().connect()
return r.lrange(name, start, end)
def ltrim(self, name, start, end):
"""
截取键为name的列表,保留索引为start到end的内容
:param name: 键名;
:param start: 起始索引;
:param end: 终止索引
:return: True
例子:ltrim('list', 1, 3)
保留键为list的索引为1到3的元素
"""
r = redisClass().connect()
return r.ltrim(name, start, end)
def lindex(self, name, index):
"""
返回键为name的列表中index位置的元素
:param name: 键名;
:param index: 索引
:return: b’2’
例子:.lindex('list', 1)
返回键为list的列表索引为1的元素
"""
r = redisClass().connect()
return r.lindex(name, index)
def lset(self, name, index, value):
"""
给键为name的列表中index位置的元素赋值,越界则报错
:param name: 键名;
:param index: 索引位置;
:param value: 值
:return: True
例子:.lset('list', 1, 5)
将键为list的列表中索引为1的位置赋值为5
"""
r = redisClass().connect()
return r.lset(name, index, value)
def lrem(self, name, count, value):
"""
删除count个键的列表中值为value的元素
:param name: 键名;
:param count: 删除个数;
:param value: 值
:return: 1,即删除的个数
例子:.lrem('list', 2, 3)
将键为list的列表删除两个3
"""
r = redisClass().connect()
return r.lrem(name, count, value)
def lpop(self, name):
"""
返回并删除键为name的列表中的首元素
:param name: 键名
:return: b'5'
例子:.lpop('list')
返回并删除名为list的列表中的第一个元素
"""
r = redisClass().connect()
return r.lpop(name)
def rpop(self, name):
"""
返回并删除键为name的列表中的尾元素
:param name: 键名
:return: b'2'
例子:.rpop('list')
返回并删除名为list的列表中的最后一个元素
"""
r = redisClass().connect()
return r.rpop(name)
def blpop(self, keys, timeout=0):
"""
返回并删除名称在keys中的list中的首个元素,如果列表为空,则会一直阻塞等待
:param keys: 键列表;
:param timeout: 超时等待时间,0为一直等待
:return: [b'5']
例子:.blpop('list')
返回并删除键为list的列表中的第一个元素
"""
r = redisClass().connect()
return r.blpop(keys, timeout)
def brpop(self, keys, timeout=0):
"""
返回并删除键为name的列表中的尾元素,如果list为空,则会一直阻塞等待
:param keys: 键列表;
:param timeout: 超时等待时间,0为一直等待
:return: [b'2']
例子:.brpop('list')
返回并删除名为list的列表中的最后一个元素
"""
r = redisClass().connect()
return r.brpop(keys, timeout)
def rpoplpush(self, src, dst):
"""
返回并删除名称为src的列表的尾元素,并将该元素添加到名称为dst的列表头部
:param src: 源列表的键;
:param dst: 目标列表的key
:return: b'2'
例子:.rpoplpush('list', 'list2')
将键为list的列表尾元素删除并将其添加到键为list2的列表头部,然后返回
"""
r = redisClass().connect()
return r.rpoplpush(src, dst)
def sadd(self, name, *values):
"""
向键为name的集合中添加元素
:param name: 键名;
:param values: 值,可为多个
:return: 3,即插入的数据个数
例子: .sadd('tags', 'Book', 'Tea', 'Coffee')
向键为tags的集合中添加Book、Tea和Coffee这3个内容
"""
r = redisClass().connect()
return r.sadd(name, *values)
def srem(self, name, *values):
"""
从键为name的集合中删除元素
:param name: 键名;
:param values: 值,可为多个
:return: 1,即删除的数据个数
例子: .srem('tags', 'Book')
从键为tags的集合中删除Book
"""
r = redisClass().connect()
return r.srem(name, *values)
def spop(self, name):
"""
随机返回并删除键为name的集合中的一个元素
:param name: 键名
:return:b'Tea'
例子:.spop('tags')
从键为tags的集合中随机删除并返回该元素
"""
r = redisClass().connect()
return r.spop(name)
def smove(self, src, dst, value):
"""
从src对应的集合中移除元素并将其添加到dst对应的集合中
:param src: 源集合;
:param dst: 目标集合;
:param value: 元素值
:return: True
例子:.smove('tags', 'tags2', 'Coffee')
从键为tags的集合中删除元素Coffee并将其添加到键为tags2的集合
"""
r = redisClass().connect()
return r.smove(src, dst, value)
def scard(self, name):
"""
返回键为name的集合的元素个数
:param name:
:return: 3
例子:.scard('tags')
获取键为tags的集合中的元素个数
"""
r = redisClass().connect()
return r.scard(name)
def sismember(self, name, value):
"""
测试member是否是键为name的集合的元素
:param name: 键值
:param value:
:return: True
例子:.sismember('tags', 'Book')
判断Book是否是键为tags的集合元素
"""
r = redisClass().connect()
return r.sismember(name, value)
def sinter(self, keys, *args):
"""
返回所有给定键的集合的交集
:param keys: 键列表
:param args:
:return: {b'Coffee'}
例子:.sinter(['tags', 'tags2'])
返回键为tags的集合和键为tags2的集合的交集
"""
r = redisClass().connect()
return r.sinter(keys, *args)
def sinterstore(self, dest, keys, *args):
"""
求交集并将交集保存到dest的集合
:param dest: 结果集合;
:param keys: 键列表
:param args:
:return: 1
例子:.sinterstore('inttag', ['tags', 'tags2'])
求键为tags的集合和键为tags2的集合的交集并将其保存为inttag
"""
r = redisClass().connect()
return r.sinterstore(dest, keys, *args)
def sunion(self, keys, *args):
"""
返回所有给定键的集合的并集
:param keys: 键列表
:param args:
:return: {b'Coffee', b'Book', b'Pen'}
例子:.sunion(['tags', 'tags2'])
返回键为tags的集合和键为tags2的集合的并集
"""
r = redisClass().connect()
return r.sunion(keys, *args)
def sunionstore(self, dest, keys, *args):
"""
求并集并将并集保存到dest的集合
:param dest: 结果集合;
:param keys: 键列表
:param args:
:return: 3
例子:.sunionstore('inttag', ['tags', 'tags2'])
求键为tags的集合和键为tags2的集合的并集并将其保存为inttag
"""
r = redisClass().connect()
return r.sunionstore(dest, keys, *args)
def sdiff(self, keys, *args):
"""
返回所有给定键的集合的差集
:param keys: 键列表
:param args:
:return: {b'Book', b'Pen'}
例子:.sdiff(['tags', 'tags2'])
返回键为tags的集合和键为tags2的集合的差集
"""
r = redisClass().connect()
return r.sdiff(keys, *args)
def sdiffstore(self, dest, keys, *args):
"""
求差集并将差集保存到dest集合
:param dest: 结果集合;
:param keys: 键列表
:param args:
:return: 3
例子:.sdiffstore('inttag', ['tags', 'tags2'])
求键为tags的集合和键为tags2的集合的差集并将其保存为inttag
"""
r = redisClass().connect()
return r.sdiffstore(dest, keys, *args)
def smembers(self, name):
"""
返回键为name的集合的所有元素
:param name: 键名
:return: {b'Pen', b'Book', b'Coffee'}
例子:.smembers('tags')
返回键为tags的集合的所有元素
"""
r = redisClass().connect()
return r.smembers(name)
def srandmember(self, name):
"""
随机返回键为name的集合中的一个元素,但不删除元素
:param name: 键值
:return:
例子:.srandmember('tags')
随机返回键为tags的集合中的一个元素
"""
r = redisClass().connect()
return r.srandmember(name)
def zadd(self, name, *args, **kwargs):
"""
向键为name的zset中添加元素member,score用于排序。如果该元素存在,则更新其顺序
:param name: 键名;
:param args: 可变参数
:param kwargs:
:return: 2,即添加的元素个数
例子:.zadd('grade', 100, 'Bob', 98, 'Mike')
向键为grade的zset中添加Bob(其score为100),并添加Mike(其score为98)
"""
r = redisClass().connect()
return r.zadd(name, *args, **kwargs)
def zrem(self, name, *values):
"""
删除键为name的zset中的元素
:param name: 键名;
:param values: 元素
:return: 1,即删除的元素个数
例子:.zrem('grade', 'Mike')
从键为grade的zset中删除Mike
"""
r = redisClass().connect()
return r.zrem(name, *values)
def zincrby(self, name, value, amount=1):
"""
如果在键为name的zset中已经存在元素value,则将该元素的score增加amount;否则向该集合中添加该元素,其score的值为amount
:param name: key名;
:param value: 元素;
:param amount: 增长的score值
:return: 98.0,即修改后的值
例子:.zincrby('grade', 'Bob', -2)
键为grade的zset中Bob的score减2
"""
r = redisClass().connect()
return r.zincrby(name, value, amount)
def zrank(self, name, value):
"""
返回键为name的zset中元素的排名,按score从小到大排序,即名次
:param name: 键名;
:param value: 元素值
:return: 1
例子:.zrank('grade', 'Amy')
得到键为grade的zset中Amy的排名
"""
r = redisClass().connect()
return r.zrank(name, value)
def zrevrank(self, name, value):
"""
返回键为name的zset中元素的倒数排名(按score从大到小排序),即名次
:param name: 键名;
:param value: 元素值
:return: 2
例子:.zrevrank('grade', 'Amy')
得到键为grade的zset中Amy的倒数排名
"""
r = redisClass().connect()
return r.zrevrank(name, value)
def zrevrange(self, name, start, end, withscores=False):
"""
返回键为name的zset(按score从大到小排序)中index从start到end的所有元素
:param name: 键值;
:param start: 开始索引;
:param end: 结束索引;
:param withscores: 是否带score
:return: [b'Bob', b'Mike', b'Amy', b'James']
例子:.zrevrange('grade', 0, 3)
返回键为grade的zset中前四名元素
"""
r = redisClass().connect()
return r.zrevrange(name, start, end, withscores)
def zrangebyscore(self, name, min, max, start=None, num=None, withscores=False):
"""
返回键为name的zset中score在给定区间的元素
:param name: 键名;
:param min: 最低score;
:param max: 最高score;
:param start: 起始索引;
:param num: 个数;
:param withscores: 是否带score
:return: [b'Bob', b'Mike', b'Amy', b'James']
例子:.zrangebyscore('grade', 80, 95)
返回键为grade的zset中score在80和95之间的元素
"""
r = redisClass().connect()
return r.zrangebyscore(name, min, max, start, num, withscores)
def zcount(self, name, min, max):
"""
返回键为name的zset中score在给定区间的数量
:param name: 键名;
:param min: 最低score;
:param max: 最高score
:return: 2
例子:.zcount('grade', 80, 95)
返回键为grade的zset中score在80到95的元素个数
"""
r = redisClass().connect()
return r.zcount(name, min, max)
def zcard(self, name):
"""
返回键为name的zset的元素个数
:param name: 键名
:return: 3
例子:.zcard('grade')
获取键为grade的zset中元素的个数
"""
r = redisClass().connect()
return r.zcard(name)
def zremrangebyrank(self, name, min, max):
"""
删除键为name的zset中排名在给定区间的元素
:param name: 键名;
:param min: 最低位次;
:param max: 最高位次
:return: 1,即删除的元素个数
例子:.zremrangebyrank('grade', 0, 0)
删除键为grade的zset中排名第一的元素
"""
r = redisClass().connect()
return r.zremrangebyrank(name, min, max)
def zremrangebyscore(self, name, min, max):
"""
删除键为name的zset中score在给定区间的元素
:param name: 键名;
:param min: 最低score;
:param max: 最高score
:return: 1,即删除的元素个数
例子:.zremrangebyscore('grade', 80, 90)
删除score在80到90之间的元素
"""
r = redisClass().connect()
return r.zremrangebyscore(name, min, max)
def hset(self, name, key, value):
"""
向键为name的散列表中添加映射
:param name: 键名;
:param key: 映射键名;
:param value: 映射键值
:return: 1,即添加的映射个数
例子:.hset('price', 'cake', 5)
向键为price的散列表中添加映射关系,cake的值为5
"""
r = redisClass().connect()
return r.hset(name, key, value)
def hsetnx(self, name, key, value):
"""
如果映射键名不存在,则向键为name的散列表中添加映射
:param name: 键名;
:param key: 映射键名;
:param value: 映射键值
:return: 1,即添加的映射个数
例子:.hsetnx('price', 'book', 6)
向键为price的散列表中添加映射关系,book的值为6
"""
r = redisClass().connect()
return r.hsetnx(name, key, value)
def hget(self, name, key):
"""
返回键为name的散列表中key对应的值
:param name: 键名;
:param key: 映射键名;
:return: 5
例子:.hget('price', 'cake')
获取键为price的散列表中键名为cake的值
"""
r = redisClass().connect()
return r.hget(name, key)
def hmget(self, name, keys, *args):
"""
返回键为name的散列表中各个键对应的值
:param name: 键名;
:param keys: 映射键名列表
:param args:
:return: [b'3', b'7']
例子:.hmget('price', ['apple', 'orange'])
获取键为price的散列表中apple和orange的值
"""
r = redisClass().connect()
return r.hmget(name, keys, *args)
def hmset(self, name, mapping):
"""
向键为name的散列表中批量添加映射
:param name: 键名;
:param mapping: 映射字典
:return: True
例子:.hmset('price', {'banana': 2, 'pear': 6})
向键为price的散列表中批量添加映射
"""
r = redisClass().connect()
return r.hmset(name, mapping)
def hincrby(self, name, key, amount=1):
"""
将键为name的散列表中映射的值增加amount
:param name: 键名;
:param key: 映射键名;
:param amount: 增长量
:return: 6,修改后的值
例子:.hincrby('price', 'apple', 3)
key为price的散列表中apple的值增加3
"""
r = redisClass().connect()
return r.hincrby(name, key, amount)
def hexists(self, name, key):
"""
键为name的散列表中是否存在键名为键的映射
:param name: 键名;
:param key: 映射键名;
:return: True
例子:.hexists('price', 'banana')
键为price的散列表中banana的值是否存在
"""
r = redisClass().connect()
return r.hexists(name, key)
def hdel(self, name, *keys):
"""
在键为name的散列表中,删除键名为键的映射
:param name: 键名;
:param key: 映射键名;
:return: True
例子:.hdel('price', 'banana')
从键为price的散列表中删除键名为banana的映射
"""
r = redisClass().connect()
return r.hdel(name, *keys)
def hlen(self, name):
"""
从键为name的散列表中获取映射个数
:param name: 键名
:return: 6
例子:.hlen('price')
从键为price的散列表中获取映射个数
"""
r = redisClass().connect()
return r.hlen(name)
def hkeys(self, name):
"""
从键为name的散列表中获取所有映射键名
:param name: 键名
:return: [b'cake', b'book', b'banana', b'pear']
例子:.hkeys('price')
从键为price的散列表中获取所有映射键名
"""
r = redisClass().connect()
return r.hkeys(name)
def hvals(self, name):
"""
从键为name的散列表中获取所有映射键值
:param name: 键名
:return:[b'5', b'6', b'2', b'6']
例子:.hvals('price')
从键为price的散列表中获取所有映射键值
"""
r = redisClass().connect()
return r.hvals(name)
def hgetall(self, name):
"""
从键为name的散列表中获取所有映射键值对
:param name: 键名
:return:{b'cake': b'5', b'book': b'6', b'orange': b'7', b'pear': b'6'}
例子:.hgetall('price')
从键为price的散列表中获取所有映射键值对
"""
r = redisClass().connect()
return r.hgetall(name)
import time
import telnetlib
import re
class TelnetClient(object):
"""通过telnet连接dubbo服务, 执行shell命令, 可用来调用dubbo接口
"""
def __init__(self, server_host, server_post):
self.tn = telnetlib.Telnet()
self.server_host = server_host
self.server_port = server_post
# 此函数实现telnet登录主机
def connect_dubbo(self):
try:
print("telent连接dubbo服务端: telnet {} {} ……".format(self.server_host, self.server_port))
self.tn.open(self.server_host, port=self.server_port)
return True
except Exception as e:
print('连接失败, 原因是: {}'.format(str(e)))
return False
# 此函数实现执行传过来的命令,并输出其执行结果
def execute_some_command(self, command):
# 执行命令
cmd = (command + '\n').encode("gbk")
self.tn.write(cmd)
# 获取命令结果,字符串类型
retry_count = 0
# 如果响应未及时返回,则等待后重新读取,并记录重试次数
result = self.tn.read_very_eager().decode(encoding='gbk')
while result == '':
time.sleep(1)
result = self.tn.read_very_eager().decode(encoding='gbk')
retry_count += 1
return result
# 退出telnet
def logout_host(self):
self.tn.write(b"exit\n")
print("登出成功")
class InvokeDubboApi(object):
def __init__(self, server_host, server_post):
try:
self.telnet_client = TelnetClient(server_host, server_post)
self.login_flag = self.telnet_client.connect_dubbo()
except Exception as e:
print("invokedubboapi init error" + str(e))
def invoke_dubbo_api(self, dubbo_service, dubbor_method, args):
api_name = dubbo_service + "." + dubbor_method + "({})"
cmd = "invoke " + api_name.format(args)
print("调用命令是:{}".format(cmd))
resp0 = None
try:
if self.login_flag:
resp0 = self.telnet_client.execute_some_command(cmd)
print("接口响应是,resp={}".format(resp0))
# dubbo接口返回的数据中有 elapsed: 4 ms. 耗时,需要使用elapsed 进行切割
return str(re.compile(".+").findall(resp0).pop(0)).split("elapsed").pop(0).strip()
else:
print("登陆失败!")
except Exception as e:
raise Exception("调用接口异常, 接口响应是resp={}, 异常信息为:{}".format(resp0, str(e)))
self.logout()
def logout(self):
self.telnet_client.logout_host()
class GetDubboService2(object):
def __init__(self):
pass
def get_dubbo_info2(self,content):
try:
dubbore = re.compile(r"([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+:[0-9]+)", re.I)
result = dubbore.search(str(content)).group()
print("获取到dubbo部署信息" + result)
return {"server_host": result.split(":")[0], "server_post": result.split(":")[1]}
except Exception as e:
raise Exception("获取dubbo部署信息失败:{}".format(str(e)))
# -*- encoding=utf8 -*-
from common.cli.runner import AirtestCase, run_script
from argparse import *
import jinja2
import json
import os
import io
import time
import multiprocessing as mp
from multiprocessing import Process, JoinableQueue, Pool
import re
import math
import report.report.report as report
import logging
from airtest.utils.logger import get_logger
import requests
class CustomAirTestCase(AirtestCase):
# @classMethod
# def setUpClass(cls):
# super(CustomAirTestCase,cls).setUpClass()
#
# 类变量,多进程跑任务的时候用到
summary_message = {}
summary_message["caseNum"] = 0
summary_message["passNum"] = 0
summary_message['passRate'] = 0.0
case_results = []
log_list = []
def __init__(self, work_space, case_path, log_path, runtime_log, log_level):
"""
初始化一些环境变量
:param work_space:
:param case_path:
:param log_path:
:param runtime_log:
"""
self.manager = mp.Manager
self.workspace = work_space
self.case_path = case_path
self.log_path = log_path
self.runtime_log = runtime_log
self.device_dict = self.manager().dict()
level = self.get_log_level(log_level)
logger = get_logger("airtest")
logger.setLevel(level)
def get_log_level(self, log_level):
if log_level == 'WARN':
return logging.WARN
elif log_level == 'DEBUG':
return logging.DEBUG
elif log_level == 'INFO':
return logging.INFO
else:
print("输入日志信息不对")
return logging.DEBUG
def dingding_send(self, content):
"""
报错的用例发钉钉群
:param content:
:return:
"""
url = 'https://oapi.dingtalk.com/robot/send?access_token=d9e673ded1ab61ee6a631333a3d80b4291684e3adf61169ab6c1bb83d375e26d'
pagrem = {
"msgtype": "text",
"text": {
"content": content
},
"at": {
"atMobiles": [
],
"isAtAll": True # 是否@所有人,默认否
}
}
headers = {
'Content-Type': 'application/json'
}
response = requests.post(url, data=json.dumps(pagrem), headers=headers)
print(response.text)
return response.text
def setUp(self):
print("custom setup")
super(CustomAirTestCase, self).setUp()
def tearDown(self):
print("custom tearDown")
super(CustomAirTestCase, self).tearDown()
def get_author(self, case_name):
"""
获取case是谁写的
:return:
"""
name = 'Unknown'
pyfile = open(self.case_path + os.sep + case_name + '.air' + os.sep + case_name + '.py', 'r', encoding='utf-8')
lines = pyfile.readlines()
for line in lines:
if '__author__' in line:
line = line.split('__author__ = ')[-1]
name = line.replace('"', '')
name = name.strip().strip('\n')
pyfile.close()
print(name)
return name
def assign_tasks(self, task_list, num):
"""
将任务平均分配给num个设备跑
:param task_list: 待分配任务列表
:param num: num个设备
:return:
"""
all_list = []
length = len(task_list)
for i in range(num):
one_list = task_list[math.floor(i / num * length):math.floor((i + 1) / num * length)]
all_list.append(one_list)
return all_list
def get_device(self, case_dict, script):
"""
根据之间跑case时候记录的设备对应执行的case,获取某个脚本是在哪个设备上执行的
:param case_dict:
:param script:
:return: devie
"""
for k in case_dict:
if script in case_dict[k]:
return k
def get_all_dir(self, case_name):
"""
:return: 获得需要执行的所有air的文件夹,同时创建单个log目录
"""
dirs_ls = []
for dirs in os.listdir(self.case_path):
isdir = os.path.join(self.case_path, dirs)
if os.path.isdir(isdir) and isdir.endswith(".air"):
air_name = dirs.replace('.air', '')
if 'all' in case_name or air_name in case_name:
script = os.path.join(self.case_path, isdir)
dirs_ls.append(script)
log = self.create_log(self.log_path, air_name)
CustomAirTestCase.log_list.append(log)
return dirs_ls
def get_cases_and_log(self, case_name, device_list):
"""
:return: 获得需要执行的所有air的文件夹,同时对每个设备创建log目录
"""
dirs_ls = []
for dirs in os.listdir(self.case_path):
isdir = os.path.join(self.case_path, dirs)
if os.path.isdir(isdir) and isdir.endswith(".air"):
air_name = dirs.replace('.air', '')
if case_name == 'all' or air_name in case_name:
script = os.path.join(self.case_path, isdir)
dirs_ls.append(script)
for device in device_list:
self.create_log(os.path.join(self.log_path, device), air_name)
return dirs_ls
def create_log(self, log_path, air_name):
"""
创建对应case的log 文件
:return:
"""
if os.path.isdir(log_path):
pass
else:
print("创建目录:" + log_path)
os.makedirs(log_path)
log = os.path.join(log_path, air_name)
if os.path.isdir(log):
pass
else:
os.makedirs(log)
print(str(log) + 'is created')
return log
def do_run_by_pool(self, run_device, cases, log_path, log_level):
"""
在指定设备上跑指定脚本
:param run_device:
:param cases:
:param log_path:
:param log_level:
:return:
"""
level = self.get_log_level(log_level)
logger = get_logger("airtest")
logger.setLevel(level)
run_device = 'Android:///' + run_device
for script in cases:
air_name = script.split(os.sep)[-1].replace('.air', '')
log = os.path.join(log_path, air_name)
print(run_device + ' run ' + script + ' ---->start')
args = Namespace(device=run_device, log=log, recording=None, script=script)
try:
run_script(args, AirtestCase)
except:
pass
print(run_device + ' run ' + script + ' ---->end')
def get_case_runtime(self, log):
"""
读取case运行日志,获取case执行开始时间,结束时间
:param log:
:return:
"""
txt_log = os.path.join(log, 'log.txt')
f = open(txt_log, 'r')
lines = f.readlines()
f.close()
if len(lines) > 0:
start_time_mes = re.findall('.*"start_time": (.*), "ret".*', lines[0])
# 有可能手机没装app 或者其他问题导致日志里没有start_time, 所以要做个判断。
if len(start_time_mes) > 0:
start_time = float(start_time_mes[0])
start_time = int(start_time * 1000)
try:
end_time = float(re.findall('.*"end_time": (.*)}}.*', lines[-1])[0])
except:
# case执行失败的话没打end_time
end_time = float(re.findall('.*"time": (.*), "data".*', lines[-2])[0])
end_time = int(end_time * 1000)
else:
return 0, 1
else:
return 0, 1
return start_time, end_time
def delWithLogToReportByDevice(self, script_list, device):
"""
根据日志信息产出报告,一个设备出一份日志
:param workspace:
:param script_list:
:param log_path:
:param device:
:return:
"""
summary_message = {}
summary_message["caseNum"] = 0
summary_message["passNum"] = 0
log_path = os.path.join(self.log_path, device)
case_results = []
count = len(script_list)
for i in range(count):
script = script_list[i]
air_name = script.split(os.sep)[-1].replace('.air', '')
log = os.path.join(log_path, air_name)
rpt = report.LogToHtml(script, log)
output_file = log + os.sep + 'log.html'
report_dir = '../../../report/report/'
case_image_dir = '../../../air_case'
rpt.report("log_template.html", report_dir, case_image_dir, output_file=output_file)
result = {}
result["name"] = air_name
result["result"] = rpt.test_result
if result["result"]:
summary_message["passNum"] += 1
else:
if air_name in ('initialized', 'initialize_oppo') and device != 'B2T7N16A28002135':
self.dingding_send("设备检查: " + device + "解锁及检查失败")
elif air_name == 'install_app':
self.dingding_send("设备检查: " + device + "安装app失败")
else:
pass
start, end = self.get_case_runtime(log)
result["time"] = (end - start)/1000 if end > start else 0
author = self.get_author(air_name)
result["author"] = author
case_results.append(result)
summary_message["caseNum"] = count
summary_message["passRate"] = round(summary_message["passNum"]*100/count, 2)
summary_message["time"] = CustomAirTestCase.summary_message["time"]
report_name = 'summary_' + device + '.html'
self.doReport(case_results, summary_message, report_name)
def delWithLogToReport(self, case_list):
"""
读取运行case的日志,获取case数,case运行成功数等信息,生成html 报告
:return:
"""
count = len(CustomAirTestCase.log_list)
for i in range(count):
script = case_list[i]
log = CustomAirTestCase.log_list[i]
air_name = script.split(os.sep)[-1].replace('.air', '')
rpt = report.LogToHtml(script, log)
output_file = os.path.join(log, 'log.html')
report_dir = '../../report/report/'
case_image_dir = '../../air_case'
try:
rpt.report("log_template.html", report_dir, case_image_dir, output_file=output_file)
except:
pass
result = {}
result["name"] = air_name
result["result"] = rpt.test_result
if result["result"]:
CustomAirTestCase.summary_message["passNum"] += 1
try:
start, end = self.get_case_runtime(log)
except:
start = 1
end = 60
result["time"] = (end - start)/1000 if end > start else 0
author = self.get_author(air_name)
result["author"] = author
result['device'] = self.device_dict[air_name]
CustomAirTestCase.case_results.append(result)
CustomAirTestCase.summary_message["caseNum"] = count
CustomAirTestCase.summary_message["passRate"] = round(CustomAirTestCase.summary_message["passNum"]\
* 100/CustomAirTestCase.summary_message["caseNum"], 2)
print("summary_message:")
print(CustomAirTestCase.summary_message)
self.doReport(CustomAirTestCase.case_results, CustomAirTestCase.summary_message, 'summary.html')
def doReport(self, results, total, report_name):
"""
输出具体报告
:param results:
:param total:
:param report_name:
:return:
"""
report_path = os.path.join(self.workspace, 'report')
if len(results) == 0:
print("没有执行case")
return
else:
env = jinja2.Environment(
loader=jinja2.FileSystemLoader(os.path.join(self.workspace, 'templates')),
extensions=(),
autoescape=True
)
total["passRate"] = round(total["passNum"]*100/total["caseNum"], 2)
template = env.get_template("summary_template.html")
# log_path必须传相对路径,要不在jenkins上展示报告会报错
log_path = '../log'
# report_name 是根据设备命名的,如果report_name里有设备名的话,那log_path得加一层设备名
if '_' in report_name:
log_path = '../log/' + report_name.split('_')[-1].replace('.html', '')
html = template.render({"case_results": results, "summary_message": total, "log_path": log_path})
output_file = os.path.join(report_path, report_name)
with io.open(output_file, 'w', encoding="utf-8") as f:
f.write(html)
print("报告文件: " + output_file)
def producer(self, script_list, q):
for script in script_list:
q.put(script)
q.join()
def consumer(self, device, q, log_level):
level = self.get_log_level(log_level)
logger = get_logger("airtest")
logger.setLevel(level)
while True:
script = q.get()
run_device = 'Android:///' + device
# 执行脚本
air_name = script.split(os.sep)[-1].replace('.air', '')
log = os.path.join(self.log_path, air_name)
print(device + ' run ' + script + ' ---->start')
args = Namespace(device=run_device, log=log, recording=None, script=script)
try:
run_script(args, AirtestCase)
except:
print("噢? 失败啦")
pass
finally:
self.device_dict[air_name] = device
print(device + ' run ' + script + ' ---->end')
print('\033[45m%s 执行了 %s\033[0m' % (os.getpid(), script))
q.task_done() # 向q.join()发送一次信号,证明一个数据已经被取走了
def run_case_by_distri(self, device_list, case_name, log_level):
"""
多进程跑case,将任务平均分给不同的设备,多进程跑,都跑完后,读取各case的log.txt 文件,获取case的开始结束时间信息
:param device_list
:param case_name:
:param log_level:
:return:
"""
workers = []
all_start = int(time.time() * 1000)
# 获取所有需要执行的用例集
script_list = self.get_all_dir(case_name)
#random.shuffle(script_list)
device_count = len(device_list)
q = JoinableQueue()
# 生产者们:收集要执行的用例
p1 = Process(target=self.producer, args=(script_list, q))
workers.append(p1)
print("我们总共需要执行" + str(len(workers)) + "个用例")
print("奥力给!")
# 消费者们:各设备执行用例
for i in range(device_count):
c1 = Process(target=self.consumer, args=(device_list[i], q, log_level))
c1.daemon = True
workers.append(c1)
# 开始
for p in workers:
time.sleep(2)
p.start()
p1.join()
print("all SubProcesses done")
seconds = (int(time.time() * 1000) - all_start) / 1000
m, s = divmod(seconds, 60)
CustomAirTestCase.summary_message["time"] = "%d分%d秒" % (m, s)
self.delWithLogToReport(script_list)
def run_case(self, device, case_name):
"""
运行单个case
:param device:
:param case_name:
:return: 返回case的运行结果集合,包括case名,case执行起止时间,供校验埋点的时候用
"""
summary_message = {}
summary_message["caseNum"] = 0
summary_message["passNum"] = 0
all_start = int(time.time() * 1000)
fp = open(self.runtime_log, 'w+')
# 聚合结果
case_results = []
# d获取所有用例集
for f in os.listdir(self.case_path):
if f.endswith(".air"):
# f为.air用例名称:demo.air
air_name = f.replace('.air', '')
if 'all' in case_name or air_name in case_name:
script = os.path.join(self.case_path, f)
# 创建日志
log = self.create_log(self.log_path, air_name)
output_file = os.path.join(log, 'log.html')
# global args
dev = 'Android:///' + device
args = Namespace(device=dev, log=log, recording=None, script=script)
start = int(time.time() * 1000)
try:
run_script(args, AirtestCase)
# summary_message["passNum"] += 1
except:
pass
finally:
end = int(time.time() * 1000)
author = self.get_author(air_name)
fp.write(device + '\t' + air_name + '\t' + str(start) + '\t' + str(end) + '\t' + author + '\n')
rpt = report.LogToHtml(script, log)
report_dir = '../../report/report/'
case_image_dir = '../../air_case'
rpt.report("log_template.html", report_dir, case_image_dir, output_file=output_file)
result = {}
result["name"] = air_name
result["result"] = rpt.test_result
if result["result"]:
summary_message["passNum"] += 1
result["time"] = (end - start) / 1000 if end > start else 0
author = self.get_author(air_name)
result["author"] = author
case_results.append(result)
summary_message["caseNum"] += 1
print("case 执行完毕,开始生成报告")
fp.close()
seconds = (int(time.time() * 1000) - all_start) / 1000
m, s = divmod(seconds, 60)
summary_message["time"] = "%d分%d秒" % (m, s)
self.doReport(case_results, summary_message, 'summary.html')
print("报告已生成")
return True
def run_case_by_Multi(self, device_list, case_name, log_level):
"""
兼容性测试,case在每个机器上跑一遍
:param device_list:
:param case_name:
:param log_level:
:return:
"""
all_start = int(time.time() * 1000)
p = Pool(len(device_list))
# 获取所有需要执行的用例集
script_list = self.get_cases_and_log(case_name, device_list)
#random.shuffle(script_list)
print("任务列表:")
print(script_list)
for device in device_list:
# 设置进程数和设备数一样
try:
p.apply_async(self.do_run_by_pool, args=(device, script_list, os.path.join(self.log_path, device), log_level))
time.sleep(3)
except:
pass
p.close()
p.join()
print("all subprocesses done")
seconds = (int(time.time() * 1000) - all_start) / 1000
m, s = divmod(seconds, 60)
CustomAirTestCase.summary_message["time"] = "%d分%d秒" % (m, s)
for device in device_list:
self.delWithLogToReportByDevice(script_list, device)
import os
from ruamel import yaml
from common.db.db import dbOP
import datetime
curpath = os.path.dirname(os.path.realpath(__file__))
# 数据读入和写入文件
class Rw:
def w_token(self, enc_user_id='u779700044448', env='on'):
result = dbOP().selectSql('select_user_token', [enc_user_id])
token = result[0][0]
dict = {}
token_value = {}
token_value["time"] = datetime.datetime.now()
token_value["token"] = token
token_value["enc_user_id"] = enc_user_id
key = enc_user_id + "_" + env
dict[key] = token_value
yamlpath = os.path.join(curpath, "data")
# 写入到yaml文件
with open(yamlpath, "w", encoding="utf-8") as f:
yaml.dump(dict, f, Dumper=yaml.RoundTripDumper)
return token
def r_token(self, enc_user_id='u779700044448', env='on'):
yamlpath = os.path.join(curpath, "data")
file_value = open(yamlpath, 'r')
result = yaml.load(file_value.read(), Loader=yaml.Loader)
if result != None:
key = enc_user_id + "_" + env
if key in result:
return result[key]
else:
return None
else:
return None
def r_temp_file(self, key):
"""
读临时文件
:return:
"""
yamlPath = os.path.join(curpath, "temp.yaml")
file_value = open(yamlPath, 'r')
result = yaml.load(file_value.read(), Loader=yaml.Loader)
return result[key]
def w_temp_file(self, dict):
"""
写临时文件,没有会自动生成
:param content:
:return:
"""
yamlpath = os.path.join(curpath, "temp.yaml")
# 写入到yaml文件
with open(yamlpath, "w", encoding="utf-8") as f:
yaml.dump(dict, f, Dumper=yaml.RoundTripDumper)
# dev
# dev
172.16.10.203 api.babytree-dev.com
172.16.10.203 api-test11.babytree-test.com
172.16.10.203 api.test11.babytree-fpm.com
172.16.10.203 api.babytree.com
10.88.9.88 config.meitun.com copy.meitun.com
10.54.17.151 sita-h5.meitun.com
10.54.17.151 sita-cu.meitun.com
10.54.11.80 jenkins.baobaoshu.com atms.baobaoshu.com cloud.baobaoshu.com
10.88.7.8 fastdfs1.mt.com img11.meitun.com img01.meituncdn.com
10.88.7.8 fastdfs2.mt.com img12.meitun.com img02.meituncdn.com
10.54.11.80 grafana.baobaoshu.com
10.54.11.80 zabbix.baobaoshu.com
10.54.11.80 cacti.baobaoshu.com
10.54.11.80 ucm.baobaoshu.com
10.54.11.80 rundeck.baobaoshu.com dubbo.baobaoshu.com
10.54.11.80 cmdb.baobaoshu.com
10.54.11.80 apollo.baobaoshu.com muse.baobaoshu.com
10.54.11.80 splunk.baobaoshu.com
192.168.24.30 dbm.mt.com
192.168.24.30 dbs1.mt.com
192.168.24.30 redisdb1.mt.com redisdb2.mt.com redis.mobile.db01.mt.com rediscache1.mt.com rediscache2.mt.com rediscache3.mt.com
192.168.24.46 mqmaster.mt.com
192.168.24.47 mqslave01.mt.com
10.88.9.166 zk.mt.com
10.54.11.80 ucmweb.baobaoshu.com
172.16.9.154 webview.test9.dev.babytree-inc.com
172.16.9.154 test11.dev.babytree-inc.com
172.16.10.203 test11.babytree-dev.com test11.babytree-fpm.com m.test11.babytree-test.com test11.babytree-test.com test19.babytree-test.com webview.test9.dev.babytree-inc.com
10.54.17.151 sita-live.meitun.com
10.88.9.118 kafka.mt.com
10.88.7.11 pkg.baobaoshu.com
172.16.10.203 test100.babytree-dev.com g.babytree-dev.com
122.9.41.244 gerrit.babytree-inc.com
10.54.17.153 localproxy.baobaoshu.com
192.168.24.43 gerrit.mtmm.com
192.168.24.43 gerrit.baobaoshu.com
0.0.0.0 account.jetbrains.com
10.54.11.80 idb.baobaoshu.com
10.54.17.153 sit-backend.meitunmama.com
10.54.17.153 sit-openapi.meitun.com
180.168.13.174 backend.meitunmama.com
10.54.17.153 sit-m.meitun.com
10.54.17.153 sit-static.meitun.com
10.54.17.153 sit-static.meituncdn.com
10.54.17.153 static.meitun.com
10.54.17.153 static.meituncdn.com
47.97.216.230 m.meitun.com
10.88.17.43 pre-backend.meitunmama.com
172.16.9.154 pack.babytree-inc.com
# 线上
117.121.137.17 jumper.meitunmama.com
192.168.60.26 ops.baobaoshu.com
192.168.60.26 falcon.baobaoshu.com
115.159.253.118 ppt.baobaoshu.com
10.50.80.66 oneops.baobaoshu.com
10.50.80.66 ssv.baobaoshu.com
10.88.9.204 im.meitun.com
10.50.253.100 workflow.meitunmama.com
172.16.9.163 space.babytree-inc.com
10.88.17.43 pre-m.meitun.com
10.88.17.43 pre-openapi.meitun.com
10.54.11.80 mock.baobaoshu.com
10.54.11.80 mantis.baobaoshu.com
10.54.11.80 testportal.baobaoshu.com
\ No newline at end of file
user:
live:
- 9
- 4
- 310
pre:
- 9
- 2
- 307
health:
live:
- 35
- 4
- 165
pre :
- 35
- 2
- 163
sit: health
alimall:
live:
- 736
- 4
- 828
pre :
- 736
- 2
- 826
sit: alimall
promotion:
live:
- 19
- 4
- 129
pre:
- 19
- 2
- 58
sit: promotion
salesorder:
live:
- 6
- 4
- 134
pre :
- 6
- 2
- 6
sit: salesorder
seller:
live:
- 7
- 4
- 144
pre:
- 7
- 2
- 7
sit: seller
cms:
live:
- 22
- 4
- 142
pre:
- 22
- 2
- 41
sit: cms
community:
live:
- 25
- 4
- 147
pre:
- 25
- 2
- 37
sit: community
price:
live:
- 14
- 4
- 138
pre:
- 14
- 2
- 51
sit: price
medical:
live:
- 766
- 4
- 1131
pre:
- 766
- 2
- 1128
sit: medical
bcoin:
live:
- 717
- 4
- 750
pre:
- 717
- 2
- 747
sit: bcoin
edu:
live:
- 727
- 4
- 794
pre:
- 727
- 2
- 792
sit: edu
account:
live:
- 62
- 4
- 293
pre:
- 62
- 2
- 291
sit: account
finance:
live:
- 10
- 4
- 136
pre:
- 10
- 2
- 139
sit: finance
socialec:
live:
- 762
- 4
- 1079
pre:
- 762
- 2
- 1077
sit: socialec
item:
live:
- 8
- 4
- 143
pre:
- 8
- 2
- 8
sit: item
new_wms:
live:
- 763
- 4
- 1089
pre:
- 763
- 2
- 1086
sit: new_wms
distribution:
live:
- 67
- 4
- 312
pre:
- 67
- 2
- 315
sit: new_wms
"dingding_msg":
-
# 存储业务sql
"select_patient":
- medical
- SELECT relation_id,name,gender,id FROM `patient` where enc_user_id=%s and deleted=0;
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment