Commit 22b60c14 authored by 齐 振鋆's avatar 齐 振鋆
Browse files

个人代码迁移

parent c6ba7a8b
Pipeline #10140 failed with stages
in 3 seconds
# -*- coding:utf-8 -*-
# 作者:齐振鋆
# 日期:2022/11/14
# 描述: 骨科一体化项目
# -*- coding:utf-8 -*-
# 作者:齐振鋆
# 日期:2022/8/11
[pytest]
python_files = test_*.py
python_classes = Test*
python_functions = test
log_cli = 1
log_cli_level = INFO
log_cli_format = %(asctime)s [%(levelname)s] %(message)s (%(filename)s:%(lineno)s)
log_cli_date_format=%Y-%m-%d %H:%M:%S
markers =
incremental: "mast have"
run: "ordering"
debug: "debug"
smoke: "smoke test"
surgery: "gkyth_ssgt"
purchase: "gkyth_cg_op"
transfer: "gkyth_db"
verify: "warehouse verify"
transferin: 'test'
transferoutred:'test'
addopts = -vs --strict-markers --disable-warnings --alluredir allure-results --clean-alluredir
# -*- coding:utf-8 -*-
# 作者:齐振鋆
# 日期:2023/3/27
# 描述:
import os
import pytest
from unit.public.DataDic import DataDic
dd = DataDic()
def py_run():
tab_code = dd.get_value('tab')
if tab_code is None:
pytest.main(['cases/UI/{}/{}'.format(dd.get_value('project'), dd.get_value('module'))])
else:
pytest.main(
['cases/UI/{}/{}'.format(dd.get_value('project'), dd.get_value('module')), '-m', tab_code])
# allure generate allure-results -o allure-report --clean
os.system("allure generate allure-results -o allure-report --clean")
\ No newline at end of file
# -*- coding:utf-8 -*-
# 作者:齐振鋆
# 日期:2022/8/11
# -*- coding:utf-8 -*-
# 作者:齐振鋆
# 日期:2022/11/10
# 描述:
import logging
import re
import pytest
from unit.public.UI.BasePage import BasePage
log = logging.getLogger('gyxc')
class VerifyTool(BasePage):
verify_object = {
're': '正则匹配',
'in': '包含',
'large': '大于',
'less': '小于',
'eq': '等于'
}
def verify_text(self, actual_result, expect_result, verify_type, fail_msg):
"""
param: actual_result 实际结果
param: expect_result 预期结果
param: verify_type 校验类型[re:正则,in:包含,large:大于,less:小于,eq:等于]
param: fail_msg 失败信息
"""
res_flag = False
if verify_type == 're':
res = re.search(expect_result, actual_result)
if res:
res_flag = True
if verify_type == 'in':
res_flag = expect_result in actual_result
if verify_type == 'large':
res_flag = actual_result > expect_result
if verify_type == 'less':
res_flag = actual_result < expect_result
if verify_type == 'eq':
res_flag = actual_result == expect_result
log.info('{} {} {},结果为:{}'.format(actual_result, self.verify_object[verify_type], expect_result, res_flag))
if res_flag is False:
self.screenshot()
pytest.fail(fail_msg)
def verify_attribute(self):
pass
# -*- coding:utf-8 -*-
# 作者:齐振鋆
# 日期:2023/1/9
# 描述:
# -*- coding:utf-8 -*-
# 作者:齐振鋆
# 日期:2023/1/9
# 描述:
# -*- coding:utf-8 -*-
# 作者:齐振鋆
# 日期:2023/1/10
# 描述:
from random import sample
class BaseUnit(object):
def __init__(self):
pass
@staticmethod
def random_string(str_len=32):
return ''.join(sample('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890', str_len))
# -*- coding:utf-8 -*-
# 作者:齐振鋆
# 日期:2023/1/10
# 描述:
import pymysql
from unit.public.YamlUtil import UtilYaml
class MysqlDataBase(object):
__init_flag = False
__instance = None
def __new__(cls, connect_info):
cls.__instance = object.__new__(cls) if cls.__instance is None else cls.__instance
return cls.__instance
def __init__(self, connect_info):
if not MysqlDataBase.__init_flag:
MysqlDataBase.__init_flag = True
self.__db = pymysql.connect(host=connect_info['DB_HOST'],
port=connect_info['DB_PORT'],
user=connect_info['DB_USER'],
passwd=connect_info['DB_PASSWD'],
db=connect_info['DB_NAME'], charset='utf8')
self.cursor = self.__db.cursor()
def exec(self, sql):
self.cursor.execute(sql)
return self.cursor.fetchall()
def commit(self):
self.__db.commit()
def final(self):
self.cursor.close()
self.__db.close()
# -*- coding:utf-8 -*-
# 作者:齐振鋆
# 日期:2022/11/9
# 描述:
class DataDic(object):
__init_flag = False
__instance = None
def __new__(cls):
if cls.__instance is None:
cls.__instance = object.__new__(cls)
return cls.__instance
def __init__(self):
if not DataDic.__init_flag:
DataDic.__init_flag = True
self.global_data = {}
def set_value(self, key, value):
self.global_data.setdefault(key, value)
def get_value(self, key):
return self.global_data.get(key)
# -*- coding:utf-8 -*-
# 作者:齐振鋆
# 日期:2023/1/10
# 描述:
import warnings
from openpyxl import load_workbook
class Excel(object):
__init_flag = False
__instance = None
def __new__(cls, file, sheet_name):
cls.__instance = object.__new__(cls) if cls.__instance is None else cls.__instance
return cls.__instance
def __init__(self, file, sheet_name):
if not Excel.__init_flag:
Excel.__init_flag = True
self.__file = file
with warnings.catch_warnings(record=True):
self.__wb = load_workbook(file)
self.__sheet = self.__wb[sheet_name]
def excel_save(self):
self.__wb.save(self.__file)
def excel_close(self):
self.__wb.close()
def excel_write(self, i, j, value):
"""
excel写
:param i: 行
:param j: 列
:param value: 值
:return:
"""
item = self.__sheet.cell(row=i, column=j)
item.value = value
def excel_get_max_line(self):
"""
获取sheet最大行
:return:
"""
return self.__sheet.max_row
def excel_read(self, i, j):
"""
excel读取单元格
:param i: 行
:param j: 列
:return:
"""
item = self.__sheet.cell(row=i, column=j)
return item.value
# -*- coding:utf-8 -*-
# 作者:齐振鋆
# 日期:2022/11/12
# 描述:
import time
from datetime import datetime, timedelta
class TimeUnit(object):
@staticmethod
def get_date_str(time_diff=0):
"""
param: time_diff - 时间差
"""
now = datetime.now()
dt = now + timedelta(days=time_diff)
return dt.strftime('%Y-%m-%d')
@staticmethod
def get_time_str(time_diff=0):
"""
param: time_diff - 时间差
"""
now = datetime.now()
dt = now + timedelta(hours=time_diff)
return dt.strftime('%H-%M-%S')
@staticmethod
def get_timezone():
return int(time.time() * 1000)
@staticmethod
def get_datetime(format_type='X'):
now = datetime.now()
dt = time.strftime('%Y-%m-%d %X') if format_type == 'X' else time.strftime(
'%Y-%m-%d') if format_type == 'D' else '{}{:0>2}{:0>2}{:0>2}{:0>2}{:0>2}{:0>3}'.format(now.year, now.month,
now.day, now.hour,
now.minute,
now.second,
int(now.microsecond / 1000))
return dt
@staticmethod
def str_to_datetime(date_str):
return datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S')
# -*- coding:utf-8 -*-
# 作者:齐振鋆
# 日期:2022/9/23
import logging
import os
import time
import allure
import pyautogui
import pytest
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support import expected_conditions as ec
from selenium.webdriver.support.ui import WebDriverWait
from unit.public.DataDic import DataDic
from unit.public.TimeUnit import TimeUnit
log = logging.getLogger(__name__)
class BasePage(object):
def __init__(self):
self.locator_dict = {
'id': By.ID,
'name': By.NAME,
'class': By.CLASS_NAME,
'tag': By.TAG_NAME,
'link': By.LINK_TEXT,
'plink': By.PARTIAL_LINK_TEXT,
'xpath': By.XPATH,
'css': By.CSS_SELECTOR,
}
self.dd = DataDic()
self.tu = TimeUnit()
self.try_count = 0
def split_locator(self, locator):
"""
分解定位表达式,如'(xpath,//div[@class="username"])',拆分后返回'By.XPATH'和定位表达式'//div[@class="username"]'
:param locator: 定位方法+定位表达式组合字符串,如'css,.username'
:return: locator_dict[by], value:返回定位方式和定位表达式
"""
if type(locator) is tuple:
by = locator[0]
value = locator[1]
if by not in self.locator_dict.keys():
raise NameError(
"wrong locator!'id','name','class','tag','link','plink','xpath','css',exp:'id,username'")
element_obj = (self.locator_dict[by], value)
else:
element_obj = (self.locator_dict['xpath'], locator)
# log.debug('操作元素:{}'.format(element_obj))
return element_obj
def wait_element(self, locator, wait_time=None):
"""
等待元素出现
:param locator: 定位方法+定位表达式组合字符串,用逗号分隔,如'(xpath,//div[@class="username"])'
:param wait_time: 等待时间
"""
element_obj = self.split_locator(locator)
try:
wait = self.dd.get_value('element_wait_time') if wait_time is None else wait_time
WebDriverWait(self.dd.get_value('driver'), wait, 1).until(ec.visibility_of_element_located(element_obj))
log.debug('等待元素:{}'.format(locator))
return True
except TimeoutException:
return False
except Exception as e:
log.info(e)
self.dd.get_value('driver').quit()
def retry_get_element(self, locator):
if self.try_count < 2:
log.debug("等待1秒后,第{}次未找到元素,继续尝试。".format(self.try_count + 1))
time.sleep(1)
self.try_count += 1
self.get_element(locator)
else:
log.debug("第{}次未找到元素,测试失败。".format(self.try_count + 1))
self.fail_screenshot()
pytest.fail('元素获取失败...')
self.dd.get_value('driver').quit()
def fail_screenshot(self):
log.debug("失败截图")
allure.attach(self.dd.get_value('driver').get_screenshot_as_png(), '{}'.format(self.tu.get_timezone()),
allure.attachment_type.PNG)
def get_element(self, locator, retry=False):
"""
获取一个元素
:param locator: 定位方法+定位表达式组合字符串,用逗号分隔,如'(xpath,//div[@class="username"])'
:param retry: 重试标识
:return: 元素可找到返回element对象,否则再次查找
"""
# if self.wait_element(locator, sec) is True:
# log.debug('获取元素:{}'.format(locator))
# with allure.step('获取元素:{}'.format(locator)):
element_obj = self.split_locator(locator)
try:
element = WebDriverWait(self.dd.get_value('driver'), self.dd.get_value('element_wait_time'), 1).until(
ec.visibility_of_element_located(element_obj))
self.try_count = 0
return element
except Exception as e:
if retry is True:
self.retry_get_element(locator)
else:
self.fail_screenshot()
raise e
def get_element_coordinate(self, locator):
"""获取元素坐标"""
return self.get_element(locator).location
def get_elements(self, locator):
"""
获取一组元素
:param locator: 定位方法+定位表达式组合字符串,用逗号分隔,如'(xpath,//div[@class="username"])'
:return: elements
"""
element_objs = self.split_locator(locator)
try:
elements = WebDriverWait(self.dd.get_value('driver'), self.dd.get_value('element_wait_time'), 1).until(
ec.visibility_of_all_elements_located(element_objs))
# log.debug('获取元素列表:{}'.format(locator))
return elements
except Exception as e:
self.fail_screenshot()
raise e
def open(self, url):
"""
打开网址
:param url: 网址连接
"""
log.debug('打开网址:{}'.format(url))
self.dd.get_value('driver').get(url)
def clear(self, locator):
"""
清除元素中的内容
:param locator: 定位方法+定位表达式组合字符串,用逗号分隔,如'css,.username'
"""
log.debug('清空内容:{}'.format(locator))
self.get_element(locator).clear()
def type(self, locator, text):
"""
在元素中输入内容
:param locator: 定位方法+定位表达式组合字符串,用逗号分隔,如'css,.username'
:param text: 输入的内容
"""
self.clear(locator)
log.debug('向元素 {} 输入文字:{}'.format(locator, text))
self.get_element(locator).send_keys(text)
def enter(self, locator):
"""
在元素上按回车键
:param locator: 定位方法+定位表达式组合字符串,用逗号分隔,如'css,.username'
"""
log.debug('在元素 {} 上按回车'.format(locator))
self.get_element(locator).send_keys(Keys.ENTER)
def click(self, locator):
"""
在元素上单击
:param locator: 定位方法+定位表达式组合字符串,用逗号分隔,如'css,.username'
"""
log.debug('点击元素:{}'.format(locator))
self.get_element(locator).click()
time.sleep(1)
def right_click(self, locator):
"""
鼠标右击元素
:param locator: 定位方法+定位表达式组合字符串,用逗号分隔,如'css,.username'
"""
log.debug('在元素上右击:{}'.format(locator))
element = self.get_element(locator)
ActionChains(self.dd.get_value('driver')).context_click(element).perform()
def double_click(self, locator):
"""
双击元素
:param locator: 定位方法+定位表达式组合字符串,用逗号分隔,如'css,.username'
"""
log.debug('在元素上双击:{}'.format(locator))
element = self.get_element(locator)
ActionChains(self.dd.get_value('driver')).double_click(element).perform()
def move_to_element(self, locator):
"""
鼠标指向元素
:param locator: 定位方法+定位表达式组合字符串,用逗号分隔,如'css,.username'
"""
log.debug('指向元素 {}'.format(locator))
element = self.get_element(locator)
ActionChains(self.dd.get_value('driver')).move_to_element(element).perform()
time.sleep(2)
def drag_and_drop(self, locator, target_locator):
"""
拖动一个元素到另一个元素位置
:param locator: 要拖动元素的定位
:param target_locator: 目标位置元素的定位
"""
log.debug('把元素 {} 拖至元素 {}'.format(locator, target_locator))
element = self.get_element(locator)
target_element = self.get_element(target_locator)
ActionChains(self.dd.get_value('driver')).drag_and_drop(element, target_element).perform()
def drag_and_drop_by_offset(self, locator, xoffset, yoffset):
"""
拖动一个元素向右下移动x,y个偏移量
:param locator: 定位方法+定位表达式组合字符串,用逗号分隔,如'css,.username'
:param xoffset: X offset to move to
:param yoffset: Y offset to move to
"""
log.debug('把元素 {} 拖至坐标:{}, {}'.format(locator, xoffset, yoffset))
element = self.get_element(locator)
ActionChains(self.dd.get_value('driver')).drag_and_drop_by_offset(element, xoffset, yoffset).perform()
def click_link(self, text):
"""
按部分链接文字查找并点击链接
:param text: 链接的部分文字
"""
log.debug('点击连接:{}'.format(text))
self.get_element('plink,' + text).click()
def alert_text(self):
"""
返回alert文本
:return: alert文本
"""
log.debug('获取弹框文本:{}'.format(self.dd.get_value('driver').switch_to.alert.text))
return self.dd.get_value('driver').switch_to.alert.text
def alert_accept(self):
"""
alert点确认
"""
log.debug('点击弹框确认')
self.dd.get_value('driver').switch_to.alert.accept()
def alert_dismiss(self):
"""
alert点取消
"""
log.debug('点击弹框取消')
self.dd.get_value('driver').switch_to.alert.dismiss()
def get_attribute(self, locator, attribute):
"""
返回元素某属性的值
:param locator: 定位方法+定位表达式组合字符串,用逗号分隔,如'css,.username'
:param attribute: 属性名称
:return: 属性值
"""
value = self.get_element(locator).get_attribute(attribute)
log.debug('获取元素 {} 的属性值 {} 为:{}'.format(locator, attribute, value))
return value
def get_ele_text(self, locator):
"""
返回元素的文本
:param locator: 定位方法+定位表达式组合字符串,用逗号分隔,如'css,.username'
:return: 元素的文本
"""
text = self.get_element(locator).text
log.debug('获取元素 {} 的文本为:{}'.format(locator, text))
return self.get_element(locator).text
def is_display(self, locator):
"""判断元素是否显示"""
return self.get_element(locator).is_displayed()
def to_frame(self, locator):
"""
进入frame
:param locator: 定位方法+定位表达式组合字符串,如'css,.username'
"""
log.debug('进入frame:{}'.format(locator))
e = self.get_element(locator)
WebDriverWait(self.dd.get_value('driver'), DataDic().get_value('element_wait_time'), 1).until(
ec.frame_to_be_available_and_switch_to_it(e))
def to_parent_frame(self):
"""
进入父frame
"""
log.info('进入父frame')
self.dd.get_value('driver').switch_to.parent_frame()
def out_frame(self):
"""
返回主文档
"""
log.info('退出frame返回默认文档')
self.dd.get_value('driver').switch_to.default_content()
def open_new_window_by_locator(self, locator):
"""
点击元素打开新窗口,并将句柄切换到新窗口
:param locator: 定位方法+定位表达式组合字符串,如'css,.username'
"""
log.debug('点击元素 {} 打开新窗口'.format(locator))
self.get_element(locator).click()
self.dd.get_value('driver').switch_to.window(self.dd.get_value('driver').window_handles[-1])
# old_handle = self.dd.get_value('driver').current_window_handle
# self.get_element(locator).click()
# all_handles = self.dd.get_value('driver').window_handles
# for handle in all_handles:
# if handle != old_handle:
# self.dd.get_value('driver').switch_to.window(handle)
def open_new_window_by_element(self, element):
"""
点击元素打开新窗口,并将句柄切换到新窗口
:param element: 元素对象
"""
log.debug('点击元素打开新窗口')
element.click()
self.dd.get_value('driver').switch_to.window(self.dd.get_value('driver').window_handles[-1])
def js(self, script):
"""
执行JavaScript
:param script:js语句
"""
log.debug('执行JS语句:{}'.format(script))
self.dd.get_value('driver').execute_script(script)
@staticmethod
def zoom(z_type, times):
"""放大缩小"""
for i in range(times):
pyautogui.hotkey('ctrl', z_type)
time.sleep(1)
time.sleep(3)
def set_attribute(self, locator, attr, val):
self.dd.get_value('driver').execute_script("arguments[0].setAttribute('{}', '{}')".format(attr, val),
self.get_element(locator))
def scroll_element(self, locator):
"""
拖动滚动条至目标元素
:param locator: 定位方法+定位表达式组合字符串,如'css,.username'
"""
log.debug('滚动至元素:{}'.format(locator))
script = "arguments[0].scrollIntoView()"
element = self.get_element(locator)
self.dd.get_value('driver').execute_script(script, element)
def scroll_top(self):
"""
滚动至顶部
"""
log.debug('滚动至顶部')
self.js("window.scrollTo(document.body.scrollHeight,0)")
def scroll_bottom(self):
"""
滚动至底部
"""
log.debug('滚动至底部')
self.js("window.scrollTo(0,document.body.scrollHeight)")
def back(self):
"""
页面后退
"""
log.debug('页面后退')
self.dd.get_value('driver').back()
def forward(self):
"""
页面向前
"""
log.debug('页面向前')
self.dd.get_value('driver').forward()
def is_text_on_page(self, text):
"""
返回页面源代码
:return: 页面源代码
"""
if text in self.dd.get_value('driver').page_source:
log.debug('判断页面上有文本:{}'.format(text))
return True
else:
log.debug('判断页面上没有文本:{}'.format(text))
return False
def refresh(self):
"""
刷新页面
"""
log.debug('刷新页面')
self.dd.get_value('driver').refresh()
def screenshot(self, file_name='err@{}.png'.format(int(time.time() * 1000))):
"""
截图,起名为:文件名-方法名-注释
:param file_name: 文件名
"""
log.debug(u'截图')
screen_path = DataDic().get_value('screenshot_path') # 从全局变量取截图文件夹位置
if not os.path.exists(screen_path):
os.makedirs(screen_path)
filepath = os.path.join(screen_path, file_name)
self.dd.get_value('driver').save_screenshot(filepath)
with open(filepath, 'rb') as f:
file = f.read()
allure.attach(file, '异常截图{}'.format(self.tu.get_timezone()), attachment_type=allure.attachment_type.PNG)
def close(self):
"""
关闭当前页
"""
log.debug('关闭当前Tab')
self.dd.get_value('driver').close()
self.dd.get_value('driver').switch_to.window(self.dd.get_value('driver').window_handles[0])
# -*- coding:utf-8 -*-
# 作者:齐振鋆
# 日期:2023/1/9
# 描述:
# -*- coding:utf-8 -*-
# 作者:齐振鋆
# 日期:2023/1/9
# 描述:
import win32api
import win32con
import win32gui
import win32print
class WindowsCommon:
"""
windows的一些函数
"""
def __init__(self):
self.hdc = win32gui.GetDC(0)
def get_screen_resolution(self):
"""
得到屏幕的真实分辨率
:return:
"""
sx = win32print.GetDeviceCaps(self.hdc, win32con.DESKTOPHORZRES)
sy = win32print.GetDeviceCaps(self.hdc, win32con.DESKTOPVERTRES)
return sx, sy
@staticmethod
def get_screen_current():
"""
得到屏幕缩放后的分辨率
:return:
"""
sx = win32api.GetSystemMetrics(win32con.SM_CXSCREEN)
sy = win32api.GetSystemMetrics(win32con.SM_CYSCREEN)
return sx, sy
def get_screen_scale_rate(self):
"""
得到当前屏幕的缩放比例
:return:
"""
screen_x, screen_y = self.get_screen_resolution()
current_x, current_y = self.get_screen_current()
rate = round(screen_x / current_x, 2)
return rate
if __name__ == '__main__':
windows_common = WindowsCommon()
print(windows_common.get_screen_scale_rate())
# -*- coding:utf-8 -*-
# 作者:齐振鋆
# 日期:2022/11/1
import logging
import yaml
log = logging.getLogger(__name__)
class UtilYaml(object):
def __init__(self, file_path):
self.yaml_file = file_path
def load_yaml(self):
with open(self.yaml_file, 'r', encoding='utf-8') as f:
text = yaml.load(f, Loader=yaml.FullLoader)
return text
def get_yaml_value(self, key_lev):
value = self.load_yaml()
for key in key_lev:
try:
value = value[key]
except KeyError:
log.info('没有key:{}'.format(key))
value = None
except TypeError:
log.info('{}不是json字符串'.format(value))
value = None
return value
def write_yaml(self, data):
with open(self.yaml_file, 'a', encoding='utf-8') as f:
text = yaml.dump(data, f, allow_unicode=True)
def clean_yaml(self):
with open(self.yaml_file, 'w', encoding='utf-8') as f:
f.truncate()
if __name__ == '__main__':
yaml_file = '../../data/surgeryTestData.yaml'
res = UtilYaml(yaml_file).get_yaml_value(['goods', 'val'])
print(res)
# -*- coding:utf-8 -*-
# 作者:齐振鋆
# 日期:2023/1/9
# 描述:
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment