import base64 import json import requests from airtest.core.api import * from common.confop import confOP from common.dubboUtils import GetDubboService2, InvokeDubboApi from common.rw import Rw workspace = os.path.abspath(".") business_path = workspace + os.sep + "data" + os.sep class commonFuc(object): def find_path(self, module=""): if module == "": return business_path else: return business_path + module + os.sep def get_business_data(self, module, key, *args): env = os.environ['ENV'] data_list = confOP().getBusiYamlValue(self.find_path(module), "data") if args is not None and len(args) > 0: if isinstance(data_list[key], dict): result = json.loads(json.dumps(data_list[key]) % args) else: if str(data_list[key]) == "": result = data_list[key] else: result = data_list[key] % args else: result = data_list[key] if "_sit_" in str(result) and "_on_" in str(result): if env == "on": return result["_on_"] else: return result["_sit_"] else: return result def get_message(self, module, key): message_list = confOP().getBusiYamlValue(self.find_path(module), "message") return message_list[key] def get_openauth_url(self): return "http://openauth.meitun.com/tools/sign/create" def get_api_url(self): """ 接口类型的请求 :return: """ env = os.environ['ENV'] print(env) if env == 'on': url = 'https://m.meitun.com' elif env == 'pre': url = 'http://pre-m.meitun.com' elif env == 'sita': url = 'http://sita-m.meitun.com' elif env == "spd3": # spd3.0 # url = 'http://sit-m.meitun.com' url = 'http://spddev.cmic.com.cn' elif env == "spd2": # spd2.0 url = "" elif env == "od": # 骨科 url = "" elif env == "bs": # 电商 url = "https://service-slb.cmic.com.cn" else: url = 'http://spdtest.cmic.com.cn:8080' return url def get_token(self, module, enc_user_id="enc_user_id"): enc_user_id = self.get_business_data(module, enc_user_id) token = self.check_token(enc_user_id) return token def check_token(self, enc_user_id='u779700044448'): """ 多个case同一个token不用一直查数据库 :param enc_user_id: :return: """ env = os.environ['ENV'] result = Rw().r_token(enc_user_id, env) if result is None: return Rw().w_token(enc_user_id, env) else: return result["token"] def http_get(self, url, headers="", params=""): """ 一个get请求,返回json """ result = requests.request("GET", url, headers=headers, params=params) result = json.loads(result.text) return result def http_post(self, url, postdata=None, header=None): """ 一个post请求,返回json """ # result = requests.post(url, data=postdata, headers=header) result = requests.post(url, data=json.dumps(postdata), headers=header) result = json.loads(result.content) return result #删除请求 def http_delte(self, url,header): """ 一个post请求,返回json """ # result = requests.post(url, data=postdata, headers=header) result = requests.delete(url,headers=header) result = json.loads(result.content) return result def check_result(self, check_dict, result): """ 结果检查,要检查的字段及值放在字典里, 结果是完全匹配 result 是json """ assert_not_equal(result, [], "只想看下result的值") for k, v in check_dict.items(): actual_value = self.analysis_json(k, result) assert_equal(str(v).replace(" ", ""), str(actual_value).replace(" ", "")) def analysis_json(self, key, result): """ 解析json """ res = None if self.typeof(result) == 'dict': if key in result.keys(): return result[key] else: for k, v in result.items(): res = self.analysis_json(key, v) if res is not None: break elif self.typeof(result) == 'list': for value in result: res = self.analysis_json(key, value) if res is not None: break else: pass return res def typeof(self, variate): """ 变量类型 :param variate: :return: """ if isinstance(variate, int): return "int" elif isinstance(variate, str): return "str" elif isinstance(variate, float): return "float" elif isinstance(variate, list): return "list" elif isinstance(variate, tuple): return "tuple" elif isinstance(variate, dict): return "dict" elif isinstance(variate, set): return "set" else: return None def get_openApi_url(self): """ 获取openapi的接口 """ env = os.environ['ENV'] print(env) if env == 'on': url = 'https://openapi.meitun.com' elif env == 'pre': url = 'http://pre-openapi.meitun.com' elif env == 'sita': url = 'http://sita-openapi.meitun.com' else: url = 'http://sit-openapi.meitun.com' return url def get_ip_by_pool(self, poolName, type='sit'): """ 根据hostname获取ip地址 :param poolName: ip:端口/应用名:端口 :param type: local/sit :return: """ if str(type).lower() == 'local': return poolName else: pool = str(poolName).split(":")[0] print("pool: ", pool) port = str(poolName).split(":")[1] result = self.http_get( "http://apollo.baobaoshu.com/apiv1/united_devices/?stage=%s&application=%s&usage=SERVER" % (type, pool)) ip = result["results"][0]["ip"] return str(ip) + ":" + port def run_local_dubbo(self, content, dubbo_service, dubbo_method, *args): """ 运行本地dubbo接口 :param dubbo_service: dubbo中 服务名 如:com.zl.mall.api.IItemService :param dubbo_method: 服务中的方法 如:updateItem :param args: 方法请求需要的参数 :return: """ dubbo_info = GetDubboService2().get_dubbo_info2(content) invokeDubboApi = InvokeDubboApi(server_host=dubbo_info.get("server_host"), server_post=dubbo_info.get("server_post")) return invokeDubboApi.invoke_dubbo_api(dubbo_service, dubbo_method, *args) def get_open_url(self): """ 获取openapi的接口 """ env = os.environ['ENV'] print(env) if env == 'on': url = 'https://openapi.meitun.com' elif env == 'pre': url = 'http://pre-openapi.meitun.com' elif env == 'sita': url = 'http://sita-openapi.meitun.com' else: url = 'http://sit-openapi.meitun.com' return url def get_openapi_signature(self, module, params): """ 获取openapi的验签 :param module: :param params: :return: """ private_key = self.get_business_data(module, "private_key") app_secret = self.get_business_data(module, "app_secret") openauth_url = self.get_openauth_url() data = self.get_business_data(module, "openapi_data", private_key, app_secret, params) result = commonFuc().http_post(openauth_url, data) signature = result['signature'] return signature def get_url(self, pool=None): """ 根据环境或者url :return: """ env = os.environ['ENV'] print(env) if pool == "bid": if env == 'on': url = 'http://bid.babytree.com' else: url = 'https://bid.babytree-test.com' elif pool == "advertise-go-web": url = 'http://g.kexin001.com' elif pool == "ad_Delivery": url = 'http://go.kexin001.com' elif pool == "search-platform-index": if env == 'on': url = 'http://search-index.babytree.com/index/build' else: url = 'http://search-index.babytree-test.com/index/build' elif pool == "search-platform-web": if env == 'on': url = 'http://search-query.babytree.com/search/query' else: url = 'http://search-query.babytree-test.com/search/query' elif pool == "search-merger": if env == 'on': url = 'http://merger.babytree.com/search' else: url = 'http://merger.babytree-test.com/search' elif pool == "search-suggest": if env == 'on': url = 'http://suggest.babytree.com' else: url = 'http://suggest.babytree-test.com' else: if env == 'on': url = 'https://backend.meitunmama.com/' elif env == 'pre': url = 'http://pre-backend.meitunmama.com' elif env == 'sita': url = 'http://sita-backend.meitunmama.com' else: url = 'http://sit-backend.meitunmama.com' return url def login_backend(self, driver): driver.get(self.get_url()) sleep(3) # driver.assert_template(Template(r"tpl1580989830894.png", record_pos=(3.27, 2.99), resolution=(100, 100)), # "请填写测试点") driver.set_window_size(1366, 868) if os.environ['ENV'] == 'on' or os.environ['ENV'] == 'pre': es = 'aG9uZ2xp' driver.find_element_by_id("loginName").send_keys(base64.b64decode(es.encode('utf-8')).decode("utf-8")) sleep(1) es = 'aGxiYjEyMTA5Mg==' driver.find_element_by_xpath("//input[@type='password']").send_keys( base64.b64decode(es.encode('utf-8')).decode("utf-8")) sleep(1) driver.find_element_by_id("smsCode").send_keys("111111") sleep(1) else: driver.find_element_by_id("loginName").send_keys('autotest') driver.find_element_by_xpath("//input[@type='password']").send_keys('123@qwe') driver.find_element_by_id("smsCode").send_keys("111111") driver.find_element_by_id("sub_btn").click() sleep(2) # driver.assert_template(Template(r"tpl1579258499558.png", record_pos=(0.47, 0.975), resolution=(100, 100)), # "验证登录成功了") # 选择打开页面的路径,如: 大健康-课程包-课程包管理 def enter_channel_manage(self, driver, title, classfy, content): driver.find_element_by_xpath("//a[@title='%s']" % title).click() driver.find_element_by_id("east").find_element_by_xpath("//*[text()='%s']" % classfy).click() driver.find_element_by_xpath("//a[@title='%s']" % content).click() def enter_album_check(self, driver): """ 进入专辑审核页 """ driver.get( self.get_url() + "/bighealth-service/outcourse/list.htm?source=1&linkId=big_health_audit_sync_btn1_link&tabId=big_health_audit_sync_btn1") def remove_readonly(self, driver): """ 去除input的只读属性 """ inputs = driver.find_elements_by_tag_name("input") for input in inputs: driver.execute_script('arguments[0].removeAttribute(\"readonly\")', input) driver.execute_script('arguments[0].removeAttribute(\"autocomplete\")', input) def check_variable_exist(self, check_list, result): """ 结果检查,检查字段值存在 """ Flag = False for variable in check_list: if variable in result.keys(): Flag = True assert_equal(Flag, True, '验证参数' + variable + "存在") def check_text_exist(self, check_text, result): """ 结果检查,检查文本内容是否存在于返回结果中 """ Flag = False check_text=str(check_text) # print('bbbbbbbbbbbbbbbbbbbbbbbbb'+str(result.values())) if check_text in str(result.values()): Flag = True # print('aaaaaaaaaaaaaaaaaaaaaaaaaaaaa',Flag) assert_equal(Flag, True, '验证文本' + check_text + "存在") def check_text_no_exist(self, check_text, result): """ 结果检查,检查文本内容是否存在于返回结果中 """ Flag = False # print('bbbbbbbbbbbbbbbbbbbbbbbbb'+str(result.values())) if check_text not in str(result.values()): Flag = True # print('aaaaaaaaaaaaaaaaaaaaaaaaaaaaa',Flag) assert_equal(Flag, True, '验证文本' + check_text + "不存在") def check_text_exist_result_text(self, check_text, result_text): """ 结果检查,检查文本内容是否存在于返回结果中,返回结果也是文本 """ Flag = False check_text=str(check_text) # print('bbbbbbbbbbbbbbbbbbbbbbbbb'+str(result.values())) if check_text in result_text: Flag = True # print('aaaaaaaaaaaaaaaaaaaaaaaaaaaaa',Flag) assert_equal(Flag, True, '验证文本' + check_text + "存在") def enter_h5_page(self, driver, params): """ :param driver: :param params: 请求的URL :return: """ url = self.get_api_url() driver.get(url + params) def get_start(self, pageno, pagesize): """ 获取limit的start数 :param pageno: :param pagesize: :return: """ return (pageno - 1) * pagesize def click_iterm(self, driver, el_list, name): """ 点击对应元素 """ for el in el_list: print(el.text) if el.text == name: driver.execute_script("arguments[0].scrollIntoView();", el) el.click() break def get_mapi_babytree_url(self): """ 接口mapi_babytree类型的请求 :return: """ env = os.environ['ENV'] print(env) if env == 'on': url = 'http://mapiweb.babytree.com' elif env == 'pre': url = 'http://pre-mapiweb.babytree.com' else: url = 'http://mapiweb.babytree-test.com' return url def get_localhome_babytree_url(self): """ 接口mapi_babytree类型的请求 :return: """ env = os.environ['ENV'] print(env) if env == 'on': url = 'http://localhome.babytree.com' elif env == 'pre': url = 'http://pre-localhome.babytree.com' else: url = 'http://localhome.babytree-test.com' return url def get_go_babytree_url(self): """ 接口go_babytree类型的请求 :return: """ env = os.environ['ENV'] print(env) if env == 'on': url = 'http://go.babytree.com' elif env == 'pre': url = 'http://go.babytree.com' else: url = 'http://go-1.babytree-test.com' return url def get_inno_babytree_url(self): """ 接口mapi_babytree类型的请求 :return: """ env = os.environ['ENV'] print(env) if env == 'on': url = 'http://apilocal.babytree.com' elif env == 'pre': url = 'http://pre-apilocal.babytree.com' else: url = 'http://apilocal.babytree-test.com' return url def get_localfront_babytree_url(self): """ 接口mapi_babytree类型的请求 :return: """ env = os.environ['ENV'] print(env) if env == 'on': url = 'http://localfront.babytree.com' elif env == 'pre': url = 'http://pre-localfront.babytree.com' else: url = 'http://localfront.babytree-test.com' return url def get_search_platform_hz_index_url(self): """ 接口类型的请求 :return: """ env = os.environ['ENV'] if env == 'on': url = 'http://search-index.babytree.com' else: url = 'http://sit-search-index.babytree.com' return url # 随机n位字符串,返回n位字符串 def randomString(self, n): import string import random s = "".join(random.sample(string.ascii_letters + string.digits + "!@#$%^&*()", n)) return s def random_int(self): import random rand_num = random.randint(1, 999999) return rand_num # 打开网页 def openBrowser(self, url, chrome): chrome.implicitly_wait(10) # 隐式等待 10秒 chrome.get(url) return chrome def logIn(self, username, password, login_button,chrome): chrome.find_element_by_xpath(username).clear() chrome.find_element_by_xpath(username).send_keys('xhs') chrome.find_element_by_xpath(password).clear() chrome.find_element_by_xpath(password).send_keys('a123456!') chrome.find_element_by_xpath(login_button).click() def check_login_result(self,username,chrome): for i in range(10000): sleep(1) try: if (chrome.find_element_by_xpath('//div[text()=" %s "]' % username)).is_displayed(): print('看看这里显示了没有',username) break except: continue return chrome.find_element_by_xpath('//div[text()=" %s "]' % username).text def quit_chrome(self,chrome): chrome.quit() # 获取当前项目的根目录的路径 def get_pro_path(self): import os curPath = os.path.abspath(os.path.dirname(__file__)) # 获取当前文件的所在目录的绝对路径 # print(os.path.split(curPath)) rootPath = os.path.split(curPath)[0] return rootPath # 截图 def getPhoto(self,chrome): import time # imgName = time.strftime("%Y%m%d_%H%M%S", time.localtime()) + ".png" imgName = "info.png" pro_path = self.get_pro_path() # 获取项目的根目录的路径 image_path = pro_path + "//log//" + imgName chrome.get_screenshot_as_file(image_path) return imgName, image_path