Commit 530c0f65 authored by 李光宇's avatar 李光宇
Browse files

Merge branch 'master' into 'main'

代码归档只main分支

See merge request !1
parents 13b3da0e 2b18a6d3
# -*- encoding=utf8 -*-
from common.common_func import commonFuc
__author__ = "meimengting"
"""
case_tag:api,medical-web,sit,on
"""
from airtest.core.api import *
import sys
sys.path.append('..')
from common.db.db import dbOP
import time
# using("common_steps.air")
# import common_steps
# result_db = dbOP().selectSql('exp_expert', [0, 20])
"""
enc_user_id可替换成接口实际所需要id,默认是u779700044448
获取token和时间戳
"""
enc_user_id = 'u97969333801'
# token = common_steps.check_token(enc_user_id)
timestamp = str(int(time.time()))
# "按照环境将url管理起来,更改api url后的接口名即可"
# url = common_steps.get_api_url()
# url = url + "/newapi/router/medical/expert/info"
"""
querystring: 业务参数
headers: header参数
"""
# exId = result_db[0][0]
# exName = result_db[0][1]
# querystring = {"expertIds": exId}
# headers = {
# 'clientInfo': '{"birthday":"2018-11-18","screenwidth":"375","clientVersion":"2.4.2","screenheight":"667","partner":"meitunmama","clientip":"10.180.81.127","traderName":"iPhone 6S","clientAppVersion":"2.4.2","clientYunyuVersion":"7.9.6","clientSystem":"ios","nettype":"wifi","deviceCode":"1f4b3860acfa303bca0407f5128bc5ea0f529fec"}',
# 'platform': "1",
# 'signature': "144c6b3c78fc20ad57da1ebdb879615b",
# 'token': token,
# 'timestamp': timestamp,
# }
"""
get方法
"""
# result = common_steps.get(url, headers, querystring)
"""
出参校验
"""
# check_dict = {"rtn_code": 0, "rtn_msg": "请求成功", "expertName": str(exName)}
# commonFuc().check_result(check_dict, result)
\ No newline at end of file
import os
from jpype import *
import jpype
curPath = os.path.abspath(os.path.dirname(__file__))
rootPath = rootPath = os.path.split(curPath)[0]
startJVM(jpype.getDefaultJVMPath(), "-ea", "-Djava.class.path=%s" % rootPath + os.sep + "jar" + os.sep + "utils.jar")
jclass = JClass("com.babytree.AesUtils")
class aesUtils:
def encrypt(self, content):
"""
aes_ecb模式加密
:param content:
:return:
"""
result = jclass.encrypt(content)
return result
def decrypt(self, content):
"""
aes_ecb模式解密
:param content:
:return:
"""
result = jclass.decrypt(content)
return result
def shutdown(self):
"""
关闭虚拟机
:return:
"""
shutdownJVM()
if __name__ == '__main__':
sign = aesUtils().encrypt("helloword")
print("加密:", sign)
sign2 = aesUtils().decrypt(sign)
print("解密: ", sign2)
aesUtils().shutdown()
import re
import os
def get_message(case_path, case_name):
"""
从注释中获取某个文件的 case tag 信息
"""
tags = []
pyfile = open(case_path + os.sep + case_name + '.py', 'r', encoding='utf-8')
lines = pyfile.readlines()
for line in lines:
if 'case_tag' in line:
line = line.split('case_tag')[-1]
#对不规范的写法做一些处理,如中文分号, 中文逗号,多空格等。。
line = re.sub(' ', '', line)
line = line.replace(',', ',').replace(':', '').replace(':', '')
line = line.strip().strip('\n')
tags = line.split(',')
pyfile.close()
return tags
def get_case_tag_list(case_path, cases):
"""
遍历case目录下的文件,从文件中获取case tag,加入列表中
再加个判断,判断是跑线上还是线下
"""
tagList = cases.split('#')
env = os.environ['ENV']
case_list = []
for f in os.listdir(case_path):
nf_path = os.path.join(case_path, f)
if os.path.isdir(nf_path) and nf_path.endswith(".air") == False:
for nf in os.listdir(nf_path):
script = os.path.join(nf_path, nf)
if script.endswith(".air"):
air_name = nf.replace('.air', '')
case_tags = get_message(script, air_name)
flag = False
for con in tagList:
conList = con.split(',')
if list(set(case_tags) & set(conList)):
flag = True
else:
flag = False
break
if flag and (env in case_tags or env in ('sit', 'sita')):
case_hash = {}
case_hash["air_path"] = str(script)
case_hash["air_name"] = str(air_name)
case_hash["module"] = str(script.split(os.sep)[-2])
case_list.append(case_hash)
else:
script = os.path.join(case_path, f)
if f.endswith(".air"):
air_name = f.replace('.air', '')
case_tags = get_message(script, air_name)
flag = False
for con in tagList:
conList = con.split(',')
if list(set(case_tags) & set(conList)):
flag = True
else:
flag = False
break
if flag and (env in case_tags or env in ('sit', 'sita')):
case_hash = {}
case_hash["air_path"] = str(script)
case_hash["air_name"] = str(air_name)
case_hash["module"] = str(script.split(os.sep)[-2])
case_list.append(case_hash)
return case_list
def get_case_tag_list_2(case_path, cases):
"""
遍历case目录下的文件,从文件中获取case tag,加入列表中
"""
tagList = cases.split('#')
case_list = []
for f in os.listdir(case_path):
script = os.path.join(case_path, f)
air_name = f.replace('.air', '')
case_tags = get_message(script, air_name)
flag = False
for con in tagList:
conList = con.split(',')
if list(set(case_tags) & set(conList)):
flag = True
else:
flag = False
break
if flag:
case_list.append(air_name)
return case_list
# -*- coding: utf-8 -*-
import unittest
import os
import sys
import six
import re
import shutil
import traceback
import warnings
from io import open
from airtest.core.api import G, auto_setup, log
from airtest.core.settings import Settings as ST
from airtest.utils.compat import decode_path, script_dir_name, script_log_dir
from copy import copy
class AirtestCase(unittest.TestCase):
PROJECT_ROOT = "."
SCRIPTEXT = ".air"
TPLEXT = ".png"
@classmethod
def setUpClass(cls):
cls.args = args
setup_by_args(args)
# setup script exec scope
cls.scope = copy(globals())
cls.scope["exec_script"] = cls.exec_other_script
def setUp(self):
if self.args.log and self.args.recording:
for dev in G.DEVICE_LIST:
try:
dev.start_recording()
except:
traceback.print_exc()
def tearDown(self):
if self.args.log and self.args.recording:
for k, dev in enumerate(G.DEVICE_LIST):
try:
output = os.path.join(self.args.log, "recording_%d.mp4" % k)
dev.stop_recording(output)
except:
traceback.print_exc()
def runTest(self):
scriptpath, pyfilename = script_dir_name(self.args.script)
pyfilepath = os.path.join(scriptpath, pyfilename)
pyfilepath = os.path.abspath(pyfilepath)
self.scope["__file__"] = pyfilepath
with open(pyfilepath, 'r', encoding="utf8") as f:
code = f.read()
pyfilepath = pyfilepath.encode(sys.getfilesystemencoding())
try:
exec(compile(code.encode("utf-8"), pyfilepath, 'exec'), self.scope)
except Exception as err:
tb = traceback.format_exc()
log("Final Error", tb)
six.reraise(*sys.exc_info())
@classmethod
def exec_other_script(cls, scriptpath):
"""run other script in test script"""
warnings.simplefilter("always")
warnings.warn("please use using() api instead.", PendingDeprecationWarning)
def _sub_dir_name(scriptname):
dirname = os.path.splitdrive(os.path.normpath(scriptname))[-1]
dirname = dirname.strip(os.path.sep).replace(os.path.sep, "_").replace(cls.SCRIPTEXT, "_sub")
return dirname
def _copy_script(src, dst):
if os.path.isdir(dst):
shutil.rmtree(dst, ignore_errors=True)
os.mkdir(dst)
for f in os.listdir(src):
srcfile = os.path.join(src, f)
if not (os.path.isfile(srcfile) and f.endswith(cls.TPLEXT)):
continue
dstfile = os.path.join(dst, f)
shutil.copy(srcfile, dstfile)
# find script in PROJECT_ROOT
scriptpath = os.path.join(ST.PROJECT_ROOT, scriptpath)
# copy submodule's images into sub_dir
sub_dir = _sub_dir_name(scriptpath)
sub_dirpath = os.path.join(cls.args.script, sub_dir)
_copy_script(scriptpath, sub_dirpath)
# read code
pyfilename = os.path.basename(scriptpath).replace(cls.SCRIPTEXT, ".py")
pyfilepath = os.path.join(scriptpath, pyfilename)
pyfilepath = os.path.abspath(pyfilepath)
with open(pyfilepath, 'r', encoding='utf8') as f:
code = f.read()
# replace tpl filepath with filepath in sub_dir
code = re.sub("[\'\"](\w+.png)[\'\"]", "\"%s/\g<1>\"" % sub_dir, code)
exec(compile(code.encode("utf8"), pyfilepath, 'exec'), cls.scope)
def setup_by_args(args):
# init devices
if isinstance(args.device, list):
devices = args.device
elif args.device:
devices = [args.device]
else:
devices = []
# set base dir to find tpl
dirpath, _ = script_dir_name(args.script)
# set log dir
if args.log:
args.log = script_log_dir(dirpath, args.log)
print("save log in '%s'" % args.log)
else:
print("do not save log")
# guess project_root to be basedir of current .air path
project_root = os.path.dirname(args.script) if not ST.PROJECT_ROOT else None
try:
auto_setup(dirpath, devices, args.log, project_root)
except:
os.system('adb devices')
auto_setup(dirpath, devices, args.log, project_root)
def run_script(parsed_args, testcase_cls=AirtestCase):
global args # make it global deliberately to be used in AirtestCase & test scripts
args = parsed_args
suite = unittest.TestSuite()
suite.addTest(testcase_cls())
result = unittest.TextTestRunner(verbosity=0).run(suite)
if not result.wasSuccessful():
sys.exit(-1)
import base64
import json
import requests
from airtest.core.api import *
from common.confop import confOP
from common.dubboUtils import GetDubboService2, InvokeDubboApi
from common.rw import Rw
workspace = os.path.abspath(".")
business_path = workspace + os.sep + "data" + os.sep
class commonFuc(object):
def find_path(self, module=""):
if module == "":
return business_path
else:
return business_path + module + os.sep
def get_business_data(self, module, key, *args):
env = os.environ['ENV']
data_list = confOP().getBusiYamlValue(self.find_path(module), "data")
if args is not None and len(args) > 0:
if isinstance(data_list[key], dict):
result = json.loads(json.dumps(data_list[key]) % args)
else:
if str(data_list[key]) == "":
result = data_list[key]
else:
result = data_list[key] % args
else:
result = data_list[key]
if "_sit_" in str(result) and "_on_" in str(result):
if env == "on":
return result["_on_"]
else:
return result["_sit_"]
else:
return result
def get_message(self, module, key):
message_list = confOP().getBusiYamlValue(self.find_path(module), "message")
return message_list[key]
def get_openauth_url(self):
return "http://openauth.meitun.com/tools/sign/create"
def get_api_url(self):
"""
接口类型的请求
:return:
"""
env = os.environ['ENV']
print(env)
if env == 'on':
url = 'https://m.meitun.com'
elif env == 'pre':
url = 'http://pre-m.meitun.com'
elif env == 'sita':
url = 'http://sita-m.meitun.com'
elif env == "spd3": # spd3.0
# url = 'http://sit-m.meitun.com'
url = 'http://spddev.cmic.com.cn'
elif env == "spd2": # spd2.0
url = ""
elif env == "od": # 骨科
url = ""
elif env == "bs": # 电商
url = "https://service-uat.gyqxmall.com"
# url = "https://service-slb.cmic.com.cn"
else:
url = 'http://spddev.cmic.com.cn'
return url
def get_token(self, module, enc_user_id="enc_user_id"):
enc_user_id = self.get_business_data(module, enc_user_id)
token = self.check_token(enc_user_id)
return token
def check_token(self, enc_user_id='u779700044448'):
"""
多个case同一个token不用一直查数据库
:param enc_user_id:
:return:
"""
env = os.environ['ENV']
result = Rw().r_token(enc_user_id, env)
if result is None:
return Rw().w_token(enc_user_id, env)
else:
return result["token"]
def http_get(self, url, headers="", params=""):
"""
一个get请求,返回json
"""
result = requests.request("GET", url, headers=headers, params=params)
result = json.loads(result.text)
return result
def http_post(self, url, postdata=None, header=None):
"""
一个post请求,返回json
"""
# result = requests.post(url, data=postdata, headers=header)
result = requests.post(url, data=json.dumps(postdata), headers=header)
result = json.loads(result.content)
return result
def check_result(self, check_dict, result):
"""
结果检查,要检查的字段及值放在字典里, 结果是完全匹配
result 是json
"""
assert_not_equal(result, [], "只想看下result的值")
for k, v in check_dict.items():
actual_value = self.analysis_json(k, result)
assert_equal(str(v).replace(" ", ""), str(actual_value).replace(" ", ""))
def analysis_json(self, key, result):
"""
解析json
"""
res = None
if self.typeof(result) == 'dict':
if key in result.keys():
return result[key]
else:
for k, v in result.items():
res = self.analysis_json(key, v)
if res is not None:
break
elif self.typeof(result) == 'list':
for value in result:
res = self.analysis_json(key, value)
if res is not None:
break
else:
pass
return res
def typeof(self, variate):
"""
变量类型
:param variate:
:return:
"""
if isinstance(variate, int):
return "int"
elif isinstance(variate, str):
return "str"
elif isinstance(variate, float):
return "float"
elif isinstance(variate, list):
return "list"
elif isinstance(variate, tuple):
return "tuple"
elif isinstance(variate, dict):
return "dict"
elif isinstance(variate, set):
return "set"
else:
return None
def get_openApi_url(self):
"""
获取openapi的接口
"""
env = os.environ['ENV']
print(env)
if env == 'on':
url = 'https://openapi.meitun.com'
elif env == 'pre':
url = 'http://pre-openapi.meitun.com'
elif env == 'sita':
url = 'http://sita-openapi.meitun.com'
else:
url = 'http://sit-openapi.meitun.com'
return url
def get_ip_by_pool(self, poolName, type='sit'):
"""
根据hostname获取ip地址
:param poolName: ip:端口/应用名:端口
:param type: local/sit
:return:
"""
if str(type).lower() == 'local':
return poolName
else:
pool = str(poolName).split(":")[0]
print("pool: ", pool)
port = str(poolName).split(":")[1]
result = self.http_get(
"http://apollo.baobaoshu.com/apiv1/united_devices/?stage=%s&application=%s&usage=SERVER" % (type, pool))
ip = result["results"][0]["ip"]
return str(ip) + ":" + port
def run_local_dubbo(self, content, dubbo_service, dubbo_method, *args):
"""
运行本地dubbo接口
:param dubbo_service: dubbo中 服务名 如:com.zl.mall.api.IItemService
:param dubbo_method: 服务中的方法 如:updateItem
:param args: 方法请求需要的参数
:return:
"""
dubbo_info = GetDubboService2().get_dubbo_info2(content)
invokeDubboApi = InvokeDubboApi(server_host=dubbo_info.get("server_host"),
server_post=dubbo_info.get("server_post"))
return invokeDubboApi.invoke_dubbo_api(dubbo_service, dubbo_method, *args)
def get_open_url(self):
"""
获取openapi的接口
"""
env = os.environ['ENV']
print(env)
if env == 'on':
url = 'https://openapi.meitun.com'
elif env == 'pre':
url = 'http://pre-openapi.meitun.com'
elif env == 'sita':
url = 'http://sita-openapi.meitun.com'
else:
url = 'http://sit-openapi.meitun.com'
return url
def get_openapi_signature(self, module, params):
"""
获取openapi的验签
:param module:
:param params:
:return:
"""
private_key = self.get_business_data(module, "private_key")
app_secret = self.get_business_data(module, "app_secret")
openauth_url = self.get_openauth_url()
data = self.get_business_data(module, "openapi_data", private_key, app_secret, params)
result = commonFuc().http_post(openauth_url, data)
signature = result['signature']
return signature
def get_url(self, pool=None):
"""
根据环境或者url
:return:
"""
env = os.environ['ENV']
print(env)
if pool == "bid":
if env == 'on':
url = 'http://bid.babytree.com'
else:
url = 'https://bid.babytree-test.com'
elif pool == "advertise-go-web":
url = 'http://g.kexin001.com'
elif pool == "ad_Delivery":
url = 'http://go.kexin001.com'
elif pool == "search-platform-index":
if env == 'on':
url = 'http://search-index.babytree.com/index/build'
else:
url = 'http://search-index.babytree-test.com/index/build'
elif pool == "search-platform-web":
if env == 'on':
url = 'http://search-query.babytree.com/search/query'
else:
url = 'http://search-query.babytree-test.com/search/query'
elif pool == "search-merger":
if env == 'on':
url = 'http://merger.babytree.com/search'
else:
url = 'http://merger.babytree-test.com/search'
elif pool == "search-suggest":
if env == 'on':
url = 'http://suggest.babytree.com'
else:
url = 'http://suggest.babytree-test.com'
else:
if env == 'on':
url = 'https://backend.meitunmama.com/'
elif env == 'pre':
url = 'http://pre-backend.meitunmama.com'
elif env == 'sita':
url = 'http://sita-backend.meitunmama.com'
else:
url = 'http://sit-backend.meitunmama.com'
return url
def login_backend(self, driver):
driver.get(self.get_url())
sleep(3)
# driver.assert_template(Template(r"tpl1580989830894.png", record_pos=(3.27, 2.99), resolution=(100, 100)),
# "请填写测试点")
driver.set_window_size(1366, 868)
if os.environ['ENV'] == 'on' or os.environ['ENV'] == 'pre':
es = 'aG9uZ2xp'
driver.find_element_by_id("loginName").send_keys(base64.b64decode(es.encode('utf-8')).decode("utf-8"))
sleep(1)
es = 'aGxiYjEyMTA5Mg=='
driver.find_element_by_xpath("//input[@type='password']").send_keys(
base64.b64decode(es.encode('utf-8')).decode("utf-8"))
sleep(1)
driver.find_element_by_id("smsCode").send_keys("111111")
sleep(1)
else:
driver.find_element_by_id("loginName").send_keys('autotest')
driver.find_element_by_xpath("//input[@type='password']").send_keys('123@qwe')
driver.find_element_by_id("smsCode").send_keys("111111")
driver.find_element_by_id("sub_btn").click()
sleep(2)
# driver.assert_template(Template(r"tpl1579258499558.png", record_pos=(0.47, 0.975), resolution=(100, 100)),
# "验证登录成功了")
# 选择打开页面的路径,如: 大健康-课程包-课程包管理
def enter_channel_manage(self, driver, title, classfy, content):
driver.find_element_by_xpath("//a[@title='%s']" % title).click()
driver.find_element_by_id("east").find_element_by_xpath("//*[text()='%s']" % classfy).click()
driver.find_element_by_xpath("//a[@title='%s']" % content).click()
def enter_album_check(self, driver):
"""
进入专辑审核页
"""
driver.get(
self.get_url() + "/bighealth-service/outcourse/list.htm?source=1&linkId=big_health_audit_sync_btn1_link&tabId=big_health_audit_sync_btn1")
def remove_readonly(self, driver):
"""
去除input的只读属性
"""
inputs = driver.find_elements_by_tag_name("input")
for input in inputs:
driver.execute_script('arguments[0].removeAttribute(\"readonly\")', input)
driver.execute_script('arguments[0].removeAttribute(\"autocomplete\")', input)
def check_variable_exist(self, check_list, result):
"""
结果检查,检查字段值存在
"""
Flag = False
for variable in check_list:
if variable in result.keys():
Flag = True
assert_equal(Flag, True, '验证参数' + variable + "存在")
def check_text_exist(self, check_text, result):
"""
结果检查,检查文本内容是否存在于返回结果中
"""
Flag = False
check_text=str(check_text)
# print('bbbbbbbbbbbbbbbbbbbbbbbbb'+str(result.values()))
if check_text in str(result.values()):
Flag = True
# print('aaaaaaaaaaaaaaaaaaaaaaaaaaaaa',Flag)
assert_equal(Flag, True, '验证文本' + check_text + "存在")
def check_text_no_exist(self, check_text, result):
"""
结果检查,检查文本内容是否存在于返回结果中
"""
Flag = False
# print('bbbbbbbbbbbbbbbbbbbbbbbbb'+str(result.values()))
if check_text not in str(result.values()):
Flag = True
# print('aaaaaaaaaaaaaaaaaaaaaaaaaaaaa',Flag)
assert_equal(Flag, True, '验证文本' + check_text + "不存在")
def check_text_exist_result_text(self, check_text, result_text):
"""
结果检查,检查文本内容是否存在于返回结果中,返回结果也是文本
"""
Flag = False
check_text=str(check_text)
# print('bbbbbbbbbbbbbbbbbbbbbbbbb'+str(result.values()))
if check_text in result_text:
Flag = True
# print('aaaaaaaaaaaaaaaaaaaaaaaaaaaaa',Flag)
assert_equal(Flag, True, '验证文本' + check_text + "存在")
def enter_h5_page(self, driver, params):
"""
:param driver:
:param params: 请求的URL
:return:
"""
url = self.get_api_url()
driver.get(url + params)
def get_start(self, pageno, pagesize):
"""
获取limit的start数
:param pageno:
:param pagesize:
:return:
"""
return (pageno - 1) * pagesize
def click_iterm(self, driver, el_list, name):
"""
点击对应元素
"""
for el in el_list:
print(el.text)
if el.text == name:
driver.execute_script("arguments[0].scrollIntoView();", el)
el.click()
break
def get_mapi_babytree_url(self):
"""
接口mapi_babytree类型的请求
:return:
"""
env = os.environ['ENV']
print(env)
if env == 'on':
url = 'http://mapiweb.babytree.com'
elif env == 'pre':
url = 'http://pre-mapiweb.babytree.com'
else:
url = 'http://mapiweb.babytree-test.com'
return url
def get_localhome_babytree_url(self):
"""
接口mapi_babytree类型的请求
:return:
"""
env = os.environ['ENV']
print(env)
if env == 'on':
url = 'http://localhome.babytree.com'
elif env == 'pre':
url = 'http://pre-localhome.babytree.com'
else:
url = 'http://localhome.babytree-test.com'
return url
def get_go_babytree_url(self):
"""
接口go_babytree类型的请求
:return:
"""
env = os.environ['ENV']
print(env)
if env == 'on':
url = 'http://go.babytree.com'
elif env == 'pre':
url = 'http://go.babytree.com'
else:
url = 'http://go-1.babytree-test.com'
return url
def get_inno_babytree_url(self):
"""
接口mapi_babytree类型的请求
:return:
"""
env = os.environ['ENV']
print(env)
if env == 'on':
url = 'http://apilocal.babytree.com'
elif env == 'pre':
url = 'http://pre-apilocal.babytree.com'
else:
url = 'http://apilocal.babytree-test.com'
return url
def get_localfront_babytree_url(self):
"""
接口mapi_babytree类型的请求
:return:
"""
env = os.environ['ENV']
print(env)
if env == 'on':
url = 'http://localfront.babytree.com'
elif env == 'pre':
url = 'http://pre-localfront.babytree.com'
else:
url = 'http://localfront.babytree-test.com'
return url
def get_search_platform_hz_index_url(self):
"""
接口类型的请求
:return:
"""
env = os.environ['ENV']
if env == 'on':
url = 'http://search-index.babytree.com'
else:
url = 'http://sit-search-index.babytree.com'
return url
# 随机n位字符串,返回n位字符串
def randomString(self, n):
import string
import random
s = "".join(random.sample(string.ascii_letters + string.digits + "!@#$%^&*()", n))
return s
# 打开网页
def openBrowser(self, url, chrome):
chrome.implicitly_wait(10) # 隐式等待 10秒
chrome.get(url)
return chrome
def logIn(self, username, password, login_button,chrome):
chrome.find_element_by_xpath(username).clear()
chrome.find_element_by_xpath(username).send_keys('xhs')
chrome.find_element_by_xpath(password).clear()
chrome.find_element_by_xpath(password).send_keys('a123456!')
chrome.find_element_by_xpath(login_button).click()
def check_login_result(self,username,chrome):
for i in range(10000):
sleep(1)
try:
if (chrome.find_element_by_xpath('//div[text()=" %s "]' % username)).is_displayed():
print('看看这里显示了没有',username)
break
except:
continue
return chrome.find_element_by_xpath('//div[text()=" %s "]' % username).text
def quit_chrome(self,chrome):
chrome.quit()
# 获取当前项目的根目录的路径
def get_pro_path(self):
import os
curPath = os.path.abspath(os.path.dirname(__file__)) # 获取当前文件的所在目录的绝对路径
# print(os.path.split(curPath))
rootPath = os.path.split(curPath)[0]
return rootPath
\ No newline at end of file
class ModuleName(object):
SEARCH_MERGER = "search-merger"
SEARCH_SUGGEST = "search-suggest"
SEARCH_PLATFORM_INDEX = "search-platform-index"
SEARCH_PLATFORM_WEB = "search-platform-web"
import configparser
import os
import yaml
cf = configparser.ConfigParser()
curPath = os.path.abspath(os.path.dirname(__file__))
rootPath = rootPath = os.path.split(curPath)[0]
class confOP:
def getConfValue(self, filename, section, option, type='string'):
cf.read(filename)
if (type == 'string'):
value = cf.get(section, option)
else:
value = cf.getint(section, option)
return value
def getYamlValue(self, filename):
"""
获取公共sql中的sql
:param filename:
:return:
"""
file = os.path.join(rootPath, "data" + os.sep + filename)
f = open(file, 'r', encoding='utf-8')
cont = f.read()
value = yaml.load(cont, Loader=yaml.FullLoader)
f.close()
return value
def getBusiYamlValue(self, path, filename):
"""
获取业务中的yaml文件数据
:param path:
:param filename:
:return:
"""
file = os.path.join(path, filename)
f = open(file, 'r', encoding='utf-8')
cont = f.read()
value = yaml.load(cont, Loader=yaml.FullLoader)
f.close()
return value
loginusername1:
loginusername: 国药集团公司!n0d(
goodsname: 一次性注射器NbfHr
import datetime
import calendar
class dateUtils:
def get_current_start_end(self):
"""
获取当日的开始时间和结束时间
:return:
"""
start, end = datetime.datetime.now().replace(hour=0, minute=0, second=0).strftime(
"%Y-%m-%d %H:%M:%S"), datetime.datetime.now().replace(hour=23, minute=59, second=59).strftime(
"%Y-%m-%d %H:%M:%S")
return start, end
def get_week_start_end(self):
"""
获取本周的第一天和最后一天
:return:
"""
monday, sunday = datetime.date.today(), datetime.date.today()
one_day = datetime.timedelta(days=1)
while monday.weekday() != 0:
monday -= one_day
while sunday.weekday() != 6:
sunday += one_day
# 返回当前的星期一和星期天的日期
monday, sunday = monday.strftime("%Y-%m-%d %H:%M:%S"), sunday.strftime("%Y-%m-%d %H:%M:%S").replace("00:00:00", "23:59:59")
return monday, sunday
def get_current_time(self):
"""
获取当前时间,返回格式是yyyy-mm-dd hh:mm:ss
:return:
"""
curr_time = datetime.datetime.now()
time = datetime.datetime.strftime(curr_time, '%Y-%m-%d %H:%M:%S')
return time
def get_current_date(self):
"""
获取当前日期,返回格式是yyyy-mm-dd
:return:
"""
curr_time = datetime.datetime.now()
date = datetime.datetime.strftime(curr_time, '%Y-%m-%d')
return date
def get_week_of_date(self, date):
"""
获取传入的日期是星期几
:param date:
:return:
"""
week_day_dict = {
0: '星期一',
1: '星期二',
2: '星期三',
3: '星期四',
4: '星期五',
5: '星期六',
6: '星期日',
}
day = datetime.datetime.strptime(date, "%Y-%m-%d").weekday()
return week_day_dict[day]
def add_days(self, date, days):
"""
获取新的日期
:param date: 日期
:param days: 天数(可正数,可负数)
:return:
"""
dt = datetime.datetime.strptime(date, "%Y-%m-%d")
out_date = (dt + datetime.timedelta(days=days)).strftime("%Y-%m-%d")
return out_date
def add_hours(self, date, hours):
"""
获取新的时间
:param date:
:param hours:
:return:
"""
dt = datetime.datetime.strptime(date, "%Y-%m-%d %H:%M:%S")
out_date = (dt + datetime.timedelta(hours=hours)).strftime("%Y-%m-%d %H:%M:%S")
return out_date
def get_month_start_end(self, year, month):
"""
获取本月的第一天和最后一天
:return:
"""
weekDay, monthCountDay = calendar.monthrange(year, month)
firstDay = datetime.date(year, month, day=1)
lastDay = datetime.date(year, month, day=monthCountDay)
return firstDay, lastDay
def get_every_month_start_end(self, year):
"""
获取某年的每个月的第一天和最后一天
:param year:
:return:
"""
start = []
end = []
for x in range(1, 13):
dt_start = (datetime.datetime(int(year), x, 1)).strftime("%Y%m%d")
if 12 == x:
dt_end = (datetime.datetime(int(year), 12, 31)).strftime("%Y%m%d")
else:
dt_end = (datetime.datetime(int(year), x + 1, 1) - datetime.timedelta(days=1)).strftime("%Y%m%d")
start.append(dt_start)
end.append(dt_end)
return start, end
def get_year_month(self, time):
"""
获取某个时间的年、月份
:param time:
:return:
"""
dt = datetime.datetime.strptime(time, "%Y-%m-%d %H:%M:%S")
return dt.year, dt.month
if __name__ == '__main__':
print(dateUtils().get_current_date())
print(" \n ")
print(dateUtils().get_current_start_end())
print(" \n ")
print(dateUtils().add_days("2020-10-18", 10))
print(" \n ")
print(dateUtils().add_hours("2020-10-18 00:00:00", 3))
print(" \n ")
print(dateUtils().get_current_time())
print(" \n ")
print(dateUtils().get_every_month_start_end("2020"))
print(" \n ")
print(dateUtils().get_week_of_date("2020-10-18"))
[meitun_db]
host = rm-bp18as20m3solyi69.mysql.rds.aliyuncs.com
port = 3306
user = test_admin
password = ahdp0b.cr76_Pje1
[bj_db]
host = 172.25.1.6
port = 3320
user = local_rw
password = baidugoogleyahoo
[promotion]
livedb = 19
liveenv = 4
liveserver = 129
predb = 19
preenv = 2
preserver = 58
[mongo]
ip = 127.0.0.1
port = 27017
[redis]
redis_host=192.168.24.31
[cmdc_db]
host = 39.106.226.158
port = 3306
user = root
password = @^2DMfIYt1%%6OVT8
import pymysql
import os
import sys
from common.confop import confOP
from common.db.sql.sqlByIdb import sqlByIdb
import pymongo
curPath = os.path.abspath(os.path.dirname(__file__))
rootPath = os.path.split(curPath)[0]
sys.path.append(rootPath)
class dbOP():
def __init__(self, path=rootPath):
self.path = path
def selectSql(self, schema, param=[], db="bj_db"):
host = mySql().getConf(db)[0]
port = mySql().getConf(db)[1]
user = mySql().getConf(db)[2]
pwd = mySql().getConf(db)[3]
value = confOP().getYamlValue("sql")
if schema not in value:
value = confOP().getBusiYamlValue(self.path, "sql")
env = os.environ["ENV"]
if env == "on" or env == "pre":
database = value[schema][0]
sql = value[schema][1]
param = param
result = sqlByIdb().selectSql(database, sql, param, env)
else:
result = mySql().selectSql(host, port, user, pwd, value[schema][0], value[schema][1], param)
return result
def ddlSql(self, schema, param=[], db="bj_db"):
host = mySql().getConf(db)[0]
port = mySql().getConf(db)[1]
user = mySql().getConf(db)[2]
pwd = mySql().getConf(db)[3]
value = confOP().getYamlValue("sql")
if schema not in value:
value = confOP().getBusiYamlValue(self.path, "sql")
result = mySql().executeUpdate(host, port, user, pwd, value[schema][0], value[schema][1], param)
return result
class mySql:
def __init__(self):
pass
def getConf(self, db="bj_db"):
host = confOP().getConfValue(curPath + "/conf.ini", db, "host")
port = confOP().getConfValue(curPath + "/conf.ini", db, "port", "int")
user = confOP().getConfValue(curPath + "/conf.ini", db, "user")
pwd = confOP().getConfValue(curPath + "/conf.ini", db, "password")
return host,port,user,pwd
def connection(self, host, port, user, pwd, database):
return pymysql.connect(host=host, port=port,user=user,passwd=pwd,db=database)
def selectSql(self, host, port, user, pwd, database, sql, param=[]):
conn = self.connection(host, port, user, pwd, database)
cursor = conn.cursor()
try:
cursor.execute(sql, param)
data = cursor.fetchall()
cols = cursor.description
col = []
for i in cols:
col.append(i[0])
data = list(map(list, data))
return data
except Exception as e:
print(e)
conn.rollback()
cursor.close()
conn.close()
def executeUpdate(self, host, port, user, pwd, database, sql, param=[]):
conn = self.connection(host, port, user, pwd, database)
cursor = conn.cursor()
try:
ret = cursor.execute(sql, param)
conn.commit()
return ret
except Exception as e:
print(e)
conn.rollback()
cursor.close()
conn.close()
class mongodb:
def connection(self,db,collect):
ip = confOP().getConfValue(curPath + "/conf.ini", "mongo", "ip")
port = confOP().getConfValue(curPath + "/conf.ini", "mongo", "port")
path = "mongodb://" + ip + ":" + port + "/"
myclient = pymongo.MongoClient(path)
mydb = myclient[db]
mycol = mydb[collect]
return mycol
#查询所有的值
def findALl(self, db, collect):
result = []
mycol = self.connection(db, collect)
for x in mycol.find():
result.append(x)
return result
#按照条件查询:条件为{}类型
def findByCon(self, db, collect, condition):
result = []
mycol = self.connection(db, collect)
for x in mycol.find(condition):
result.append(x)
return result
if __name__ == '__main__':
env = 'sit'
os.environ['ENV'] = env.lower()
path = "D:\\myCode\\autotest-airtest-local\\data\\月嫂"
#mysql的例子
result = dbOP(path).selectSql("local_worker_info")
print(result[0])
#monggdb的例子
#mongodb().findByCon("yapi","user", {"role": "admin"})
#redis的例子
#redisClass().connect()
import redis
import os
curPath = os.path.abspath(os.path.dirname(__file__))
rootPath = os.path.split(curPath)[0]
import sys
sys.path.append(rootPath)
from common.confop import confOP
class redisClass:
def connect(self):
redis_host = confOP.getConfValue(curPath + "/conf.ini", "redis", "redis_host")
pool = redis.ConnectionPool(host=redis_host)
r = redis.Redis(connection_pool=pool)
return r
def randomkey(self):
"""
获取随机的一个键
:return: b'name'
"""
r = redisClass().connect()
return r.randomkey()
def exists(self, name):
"""
判断一个键是否存在
:param name:键名
:return: True
例子: .exists('name')
是否存在name这个键
"""
r = redisClass().connect()
return r.exists(name)
def delete(self, name):
"""
删除一个键
:param name:键名
:return: 1
例子: .delete('name')
删除name这个键
"""
r = redisClass().connect()
return r.delete(name)
def type(self, name):
"""
判断键类型
:param name:
:return:b'string'
例子:.type('name')
判断name这个键类型
"""
r = redisClass().connect()
return r.type(name)
def keys(self, pattern):
"""
获取所有符合规则的键
:param pattern:匹配规则
:return:[b'name']
例子:.keys('n*')
获取所有以n开头的键
"""
r = redisClass().connect()
return r.keys(pattern)
def rename(self, src, dst):
"""
重命名键
:param src: 原键名;
:param dst: 新键名
:return:True
例子:.rename('name', 'nickname')
将name重命名为nickname
"""
r = redisClass().connect()
return r.rename(src, dst)
def dbsize(self):
"""
获取当前数据库中键的数目
:return: 100
"""
r = redisClass().connect()
return r.dbsize()
def expire(self, name, time):
"""
设定键的过期时间,单位为秒
:param name:键名;
:param time:秒数
:return:True
例子:.expire('name', 2)
将name键的过期时间设置为2秒
"""
r = redisClass().connect()
return r.expire(name, time)
def ttl(self, name):
"""
获取键的过期时间,单位为秒,-1表示永久不过期
:param name: 键名
:return: -1
例子:.ttl('name')
获取name这个键的过期时间
"""
r = redisClass().connect()
return r.ttl(name)
def move(self, name, db):
"""
将键移动到其他数据库
:param name: 键名;
:param db: 数据库代号
:return: True
例子: .move('name', 2)
将name移动到2号数据库
"""
r = redisClass().connect()
return r.move(name, db)
def flushdb(self):
"""
删除当前选择数据库中的所有键
:return: True
"""
r = redisClass().connect()
return r.flushdb()
def flushall(self):
"""
删除所有数据库中的所有键
:return: True
"""
r = redisClass().connect()
return r.flushall()
def set(self, name, value):
"""
给数据库中键为name的string赋予值value
:param name: 键名;
:param value:值
:return: True
例子: .set('name', 'Bob')
给name这个键的value赋值为Bob
"""
r = redisClass().connect()
return r.set(name, value)
def get(self, name):
"""
返回数据库中键为name的string的value
:param name: 键名
:return: b'Bob'
例子: .get('name')
返回name这个键的value
"""
r = redisClass().connect()
return r.get(name)
def getset(self, name, value):
"""
给数据库中键为name的string赋予值value并返回上次的value
:param name: 键名;
:param value: 新值
:return: b'Bob'
例子: .getset('name', 'Mike')
赋值name为Mike并得到上次的value
"""
r = redisClass().connect()
return r.getset(name, value)
def mget(self, keys, *args):
"""
返回多个键对应的value
:param keys: 键的列表
:param args:
:return: [b'Mike', b'Miker']
例子: .mget(['name', 'nickname'])
返回name和nickname的value
"""
r = redisClass().connect()
return r.mget(keys, *args)
def setnx(self, name, value):
"""
如果不存在这个键值对,则更新value,否则不变
:param name: 键名
:param value:
:return: 第一次运行结果是True,第二次运行结果是False
例子: .setnx('newname', 'James')
如果newname这个键不存在,则设置值为James
"""
r = redisClass().connect()
return r.setnx(name, value)
def setex(self, name, time, value):
"""
设置可以对应的值为string类型的value,并指定此键值对应的有效期
:param name: 键名;
:param time: 有效期;
:param value: 值
:return: True
例子: .setex('name', 1, 'James')
将name这个键的值设为James,有效期为1秒
"""
r = redisClass().connect()
return r.setex(name, time, value)
def setrange(self, name, offset, value):
"""
设置指定键的value值的子字符串
:param name: 键名;
:param offset: 偏移量;
:param value: 值
:return:xx,修改后的字符串长度
例子:
.set('name', 'Hello')
.setrange('name', 6, 'World')
设置name为Hello字符串,并在index为6的位置补World
"""
r = redisClass().connect()
return r.setrange(name, offset, value)
def mset(self, mapping):
"""
批量赋值
:param mapping: 字典
:return: True
例子: .mset({'name1': 'Durant', 'name2': 'James'})
将name1设为Durant,name2设为James
"""
r = redisClass().connect()
return r.mset(mapping)
def msetnx(self, mapping):
"""
键均不存在时才批量赋值
:param mapping: 字典
:return: True
例子: .msetnx({'name3': 'Smith', 'name4': 'Curry'})
在name3和name4均不存在的情况下才设置二者值
"""
r = redisClass().connect()
return r.msetnx(mapping)
def incr(self, name, amount=1):
"""
键为name的value增值操作,默认为1,键不存在则被创建并设为amount
:param name: 键名;
:param amount: 增长的值
:return: 1,即修改后的值
例子: .incr('age', 1)
age对应的值增1,若不存在,则会创建并设置为1
"""
r = redisClass().connect()
return r.incr(name, amount)
def decr(self, name, amount=1):
"""
键为name的value减值操作,默认为1,键不存在则被创建并将value设置为-amount
:param name: 键名;
:param amount: 减少的值
:return: -1,即修改后的值
例子: .decr('age', 1)
age对应的值减1,若不存在,则会创建并设置为-1
"""
r = redisClass().connect()
return r.decr(name, amount)
def append(self, key, value):
"""
键为name的string的值附加value
:param key: 键名
:param value:
:return: 13,即修改后的字符串长度
例子: .append('nickname', 'OK')
向键为nickname的值后追加OK
"""
r = redisClass().connect()
return r.append(key, value)
def substr(self, name, start, end=-1):
"""
返回键为name的string的子串
:param name: 键名;
:param start: 起始索引;
:param end: 终止索引,默认为-1,表示截取到末尾
:return: b'ello'
例子:.substr('name', 1, 4)
返回键为name的值的字符串,截取索引为1~4的字符
"""
r = redisClass().connect()
return r.substr(name, start, end)
def getrange(self, key, start, end):
"""
获取键的value值从start到end的子字符串
:param key: 键名;
:param start: 起始索引;
:param end: 终止索引
:return: b'ello'
例子: .getrange('name', 1, 4)
返回键为name的值的字符串,截取索引为1~4的字符
"""
r = redisClass().connect()
return r.getrange(key, start, end)
def rpush(self, name, *values):
"""
在键为name的列表末尾添加值为value的元素,可以传多个
:param name: 键名;
:param values: 值
:return: 3,列表大小
例子: .rpush('list', 1, 2, 3)
向键为list的列表尾添加1、2、3
"""
r = redisClass().connect()
return r.rpush(name, *values)
def lpush(self, name, *values):
"""
在键为name的列表头添加值为value的元素,可以传多个
:param name: 键名;
:param values: 值
:return: 4,列表大小
例子:.lpush('list', 0)
向键为list的列表头部添加0
"""
r = redisClass().connect()
return r.lpush(name, *values)
def llen(self, name):
"""
返回键为name的列表的长度
:param name: 键名
:return: 4
例子:.llen('list')
返回键为list的列表的长度
"""
r = redisClass().connect()
return r.llen(name)
def lrange(self, name, start, end):
"""
返回键为name的列表中start至end之间的元素
:param name: 键名;
:param start: 起始索引;
:param end: 终止索引
:return: [b'3', b'2', b'1']
例子:.lrange('list', 1, 3)
返回起始索引为1终止索引为3的索引范围对应的列表
"""
r = redisClass().connect()
return r.lrange(name, start, end)
def ltrim(self, name, start, end):
"""
截取键为name的列表,保留索引为start到end的内容
:param name: 键名;
:param start: 起始索引;
:param end: 终止索引
:return: True
例子:ltrim('list', 1, 3)
保留键为list的索引为1到3的元素
"""
r = redisClass().connect()
return r.ltrim(name, start, end)
def lindex(self, name, index):
"""
返回键为name的列表中index位置的元素
:param name: 键名;
:param index: 索引
:return: b’2’
例子:.lindex('list', 1)
返回键为list的列表索引为1的元素
"""
r = redisClass().connect()
return r.lindex(name, index)
def lset(self, name, index, value):
"""
给键为name的列表中index位置的元素赋值,越界则报错
:param name: 键名;
:param index: 索引位置;
:param value: 值
:return: True
例子:.lset('list', 1, 5)
将键为list的列表中索引为1的位置赋值为5
"""
r = redisClass().connect()
return r.lset(name, index, value)
def lrem(self, name, count, value):
"""
删除count个键的列表中值为value的元素
:param name: 键名;
:param count: 删除个数;
:param value: 值
:return: 1,即删除的个数
例子:.lrem('list', 2, 3)
将键为list的列表删除两个3
"""
r = redisClass().connect()
return r.lrem(name, count, value)
def lpop(self, name):
"""
返回并删除键为name的列表中的首元素
:param name: 键名
:return: b'5'
例子:.lpop('list')
返回并删除名为list的列表中的第一个元素
"""
r = redisClass().connect()
return r.lpop(name)
def rpop(self, name):
"""
返回并删除键为name的列表中的尾元素
:param name: 键名
:return: b'2'
例子:.rpop('list')
返回并删除名为list的列表中的最后一个元素
"""
r = redisClass().connect()
return r.rpop(name)
def blpop(self, keys, timeout=0):
"""
返回并删除名称在keys中的list中的首个元素,如果列表为空,则会一直阻塞等待
:param keys: 键列表;
:param timeout: 超时等待时间,0为一直等待
:return: [b'5']
例子:.blpop('list')
返回并删除键为list的列表中的第一个元素
"""
r = redisClass().connect()
return r.blpop(keys, timeout)
def brpop(self, keys, timeout=0):
"""
返回并删除键为name的列表中的尾元素,如果list为空,则会一直阻塞等待
:param keys: 键列表;
:param timeout: 超时等待时间,0为一直等待
:return: [b'2']
例子:.brpop('list')
返回并删除名为list的列表中的最后一个元素
"""
r = redisClass().connect()
return r.brpop(keys, timeout)
def rpoplpush(self, src, dst):
"""
返回并删除名称为src的列表的尾元素,并将该元素添加到名称为dst的列表头部
:param src: 源列表的键;
:param dst: 目标列表的key
:return: b'2'
例子:.rpoplpush('list', 'list2')
将键为list的列表尾元素删除并将其添加到键为list2的列表头部,然后返回
"""
r = redisClass().connect()
return r.rpoplpush(src, dst)
def sadd(self, name, *values):
"""
向键为name的集合中添加元素
:param name: 键名;
:param values: 值,可为多个
:return: 3,即插入的数据个数
例子: .sadd('tags', 'Book', 'Tea', 'Coffee')
向键为tags的集合中添加Book、Tea和Coffee这3个内容
"""
r = redisClass().connect()
return r.sadd(name, *values)
def srem(self, name, *values):
"""
从键为name的集合中删除元素
:param name: 键名;
:param values: 值,可为多个
:return: 1,即删除的数据个数
例子: .srem('tags', 'Book')
从键为tags的集合中删除Book
"""
r = redisClass().connect()
return r.srem(name, *values)
def spop(self, name):
"""
随机返回并删除键为name的集合中的一个元素
:param name: 键名
:return:b'Tea'
例子:.spop('tags')
从键为tags的集合中随机删除并返回该元素
"""
r = redisClass().connect()
return r.spop(name)
def smove(self, src, dst, value):
"""
从src对应的集合中移除元素并将其添加到dst对应的集合中
:param src: 源集合;
:param dst: 目标集合;
:param value: 元素值
:return: True
例子:.smove('tags', 'tags2', 'Coffee')
从键为tags的集合中删除元素Coffee并将其添加到键为tags2的集合
"""
r = redisClass().connect()
return r.smove(src, dst, value)
def scard(self, name):
"""
返回键为name的集合的元素个数
:param name:
:return: 3
例子:.scard('tags')
获取键为tags的集合中的元素个数
"""
r = redisClass().connect()
return r.scard(name)
def sismember(self, name, value):
"""
测试member是否是键为name的集合的元素
:param name: 键值
:param value:
:return: True
例子:.sismember('tags', 'Book')
判断Book是否是键为tags的集合元素
"""
r = redisClass().connect()
return r.sismember(name, value)
def sinter(self, keys, *args):
"""
返回所有给定键的集合的交集
:param keys: 键列表
:param args:
:return: {b'Coffee'}
例子:.sinter(['tags', 'tags2'])
返回键为tags的集合和键为tags2的集合的交集
"""
r = redisClass().connect()
return r.sinter(keys, *args)
def sinterstore(self, dest, keys, *args):
"""
求交集并将交集保存到dest的集合
:param dest: 结果集合;
:param keys: 键列表
:param args:
:return: 1
例子:.sinterstore('inttag', ['tags', 'tags2'])
求键为tags的集合和键为tags2的集合的交集并将其保存为inttag
"""
r = redisClass().connect()
return r.sinterstore(dest, keys, *args)
def sunion(self, keys, *args):
"""
返回所有给定键的集合的并集
:param keys: 键列表
:param args:
:return: {b'Coffee', b'Book', b'Pen'}
例子:.sunion(['tags', 'tags2'])
返回键为tags的集合和键为tags2的集合的并集
"""
r = redisClass().connect()
return r.sunion(keys, *args)
def sunionstore(self, dest, keys, *args):
"""
求并集并将并集保存到dest的集合
:param dest: 结果集合;
:param keys: 键列表
:param args:
:return: 3
例子:.sunionstore('inttag', ['tags', 'tags2'])
求键为tags的集合和键为tags2的集合的并集并将其保存为inttag
"""
r = redisClass().connect()
return r.sunionstore(dest, keys, *args)
def sdiff(self, keys, *args):
"""
返回所有给定键的集合的差集
:param keys: 键列表
:param args:
:return: {b'Book', b'Pen'}
例子:.sdiff(['tags', 'tags2'])
返回键为tags的集合和键为tags2的集合的差集
"""
r = redisClass().connect()
return r.sdiff(keys, *args)
def sdiffstore(self, dest, keys, *args):
"""
求差集并将差集保存到dest集合
:param dest: 结果集合;
:param keys: 键列表
:param args:
:return: 3
例子:.sdiffstore('inttag', ['tags', 'tags2'])
求键为tags的集合和键为tags2的集合的差集并将其保存为inttag
"""
r = redisClass().connect()
return r.sdiffstore(dest, keys, *args)
def smembers(self, name):
"""
返回键为name的集合的所有元素
:param name: 键名
:return: {b'Pen', b'Book', b'Coffee'}
例子:.smembers('tags')
返回键为tags的集合的所有元素
"""
r = redisClass().connect()
return r.smembers(name)
def srandmember(self, name):
"""
随机返回键为name的集合中的一个元素,但不删除元素
:param name: 键值
:return:
例子:.srandmember('tags')
随机返回键为tags的集合中的一个元素
"""
r = redisClass().connect()
return r.srandmember(name)
def zadd(self, name, *args, **kwargs):
"""
向键为name的zset中添加元素member,score用于排序。如果该元素存在,则更新其顺序
:param name: 键名;
:param args: 可变参数
:param kwargs:
:return: 2,即添加的元素个数
例子:.zadd('grade', 100, 'Bob', 98, 'Mike')
向键为grade的zset中添加Bob(其score为100),并添加Mike(其score为98)
"""
r = redisClass().connect()
return r.zadd(name, *args, **kwargs)
def zrem(self, name, *values):
"""
删除键为name的zset中的元素
:param name: 键名;
:param values: 元素
:return: 1,即删除的元素个数
例子:.zrem('grade', 'Mike')
从键为grade的zset中删除Mike
"""
r = redisClass().connect()
return r.zrem(name, *values)
def zincrby(self, name, value, amount=1):
"""
如果在键为name的zset中已经存在元素value,则将该元素的score增加amount;否则向该集合中添加该元素,其score的值为amount
:param name: key名;
:param value: 元素;
:param amount: 增长的score值
:return: 98.0,即修改后的值
例子:.zincrby('grade', 'Bob', -2)
键为grade的zset中Bob的score减2
"""
r = redisClass().connect()
return r.zincrby(name, value, amount)
def zrank(self, name, value):
"""
返回键为name的zset中元素的排名,按score从小到大排序,即名次
:param name: 键名;
:param value: 元素值
:return: 1
例子:.zrank('grade', 'Amy')
得到键为grade的zset中Amy的排名
"""
r = redisClass().connect()
return r.zrank(name, value)
def zrevrank(self, name, value):
"""
返回键为name的zset中元素的倒数排名(按score从大到小排序),即名次
:param name: 键名;
:param value: 元素值
:return: 2
例子:.zrevrank('grade', 'Amy')
得到键为grade的zset中Amy的倒数排名
"""
r = redisClass().connect()
return r.zrevrank(name, value)
def zrevrange(self, name, start, end, withscores=False):
"""
返回键为name的zset(按score从大到小排序)中index从start到end的所有元素
:param name: 键值;
:param start: 开始索引;
:param end: 结束索引;
:param withscores: 是否带score
:return: [b'Bob', b'Mike', b'Amy', b'James']
例子:.zrevrange('grade', 0, 3)
返回键为grade的zset中前四名元素
"""
r = redisClass().connect()
return r.zrevrange(name, start, end, withscores)
def zrangebyscore(self, name, min, max, start=None, num=None, withscores=False):
"""
返回键为name的zset中score在给定区间的元素
:param name: 键名;
:param min: 最低score;
:param max: 最高score;
:param start: 起始索引;
:param num: 个数;
:param withscores: 是否带score
:return: [b'Bob', b'Mike', b'Amy', b'James']
例子:.zrangebyscore('grade', 80, 95)
返回键为grade的zset中score在80和95之间的元素
"""
r = redisClass().connect()
return r.zrangebyscore(name, min, max, start, num, withscores)
def zcount(self, name, min, max):
"""
返回键为name的zset中score在给定区间的数量
:param name: 键名;
:param min: 最低score;
:param max: 最高score
:return: 2
例子:.zcount('grade', 80, 95)
返回键为grade的zset中score在80到95的元素个数
"""
r = redisClass().connect()
return r.zcount(name, min, max)
def zcard(self, name):
"""
返回键为name的zset的元素个数
:param name: 键名
:return: 3
例子:.zcard('grade')
获取键为grade的zset中元素的个数
"""
r = redisClass().connect()
return r.zcard(name)
def zremrangebyrank(self, name, min, max):
"""
删除键为name的zset中排名在给定区间的元素
:param name: 键名;
:param min: 最低位次;
:param max: 最高位次
:return: 1,即删除的元素个数
例子:.zremrangebyrank('grade', 0, 0)
删除键为grade的zset中排名第一的元素
"""
r = redisClass().connect()
return r.zremrangebyrank(name, min, max)
def zremrangebyscore(self, name, min, max):
"""
删除键为name的zset中score在给定区间的元素
:param name: 键名;
:param min: 最低score;
:param max: 最高score
:return: 1,即删除的元素个数
例子:.zremrangebyscore('grade', 80, 90)
删除score在80到90之间的元素
"""
r = redisClass().connect()
return r.zremrangebyscore(name, min, max)
def hset(self, name, key, value):
"""
向键为name的散列表中添加映射
:param name: 键名;
:param key: 映射键名;
:param value: 映射键值
:return: 1,即添加的映射个数
例子:.hset('price', 'cake', 5)
向键为price的散列表中添加映射关系,cake的值为5
"""
r = redisClass().connect()
return r.hset(name, key, value)
def hsetnx(self, name, key, value):
"""
如果映射键名不存在,则向键为name的散列表中添加映射
:param name: 键名;
:param key: 映射键名;
:param value: 映射键值
:return: 1,即添加的映射个数
例子:.hsetnx('price', 'book', 6)
向键为price的散列表中添加映射关系,book的值为6
"""
r = redisClass().connect()
return r.hsetnx(name, key, value)
def hget(self, name, key):
"""
返回键为name的散列表中key对应的值
:param name: 键名;
:param key: 映射键名;
:return: 5
例子:.hget('price', 'cake')
获取键为price的散列表中键名为cake的值
"""
r = redisClass().connect()
return r.hget(name, key)
def hmget(self, name, keys, *args):
"""
返回键为name的散列表中各个键对应的值
:param name: 键名;
:param keys: 映射键名列表
:param args:
:return: [b'3', b'7']
例子:.hmget('price', ['apple', 'orange'])
获取键为price的散列表中apple和orange的值
"""
r = redisClass().connect()
return r.hmget(name, keys, *args)
def hmset(self, name, mapping):
"""
向键为name的散列表中批量添加映射
:param name: 键名;
:param mapping: 映射字典
:return: True
例子:.hmset('price', {'banana': 2, 'pear': 6})
向键为price的散列表中批量添加映射
"""
r = redisClass().connect()
return r.hmset(name, mapping)
def hincrby(self, name, key, amount=1):
"""
将键为name的散列表中映射的值增加amount
:param name: 键名;
:param key: 映射键名;
:param amount: 增长量
:return: 6,修改后的值
例子:.hincrby('price', 'apple', 3)
key为price的散列表中apple的值增加3
"""
r = redisClass().connect()
return r.hincrby(name, key, amount)
def hexists(self, name, key):
"""
键为name的散列表中是否存在键名为键的映射
:param name: 键名;
:param key: 映射键名;
:return: True
例子:.hexists('price', 'banana')
键为price的散列表中banana的值是否存在
"""
r = redisClass().connect()
return r.hexists(name, key)
def hdel(self, name, *keys):
"""
在键为name的散列表中,删除键名为键的映射
:param name: 键名;
:param key: 映射键名;
:return: True
例子:.hdel('price', 'banana')
从键为price的散列表中删除键名为banana的映射
"""
r = redisClass().connect()
return r.hdel(name, *keys)
def hlen(self, name):
"""
从键为name的散列表中获取映射个数
:param name: 键名
:return: 6
例子:.hlen('price')
从键为price的散列表中获取映射个数
"""
r = redisClass().connect()
return r.hlen(name)
def hkeys(self, name):
"""
从键为name的散列表中获取所有映射键名
:param name: 键名
:return: [b'cake', b'book', b'banana', b'pear']
例子:.hkeys('price')
从键为price的散列表中获取所有映射键名
"""
r = redisClass().connect()
return r.hkeys(name)
def hvals(self, name):
"""
从键为name的散列表中获取所有映射键值
:param name: 键名
:return:[b'5', b'6', b'2', b'6']
例子:.hvals('price')
从键为price的散列表中获取所有映射键值
"""
r = redisClass().connect()
return r.hvals(name)
def hgetall(self, name):
"""
从键为name的散列表中获取所有映射键值对
:param name: 键名
:return:{b'cake': b'5', b'book': b'6', b'orange': b'7', b'pear': b'6'}
例子:.hgetall('price')
从键为price的散列表中获取所有映射键值对
"""
r = redisClass().connect()
return r.hgetall(name)
# -*- encoding=utf8 -*-
import base64
import requests
import json
from common.confop import confOP
class sqlByIdb:
# 登录
def login(self):
url = "http://docp.plt.babytree-inc.com/api/v1/passport/login/"
header = {
'Content-Type': 'application/json'
}
username = "dGVzdF9jbG91ZA=="
password = "eDZpbkRITnJVRWRl"
postdata = json.dumps({
"username": base64.b64decode(username.encode('utf-8')).decode("utf-8"),
"password": base64.b64decode(password.encode('utf-8')).decode("utf-8")
})
response = requests.request("POST", url, headers=header, data=postdata)
result = json.loads(response.text.encode('utf8'))
return result
def selectSql(self, database, sql, param, env):
login_result = sqlByIdb().login()
sql = sql.replace('%d', '%s')
for pa in param:
pa = "'" + str(pa) + "'"
sql = sql.replace('%s', pa, 1)
print(sql)
value = confOP().getYamlValue("idbSet")
if env == "on":
env = "live"
instance_id = value[database][env]
url = "http://docp.plt.babytree-inc.com/api/v1/sql/mysql/query/query/"
payload = json.dumps({
"instance_id": instance_id,
"db_name": database,
"tb_name": database,
"limit_num": 3000,
"sqlContent": sql
})
headers = {
'Authorization': login_result["sid"],
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
result = json.loads(response.text.encode('utf8'))
return result["results"][0]["rows"]
import time
import telnetlib
import re
class TelnetClient(object):
"""通过telnet连接dubbo服务, 执行shell命令, 可用来调用dubbo接口
"""
def __init__(self, server_host, server_post):
self.tn = telnetlib.Telnet()
self.server_host = server_host
self.server_port = server_post
# 此函数实现telnet登录主机
def connect_dubbo(self):
try:
print("telent连接dubbo服务端: telnet {} {} ……".format(self.server_host, self.server_port))
self.tn.open(self.server_host, port=self.server_port)
return True
except Exception as e:
print('连接失败, 原因是: {}'.format(str(e)))
return False
# 此函数实现执行传过来的命令,并输出其执行结果
def execute_some_command(self, command):
# 执行命令
cmd = (command + '\n').encode("gbk")
self.tn.write(cmd)
# 获取命令结果,字符串类型
retry_count = 0
# 如果响应未及时返回,则等待后重新读取,并记录重试次数
result = self.tn.read_very_eager().decode(encoding='gbk')
while result == '':
time.sleep(1)
result = self.tn.read_very_eager().decode(encoding='gbk')
retry_count += 1
return result
# 退出telnet
def logout_host(self):
self.tn.write(b"exit\n")
print("登出成功")
class InvokeDubboApi(object):
def __init__(self, server_host, server_post):
try:
self.telnet_client = TelnetClient(server_host, server_post)
self.login_flag = self.telnet_client.connect_dubbo()
except Exception as e:
print("invokedubboapi init error" + str(e))
def invoke_dubbo_api(self, dubbo_service, dubbor_method, args):
api_name = dubbo_service + "." + dubbor_method + "({})"
cmd = "invoke " + api_name.format(args)
# print("调用命令是:{}".format(cmd))
resp0 = None
try:
if self.login_flag:
resp0 = self.telnet_client.execute_some_command(cmd)
# print("接口响应是,resp={}".format(resp0))
# dubbo接口返回的数据中有 elapsed: 4 ms. 耗时,需要使用elapsed 进行切割
return str(re.compile(".+").findall(resp0).pop(0)).split("elapsed").pop(0).strip()
else:
print("登陆失败!")
except Exception as e:
raise Exception("调用接口异常, 接口响应是resp={}, 异常信息为:{}".format(resp0, str(e)))
self.logout()
def logout(self):
self.telnet_client.logout_host()
class GetDubboService2(object):
def __init__(self):
pass
def get_dubbo_info2(self,content):
try:
dubbore = re.compile(r"([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+:[0-9]+)", re.I)
result = dubbore.search(str(content)).group()
print("获取到dubbo部署信息" + result)
return {"server_host": result.split(":")[0], "server_post": result.split(":")[1]}
except Exception as e:
raise Exception("获取dubbo部署信息失败:{}".format(str(e)))
import os
from ruamel import yaml
from common.db.db import dbOP
import datetime
curpath = os.path.dirname(os.path.realpath(__file__))
rootPath = os.path.split(curpath)[0]
# 数据读入和写入文件
class FileUtils(object):
def w_info(self, info,keyname):
module=info[2]
dict = {}
value = {}
value['username'] = info[0]
value['goodsname'] = info[1]
key=keyname
dict[key] = value
w_path=rootPath+os.sep+'data'+os.sep+module
# print(w_path)
yamlpath = os.path.join(w_path, "message")
# 写入到yaml文件
with open(yamlpath, "w", encoding="utf-8") as f:
yaml.dump(dict, f, Dumper=yaml.RoundTripDumper,allow_unicode=True)
def r_info(self, module,keyname):
w_path = rootPath + os.sep + 'data' + os.sep + module
yamlpath = os.path.join(w_path, "message")
file_value = open(yamlpath, 'r',encoding='utf-8')
result = yaml.load(file_value.read(), Loader=yaml.Loader)
if result is not None:
key = keyname
if key in result:
return result[key]
else:
return None
else:
return None
if __name__ == '__main__':
# info=("aaaa","bbbbbb","mdm3-pim")
# FileUtils().w_info(info,"产品新增")
aa=FileUtils().r_info("mdm3-pim","产品新增")
print(aa['username'])
import openpyxl
class HandleExcel:
"""用来操作excel文件的类="""
def __init__(self, filename, sheetname):
"""
初始化对象属性
:param filename: excel文件路径
:param sheetname: 表单名
"""
self.filename = filename
self.sheetname = sheetname
def read_data(self):
"""读取excel中的数据"""
# 获取工作簿对象
wb = openpyxl.load_workbook(self.filename)
# 选择表单
sh = wb[self.sheetname]
# 按行获取所有的数据,转换为列表
rows_data = list(sh.rows)
# 创建一个空列表用来保存所有的用例数据
cases_data = []
# 获取表单中的表头数据,放入title这个列表中
title = []
for i in rows_data[0]:
title.append(i.value)
# 获取除表头之外的其他行数据
for item in rows_data[1:]:
# 每遍历出来一行数据,就创建一个空列表,来存放该行数据
values = []
for i in item:
values.append(i.value)
# 将该行的数据和表头进行打包,转换为字典
case = dict(zip(title, values))
# 将该行数据打包的字典,放入cases_data中
cases_data.append(case)
# 返回读取出来的所有数据
return cases_data
def write_data(self, row, column, value):
"""
写入数据
:param row: 行
:param column: 列表
:param value: 写入的值
:return:
"""
# 获取工作簿对象
wb = openpyxl.load_workbook(self.filename)
# 选择表单
sh = wb[self.sheetname]
# 根据行、列去写入内容
sh.cell(row=row, column=column, value=value)
# 把工作簿保存为文件
wb.save(self.filename)
loginusername1:
aaaa: aaaa
bbbbbb: bbbbbb
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment