Commit 6c225100 authored by 17322369953's avatar 17322369953
Browse files

air报告推送

parent 16c3516a
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Encoding">
<file url="file://$PROJECT_DIR$/common/data" charset="GBK" />
</component>
</project>
\ No newline at end of file
...@@ -2,5 +2,6 @@ ...@@ -2,5 +2,6 @@
<project version="4"> <project version="4">
<component name="VcsDirectoryMappings"> <component name="VcsDirectoryMappings">
<mapping directory="" vcs="Git" /> <mapping directory="" vcs="Git" />
<mapping directory="$PROJECT_DIR$/autotest-airtest-web" vcs="Git" />
</component> </component>
</project> </project>
\ No newline at end of file
# autotest-airtest-web-sc # autotest-airtest-web
...@@ -15,14 +15,14 @@ Already a pro? Just edit this README.md and make it your own. Want to make it ea ...@@ -15,14 +15,14 @@ Already a pro? Just edit this README.md and make it your own. Want to make it ea
``` ```
cd existing_repo cd existing_repo
git remote add origin http://10.17.65.20/gitlab/test/autotest-airtest-web-sc.git git remote add origin https://spduat.cmic.com.cn/gitlab/test/autotest-airtest-web.git
git branch -M main git branch -M main
git push -uf origin main git push -uf origin main
``` ```
## Integrate with your tools ## Integrate with your tools
- [ ] [Set up project integrations](http://10.17.65.20/gitlab/test/autotest-airtest-web-sc/-/settings/integrations) - [ ] [Set up project integrations](https://spduat.cmic.com.cn/gitlab/test/autotest-airtest-web/-/settings/integrations)
## Collaborate with your team ## Collaborate with your team
......
# -*- encoding=utf8 -*-
__author__ = "Administrator"
"""
case_tag:api,baidu,sit,on
"""
from airtest.core.api import *
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from airtest_selenium.proxy import *
driver = WebChrome()
driver.implicitly_wait(20)
driver.get("https://www.baidu.com")
sleep(4)
driver.find_element_by_id("kw").send_keys("airtest")
sleep(4)
driver.airtest_touch(Template(r"tpl1691374901526.png", record_pos=(12.3, 2.8), resolution=(100, 100)))
# -*- encoding=utf8 -*-
__author__ = "Administrator"
"""
case_tag:api,baidu,sit,on
"""
from airtest.core.api import *
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from airtest_selenium.proxy import *
driver = WebChrome()
driver.implicitly_wait(20)
driver.get("https://www.baidu.com")
sleep(4)
driver.find_element_by_id("kw").send_keys("airtest")
sleep(4)
driver.airtest_touch(Template(r"tpl1691374901526.png", record_pos=(12.3, 2.8), resolution=(100, 100)))
# -*- encoding=utf8 -*-
__author__ = "Administrator"
"""
case_tag:api,baidu,sit,on
"""
from airtest.core.api import *
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from airtest_selenium.proxy import *
driver = WebChrome()
driver.implicitly_wait(20)
driver.get("https://www.baidu.com")
sleep(4)
driver.find_element_by_id("kw").send_keys("airtest")
sleep(4)
driver.airtest_touch(Template(r"tpl1691374901526.png", record_pos=(12.3, 2.8), resolution=(100, 100)))
# -*- encoding=utf8 -*-
from common.common_func import commonFuc
__author__ = "meimengting"
"""
case_tag:api,medical-web11111,sit,on
"""
from airtest.core.api import *
# import sys
# sys.path.append('..')
from common.db.db import dbOP
# import time
# using("common_steps.air")
# import common_steps
# result_db = dbOP().selectSql('exp_expert', [0, 20])
"""
enc_user_id可替换成接口实际所需要id,默认是u779700044448
获取token和时间戳
"""
# enc_user_id = 'u97969333801'
# token = common_steps.check_token(enc_user_id)
# timestamp = str(int(time.time()))
# "按照环境将url管理起来,更改api url后的接口名即可"
# url = common_steps.get_api_url()
# url = url + "/newapi/router/medical/expert/info"
"""
querystring: 业务参数
headers: header参数
"""
# exId = result_db[0][0]
# exName = result_db[0][1]
# querystring = {"expertIds": exId}
# headers = {
# 'clientInfo': '{"birthday":"2018-11-18","screenwidth":"375","clientVersion":"2.4.2","screenheight":"667","partner":"meitunmama","clientip":"10.180.81.127","traderName":"iPhone 6S","clientAppVersion":"2.4.2","clientYunyuVersion":"7.9.6","clientSystem":"ios","nettype":"wifi","deviceCode":"1f4b3860acfa303bca0407f5128bc5ea0f529fec"}',
# 'platform': "1",
# 'signature': "144c6b3c78fc20ad57da1ebdb879615b",
# 'token': token,
# 'timestamp': timestamp,
# }
"""
get方法
"""
# result = common_steps.get(url, headers, querystring)
"""
出参校验
"""
# check_dict = {"rtn_code": 0, "rtn_msg": "请求成功", "expertName": str(exName)}
# commonFuc().check_result(check_dict, result)
\ No newline at end of file
import os
from jpype import *
import jpype
curPath = os.path.abspath(os.path.dirname(__file__))
rootPath = rootPath = os.path.split(curPath)[0]
startJVM(jpype.getDefaultJVMPath(), "-ea", "-Djava.class.path=%s" % rootPath + os.sep + "jar" + os.sep + "utils.jar")
jclass = JClass("com.babytree.AesUtils")
class aesUtils:
def encrypt(self, content):
"""
aes_ecb模式加密
:param content:
:return:
"""
result = jclass.encrypt(content)
return result
def decrypt(self, content):
"""
aes_ecb模式解密
:param content:
:return:
"""
result = jclass.decrypt(content)
return result
def shutdown(self):
"""
关闭虚拟机
:return:
"""
shutdownJVM()
if __name__ == '__main__':
sign = aesUtils().encrypt("helloword")
print("加密:", sign)
sign2 = aesUtils().decrypt(sign)
print("解密: ", sign2)
aesUtils().shutdown()
import base64
import json
import requests
from airtest.core.api import *
from common.confop import confOP
from common.dubboUtils import GetDubboService2, InvokeDubboApi
from common.rw import Rw
workspace = os.path.abspath(".")
business_path = workspace + os.sep + "data" + os.sep
class commonFuc(object):
def find_path(self, module=""):
if module == "":
return business_path
else:
return business_path + module + os.sep
def get_business_data(self, module, key, *args):
env = os.environ['ENV']
data_list = confOP().getBusiYamlValue(self.find_path(module), "data")
if args is not None and len(args) > 0:
if isinstance(data_list[key], dict):
result = json.loads(json.dumps(data_list[key]) % args)
else:
if str(data_list[key]) == "":
result = data_list[key]
else:
result = data_list[key] % args
else:
result = data_list[key]
if "_sit_" in str(result) and "_on_" in str(result):
if env == "on":
return result["_on_"]
else:
return result["_sit_"]
else:
return result
def get_message(self, module, key):
message_list = confOP().getBusiYamlValue(self.find_path(module), "message")
return message_list[key]
def get_openauth_url(self):
return "http://openauth.meitun.com/tools/sign/create"
def get_api_url(self):
"""
接口类型的请求
:return:
"""
env = os.environ['ENV']
print(env)
if env == 'on':
url = 'https://m.meitun.com'
elif env == 'pre':
url = 'http://pre-m.meitun.com'
elif env == 'sita':
url = 'http://sita-m.meitun.com'
elif env == "spd3": # spd3.0
# url = 'http://sit-m.meitun.com'
url = 'http://spddev.cmic.com.cn'
elif env == "spd2": # spd2.0
url = ""
elif env == "od": # 骨科
url = ""
elif env == "bs": # 电商
url = "https://service-slb.cmic.com.cn"
else:
url = 'http://spddev.cmic.com.cn'
return url
def get_token(self, module, enc_user_id="enc_user_id"):
enc_user_id = self.get_business_data(module, enc_user_id)
token = self.check_token(enc_user_id)
return token
def check_token(self, enc_user_id='u779700044448'):
"""
多个case同一个token不用一直查数据库
:param enc_user_id:
:return:
"""
env = os.environ['ENV']
result = Rw().r_token(enc_user_id, env)
if result is None:
return Rw().w_token(enc_user_id, env)
else:
return result["token"]
def http_get(self, url, headers="", params=""):
"""
一个get请求,返回json
"""
result = requests.request("GET", url, headers=headers, params=params)
result = json.loads(result.text)
return result
def http_post(self, url, postdata=None, header=None):
"""
一个post请求,返回json
"""
# result = requests.post(url, data=postdata, headers=header)
result = requests.post(url, data=json.dumps(postdata), headers=header)
result = json.loads(result.content)
return result
#删除请求
def http_delte(self, url,header):
"""
一个post请求,返回json
"""
# result = requests.post(url, data=postdata, headers=header)
result = requests.delete(url,headers=header)
result = json.loads(result.content)
return result
def check_result(self, check_dict, result):
"""
结果检查,要检查的字段及值放在字典里, 结果是完全匹配
result 是json
"""
assert_not_equal(result, [], "只想看下result的值")
for k, v in check_dict.items():
actual_value = self.analysis_json(k, result)
assert_equal(str(v).replace(" ", ""), str(actual_value).replace(" ", ""))
def analysis_json(self, key, result):
"""
解析json
"""
res = None
if self.typeof(result) == 'dict':
if key in result.keys():
return result[key]
else:
for k, v in result.items():
res = self.analysis_json(key, v)
if res is not None:
break
elif self.typeof(result) == 'list':
for value in result:
res = self.analysis_json(key, value)
if res is not None:
break
else:
pass
return res
def typeof(self, variate):
"""
变量类型
:param variate:
:return:
"""
if isinstance(variate, int):
return "int"
elif isinstance(variate, str):
return "str"
elif isinstance(variate, float):
return "float"
elif isinstance(variate, list):
return "list"
elif isinstance(variate, tuple):
return "tuple"
elif isinstance(variate, dict):
return "dict"
elif isinstance(variate, set):
return "set"
else:
return None
def get_openApi_url(self):
"""
获取openapi的接口
"""
env = os.environ['ENV']
print(env)
if env == 'on':
url = 'https://openapi.meitun.com'
elif env == 'pre':
url = 'http://pre-openapi.meitun.com'
elif env == 'sita':
url = 'http://sita-openapi.meitun.com'
else:
url = 'http://sit-openapi.meitun.com'
return url
def get_ip_by_pool(self, poolName, type='sit'):
"""
根据hostname获取ip地址
:param poolName: ip:端口/应用名:端口
:param type: local/sit
:return:
"""
if str(type).lower() == 'local':
return poolName
else:
pool = str(poolName).split(":")[0]
print("pool: ", pool)
port = str(poolName).split(":")[1]
result = self.http_get(
"http://apollo.baobaoshu.com/apiv1/united_devices/?stage=%s&application=%s&usage=SERVER" % (type, pool))
ip = result["results"][0]["ip"]
return str(ip) + ":" + port
def run_local_dubbo(self, content, dubbo_service, dubbo_method, *args):
"""
运行本地dubbo接口
:param dubbo_service: dubbo中 服务名 如:com.zl.mall.api.IItemService
:param dubbo_method: 服务中的方法 如:updateItem
:param args: 方法请求需要的参数
:return:
"""
dubbo_info = GetDubboService2().get_dubbo_info2(content)
invokeDubboApi = InvokeDubboApi(server_host=dubbo_info.get("server_host"),
server_post=dubbo_info.get("server_post"))
return invokeDubboApi.invoke_dubbo_api(dubbo_service, dubbo_method, *args)
def get_open_url(self):
"""
获取openapi的接口
"""
env = os.environ['ENV']
print(env)
if env == 'on':
url = 'https://openapi.meitun.com'
elif env == 'pre':
url = 'http://pre-openapi.meitun.com'
elif env == 'sita':
url = 'http://sita-openapi.meitun.com'
else:
url = 'http://sit-openapi.meitun.com'
return url
def get_openapi_signature(self, module, params):
"""
获取openapi的验签
:param module:
:param params:
:return:
"""
private_key = self.get_business_data(module, "private_key")
app_secret = self.get_business_data(module, "app_secret")
openauth_url = self.get_openauth_url()
data = self.get_business_data(module, "openapi_data", private_key, app_secret, params)
result = commonFuc().http_post(openauth_url, data)
signature = result['signature']
return signature
def get_url(self, pool=None):
"""
根据环境或者url
:return:
"""
env = os.environ['ENV']
print(env)
if pool == "bid":
if env == 'on':
url = 'http://bid.babytree.com'
else:
url = 'https://bid.babytree-test.com'
elif pool == "advertise-go-web":
url = 'http://g.kexin001.com'
elif pool == "ad_Delivery":
url = 'http://go.kexin001.com'
elif pool == "search-platform-index":
if env == 'on':
url = 'http://search-index.babytree.com/index/build'
else:
url = 'http://search-index.babytree-test.com/index/build'
elif pool == "search-platform-web":
if env == 'on':
url = 'http://search-query.babytree.com/search/query'
else:
url = 'http://search-query.babytree-test.com/search/query'
elif pool == "search-merger":
if env == 'on':
url = 'http://merger.babytree.com/search'
else:
url = 'http://merger.babytree-test.com/search'
elif pool == "search-suggest":
if env == 'on':
url = 'http://suggest.babytree.com'
else:
url = 'http://suggest.babytree-test.com'
else:
if env == 'on':
url = 'https://backend.meitunmama.com/'
elif env == 'pre':
url = 'http://pre-backend.meitunmama.com'
elif env == 'sita':
url = 'http://sita-backend.meitunmama.com'
else:
url = 'http://sit-backend.meitunmama.com'
return url
def login_backend(self, driver):
driver.get(self.get_url())
sleep(3)
# driver.assert_template(Template(r"tpl1580989830894.png", record_pos=(3.27, 2.99), resolution=(100, 100)),
# "请填写测试点")
driver.set_window_size(1366, 868)
if os.environ['ENV'] == 'on' or os.environ['ENV'] == 'pre':
es = 'aG9uZ2xp'
driver.find_element_by_id("loginName").send_keys(base64.b64decode(es.encode('utf-8')).decode("utf-8"))
sleep(1)
es = 'aGxiYjEyMTA5Mg=='
driver.find_element_by_xpath("//input[@type='password']").send_keys(
base64.b64decode(es.encode('utf-8')).decode("utf-8"))
sleep(1)
driver.find_element_by_id("smsCode").send_keys("111111")
sleep(1)
else:
driver.find_element_by_id("loginName").send_keys('autotest')
driver.find_element_by_xpath("//input[@type='password']").send_keys('123@qwe')
driver.find_element_by_id("smsCode").send_keys("111111")
driver.find_element_by_id("sub_btn").click()
sleep(2)
# driver.assert_template(Template(r"tpl1579258499558.png", record_pos=(0.47, 0.975), resolution=(100, 100)),
# "验证登录成功了")
# 选择打开页面的路径,如: 大健康-课程包-课程包管理
def enter_channel_manage(self, driver, title, classfy, content):
driver.find_element_by_xpath("//a[@title='%s']" % title).click()
driver.find_element_by_id("east").find_element_by_xpath("//*[text()='%s']" % classfy).click()
driver.find_element_by_xpath("//a[@title='%s']" % content).click()
def enter_album_check(self, driver):
"""
进入专辑审核页
"""
driver.get(
self.get_url() + "/bighealth-service/outcourse/list.htm?source=1&linkId=big_health_audit_sync_btn1_link&tabId=big_health_audit_sync_btn1")
def remove_readonly(self, driver):
"""
去除input的只读属性
"""
inputs = driver.find_elements_by_tag_name("input")
for input in inputs:
driver.execute_script('arguments[0].removeAttribute(\"readonly\")', input)
driver.execute_script('arguments[0].removeAttribute(\"autocomplete\")', input)
def check_variable_exist(self, check_list, result):
"""
结果检查,检查字段值存在
"""
Flag = False
for variable in check_list:
if variable in result.keys():
Flag = True
assert_equal(Flag, True, '验证参数' + variable + "存在")
def check_text_exist(self, check_text, result):
"""
结果检查,检查文本内容是否存在于返回结果中
"""
Flag = False
check_text=str(check_text)
# print('bbbbbbbbbbbbbbbbbbbbbbbbb'+str(result.values()))
if check_text in str(result.values()):
Flag = True
# print('aaaaaaaaaaaaaaaaaaaaaaaaaaaaa',Flag)
assert_equal(Flag, True, '验证文本' + check_text + "存在")
def check_text_no_exist(self, check_text, result):
"""
结果检查,检查文本内容是否存在于返回结果中
"""
Flag = False
# print('bbbbbbbbbbbbbbbbbbbbbbbbb'+str(result.values()))
if check_text not in str(result.values()):
Flag = True
# print('aaaaaaaaaaaaaaaaaaaaaaaaaaaaa',Flag)
assert_equal(Flag, True, '验证文本' + check_text + "不存在")
def check_text_exist_result_text(self, check_text, result_text):
"""
结果检查,检查文本内容是否存在于返回结果中,返回结果也是文本
"""
Flag = False
check_text=str(check_text)
# print('bbbbbbbbbbbbbbbbbbbbbbbbb'+str(result.values()))
if check_text in result_text:
Flag = True
# print('aaaaaaaaaaaaaaaaaaaaaaaaaaaaa',Flag)
assert_equal(Flag, True, '验证文本' + check_text + "存在")
def enter_h5_page(self, driver, params):
"""
:param driver:
:param params: 请求的URL
:return:
"""
url = self.get_api_url()
driver.get(url + params)
def get_start(self, pageno, pagesize):
"""
获取limit的start数
:param pageno:
:param pagesize:
:return:
"""
return (pageno - 1) * pagesize
def click_iterm(self, driver, el_list, name):
"""
点击对应元素
"""
for el in el_list:
print(el.text)
if el.text == name:
driver.execute_script("arguments[0].scrollIntoView();", el)
el.click()
break
def get_mapi_babytree_url(self):
"""
接口mapi_babytree类型的请求
:return:
"""
env = os.environ['ENV']
print(env)
if env == 'on':
url = 'http://mapiweb.babytree.com'
elif env == 'pre':
url = 'http://pre-mapiweb.babytree.com'
else:
url = 'http://mapiweb.babytree-test.com'
return url
def get_localhome_babytree_url(self):
"""
接口mapi_babytree类型的请求
:return:
"""
env = os.environ['ENV']
print(env)
if env == 'on':
url = 'http://localhome.babytree.com'
elif env == 'pre':
url = 'http://pre-localhome.babytree.com'
else:
url = 'http://localhome.babytree-test.com'
return url
def get_go_babytree_url(self):
"""
接口go_babytree类型的请求
:return:
"""
env = os.environ['ENV']
print(env)
if env == 'on':
url = 'http://go.babytree.com'
elif env == 'pre':
url = 'http://go.babytree.com'
else:
url = 'http://go-1.babytree-test.com'
return url
def get_inno_babytree_url(self):
"""
接口mapi_babytree类型的请求
:return:
"""
env = os.environ['ENV']
print(env)
if env == 'on':
url = 'http://apilocal.babytree.com'
elif env == 'pre':
url = 'http://pre-apilocal.babytree.com'
else:
url = 'http://apilocal.babytree-test.com'
return url
def get_localfront_babytree_url(self):
"""
接口mapi_babytree类型的请求
:return:
"""
env = os.environ['ENV']
print(env)
if env == 'on':
url = 'http://localfront.babytree.com'
elif env == 'pre':
url = 'http://pre-localfront.babytree.com'
else:
url = 'http://localfront.babytree-test.com'
return url
def get_search_platform_hz_index_url(self):
"""
接口类型的请求
:return:
"""
env = os.environ['ENV']
if env == 'on':
url = 'http://search-index.babytree.com'
else:
url = 'http://sit-search-index.babytree.com'
return url
# 随机n位字符串,返回n位字符串
def randomString(self, n):
import string
import random
s = "".join(random.sample(string.ascii_letters + string.digits + "!@#$%^&*()", n))
return s
# 打开网页
def openBrowser(self, url, chrome):
chrome.implicitly_wait(10) # 隐式等待 10秒
chrome.get(url)
return chrome
def logIn(self, username, password, login_button,chrome):
chrome.find_element_by_xpath(username).clear()
chrome.find_element_by_xpath(username).send_keys('xhs')
chrome.find_element_by_xpath(password).clear()
chrome.find_element_by_xpath(password).send_keys('a123456!')
chrome.find_element_by_xpath(login_button).click()
def check_login_result(self,username,chrome):
for i in range(10000):
sleep(1)
try:
if (chrome.find_element_by_xpath('//div[text()=" %s "]' % username)).is_displayed():
print('看看这里显示了没有',username)
break
except:
continue
return chrome.find_element_by_xpath('//div[text()=" %s "]' % username).text
def quit_chrome(self,chrome):
chrome.quit()
# 获取当前项目的根目录的路径
def get_pro_path(self):
import os
curPath = os.path.abspath(os.path.dirname(__file__)) # 获取当前文件的所在目录的绝对路径
# print(os.path.split(curPath))
rootPath = os.path.split(curPath)[0]
return rootPath
\ No newline at end of file
import datetime
import calendar
class dateUtils:
def get_current_start_end(self):
"""
获取当日的开始时间和结束时间
:return:
"""
start, end = datetime.datetime.now().replace(hour=0, minute=0, second=0).strftime(
"%Y-%m-%d %H:%M:%S"), datetime.datetime.now().replace(hour=23, minute=59, second=59).strftime(
"%Y-%m-%d %H:%M:%S")
return start, end
def get_week_start_end(self):
"""
获取本周的第一天和最后一天
:return:
"""
monday, sunday = datetime.date.today(), datetime.date.today()
one_day = datetime.timedelta(days=1)
while monday.weekday() != 0:
monday -= one_day
while sunday.weekday() != 6:
sunday += one_day
# 返回当前的星期一和星期天的日期
monday, sunday = monday.strftime("%Y-%m-%d %H:%M:%S"), sunday.strftime("%Y-%m-%d %H:%M:%S").replace("00:00:00", "23:59:59")
return monday, sunday
def get_current_time(self):
"""
获取当前时间,返回格式是yyyy-mm-dd hh:mm:ss
:return:
"""
curr_time = datetime.datetime.now()
time = datetime.datetime.strftime(curr_time, '%Y-%m-%d %H:%M:%S')
return time
def get_current_date(self):
"""
获取当前日期,返回格式是yyyy-mm-dd
:return:
"""
curr_time = datetime.datetime.now()
date = datetime.datetime.strftime(curr_time, '%Y-%m-%d')
return date
def get_week_of_date(self, date):
"""
获取传入的日期是星期几
:param date:
:return:
"""
week_day_dict = {
0: '星期一',
1: '星期二',
2: '星期三',
3: '星期四',
4: '星期五',
5: '星期六',
6: '星期日',
}
day = datetime.datetime.strptime(date, "%Y-%m-%d").weekday()
return week_day_dict[day]
def add_days(self, date, days):
"""
获取新的日期
:param date: 日期
:param days: 天数(可正数,可负数)
:return:
"""
dt = datetime.datetime.strptime(date, "%Y-%m-%d")
out_date = (dt + datetime.timedelta(days=days)).strftime("%Y-%m-%d")
return out_date
def add_hours(self, date, hours):
"""
获取新的时间
:param date:
:param hours:
:return:
"""
dt = datetime.datetime.strptime(date, "%Y-%m-%d %H:%M:%S")
out_date = (dt + datetime.timedelta(hours=hours)).strftime("%Y-%m-%d %H:%M:%S")
return out_date
def get_month_start_end(self, year, month):
"""
获取本月的第一天和最后一天
:return:
"""
weekDay, monthCountDay = calendar.monthrange(year, month)
firstDay = datetime.date(year, month, day=1)
lastDay = datetime.date(year, month, day=monthCountDay)
return firstDay, lastDay
def get_every_month_start_end(self, year):
"""
获取某年的每个月的第一天和最后一天
:param year:
:return:
"""
start = []
end = []
for x in range(1, 13):
dt_start = (datetime.datetime(int(year), x, 1)).strftime("%Y%m%d")
if 12 == x:
dt_end = (datetime.datetime(int(year), 12, 31)).strftime("%Y%m%d")
else:
dt_end = (datetime.datetime(int(year), x + 1, 1) - datetime.timedelta(days=1)).strftime("%Y%m%d")
start.append(dt_start)
end.append(dt_end)
return start, end
def get_year_month(self, time):
"""
获取某个时间的年、月份
:param time:
:return:
"""
dt = datetime.datetime.strptime(time, "%Y-%m-%d %H:%M:%S")
return dt.year, dt.month
if __name__ == '__main__':
print(dateUtils().get_current_date())
print(" \n ")
print(dateUtils().get_current_start_end())
print(" \n ")
print(dateUtils().add_days("2020-10-18", 10))
print(" \n ")
print(dateUtils().add_hours("2020-10-18 00:00:00", 3))
print(" \n ")
print(dateUtils().get_current_time())
print(" \n ")
print(dateUtils().get_every_month_start_end("2020"))
print(" \n ")
print(dateUtils().get_week_of_date("2020-10-18"))
import time
import telnetlib
import re
class TelnetClient(object):
"""通过telnet连接dubbo服务, 执行shell命令, 可用来调用dubbo接口
"""
def __init__(self, server_host, server_post):
self.tn = telnetlib.Telnet()
self.server_host = server_host
self.server_port = server_post
# 此函数实现telnet登录主机
def connect_dubbo(self):
try:
print("telent连接dubbo服务端: telnet {} {} ……".format(self.server_host, self.server_port))
self.tn.open(self.server_host, port=self.server_port)
return True
except Exception as e:
print('连接失败, 原因是: {}'.format(str(e)))
return False
# 此函数实现执行传过来的命令,并输出其执行结果
def execute_some_command(self, command):
# 执行命令
cmd = (command + '\n').encode("gbk")
self.tn.write(cmd)
# 获取命令结果,字符串类型
retry_count = 0
# 如果响应未及时返回,则等待后重新读取,并记录重试次数
result = self.tn.read_very_eager().decode(encoding='gbk')
while result == '':
time.sleep(1)
result = self.tn.read_very_eager().decode(encoding='gbk')
retry_count += 1
return result
# 退出telnet
def logout_host(self):
self.tn.write(b"exit\n")
print("登出成功")
class InvokeDubboApi(object):
def __init__(self, server_host, server_post):
try:
self.telnet_client = TelnetClient(server_host, server_post)
self.login_flag = self.telnet_client.connect_dubbo()
except Exception as e:
print("invokedubboapi init error" + str(e))
def invoke_dubbo_api(self, dubbo_service, dubbor_method, args):
api_name = dubbo_service + "." + dubbor_method + "({})"
cmd = "invoke " + api_name.format(args)
# print("调用命令是:{}".format(cmd))
resp0 = None
try:
if self.login_flag:
resp0 = self.telnet_client.execute_some_command(cmd)
# print("接口响应是,resp={}".format(resp0))
# dubbo接口返回的数据中有 elapsed: 4 ms. 耗时,需要使用elapsed 进行切割
return str(re.compile(".+").findall(resp0).pop(0)).split("elapsed").pop(0).strip()
else:
print("登陆失败!")
except Exception as e:
raise Exception("调用接口异常, 接口响应是resp={}, 异常信息为:{}".format(resp0, str(e)))
self.logout()
def logout(self):
self.telnet_client.logout_host()
class GetDubboService2(object):
def __init__(self):
pass
def get_dubbo_info2(self,content):
try:
dubbore = re.compile(r"([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+:[0-9]+)", re.I)
result = dubbore.search(str(content)).group()
print("获取到dubbo部署信息" + result)
return {"server_host": result.split(":")[0], "server_post": result.split(":")[1]}
except Exception as e:
raise Exception("获取dubbo部署信息失败:{}".format(str(e)))
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment