Commit c5b12575 authored by xiao-hesheng's avatar xiao-hesheng
Browse files

数据库验证流程十三脚本编写

parent c417933e
......@@ -21,13 +21,44 @@ import time
module = "b5_spd3_core_business_flow"
def add_dept():
# 内网登录
username = commonFuc().get_business_data(module, "username")
# print('医院名称',username)
password = commonFuc().get_business_data(module, "password")
# print(username, password,'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')
# 获取token和projectCode
token, projectCode, uxid, corpId, info = login_system(username, password).get_token()
# print( token,projectCode,uxid,corpId,info)
# c0_新增科室_特殊科室_设备科_中心库房
branch_id = FileUtils().r_info8(module, '院区新增type2','message')["院区id"]
url = commonFuc().get_api_url() + commonFuc().get_business_data(module, "dept_insert_url")
# 获取请求头信息
headers = commonFuc().get_business_data(module, "json_headers2",
commonFuc().get_business_data(module, "json_contentType"), token,
commonFuc().get_business_data(module, "X-APP-CODE"))
# #请求体
request_body = commonFuc().get_business_data(module, "payload14", branch_id)
print(request_body)
# # # 发送请求
result = commonFuc().http_post(url, request_body, headers)
print('result', result)
# #断言
checkDict = commonFuc().get_business_data(module, 'checkDict2')
commonFuc().check_result(checkDict, result)
def test1(type=1):
def test1():
# 院区新增
execute_command("python runner_test.py tag id2263-4 debug sit")
# 新增科室
execute_command(
"python runner_test.py tag id2263-19,id2263-20,id2263-21,id2263-22,id2263-23,id2263-24,id2263-25,id2263-26 debug sit")
if type==1:
# 院区新增
execute_command("python runner_test.py tag id2263-4 debug sit")
# 新增科室
execute_command(
"python runner_test.py tag id2263-19,id2263-20,id2263-21,id2263-22,id2263-23,id2263-24,id2263-25,id2263-26 debug sit")
else:
branch_add()
add_dept()
# 一键导入所有产品
execute_command("python runner_test.py tag id2263-48 debug sit")
#导入同步过来的新产品,防止因同步时间差导致新建的产品没有导入过来
......@@ -292,13 +323,51 @@ def add_StockArea():#添加货位
commonFuc().check_text_exist(0, result)
else:
commonFuc().check_text_exist('error', result)
def branch_add(token):
module='b2_herp3_bs'
# 内网登录
username = commonFuc().get_business_data(module, "username")
# print('医院名称',username)
password = commonFuc().get_business_data(module, "password")
# print(username, password,'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')
# 获取token和projectCode
token, projectCode, uxid, corpId, info = login_system(username, password).get_token()
url=commonFuc().get_api_add_port_url()+commonFuc().get_business_data(module, "insert_url")
print('a4_院区新增',url)
# 获取请求头信息
headers = commonFuc().get_business_data(module, "json_headers2",
commonFuc().get_business_data(module, "json_contentType"), token,
commonFuc().get_business_data(module, "X-APP-CODE"))
name=commonFuc().randomString(6)#核心业务流的长度后缀设置为6位
#请求体
request_body=commonFuc().get_business_data(module, "payload",name,name)
# 发送请求
result = commonFuc().http_post(url, request_body,headers)
print('result', result)
#断言
code=commonFuc().analysis_json('code',result)
commonFuc().check_text_exist(0,result)
id1=commonFuc().analysis_json('id',commonFuc().analysis_json('data',result))
name=commonFuc().analysis_json('name',commonFuc().analysis_json('data',result))
#读取原有院区文件中的内容
old_branch_result=FileUtils().r_info9(module, '院区新增', 'message')
print(old_branch_result)
#写入到另一个文件中备份起来
FileUtils().w_info9('b5_spd3_core_business_flow',old_branch_result,'message29')
info = (id1, name)
titlename = ('院区id', '院区name')
FileUtils().w_info2(info,module,'院区新增')
def main():
try:
# ==============删除业务数据和报表数据
# delData().Delete_business_Data()#删除业务数据
# delData().Del_reporter_data() # 删除报表数据
#==============删除业务数据和报表数据
test1()
test1()#参数2是新增院区后缀为6位数的参数1是老模式
test2()# 科室添加产品信息(产品是从外网同步过来的)
dept_query_all()# 科室查询,将科室id写入文件中
grants_user()#给用户授权
......
......@@ -839,9 +839,8 @@ try:
# check_mcms_psi_dept_out_and_in(20, 20) #检查高值消耗出库单和消耗入库单
# second_dept_consume_Out_Stock(1) # 低值消耗出库
check_mcms_psi_dept_XH_CK(10,16)
second_dept_consume_in_Stock(1) # 低值消耗退回入库
check_mcms_psi_dept_XH_CK(16,16)
check_mcms_psi_dept_XH_CK(10,16)
second_dept_consume_Out_Stock(3) # 试剂消耗出库
second_dept_consume_in_Stock(3) # 试剂消耗退回入库
commonFuc().check_text_exist_result_text('SUCCESS', 'SUCCESS')
......
# -*- encoding=utf8 -*-
from common.db.sql.sql_del_branch_info import delData
from common.db.sql.sql_tools import main2
from common.fileUtls import FileUtils
from air_case.public1.public1.public1 import *
......@@ -34,27 +33,32 @@ import sys
# 第一种方式===========================
# # 第二种方式=================================
# hos_goods_id = FileUtils().r_info8('b5_spd3_core_business_flow', '产品审核信息', 'message2')["hosGoodsId"]
# actual_value1 = main2(hos_goods_id)
# print(actual_value1)
# # print('actual_value2',actual_value1)
# actual_value2 = get_process_list2(actual_value1)
# print('actual_value2', actual_value2, type(actual_value2))
# prov_hos_goods_id = '7be89fe4c5ee4689a5e249d3197b9947'
# expected_value2 = commonFuc().get_business_data('b5_spd3_core_business_flow', 'expected_value2', prov_hos_goods_id)
# expected_value2=get_process_list2(expected_value2)
# print('expected_value2', expected_value2, type(expected_value2))
# actual_value2 = actual_value2.replace(" ", "")
# expected_value2 = expected_value2.replace(" ", "")
# print(actual_value2)
# print(expected_value2)
#
# print(actual_value2 == expected_value2)
# if actual_value2 == expected_value2:
# print('ok')
# commonFuc().check_text_exist_result_text('succees', 'succees')
# else:
# print('error')
# commonFuc().check_text_exist_result_text('error', 'succees')
# 第二种方式=================================
hos_goods_id = FileUtils().r_info8('b5_spd3_core_business_flow', '产品审核信息', 'message2')["hosGoodsId"]
actual_value1 = main2(hos_goods_id)
print(actual_value1)
# print('actual_value2',actual_value1)
actual_value2 = get_process_list2(actual_value1)
print('actual_value2', actual_value2, type(actual_value2))
prov_hos_goods_id = '7be89fe4c5ee4689a5e249d3197b9947'
expected_value2 = commonFuc().get_business_data('b5_spd3_core_business_flow', 'expected_value2', prov_hos_goods_id)
expected_value2=get_process_list2(expected_value2)
print('expected_value2', expected_value2, type(expected_value2))
actual_value2 = actual_value2.replace(" ", "")
expected_value2 = expected_value2.replace(" ", "")
print(actual_value2)
print(expected_value2)
print(actual_value2 == expected_value2)
if actual_value2 == expected_value2:
print('ok')
commonFuc().check_text_exist_result_text('succees', 'succees')
else:
print('error')
commonFuc().check_text_exist_result_text('error', 'succees')
# 第二种方式=================================
\ No newline at end of file
old_branch_result=FileUtils().r_info9('b2_herp3_bs', '院区新增', 'message')
print(old_branch_result)
# 写入到另一个文件中
FileUtils().w_info9('b5_spd3_core_business_flow', old_branch_result, 'message29')
\ No newline at end of file
......@@ -306,7 +306,29 @@ class FileUtils(object):
return None
else:
return None
def r_info9(self, module, keyname, message_no):
w_path = rootPath + os.sep + 'data' + os.sep + module
yamlpath = os.path.join(w_path, message_no)
try:
file_value = open(yamlpath, 'r', encoding='utf-8')
except:
file_value = open(yamlpath, 'w', encoding='utf-8')
file_value = open(yamlpath, 'r', encoding='utf-8')
result = yaml.load(file_value.read(), Loader=yaml.Loader)
if result is not None:
return result
else:
return None
def w_info9(self, module,content,message_no):
module = module
dict = content
w_path = rootPath + os.sep + 'data' + os.sep + module
# print(w_path)
yamlpath = os.path.join(w_path, message_no)
# 写入到yaml文件
with open(yamlpath, "a", encoding="utf-8") as f:
yaml.dump(dict, f, Dumper=yaml.RoundTripDumper, allow_unicode=True)
if __name__ == '__main__':
# info=("aaaa","bbbbbb","mdm3-pim")
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment