Commit 13859465 authored by liguangyu06's avatar liguangyu06
Browse files

提交电商项目用例

parent 13b3da0e
Pipeline #6911 failed with stages
in 3 seconds
# -*- encoding=utf8 -*-
__author__ = "liguangyu"
"""
case_tag:api,cmdc-maiilogin,多彩商城登录,sit,bs
"""
from common.common_func import commonFuc
import requests
import uuid
from common.verification_code_ocr import VerificationCodeOcr
import json
import os
module = "cmdc_login"
class CmdcMaiiLogin():
"""多彩商城登录"""
def __init__(self, username, password):
self.username = username
self.password = password
def get_token(self):
# 获取唯一识别码
uuid_handle = uuid.uuid4()
# 获取验证码报文
param = {"uuid": uuid_handle}
# 获取多彩商城登录页面获取验证码地址
code_url = commonFuc().get_api_url() + commonFuc().get_business_data(module, "getVerifyCode_url")
# code_url = "https://service-slb.cmic.com.cn/sso/getVerifyCode"
# 发送请求获取验证码
result = requests.get(code_url, params=param)
# print(result.content)
# 获取当前文件路径
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
code_path = BASE_DIR + "/多彩商城登录/verifycode.png"
# 获取到验证码存入本地文件
with open(code_path, 'wb') as f:
f.write(result.content)
# 识别并获取验证码
code = VerificationCodeOcr(code_path, "rainbow123", "rainbow123").base64_api()
# 获取多彩商城登录地址
cmdc_url = commonFuc().get_api_url() + commonFuc().get_business_data(module, "maii_login_url")
# cmdc_url = "https://service-slb.cmic.com.cn/sso/mallLogin"
# 组装请求报文
request_body = {"userName": self.username,"password": self.password,"validateCode": code,"uuid": uuid_handle}
print(request_body)
# 发送请求
result = requests.post(cmdc_url, params=request_body)
# 获取预期结果
# check_dict = commonFuc().get_business_data(module, "checkDict")
# print(check_dict)
# # 断言实际结果中是否包含预期结果的内容
# commonFuc().check_result(check_dict, result)
result = json.loads(result.content)
token = commonFuc().analysis_json('accessToken', result)
return token
if __name__ == "__main__":
print(CmdcMaiiLogin("Test001", "Aa123456").get_token())
# -*- encoding=utf8 -*-
__author__ = "liguangyu"
"""
case_tag:api,cmdc采购单查询,2250,2250-1,sit,bs
主数据平台:运营后台管理系统采购单查询接口
"""
from common.common_func import commonFuc
from air_case.cmdc_login.后台管理系统登录.后台管理系统登录 import CmdcDoLogin
import requests
import json
module = "cmdc_purchase_list"
url = commonFuc().get_api_url() + commonFuc().get_business_data(module, "url")
print(url)
# 获取登录所需账号密码
username = commonFuc().get_business_data(module, "username")
password = commonFuc().get_business_data(module, "password")
# 获取登录后Cmdc_access_token
cmdc_access_token = CmdcDoLogin(username, password).get_token()
print(cmdc_access_token)
headers = commonFuc().get_business_data(module, "json_headers1",cmdc_access_token)
print(headers)
request_body = commonFuc().get_business_data(module, "payload1")
print(request_body)
"""
场景:传入正确参数,获取到采购单列表
用例名称:获取采购单列表
输出:{"success":true,"code":"200","message":null}
"""
# 发送请求
result = requests.post(url, json=request_body, headers=headers)
result = json.loads(result.content)
print(result)
# 获取预期结果
check_dict = commonFuc().get_business_data(module, "checkDict2")
print(check_dict)
# 断言实际结果中是否包含预期结果的内容
commonFuc().check_result(check_dict, result)
# -*- encoding=utf8 -*-
__author__ = "liguangyu"
"""
case_tag:api,cmdc客户查询,2250,2250-2,sit,bs
主数据平台:运营后台管理系统客户接口
"""
from common.common_func import commonFuc
from air_case.cmdc_login.后台管理系统登录.后台管理系统登录 import CmdcDoLogin
import requests
import json
module = "cmdc_query_customer"
url = commonFuc().get_api_url() + commonFuc().get_business_data(module, "url")
print(url)
# 获取登录所需账号密码
username = commonFuc().get_business_data(module, "username")
password = commonFuc().get_business_data(module, "password")
# 获取登录后Cmdc_access_token
cmdc_access_token = CmdcDoLogin(username, password).get_token()
print(cmdc_access_token)
headers = commonFuc().get_business_data(module, "json_headers1",cmdc_access_token)
print(headers)
request_body = commonFuc().get_business_data(module, "payload1")
print(request_body)
"""
场景:传入正确参数,获取到客户列表
用例名称:获取客户列表
输出:{"success":true,"code":"200","message":"OK"}
"""
# 发送请求
result = requests.post(url, json=request_body, headers=headers)
result = json.loads(result.content)
print(result)
# 获取预期结果
check_dict = commonFuc().get_business_data(module, "checkDict2")
print(check_dict)
# 断言实际结果中是否包含预期结果的内容
commonFuc().check_result(check_dict, result)
# -*- encoding=utf8 -*-
__author__ = "xiaohesheng"
"""
case_tag:spd3-web,10087
spd登录的ui自动化例子1
"""
from common.common_func import commonFuc
from airtest.core.api import *
from airtest_selenium.proxy import *
module = "demo-ui"
url = commonFuc().get_api_url() + commonFuc().get_business_data(module, "spd3_login_url")
print(url)
chrome=WebChrome()
chrome=commonFuc().openBrowser(url,chrome)
chrome.maximize_window()
chrome.implicitly_wait(20)
username=commonFuc().get_business_data(module, "username")
password=commonFuc().get_business_data(module, "password")
login_button=commonFuc().get_business_data(module, "login_button")
print(username,password)
commonFuc().logIn(username,password,login_button,chrome)
name1=commonFuc().get_business_data(module,"name1")
print(name1)
result=commonFuc().check_login_result(name1,chrome)
print(result)
#断言实际结果中是否包含预期的文本
commonFuc().check_text_exist_result_text(name1,result)
chrome.airtest_touch(Template(r"tpl1691374901526.png", record_pos=(12.3, 2.8), resolution=(100, 100)))
commonFuc().quit_chrome(chrome)
# -*- encoding=utf8 -*-
__author__ = "Administrator"
"""
case_tag:api,baidu,sit,on
"""
from airtest.core.api import *
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from airtest_selenium.proxy import *
driver = WebChrome()
driver.implicitly_wait(20)
driver.get("https://www.baidu.com")
sleep(4)
driver.find_element_by_id("kw").send_keys("airtest")
sleep(4)
driver.airtest_touch(Template(r"tpl1691374901526.png", record_pos=(12.3, 2.8), resolution=(100, 100)))
# -*- encoding=utf8 -*-
from common.common_func import commonFuc
__author__ = "meimengting"
"""
case_tag:api,medical-web,sit,on
"""
from airtest.core.api import *
import sys
sys.path.append('..')
from common.db.db import dbOP
import time
# using("common_steps.air")
# import common_steps
# result_db = dbOP().selectSql('exp_expert', [0, 20])
"""
enc_user_id可替换成接口实际所需要id,默认是u779700044448
获取token和时间戳
"""
enc_user_id = 'u97969333801'
# token = common_steps.check_token(enc_user_id)
timestamp = str(int(time.time()))
# "按照环境将url管理起来,更改api url后的接口名即可"
# url = common_steps.get_api_url()
# url = url + "/newapi/router/medical/expert/info"
"""
querystring: 业务参数
headers: header参数
"""
# exId = result_db[0][0]
# exName = result_db[0][1]
# querystring = {"expertIds": exId}
# headers = {
# 'clientInfo': '{"birthday":"2018-11-18","screenwidth":"375","clientVersion":"2.4.2","screenheight":"667","partner":"meitunmama","clientip":"10.180.81.127","traderName":"iPhone 6S","clientAppVersion":"2.4.2","clientYunyuVersion":"7.9.6","clientSystem":"ios","nettype":"wifi","deviceCode":"1f4b3860acfa303bca0407f5128bc5ea0f529fec"}',
# 'platform': "1",
# 'signature': "144c6b3c78fc20ad57da1ebdb879615b",
# 'token': token,
# 'timestamp': timestamp,
# }
"""
get方法
"""
# result = common_steps.get(url, headers, querystring)
"""
出参校验
"""
# check_dict = {"rtn_code": 0, "rtn_msg": "请求成功", "expertName": str(exName)}
# commonFuc().check_result(check_dict, result)
\ No newline at end of file
# -*- encoding=utf8 -*-
__author__ = "xiaohesheng"
"""
case_tag:api,mdm-web,经销商正常注册,id2232,id2232-8,sit,on
主数据平台:经销商_企业注册提交_正常注册
"""
from common.common_func import commonFuc
import time
module = "mdm3_es"
#企业注册接口
import os
url = commonFuc().get_api_url() + commonFuc().get_business_data(module, "CompanyRegist_url")
print(url)
# #获取请求头信息
headers = commonFuc().get_business_data(module, "json_headers", commonFuc().get_business_data(module,"json_contentType"))
print(headers)
name=commonFuc().get_business_data(module,"name1")#获取前缀name
cname=name+commonFuc().randomString(5)#随机数生成拼接企业名称
request_body = commonFuc().get_business_data(module, "payload6",cname,cname,cname,cname,cname,cname)
print(request_body)
"""
场景:经销商_企业注册提交_正常注册
用例名称:经销商_企业注册提交_正常注册
输入:cname
输出:"rtn_msg": "返回注册企业信息"
"""
#发送请求
result = commonFuc().http_post(url, request_body, headers)
print(result)
#获取预期结果
check_dict = commonFuc().get_business_data(module, "checkDict")
print(check_dict)
#断言实际结果中是否不包含预期的文本
commonFuc().check_text_exist(name,result)
import os
from jpype import *
import jpype
curPath = os.path.abspath(os.path.dirname(__file__))
rootPath = rootPath = os.path.split(curPath)[0]
startJVM(jpype.getDefaultJVMPath(), "-ea", "-Djava.class.path=%s" % rootPath + os.sep + "jar" + os.sep + "utils.jar")
jclass = JClass("com.babytree.AesUtils")
class aesUtils:
def encrypt(self, content):
"""
aes_ecb模式加密
:param content:
:return:
"""
result = jclass.encrypt(content)
return result
def decrypt(self, content):
"""
aes_ecb模式解密
:param content:
:return:
"""
result = jclass.decrypt(content)
return result
def shutdown(self):
"""
关闭虚拟机
:return:
"""
shutdownJVM()
if __name__ == '__main__':
sign = aesUtils().encrypt("helloword")
print("加密:", sign)
sign2 = aesUtils().decrypt(sign)
print("解密: ", sign2)
aesUtils().shutdown()
import re
import os
def get_message(case_path, case_name):
"""
从注释中获取某个文件的 case tag 信息
"""
tags = []
pyfile = open(case_path + os.sep + case_name + '.py', 'r', encoding='utf-8')
lines = pyfile.readlines()
for line in lines:
if 'case_tag' in line:
line = line.split('case_tag')[-1]
#对不规范的写法做一些处理,如中文分号, 中文逗号,多空格等。。
line = re.sub(' ', '', line)
line = line.replace(',', ',').replace(':', '').replace(':', '')
line = line.strip().strip('\n')
tags = line.split(',')
pyfile.close()
return tags
def get_case_tag_list(case_path, cases):
"""
遍历case目录下的文件,从文件中获取case tag,加入列表中
再加个判断,判断是跑线上还是线下
"""
tagList = cases.split('#')
env = os.environ['ENV']
case_list = []
for f in os.listdir(case_path):
nf_path = os.path.join(case_path, f)
if os.path.isdir(nf_path) and nf_path.endswith(".air") == False:
for nf in os.listdir(nf_path):
script = os.path.join(nf_path, nf)
if script.endswith(".air"):
air_name = nf.replace('.air', '')
case_tags = get_message(script, air_name)
flag = False
for con in tagList:
conList = con.split(',')
if list(set(case_tags) & set(conList)):
flag = True
else:
flag = False
break
if flag and (env in case_tags or env in ('sit', 'sita')):
case_hash = {}
case_hash["air_path"] = str(script)
case_hash["air_name"] = str(air_name)
case_hash["module"] = str(script.split(os.sep)[-2])
case_list.append(case_hash)
else:
script = os.path.join(case_path, f)
if f.endswith(".air"):
air_name = f.replace('.air', '')
case_tags = get_message(script, air_name)
flag = False
for con in tagList:
conList = con.split(',')
if list(set(case_tags) & set(conList)):
flag = True
else:
flag = False
break
if flag and (env in case_tags or env in ('sit', 'sita')):
case_hash = {}
case_hash["air_path"] = str(script)
case_hash["air_name"] = str(air_name)
case_hash["module"] = str(script.split(os.sep)[-2])
case_list.append(case_hash)
return case_list
def get_case_tag_list_2(case_path, cases):
"""
遍历case目录下的文件,从文件中获取case tag,加入列表中
"""
tagList = cases.split('#')
case_list = []
for f in os.listdir(case_path):
script = os.path.join(case_path, f)
air_name = f.replace('.air', '')
case_tags = get_message(script, air_name)
flag = False
for con in tagList:
conList = con.split(',')
if list(set(case_tags) & set(conList)):
flag = True
else:
flag = False
break
if flag:
case_list.append(air_name)
return case_list
# -*- coding: utf-8 -*-
import unittest
import os
import sys
import six
import re
import shutil
import traceback
import warnings
from io import open
from airtest.core.api import G, auto_setup, log
from airtest.core.settings import Settings as ST
from airtest.utils.compat import decode_path, script_dir_name, script_log_dir
from copy import copy
class AirtestCase(unittest.TestCase):
PROJECT_ROOT = "."
SCRIPTEXT = ".air"
TPLEXT = ".png"
@classmethod
def setUpClass(cls):
cls.args = args
setup_by_args(args)
# setup script exec scope
cls.scope = copy(globals())
cls.scope["exec_script"] = cls.exec_other_script
def setUp(self):
if self.args.log and self.args.recording:
for dev in G.DEVICE_LIST:
try:
dev.start_recording()
except:
traceback.print_exc()
def tearDown(self):
if self.args.log and self.args.recording:
for k, dev in enumerate(G.DEVICE_LIST):
try:
output = os.path.join(self.args.log, "recording_%d.mp4" % k)
dev.stop_recording(output)
except:
traceback.print_exc()
def runTest(self):
scriptpath, pyfilename = script_dir_name(self.args.script)
pyfilepath = os.path.join(scriptpath, pyfilename)
pyfilepath = os.path.abspath(pyfilepath)
self.scope["__file__"] = pyfilepath
with open(pyfilepath, 'r', encoding="utf8") as f:
code = f.read()
pyfilepath = pyfilepath.encode(sys.getfilesystemencoding())
try:
exec(compile(code.encode("utf-8"), pyfilepath, 'exec'), self.scope)
except Exception as err:
tb = traceback.format_exc()
log("Final Error", tb)
six.reraise(*sys.exc_info())
@classmethod
def exec_other_script(cls, scriptpath):
"""run other script in test script"""
warnings.simplefilter("always")
warnings.warn("please use using() api instead.", PendingDeprecationWarning)
def _sub_dir_name(scriptname):
dirname = os.path.splitdrive(os.path.normpath(scriptname))[-1]
dirname = dirname.strip(os.path.sep).replace(os.path.sep, "_").replace(cls.SCRIPTEXT, "_sub")
return dirname
def _copy_script(src, dst):
if os.path.isdir(dst):
shutil.rmtree(dst, ignore_errors=True)
os.mkdir(dst)
for f in os.listdir(src):
srcfile = os.path.join(src, f)
if not (os.path.isfile(srcfile) and f.endswith(cls.TPLEXT)):
continue
dstfile = os.path.join(dst, f)
shutil.copy(srcfile, dstfile)
# find script in PROJECT_ROOT
scriptpath = os.path.join(ST.PROJECT_ROOT, scriptpath)
# copy submodule's images into sub_dir
sub_dir = _sub_dir_name(scriptpath)
sub_dirpath = os.path.join(cls.args.script, sub_dir)
_copy_script(scriptpath, sub_dirpath)
# read code
pyfilename = os.path.basename(scriptpath).replace(cls.SCRIPTEXT, ".py")
pyfilepath = os.path.join(scriptpath, pyfilename)
pyfilepath = os.path.abspath(pyfilepath)
with open(pyfilepath, 'r', encoding='utf8') as f:
code = f.read()
# replace tpl filepath with filepath in sub_dir
code = re.sub("[\'\"](\w+.png)[\'\"]", "\"%s/\g<1>\"" % sub_dir, code)
exec(compile(code.encode("utf8"), pyfilepath, 'exec'), cls.scope)
def setup_by_args(args):
# init devices
if isinstance(args.device, list):
devices = args.device
elif args.device:
devices = [args.device]
else:
devices = []
# set base dir to find tpl
dirpath, _ = script_dir_name(args.script)
# set log dir
if args.log:
args.log = script_log_dir(dirpath, args.log)
print("save log in '%s'" % args.log)
else:
print("do not save log")
# guess project_root to be basedir of current .air path
project_root = os.path.dirname(args.script) if not ST.PROJECT_ROOT else None
try:
auto_setup(dirpath, devices, args.log, project_root)
except:
os.system('adb devices')
auto_setup(dirpath, devices, args.log, project_root)
def run_script(parsed_args, testcase_cls=AirtestCase):
global args # make it global deliberately to be used in AirtestCase & test scripts
args = parsed_args
suite = unittest.TestSuite()
suite.addTest(testcase_cls())
result = unittest.TextTestRunner(verbosity=0).run(suite)
if not result.wasSuccessful():
sys.exit(-1)
import base64
import json
import requests
from airtest.core.api import *
from common.confop import confOP
from common.dubboUtils import GetDubboService2, InvokeDubboApi
from common.rw import Rw
workspace = os.path.abspath(".")
business_path = workspace + os.sep + "data" + os.sep
class commonFuc(object):
def find_path(self, module=""):
if module == "":
return business_path
else:
return business_path + module + os.sep
def get_business_data(self, module, key, *args):
env = os.environ['ENV']
data_list = confOP().getBusiYamlValue(self.find_path(module), "data")
if args is not None and len(args) > 0:
if isinstance(data_list[key], dict):
result = json.loads(json.dumps(data_list[key]) % args)
else:
if str(data_list[key]) == "":
result = data_list[key]
else:
result = data_list[key] % args
else:
result = data_list[key]
if "_sit_" in str(result) and "_on_" in str(result):
if env == "on":
return result["_on_"]
else:
return result["_sit_"]
else:
return result
def get_message(self, module, key):
message_list = confOP().getBusiYamlValue(self.find_path(module), "message")
return message_list[key]
def get_openauth_url(self):
return "http://openauth.meitun.com/tools/sign/create"
def get_api_url(self):
"""
接口类型的请求
:return:
"""
env = os.environ['ENV']
print(env)
if env == 'on':
url = 'https://m.meitun.com'
elif env == 'pre':
url = 'http://pre-m.meitun.com'
elif env == 'sita':
url = 'http://sita-m.meitun.com'
elif env == "spd3": # spd3.0
# url = 'http://sit-m.meitun.com'
url = 'http://spddev.cmic.com.cn'
elif env == "spd2": # spd2.0
url = ""
elif env == "od": # 骨科
url = ""
elif env == "bs": # 电商
url = "https://service-slb.cmic.com.cn"
else:
url = 'http://spddev.cmic.com.cn'
return url
def get_token(self, module, enc_user_id="enc_user_id"):
enc_user_id = self.get_business_data(module, enc_user_id)
token = self.check_token(enc_user_id)
return token
def check_token(self, enc_user_id='u779700044448'):
"""
多个case同一个token不用一直查数据库
:param enc_user_id:
:return:
"""
env = os.environ['ENV']
result = Rw().r_token(enc_user_id, env)
if result is None:
return Rw().w_token(enc_user_id, env)
else:
return result["token"]
def http_get(self, url, headers="", params=""):
"""
一个get请求,返回json
"""
result = requests.request("GET", url, headers=headers, params=params)
result = json.loads(result.text)
return result
def http_post(self, url, postdata=None, header=None):
"""
一个post请求,返回json
"""
# result = requests.post(url, data=postdata, headers=header)
result = requests.post(url, data=json.dumps(postdata), headers=header)
result = json.loads(result.content)
return result
def check_result(self, check_dict, result):
"""
结果检查,要检查的字段及值放在字典里, 结果是完全匹配
result 是json
"""
assert_not_equal(result, [], "只想看下result的值")
for k, v in check_dict.items():
actual_value = self.analysis_json(k, result)
assert_equal(str(v).replace(" ", ""), str(actual_value).replace(" ", ""))
def analysis_json(self, key, result):
"""
解析json
"""
res = None
if self.typeof(result) == 'dict':
if key in result.keys():
return result[key]
else:
for k, v in result.items():
res = self.analysis_json(key, v)
if res is not None:
break
elif self.typeof(result) == 'list':
for value in result:
res = self.analysis_json(key, value)
if res is not None:
break
else:
pass
return res
def typeof(self, variate):
"""
变量类型
:param variate:
:return:
"""
if isinstance(variate, int):
return "int"
elif isinstance(variate, str):
return "str"
elif isinstance(variate, float):
return "float"
elif isinstance(variate, list):
return "list"
elif isinstance(variate, tuple):
return "tuple"
elif isinstance(variate, dict):
return "dict"
elif isinstance(variate, set):
return "set"
else:
return None
def get_openApi_url(self):
"""
获取openapi的接口
"""
env = os.environ['ENV']
print(env)
if env == 'on':
url = 'https://openapi.meitun.com'
elif env == 'pre':
url = 'http://pre-openapi.meitun.com'
elif env == 'sita':
url = 'http://sita-openapi.meitun.com'
else:
url = 'http://sit-openapi.meitun.com'
return url
def get_ip_by_pool(self, poolName, type='sit'):
"""
根据hostname获取ip地址
:param poolName: ip:端口/应用名:端口
:param type: local/sit
:return:
"""
if str(type).lower() == 'local':
return poolName
else:
pool = str(poolName).split(":")[0]
print("pool: ", pool)
port = str(poolName).split(":")[1]
result = self.http_get(
"http://apollo.baobaoshu.com/apiv1/united_devices/?stage=%s&application=%s&usage=SERVER" % (type, pool))
ip = result["results"][0]["ip"]
return str(ip) + ":" + port
def run_local_dubbo(self, content, dubbo_service, dubbo_method, *args):
"""
运行本地dubbo接口
:param dubbo_service: dubbo中 服务名 如:com.zl.mall.api.IItemService
:param dubbo_method: 服务中的方法 如:updateItem
:param args: 方法请求需要的参数
:return:
"""
dubbo_info = GetDubboService2().get_dubbo_info2(content)
invokeDubboApi = InvokeDubboApi(server_host=dubbo_info.get("server_host"),
server_post=dubbo_info.get("server_post"))
return invokeDubboApi.invoke_dubbo_api(dubbo_service, dubbo_method, *args)
def get_open_url(self):
"""
获取openapi的接口
"""
env = os.environ['ENV']
print(env)
if env == 'on':
url = 'https://openapi.meitun.com'
elif env == 'pre':
url = 'http://pre-openapi.meitun.com'
elif env == 'sita':
url = 'http://sita-openapi.meitun.com'
else:
url = 'http://sit-openapi.meitun.com'
return url
def get_openapi_signature(self, module, params):
"""
获取openapi的验签
:param module:
:param params:
:return:
"""
private_key = self.get_business_data(module, "private_key")
app_secret = self.get_business_data(module, "app_secret")
openauth_url = self.get_openauth_url()
data = self.get_business_data(module, "openapi_data", private_key, app_secret, params)
result = commonFuc().http_post(openauth_url, data)
signature = result['signature']
return signature
def get_url(self, pool=None):
"""
根据环境或者url
:return:
"""
env = os.environ['ENV']
print(env)
if pool == "bid":
if env == 'on':
url = 'http://bid.babytree.com'
else:
url = 'https://bid.babytree-test.com'
elif pool == "advertise-go-web":
url = 'http://g.kexin001.com'
elif pool == "ad_Delivery":
url = 'http://go.kexin001.com'
elif pool == "search-platform-index":
if env == 'on':
url = 'http://search-index.babytree.com/index/build'
else:
url = 'http://search-index.babytree-test.com/index/build'
elif pool == "search-platform-web":
if env == 'on':
url = 'http://search-query.babytree.com/search/query'
else:
url = 'http://search-query.babytree-test.com/search/query'
elif pool == "search-merger":
if env == 'on':
url = 'http://merger.babytree.com/search'
else:
url = 'http://merger.babytree-test.com/search'
elif pool == "search-suggest":
if env == 'on':
url = 'http://suggest.babytree.com'
else:
url = 'http://suggest.babytree-test.com'
else:
if env == 'on':
url = 'https://backend.meitunmama.com/'
elif env == 'pre':
url = 'http://pre-backend.meitunmama.com'
elif env == 'sita':
url = 'http://sita-backend.meitunmama.com'
else:
url = 'http://sit-backend.meitunmama.com'
return url
def login_backend(self, driver):
driver.get(self.get_url())
sleep(3)
# driver.assert_template(Template(r"tpl1580989830894.png", record_pos=(3.27, 2.99), resolution=(100, 100)),
# "请填写测试点")
driver.set_window_size(1366, 868)
if os.environ['ENV'] == 'on' or os.environ['ENV'] == 'pre':
es = 'aG9uZ2xp'
driver.find_element_by_id("loginName").send_keys(base64.b64decode(es.encode('utf-8')).decode("utf-8"))
sleep(1)
es = 'aGxiYjEyMTA5Mg=='
driver.find_element_by_xpath("//input[@type='password']").send_keys(
base64.b64decode(es.encode('utf-8')).decode("utf-8"))
sleep(1)
driver.find_element_by_id("smsCode").send_keys("111111")
sleep(1)
else:
driver.find_element_by_id("loginName").send_keys('autotest')
driver.find_element_by_xpath("//input[@type='password']").send_keys('123@qwe')
driver.find_element_by_id("smsCode").send_keys("111111")
driver.find_element_by_id("sub_btn").click()
sleep(2)
# driver.assert_template(Template(r"tpl1579258499558.png", record_pos=(0.47, 0.975), resolution=(100, 100)),
# "验证登录成功了")
# 选择打开页面的路径,如: 大健康-课程包-课程包管理
def enter_channel_manage(self, driver, title, classfy, content):
driver.find_element_by_xpath("//a[@title='%s']" % title).click()
driver.find_element_by_id("east").find_element_by_xpath("//*[text()='%s']" % classfy).click()
driver.find_element_by_xpath("//a[@title='%s']" % content).click()
def enter_album_check(self, driver):
"""
进入专辑审核页
"""
driver.get(
self.get_url() + "/bighealth-service/outcourse/list.htm?source=1&linkId=big_health_audit_sync_btn1_link&tabId=big_health_audit_sync_btn1")
def remove_readonly(self, driver):
"""
去除input的只读属性
"""
inputs = driver.find_elements_by_tag_name("input")
for input in inputs:
driver.execute_script('arguments[0].removeAttribute(\"readonly\")', input)
driver.execute_script('arguments[0].removeAttribute(\"autocomplete\")', input)
def check_variable_exist(self, check_list, result):
"""
结果检查,检查字段值存在
"""
Flag = False
for variable in check_list:
if variable in result.keys():
Flag = True
assert_equal(Flag, True, '验证参数' + variable + "存在")
def check_text_exist(self, check_text, result):
"""
结果检查,检查文本内容是否存在于返回结果中
"""
Flag = False
check_text=str(check_text)
# print('bbbbbbbbbbbbbbbbbbbbbbbbb'+str(result.values()))
if check_text in str(result.values()):
Flag = True
# print('aaaaaaaaaaaaaaaaaaaaaaaaaaaaa',Flag)
assert_equal(Flag, True, '验证文本' + check_text + "存在")
def check_text_no_exist(self, check_text, result):
"""
结果检查,检查文本内容是否存在于返回结果中
"""
Flag = False
# print('bbbbbbbbbbbbbbbbbbbbbbbbb'+str(result.values()))
if check_text not in str(result.values()):
Flag = True
# print('aaaaaaaaaaaaaaaaaaaaaaaaaaaaa',Flag)
assert_equal(Flag, True, '验证文本' + check_text + "不存在")
def check_text_exist_result_text(self, check_text, result_text):
"""
结果检查,检查文本内容是否存在于返回结果中,返回结果也是文本
"""
Flag = False
check_text=str(check_text)
# print('bbbbbbbbbbbbbbbbbbbbbbbbb'+str(result.values()))
if check_text in result_text:
Flag = True
# print('aaaaaaaaaaaaaaaaaaaaaaaaaaaaa',Flag)
assert_equal(Flag, True, '验证文本' + check_text + "存在")
def enter_h5_page(self, driver, params):
"""
:param driver:
:param params: 请求的URL
:return:
"""
url = self.get_api_url()
driver.get(url + params)
def get_start(self, pageno, pagesize):
"""
获取limit的start数
:param pageno:
:param pagesize:
:return:
"""
return (pageno - 1) * pagesize
def click_iterm(self, driver, el_list, name):
"""
点击对应元素
"""
for el in el_list:
print(el.text)
if el.text == name:
driver.execute_script("arguments[0].scrollIntoView();", el)
el.click()
break
def get_mapi_babytree_url(self):
"""
接口mapi_babytree类型的请求
:return:
"""
env = os.environ['ENV']
print(env)
if env == 'on':
url = 'http://mapiweb.babytree.com'
elif env == 'pre':
url = 'http://pre-mapiweb.babytree.com'
else:
url = 'http://mapiweb.babytree-test.com'
return url
def get_localhome_babytree_url(self):
"""
接口mapi_babytree类型的请求
:return:
"""
env = os.environ['ENV']
print(env)
if env == 'on':
url = 'http://localhome.babytree.com'
elif env == 'pre':
url = 'http://pre-localhome.babytree.com'
else:
url = 'http://localhome.babytree-test.com'
return url
def get_go_babytree_url(self):
"""
接口go_babytree类型的请求
:return:
"""
env = os.environ['ENV']
print(env)
if env == 'on':
url = 'http://go.babytree.com'
elif env == 'pre':
url = 'http://go.babytree.com'
else:
url = 'http://go-1.babytree-test.com'
return url
def get_inno_babytree_url(self):
"""
接口mapi_babytree类型的请求
:return:
"""
env = os.environ['ENV']
print(env)
if env == 'on':
url = 'http://apilocal.babytree.com'
elif env == 'pre':
url = 'http://pre-apilocal.babytree.com'
else:
url = 'http://apilocal.babytree-test.com'
return url
def get_localfront_babytree_url(self):
"""
接口mapi_babytree类型的请求
:return:
"""
env = os.environ['ENV']
print(env)
if env == 'on':
url = 'http://localfront.babytree.com'
elif env == 'pre':
url = 'http://pre-localfront.babytree.com'
else:
url = 'http://localfront.babytree-test.com'
return url
def get_search_platform_hz_index_url(self):
"""
接口类型的请求
:return:
"""
env = os.environ['ENV']
if env == 'on':
url = 'http://search-index.babytree.com'
else:
url = 'http://sit-search-index.babytree.com'
return url
# 随机n位字符串,返回n位字符串
def randomString(self, n):
import string
import random
s = "".join(random.sample(string.ascii_letters + string.digits + "!@#$%^&*()", n))
return s
# 打开网页
def openBrowser(self, url, chrome):
chrome.implicitly_wait(10) # 隐式等待 10秒
chrome.get(url)
return chrome
def logIn(self, username, password, login_button,chrome):
chrome.find_element_by_xpath(username).clear()
chrome.find_element_by_xpath(username).send_keys('xhs')
chrome.find_element_by_xpath(password).clear()
chrome.find_element_by_xpath(password).send_keys('a123456!')
chrome.find_element_by_xpath(login_button).click()
def check_login_result(self,username,chrome):
for i in range(10000):
sleep(1)
try:
if (chrome.find_element_by_xpath('//div[text()=" %s "]' % username)).is_displayed():
print('看看这里显示了没有',username)
break
except:
continue
return chrome.find_element_by_xpath('//div[text()=" %s "]' % username).text
def quit_chrome(self,chrome):
chrome.quit()
# 获取当前项目的根目录的路径
def get_pro_path(self):
import os
curPath = os.path.abspath(os.path.dirname(__file__)) # 获取当前文件的所在目录的绝对路径
# print(os.path.split(curPath))
rootPath = os.path.split(curPath)[0]
return rootPath
\ No newline at end of file
class ModuleName(object):
SEARCH_MERGER = "search-merger"
SEARCH_SUGGEST = "search-suggest"
SEARCH_PLATFORM_INDEX = "search-platform-index"
SEARCH_PLATFORM_WEB = "search-platform-web"
import configparser
import os
import yaml
cf = configparser.ConfigParser()
curPath = os.path.abspath(os.path.dirname(__file__))
rootPath = rootPath = os.path.split(curPath)[0]
class confOP:
def getConfValue(self, filename, section, option, type='string'):
cf.read(filename)
if (type == 'string'):
value = cf.get(section, option)
else:
value = cf.getint(section, option)
return value
def getYamlValue(self, filename):
"""
获取公共sql中的sql
:param filename:
:return:
"""
file = os.path.join(rootPath, "data" + os.sep + filename)
f = open(file, 'r', encoding='utf-8')
cont = f.read()
value = yaml.load(cont, Loader=yaml.FullLoader)
f.close()
return value
def getBusiYamlValue(self, path, filename):
"""
获取业务中的yaml文件数据
:param path:
:param filename:
:return:
"""
file = os.path.join(path, filename)
f = open(file, 'r', encoding='utf-8')
cont = f.read()
value = yaml.load(cont, Loader=yaml.FullLoader)
f.close()
return value
loginusername1:
loginusername: 国药集团公司!n0d(
goodsname: 一次性注射器NbfHr
import datetime
import calendar
class dateUtils:
def get_current_start_end(self):
"""
获取当日的开始时间和结束时间
:return:
"""
start, end = datetime.datetime.now().replace(hour=0, minute=0, second=0).strftime(
"%Y-%m-%d %H:%M:%S"), datetime.datetime.now().replace(hour=23, minute=59, second=59).strftime(
"%Y-%m-%d %H:%M:%S")
return start, end
def get_week_start_end(self):
"""
获取本周的第一天和最后一天
:return:
"""
monday, sunday = datetime.date.today(), datetime.date.today()
one_day = datetime.timedelta(days=1)
while monday.weekday() != 0:
monday -= one_day
while sunday.weekday() != 6:
sunday += one_day
# 返回当前的星期一和星期天的日期
monday, sunday = monday.strftime("%Y-%m-%d %H:%M:%S"), sunday.strftime("%Y-%m-%d %H:%M:%S").replace("00:00:00", "23:59:59")
return monday, sunday
def get_current_time(self):
"""
获取当前时间,返回格式是yyyy-mm-dd hh:mm:ss
:return:
"""
curr_time = datetime.datetime.now()
time = datetime.datetime.strftime(curr_time, '%Y-%m-%d %H:%M:%S')
return time
def get_current_date(self):
"""
获取当前日期,返回格式是yyyy-mm-dd
:return:
"""
curr_time = datetime.datetime.now()
date = datetime.datetime.strftime(curr_time, '%Y-%m-%d')
return date
def get_week_of_date(self, date):
"""
获取传入的日期是星期几
:param date:
:return:
"""
week_day_dict = {
0: '星期一',
1: '星期二',
2: '星期三',
3: '星期四',
4: '星期五',
5: '星期六',
6: '星期日',
}
day = datetime.datetime.strptime(date, "%Y-%m-%d").weekday()
return week_day_dict[day]
def add_days(self, date, days):
"""
获取新的日期
:param date: 日期
:param days: 天数(可正数,可负数)
:return:
"""
dt = datetime.datetime.strptime(date, "%Y-%m-%d")
out_date = (dt + datetime.timedelta(days=days)).strftime("%Y-%m-%d")
return out_date
def add_hours(self, date, hours):
"""
获取新的时间
:param date:
:param hours:
:return:
"""
dt = datetime.datetime.strptime(date, "%Y-%m-%d %H:%M:%S")
out_date = (dt + datetime.timedelta(hours=hours)).strftime("%Y-%m-%d %H:%M:%S")
return out_date
def get_month_start_end(self, year, month):
"""
获取本月的第一天和最后一天
:return:
"""
weekDay, monthCountDay = calendar.monthrange(year, month)
firstDay = datetime.date(year, month, day=1)
lastDay = datetime.date(year, month, day=monthCountDay)
return firstDay, lastDay
def get_every_month_start_end(self, year):
"""
获取某年的每个月的第一天和最后一天
:param year:
:return:
"""
start = []
end = []
for x in range(1, 13):
dt_start = (datetime.datetime(int(year), x, 1)).strftime("%Y%m%d")
if 12 == x:
dt_end = (datetime.datetime(int(year), 12, 31)).strftime("%Y%m%d")
else:
dt_end = (datetime.datetime(int(year), x + 1, 1) - datetime.timedelta(days=1)).strftime("%Y%m%d")
start.append(dt_start)
end.append(dt_end)
return start, end
def get_year_month(self, time):
"""
获取某个时间的年、月份
:param time:
:return:
"""
dt = datetime.datetime.strptime(time, "%Y-%m-%d %H:%M:%S")
return dt.year, dt.month
if __name__ == '__main__':
print(dateUtils().get_current_date())
print(" \n ")
print(dateUtils().get_current_start_end())
print(" \n ")
print(dateUtils().add_days("2020-10-18", 10))
print(" \n ")
print(dateUtils().add_hours("2020-10-18 00:00:00", 3))
print(" \n ")
print(dateUtils().get_current_time())
print(" \n ")
print(dateUtils().get_every_month_start_end("2020"))
print(" \n ")
print(dateUtils().get_week_of_date("2020-10-18"))
[meitun_db]
host = rm-bp18as20m3solyi69.mysql.rds.aliyuncs.com
port = 3306
user = test_admin
password = ahdp0b.cr76_Pje1
[bj_db]
host = 172.25.1.6
port = 3320
user = local_rw
password = baidugoogleyahoo
[promotion]
livedb = 19
liveenv = 4
liveserver = 129
predb = 19
preenv = 2
preserver = 58
[mongo]
ip = 127.0.0.1
port = 27017
[redis]
redis_host=192.168.24.31
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment