Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in / Register
Toggle navigation
Menu
Open sidebar
test
autotest-airtest-web-spd
Commits
7221ea16
Commit
7221ea16
authored
Oct 08, 2024
by
xiao-hesheng
Browse files
流程五脚本提交
parent
8fc3ac18
Changes
4
Hide whitespace changes
Inline
Side-by-side
air_case/b6_spd3_core_business_flow_database_check/a_a6流程五_中心库入库结算_正结算2_外网配送_数据库验证.air/a_a6流程五_中心库入库结算_正结算2_外网配送_数据库验证.py
View file @
7221ea16
...
...
@@ -386,11 +386,11 @@ from a_a6流程五_中心库入库结算_正结算2_外网配送 import order_dp
try
:
token
,
projectCode1
,
uxid1
,
corpId1
,
info1
=
call_login
(
1
)
#
order_dp(1, token, projectCode1, uxid1) # 1低值
#
order_dp(2, token, projectCode1, uxid1) # 2高值
#
order_dp(3, token, projectCode1, uxid1) # 3试剂
#
order_dp
(
1
,
token
,
projectCode1
,
uxid1
)
# 1低值
order_dp
(
2
,
token
,
projectCode1
,
uxid1
)
# 2高值
order_dp
(
3
,
token
,
projectCode1
,
uxid1
)
# 3试剂
time
.
sleep
(
12
)
list_data
=
[
'16'
,
'20'
,
'66'
]
for
i
in
list_data
:
time
.
sleep
(
6
)
...
...
air_case/b6_spd3_core_business_flow_database_check/a_a7流程五_中心库入库结算_正结算3_内网验收入库结算_数据库验证.air/a_a7流程五_中心库入库结算_正结算3_内网验收入库结算_数据库验证.py
View file @
7221ea16
...
...
@@ -21,394 +21,529 @@ from airtest.core.api import using
import
sys
import
time
module
=
"b6_spd3_core_business_flow_database_check"
def
check_accept_order
(
type
=
1
):
# execute_command("python runner_test.py tag id2302-6 debug mdm3")
# # 登录获取用户id等信息,使用创建的用户登录===========开始
info
=
FileUtils
().
r_info8
(
'b2_herp3_bs'
,
"用户名信息"
,
'message3'
)
# print(info)
username
=
info
[
'username1'
]
password
=
commonFuc
().
get_business_data
(
"b2_herp3_bs"
,
"password"
)
# username = '001f90380'
# password='1qaz!QAZ'
print
(
'username,password'
,
username
,
password
)
# # print(username, password,'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')
# # 获取token和projectCode
token
,
projectCode
,
uxid
,
corpId
,
info
=
login_system
(
username
,
password
).
get_token
()
# 登录获取用户id等信息,使用创建的用户登录============结束
# =========================查询待验收订单======================
module
=
"b5_spd3_core_business_flow"
url
=
commonFuc
().
get_api_add_port_url
()
+
commonFuc
().
get_business_data
(
module
,
"check_wait_page_url"
)
print
(
'url'
,
url
)
# 获取请求头信息
headers
=
commonFuc
().
get_business_data
(
module
,
"json_headers2"
,
commonFuc
().
get_business_data
(
module
,
"json_contentType"
),
token
,
projectCode
)
branch_id
=
FileUtils
().
r_info
(
'b2_herp3_bs'
,
'院区新增'
)[
"branch_id"
]
hos_id
=
branch_id
[
0
:
5
]
def
check_mcms_check
(
type
=
1
):
if
type
==
1
:
sourceOrderNo
=
FileUtils
().
r_info8
(
module
,
'低值配送单号'
,
'message10'
)[
"distrBillId1"
]
bill_mode
=
16
elif
type
==
2
:
sourceOrderNo
=
FileUtils
().
r_info8
(
module
,
'高值配送单号'
,
'message10'
)[
"distrBillId1"
]
elif
type
==
3
:
sourceOrderNo
=
FileUtils
().
r_info8
(
module
,
'试剂配送单号'
,
'message10'
)[
"distrBillId1"
]
bill_mode
=
20
else
:
bill_mode
=
66
print
(
'检查验收单数据库写值'
)
check_no
=
FileUtils
().
r_info8
(
'b5_spd3_core_business_flow'
,
'验收单号'
,
'message15'
)[
'CHECK_NO1'
]
sql
=
"select * from mcms_check where id = '%s';"
%
check_no
order_no
=
get_id
(
"select order_no from mcms_check where id = '%s';"
%
check_no
)
# 获取实际值
actual
=
check_mcms_pur_plan_sql
(
sql
)
print
(
'actual'
,
actual
)
if
type
==
1
:
source_id
=
FileUtils
().
r_info8
(
'b5_spd3_core_business_flow'
,
'低值配送单号'
,
'message10'
)[
'distrBillId1'
]
elif
type
==
2
:
source_id
=
FileUtils
().
r_info8
(
'b5_spd3_core_business_flow'
,
'高值配送单号'
,
'message10'
)[
'distrBillId1'
]
else
:
source_id
=
FileUtils
().
r_info8
(
'b5_spd3_core_business_flow'
,
'试剂配送单号'
,
'message10'
)[
'distrBillId1'
]
source_no
=
source_id
prov_id
=
get_prov_id
()
prov_name
=
get_prov_name
()
rec_org_id
=
get_rec_org_id
(
2
)
rec_org_name
=
get_rec_org_name
(
2
)
dept_id
=
rec_org_id
dept_name
=
rec_org_name
branch_id
=
get_branch_id
()
buyBillId
=
get_buyBillId
(
branch_id
,
bill_mode
)
buyOrderNo
=
get_buyOrderNo
(
branch_id
,
bill_mode
)
if
type
==
1
:
hos_goods_id
=
FileUtils
().
r_info8
(
'b5_spd3_core_business_flow'
,
'产品审核信息'
,
'message2'
)[
"hosGoodsId"
]
elif
type
==
2
:
hos_goods_id
=
FileUtils
().
r_info8
(
'b5_spd3_core_business_flow'
,
'产品审核信息'
,
'message1'
)[
"hosGoodsId"
]
else
:
hos_goods_id
=
FileUtils
().
r_info8
(
'b5_spd3_core_business_flow'
,
'产品审核信息'
,
'message3'
)[
"hosGoodsId"
]
planBillId
=
get_planBillId
(
hos_goods_id
)
planOrderNo
=
get_planOrderNo
(
hos_goods_id
)
purOrderNo
=
get_purOrderNo
(
branch_id
,
bill_mode
)
purBillId
=
get_purBillId
(
branch_id
,
bill_mode
)
bill_relation_json
=
get_mcms_chek_purBillDate
(
branch_id
,
bill_mode
)
purBillDate
=
reGetString
(
bill_relation_json
,
r
'purBillDate":"'
,
r
'""distrBillId'
)
distrBillId
=
get_distrBillId
(
branch_id
,
bill_mode
)
distrOrderNo
=
get_distrOrderNo
(
branch_id
,
bill_mode
)
checkBillId
=
check_no
checkOrderNo
=
order_no
create_user
=
get_login_user_uxid
()
sql
=
"select create_time from mcms_check where id='%s';"
%
check_no
create_time
=
get_create_time
(
sql
)
sql
=
"select last_modified from mcms_check where id='%s';"
%
check_no
last_modified
=
get_create_time
(
sql
)
last_modified_user
=
create_user
if
type
==
1
:
check_mcms_check_batch
(
type
,
check_no
,
order_no
,
hos_goods_id
)
expected
=
commonFuc
().
get_business_data
(
'b6_spd3_core_business_flow_database_check'
,
'mcms_check'
,
check_no
,
order_no
,
get_hosid
(),
get_branch_id
(),
source_id
,
source_no
,
prov_id
,
prov_name
,
rec_org_id
,
rec_org_name
,
dept_id
,
dept_name
,
buyBillId
,
planBillId
,
planOrderNo
,
buyOrderNo
,
purOrderNo
,
purBillId
,
purBillDate
,
distrBillId
,
distrOrderNo
,
checkBillId
,
checkOrderNo
,
create_user
,
create_time
,
last_modified
,
last_modified_user
)
elif
type
==
2
:
check_mcms_check_batch
(
type
,
check_no
,
order_no
,
hos_goods_id
)
expected
=
commonFuc
().
get_business_data
(
'b6_spd3_core_business_flow_database_check'
,
'mcms_check1'
,
check_no
,
order_no
,
get_hosid
(),
get_branch_id
(),
source_id
,
source_no
,
prov_id
,
prov_name
,
rec_org_id
,
rec_org_name
,
dept_id
,
dept_name
,
buyBillId
,
planBillId
,
planOrderNo
,
buyOrderNo
,
purOrderNo
,
purBillId
,
purBillDate
,
distrBillId
,
distrOrderNo
,
checkBillId
,
checkOrderNo
,
create_user
,
create_time
,
last_modified
,
last_modified_user
)
else
:
check_mcms_check_batch
(
type
,
check_no
,
order_no
,
hos_goods_id
)
expected
=
commonFuc
().
get_business_data
(
'b6_spd3_core_business_flow_database_check'
,
'mcms_check2'
,
check_no
,
order_no
,
get_hosid
(),
get_branch_id
(),
source_id
,
source_no
,
prov_id
,
prov_name
,
rec_org_id
,
rec_org_name
,
dept_id
,
dept_name
,
buyBillId
,
planBillId
,
planOrderNo
,
buyOrderNo
,
purOrderNo
,
purBillId
,
purBillDate
,
distrBillId
,
distrOrderNo
,
checkBillId
,
checkOrderNo
,
create_user
,
create_time
,
last_modified
,
last_modified_user
)
# 对比预期值和实际值是否一致
# comparison_result(actual, expected)
actual_value1
=
get_process_list2
(
actual
)
expected_value1
=
get_process_list2
(
expected
)
actual_value2
=
actual_value1
.
replace
(
" "
,
""
)
expected_value2
=
expected_value1
.
replace
(
" "
,
""
)
# print('actual_value2',actual_value2)
print
(
'expected_value2'
)
print
(
expected_value2
)
print
(
'actual_value2'
)
print
(
actual_value2
)
result
=
(
compare_text_index
(
actual_value2
,
expected_value2
))
print
(
'result'
,
result
)
if
actual_value2
==
expected_value2
:
print
(
'ok'
)
commonFuc
().
check_text_exist_result_text
(
'succees'
,
'succees'
)
else
:
sourceOrderNo
=
FileUtils
().
r_info8
(
module
,
'低值配送单号'
,
'message10'
)[
"distrBillId1"
]
request_body
=
commonFuc
().
get_business_data
(
module
,
"payload"
,
hos_id
,
branch_id
,
sourceOrderNo
)
print
(
'request_body'
,
request_body
)
# 发送请求
while
True
:
result
=
commonFuc
().
http_post
(
url
,
request_body
,
headers
)
print
(
'result'
,
result
)
# 断言
code
=
commonFuc
().
analysis_json
(
'code'
,
result
)
data
=
commonFuc
().
analysis_json
(
'data'
,
result
)
total
=
commonFuc
().
analysis_json
(
'total'
,
data
)
if
total
==
1
:
break
else
:
time
.
sleep
(
6
)
print
(
'error'
)
commonFuc
().
check_text_exist_result_text
(
'error'
,
'succees'
)
data
=
commonFuc
().
analysis_json
(
'data'
,
result
)
data1
=
commonFuc
().
analysis_json
(
'data'
,
data
)
list_a
=
[]
list_b
=
[]
list_c
=
[]
list_a
.
append
(
data1
)
list_b
.
append
(
data1
)
list_c
.
append
(
data1
)
# print('list_a', len(list_a), list_a)
info
=
(
list_a
,
list_b
,
list_c
)
titlename
=
(
'list_a'
,
'list_b'
,
'list_c'
)
# 将验收单信息写入到文件中
FileUtils
().
w_info8
(
info
,
'b5_spd3_core_business_flow'
,
'验收单信息'
,
titlename
,
'message13'
)
# 从message13文件中获取信息
info
=
FileUtils
().
r_info8
(
'b5_spd3_core_business_flow'
,
"验收单信息"
,
'message13'
)
# print('info', info)
list_a
=
info
[
'list_a'
]
print
(
'list_a'
,
list_a
)
sourceType
=
"sourceType"
billMode
=
'billMode'
recOrgId
=
'recOrgId'
recOrgName
=
'recOrgName'
sourceOrgId
=
'sourceOrgId'
sourceOrgName
=
'sourceOrgName'
sourceVersion
=
'sourceVersion'
tbStatus
=
'tbStatus'
for
i
in
list_a
:
print
(
'i'
,
i
)
for
j
in
i
:
sourceType
=
j
[
"sourceType"
]
billMode
=
j
[
"billMode"
]
recOrgId
=
j
[
'recOrgId'
]
recOrgName
=
j
[
'recOrgName'
]
sourceOrgId
=
j
[
'sourceOrgId'
]
sourceOrgName
=
j
[
'sourceOrgName'
]
sourceVersion
=
j
[
'sourceVersion'
]
tbStatus
=
j
[
'tbStatus'
]
break
# =========================查询待验收订单======================
# ==============================查看订单详情==================
def
check_mcms_check_batch
(
type
,
check_no
,
order_no
,
hos_goods_id
):
print
(
'数据库检查验收单子表'
)
if
type
==
1
:
bill_mode
=
16
elif
type
==
2
:
bill_mode
=
20
else
:
bill_mode
=
66
sql
=
"select * from mcms_check_batch where pid='%s'"
%
check_no
# 获取实际值
actual
=
check_mcms_pur_plan_sql
(
sql
)
print
(
'actual'
,
actual
)
id
=
get_id
(
"select id from mcms_check_batch where pid='%s'"
%
check_no
)
pid
=
check_no
branch_id
=
get_branch_id
()
distr_detail_id
=
get_distr_detail_id
(
branch_id
,
bill_mode
)
source_detail_id
=
distr_detail_id
mdm_goods_code
=
get_mdm_goods_code1
(
hos_goods_id
)
goods_code
=
get_goods_code1
(
hos_goods_id
)
sql
=
"select id from hdi_barcode where code_type='%s' and branch_id='%s' order by create_time desc LIMIT 1;"
%
(
bill_mode
,
branch_id
)
barcode_id
=
get_id
(
sql
)
create_user
=
get_login_user_uxid
()
create_time
=
get_create_time
(
"select create_time from mcms_check_batch where pid='%s'"
%
check_no
)
last_modified
=
get_create_time
(
"select last_modified from mcms_check_batch where pid='%s'"
%
check_no
)
last_modified_user
=
create_user
if
type
==
1
:
expected
=
commonFuc
().
get_business_data
(
'b6_spd3_core_business_flow_database_check'
,
'mcms_check_batch'
,
id
,
pid
,
order_no
,
source_detail_id
,
distr_detail_id
,
hos_goods_id
,
mdm_goods_code
,
goods_code
,
barcode_id
,
create_user
,
create_time
,
last_modified
,
last_modified_user
)
elif
type
==
2
:
sourceId
=
sourceOrderNo
url
=
commonFuc
().
get_api_add_port_url
()
+
commonFuc
().
get_business_data
(
module
,
"check_wait_detail_url"
)
print
(
'url'
,
url
)
request_body
=
commonFuc
().
get_business_data
(
module
,
"payload1"
,
sourceId
,
sourceType
,
billMode
,
branch_id
)
expected
=
commonFuc
().
get_business_data
(
'b6_spd3_core_business_flow_database_check'
,
'mcms_check_batch1'
,
id
,
pid
,
order_no
,
source_detail_id
,
distr_detail_id
,
hos_goods_id
,
mdm_goods_code
,
goods_code
,
barcode_id
,
create_user
,
create_time
,
last_modified
,
last_modified_user
)
else
:
# print('request_body',request_body)
# 发送请求
while
True
:
time
.
sleep
(
6
)
result
=
commonFuc
().
http_post
(
url
,
request_body
,
headers
)
code
=
commonFuc
().
analysis_json
(
'code'
,
result
)
if
code
==
0
:
break
else
:
print
(
'验收单数据未同步完成,等待6秒'
)
expected
=
commonFuc
().
get_business_data
(
'b6_spd3_core_business_flow_database_check'
,
'mcms_check_batch2'
,
id
,
pid
,
order_no
,
source_detail_id
,
distr_detail_id
,
hos_goods_id
,
mdm_goods_code
,
goods_code
,
barcode_id
,
create_user
,
create_time
,
last_modified
,
last_modified_user
)
# 对比预期值和实际值是否一致
# comparison_result(actual, expected)
actual_value1
=
get_process_list2
(
actual
)
expected_value1
=
get_process_list2
(
expected
)
actual_value2
=
actual_value1
.
replace
(
" "
,
""
)
expected_value2
=
expected_value1
.
replace
(
" "
,
""
)
# print('actual_value2',actual_value2)
print
(
'expected_value2'
)
print
(
expected_value2
)
print
(
'actual_value2'
)
print
(
actual_value2
)
result
=
(
compare_text_index
(
actual_value2
,
expected_value2
))
print
(
'result'
,
result
)
# 断言
data
=
commonFuc
().
analysis_json
(
'data'
,
result
)
list_a
=
[]
list_b
=
[]
list_c
=
[]
list_a
.
append
(
data
)
list_b
.
append
(
data
)
list_c
.
append
(
data
)
# print('list_a', len(list_a), list_a)
info
=
(
list_a
,
list_b
,
list_c
)
titlename
=
(
'list_a'
,
'list_b'
,
'list_c'
)
# 将验收单详细信息写入到文件中
FileUtils
().
w_info8
(
info
,
'b5_spd3_core_business_flow'
,
'验收单详情信息'
,
titlename
,
'message14'
)
# 读取viewId
info
=
FileUtils
().
r_info8
(
'b5_spd3_core_business_flow'
,
"验收单详情信息"
,
'message14'
)
# print('info', info)
list_a
=
info
[
'list_a'
]
print
(
'list_a'
,
list_a
)
viewId
=
"viewId"
settlement
=
'settlement'
recTemperature
=
'recTemperature'
recHumidity
=
'recHumidity'
hosGoodsId
=
'hosGoodsId'
distrQty
=
'distrQty'
distrPkgQty
=
'distrPkgQty'
for
i
in
list_a
:
for
j
in
i
:
viewId
=
j
[
"viewId"
]
settlement
=
j
[
"settlement"
]
recTemperature
=
j
[
'recTemperature'
]
recHumidity
=
j
[
'recHumidity'
]
hosGoodsId
=
j
[
'hosGoodsId'
]
distrQty
=
j
[
'distrQty'
]
distrPkgQty
=
j
[
'distrPkgQty'
]
if
actual_value2
==
expected_value2
:
print
(
'ok'
)
commonFuc
().
check_text_exist_result_text
(
'succees'
,
'succees'
)
else
:
print
(
'error'
)
commonFuc
().
check_text_exist_result_text
(
'error'
,
'succees'
)
break
# ==============================查看订单详情==================
# ========设置验收全部合格
url
=
commonFuc
().
get_api_add_port_url
()
+
commonFuc
().
get_business_data
(
module
,
"check_barcode_save_url"
)
print
(
'check_barcode_save_url'
,
url
)
sourceBillId
=
sourceId
request_body
=
commonFuc
().
get_business_data
(
module
,
"payload2"
,
sourceBillId
,
viewId
)
print
(
'设置验收全部合格request_body'
,
request_body
)
# 发送请求
time
.
sleep
(
6
)
def
check_mcms_psi
(
type
=
1
):
print
(
'检查中心库入库单和出库单'
)
if
type
==
1
:
bill_mode
=
16
elif
type
==
2
:
bill_mode
=
20
else
:
bill_mode
=
66
branch_id
=
get_branch_id
()
sql
=
"select * from mcms_psi where target_branch_id='%s' and bill_mode='%s' and stock_kind='RK_YS' order by CREATE_TIME desc LIMIT 1;"
%
(
branch_id
,
bill_mode
)
# 获取实际值
actual
=
check_mcms_pur_plan_sql
(
sql
)
print
(
'actual'
,
actual
)
sql
=
"select id from mcms_psi where target_branch_id='%s' and bill_mode='%s' and stock_kind='RK_YS' order by CREATE_TIME desc LIMIT 1;"
%
(
branch_id
,
bill_mode
)
id
=
get_id
(
sql
)
sql
=
"select order_no from mcms_psi where target_branch_id='%s' and bill_mode='%s' and stock_kind='RK_YS' order by CREATE_TIME desc LIMIT 1;"
%
(
branch_id
,
bill_mode
)
order_no
=
get_id
(
sql
)
target_corp_id
=
get_hosid
()
target_branch_id
=
get_branch_id
()
target_dept_id
=
FileUtils
().
r_info8
(
"b2_herp3_bs"
,
"所有科室id"
,
'message6'
)[
'deptid1'
]
target_dept_name
=
FileUtils
().
r_info8
(
"b2_herp3_bs"
,
"所有科室name"
,
'message6'
)[
'deptid1'
]
stock_name
=
get_branch_name
()
+
"中心库房"
target_stock_id
=
get_stockId_fromdb
(
stock_name
)
target_area_code
=
get_stockAreaId_new_fromDb
(
target_dept_name
,
1
)
source_id
=
FileUtils
().
r_info8
(
'b5_spd3_core_business_flow'
,
'验收单号'
,
'message15'
)[
'CHECK_NO1'
]
source_no
=
get_distrBillId
(
branch_id
,
bill_mode
)
source_repl_id
=
source_no
source_corp_id
=
FileUtils
().
r_info8
(
'b5_spd3_core_business_flow'
,
'供货关系申请2'
,
'message'
)[
"e_corpId"
]
source_dept_id
=
source_corp_id
source_dept_name
=
FileUtils
().
r_info8
(
'b5_spd3_core_business_flow'
,
'供货关系申请2'
,
'message'
)[
"e_username"
]
source_stock_id
=
source_corp_id
accounter
=
get_login_user_uxid_bydb
(
FileUtils
().
r_info8
(
'b2_herp3_bs'
,
"用户名信息"
,
'message3'
)[
'username1'
])
account_date
=
get_create_time
(
"select account_date from mcms_psi where id='%s';"
%
id
)
while
True
:
result
=
commonFuc
().
http_post
(
url
,
request_body
,
headers
)
print
(
'result'
,
result
)
data
=
commonFuc
().
analysis_json
(
'data'
,
result
)
code
=
commonFuc
().
analysis_json
(
'code'
,
result
)
print
(
code
)
if
code
==
0
and
data
==
100
:
break
else
:
print
(
'设置产品合格出现问题'
)
time
.
sleep
(
6
)
continue
buyBillId
=
get_buyBillId
(
branch_id
,
bill_mode
)
if
type
==
1
:
hos_goods_id
=
FileUtils
().
r_info8
(
'b5_spd3_core_business_flow'
,
'产品审核信息'
,
'message2'
)[
"hosGoodsId"
]
elif
type
==
2
:
hos_goods_id
=
FileUtils
().
r_info8
(
'b5_spd3_core_business_flow'
,
'产品审核信息'
,
'message1'
)[
"hosGoodsId"
]
else
:
hos_goods_id
=
FileUtils
().
r_info8
(
'b5_spd3_core_business_flow'
,
'产品审核信息'
,
'message3'
)[
"hosGoodsId"
]
planBillId
=
get_planBillId
(
hos_goods_id
)
planOrderNo
=
get_planOrderNo
(
hos_goods_id
)
buyOrderNo
=
get_buyOrderNo
(
branch_id
,
bill_mode
)
purOrderNo
=
get_purOrderNo
(
branch_id
,
bill_mode
)
purBillId
=
get_purBillId
(
branch_id
,
bill_mode
)
bill_relation_json
=
get_purBillDate2
(
branch_id
,
bill_mode
)
purBillDate
=
reGetString
(
bill_relation_json
,
r
'purBillDate":"'
,
r
'""distrBillId'
)
distrBillId
=
get_distrBillId
(
branch_id
,
bill_mode
)
distrOrderNo
=
get_distrOrderNo
(
branch_id
,
bill_mode
)
checkBillId
=
source_id
sql
=
"select order_no from mcms_check where id='%s';"
%
checkBillId
checkOrderNo
=
get_id
(
sql
)
psiInBillId
=
id
psiInOrderNo
=
order_no
create_user
=
get_login_user_uxid_bydb
(
FileUtils
().
r_info8
(
'b2_herp3_bs'
,
"用户名信息"
,
'message3'
)[
'username1'
])
create_time
=
get_create_time
(
"select create_time from mcms_psi where id='%s';"
%
id
)
last_modified
=
get_create_time
(
"select last_modified from mcms_psi where id='%s';"
%
id
)
last_modified_user
=
create_user
if
type
==
1
:
# 检查子表
check_mcms_psi_batch_in
(
id
,
type
,
order_no
,
source_id
)
expected
=
commonFuc
().
get_business_data
(
'b6_spd3_core_business_flow_database_check'
,
'check_mcms_psi1'
,
id
,
order_no
,
target_corp_id
,
target_branch_id
,
target_dept_id
,
target_dept_name
,
target_stock_id
,
target_area_code
,
source_id
,
source_no
,
source_repl_id
,
source_corp_id
,
source_dept_id
,
source_dept_name
,
source_stock_id
,
accounter
,
account_date
,
buyBillId
,
planBillId
,
planOrderNo
,
buyOrderNo
,
purOrderNo
,
purBillId
,
purBillDate
,
distrBillId
,
distrOrderNo
,
checkBillId
,
checkOrderNo
,
psiInBillId
,
psiInOrderNo
,
create_user
,
create_time
,
last_modified
,
last_modified_user
)
elif
type
==
2
:
check_mcms_psi_batch_in
(
id
,
type
,
order_no
,
source_id
)
expected
=
commonFuc
().
get_business_data
(
'b6_spd3_core_business_flow_database_check'
,
'check_mcms_psi2'
,
id
,
order_no
,
target_corp_id
,
target_branch_id
,
target_dept_id
,
target_dept_name
,
target_stock_id
,
target_area_code
,
source_id
,
source_no
,
source_repl_id
,
source_corp_id
,
source_dept_id
,
source_dept_name
,
source_stock_id
,
accounter
,
account_date
,
buyBillId
,
planBillId
,
planOrderNo
,
buyOrderNo
,
purOrderNo
,
purBillId
,
purBillDate
,
distrBillId
,
distrOrderNo
,
checkBillId
,
checkOrderNo
,
psiInBillId
,
psiInOrderNo
,
create_user
,
create_time
,
last_modified
,
last_modified_user
)
else
:
check_mcms_psi_batch_in
(
id
,
type
,
order_no
,
source_id
)
expected
=
commonFuc
().
get_business_data
(
'b6_spd3_core_business_flow_database_check'
,
'check_mcms_psi3'
,
id
,
order_no
,
target_corp_id
,
target_branch_id
,
target_dept_id
,
target_dept_name
,
target_stock_id
,
target_area_code
,
source_id
,
source_no
,
source_repl_id
,
source_corp_id
,
source_dept_id
,
source_dept_name
,
source_stock_id
,
accounter
,
account_date
,
buyBillId
,
planBillId
,
planOrderNo
,
buyOrderNo
,
purOrderNo
,
purBillId
,
purBillDate
,
distrBillId
,
distrOrderNo
,
checkBillId
,
checkOrderNo
,
psiInBillId
,
psiInOrderNo
,
create_user
,
create_time
,
last_modified
,
last_modified_user
)
# 对比预期值和实际值是否一致
actual_value1
=
get_process_list2
(
actual
)
expected_value1
=
get_process_list2
(
expected
)
actual_value2
=
actual_value1
.
replace
(
" "
,
""
)
expected_value2
=
expected_value1
.
replace
(
" "
,
""
)
# print('actual_value2',actual_value2)
print
(
'expected_value2'
)
print
(
expected_value2
)
print
(
'actual_value2'
)
print
(
actual_value2
)
result
=
(
compare_text_index
(
actual_value2
,
expected_value2
))
print
(
'result'
,
result
)
if
actual_value2
==
expected_value2
:
print
(
'ok'
)
commonFuc
().
check_text_exist_result_text
(
'succees'
,
'succees'
)
else
:
print
(
'error'
)
commonFuc
().
check_text_exist_result_text
(
'error'
,
'succees'
)
# # 检查中心库出库单====================================================================
# sql = "select * from mcms_psi where target_branch_id='%s' and bill_mode='%s' and stock_kind='CK_QL' order by CREATE_TIME desc LIMIT 1;" % (
# branch_id, bill_mode)
# # 获取实际值
# actual = check_mcms_pur_plan_sql(sql)
# print('actual', actual)
# sql = "select id from mcms_psi where target_branch_id='%s' and bill_mode='%s' and stock_kind='CK_QL' order by CREATE_TIME desc LIMIT 1;" % (
# branch_id, bill_mode)
# id_out = get_id(sql)
# sql = "select order_no from mcms_psi where target_branch_id='%s' and bill_mode='%s' and stock_kind='CK_QL' order by CREATE_TIME desc LIMIT 1;" % (
# branch_id, bill_mode)
# order_no_out = get_id(sql)
# target_corp_id = get_hosid()
# target_branch_id = get_branch_id()
# target_dept_id = FileUtils().r_info8("b2_herp3_bs", "所有科室id", 'message6')['deptid2']
# target_dept_name = FileUtils().r_info8("b2_herp3_bs", "所有科室name", 'message6')['deptid2']
# stock_name = target_dept_name + "库房"
# target_stock_id = get_stockId_fromdb(stock_name)
# target_area_code = get_stockAreaId_new_fromDb(target_dept_name, 2)
# source_id_out = id
# source_no_out = order_no
# source_repl_id = source_no
# source_corp_id = get_hosid()
# source_dept_id = FileUtils().r_info8("b2_herp3_bs", "所有科室id", 'message6')['deptid1']
# source_dept_name = FileUtils().r_info8("b2_herp3_bs", "所有科室name", 'message6')['deptid1']
# source_stock_id = get_stockId_fromdb(get_branch_name() + '中心库房')
# accounter = get_login_user_uxid_bydb(FileUtils().r_info8('b2_herp3_bs', "用户名信息", 'message3')['username1'])
# account_date = get_create_time("select account_date from mcms_psi where id='%s';" % id_out)
#
# buyBillId = get_buyBillId(branch_id, bill_mode)
# if type == 1:
# hos_goods_id = FileUtils().r_info8('b5_spd3_core_business_flow', '产品审核信息', 'message2')["hosGoodsId"]
# elif type == 2:
# hos_goods_id = FileUtils().r_info8('b5_spd3_core_business_flow', '产品审核信息', 'message1')["hosGoodsId"]
# else:
# hos_goods_id = FileUtils().r_info8('b5_spd3_core_business_flow', '产品审核信息', 'message3')["hosGoodsId"]
# planBillId = get_planBillId(hos_goods_id)
# planOrderNo = get_planOrderNo(hos_goods_id)
# buyOrderNo = get_buyOrderNo(branch_id, bill_mode)
# purOrderNo = get_purOrderNo(branch_id, bill_mode)
# purBillId = get_purBillId(branch_id, bill_mode)
# bill_relation_json = get_purBillDate2(branch_id, bill_mode)
# purBillDate = reGetString(bill_relation_json, r'purBillDate":"', r'""distrBillId')
# distrBillId = get_distrBillId(branch_id, bill_mode)
# distrOrderNo = get_distrOrderNo(branch_id, bill_mode)
# checkBillId = source_id
# sql = "select order_no from mcms_check where id='%s';" % checkBillId
# checkOrderNo = get_id(sql)
# psiInBillId = id
# psiInOrderNo = order_no
# psiOutBillId = id_out
# psiOutOrderNo = order_no_out
# create_user = get_login_user_uxid_bydb(FileUtils().r_info8('b2_herp3_bs', "用户名信息", 'message3')['username1'])
# create_time = get_create_time("select create_time from mcms_psi where id='%s';" % id_out)
# last_modified = get_create_time("select last_modified from mcms_psi where id='%s';" % id_out)
# last_modified_user = create_user
# source_branch_id = get_branch_id()
# source_area_code = get_stockAreaId_new_fromDb(source_dept_name)
# if type == 1:
# check_mcms_psi_batch_out(id, type, order_no, source_id)
# expected = commonFuc().get_business_data('b6_spd3_core_business_flow_database_check', 'check_mcms_psi_out1',
# id_out,
# order_no_out, target_corp_id, target_branch_id, target_dept_id,
# target_dept_name, target_stock_id
# , target_area_code, source_id_out, source_no_out, source_repl_id,
# source_corp_id, source_branch_id, source_dept_id, source_dept_name
# , source_stock_id, source_area_code, buyBillId, planBillId,
# planOrderNo, buyOrderNo, purOrderNo, purBillId
# , purBillDate, distrBillId, distrOrderNo, checkBillId, checkOrderNo,
# psiInBillId, psiInOrderNo, psiOutBillId, psiOutOrderNo, create_user
# , create_time, last_modified, last_modified_user)
# elif type == 2:
# check_mcms_psi_batch_out(id, type, order_no, source_id)
# expected = commonFuc().get_business_data('b6_spd3_core_business_flow_database_check', 'check_mcms_psi_out2',
# id_out,
# order_no_out, target_corp_id, target_branch_id, target_dept_id,
# target_dept_name, target_stock_id
# , target_area_code, source_id_out, source_no_out, source_repl_id,
# source_corp_id, source_branch_id, source_dept_id, source_dept_name
# , source_stock_id, source_area_code, buyBillId, planBillId,
# planOrderNo, buyOrderNo, purOrderNo, purBillId
# , purBillDate, distrBillId, distrOrderNo, checkBillId, checkOrderNo,
# psiInBillId, psiInOrderNo, psiOutBillId, psiOutOrderNo, create_user
# , create_time, last_modified, last_modified_user)
# else:
# check_mcms_psi_batch_out(id, type, order_no, source_id)
# expected = commonFuc().get_business_data('b6_spd3_core_business_flow_database_check', 'check_mcms_psi_out3',
# id_out,
# order_no_out, target_corp_id, target_branch_id, target_dept_id,
# target_dept_name, target_stock_id
# , target_area_code, source_id_out, source_no_out, source_repl_id,
# source_corp_id, source_branch_id, source_dept_id, source_dept_name
# , source_stock_id, source_area_code, buyBillId, planBillId,
# planOrderNo, buyOrderNo, purOrderNo, purBillId
# , purBillDate, distrBillId, distrOrderNo, checkBillId, checkOrderNo,
# psiInBillId, psiInOrderNo, psiOutBillId, psiOutOrderNo, create_user
# , create_time, last_modified, last_modified_user)
# # 对比预期值和实际值是否一致
# actual_value1 = get_process_list2(actual)
# expected_value1 = get_process_list2(expected)
# actual_value2 = actual_value1.replace(" ", "")
# expected_value2 = expected_value1.replace(" ", "")
# # print('actual_value2',actual_value2)
# print('expected_value2')
# print(expected_value2)
# print('actual_value2')
# print(actual_value2)
# result = (compare_text_index(actual_value2, expected_value2))
# print('result', result)
# if actual_value2 == expected_value2:
# print('ok')
# commonFuc().check_text_exist_result_text('succees', 'succees')
# else:
# print('error')
# commonFuc().check_text_exist_result_text('error', 'succees')
# ========设置验收全部合格
# =============提交验收单========================
url
=
commonFuc
().
get_api_add_port_url
()
+
commonFuc
().
get_business_data
(
module
,
"check_submit_url"
)
deptId
=
recOrgId
deptName
=
recOrgName
provId
=
sourceOrgId
provName
=
sourceOrgName
createUser
=
uxid
sourceNo
=
sourceId
checkQty
=
distrQty
checkPkgQty
=
distrPkgQty
time
.
sleep
(
6
)
request_body
=
commonFuc
().
get_business_data
(
module
,
"payload3"
,
billMode
,
branch_id
,
deptId
,
deptName
,
hos_id
,
provId
,
provName
,
recOrgId
,
recOrgName
,
sourceId
,
sourceVersion
,
sourceType
,
tbStatus
,
createUser
,
sourceNo
,
viewId
,
hosGoodsId
,
viewId
,
settlement
,
checkQty
,
distrPkgQty
,
checkPkgQty
,
recTemperature
,
recHumidity
)
print
(
'request_body'
,
'提交验收单'
,
request_body
)
# 发送请求
while
True
:
time
.
sleep
(
3
)
result
=
commonFuc
().
http_post
(
url
,
request_body
,
headers
)
print
(
'result'
,
result
)
CHECK_NO
=
commonFuc
().
analysis_json
(
'data'
,
result
)
code
=
commonFuc
().
analysis_json
(
'code'
,
result
)
if
code
==
0
:
break
info
=
(
CHECK_NO
,
CHECK_NO
)
titlename
=
(
'CHECK_NO1'
,
'CHECK_NO2'
)
FileUtils
().
w_info8
(
info
,
'b5_spd3_core_business_flow'
,
'验收单号'
,
titlename
,
'message15'
)
# 入库上架
def
putaway
():
# # 登录获取用户id等信息,使用创建的用户登录===========开始
info
=
FileUtils
().
r_info8
(
'b2_herp3_bs'
,
"用户名信息"
,
'message3'
)
# print(info)
username
=
info
[
'username1'
]
password
=
commonFuc
().
get_business_data
(
"b2_herp3_bs"
,
"password"
)
# username = '001f90380'
# password='1qaz!QAZ'
print
(
'username,password'
,
username
,
password
)
# # print(username, password,'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')
# # 获取token和projectCode
token
,
projectCode
,
uxid
,
corpId
,
info
=
login_system
(
username
,
password
).
get_token
()
# 登录获取用户id等信息,使用创建的用户登录============结束
# =====================上架入库列表查询==========================
headers
=
commonFuc
().
get_business_data
(
module
,
"json_headers2"
,
commonFuc
().
get_business_data
(
module
,
"json_contentType"
),
token
,
projectCode
)
url
=
commonFuc
().
get_api_add_port_url
()
+
commonFuc
().
get_business_data
(
module
,
"stock_in_page_url"
)
print
(
url
)
branch_id
=
FileUtils
().
r_info
(
'b2_herp3_bs'
,
'院区新增'
)[
"branch_id"
]
hosId
=
branch_id
[
0
:
5
]
request_body
=
commonFuc
().
get_business_data
(
module
,
"payload6"
,
branch_id
,
hosId
)
result
=
commonFuc
().
http_post
(
url
,
request_body
,
headers
)
print
(
'result'
,
result
)
data
=
commonFuc
().
analysis_json
(
'data'
,
result
)
total
=
commonFuc
().
analysis_json
(
'total'
,
data
)
if
total
>=
1
:
# 将上架入库列表数据写入文件
data
=
commonFuc
().
analysis_json
(
'data'
,
result
)
data1
=
commonFuc
().
analysis_json
(
'data'
,
data
)
list_a
=
[]
list_b
=
[]
list_c
=
[]
list_a
.
append
(
data1
)
list_b
.
append
(
data1
)
list_c
.
append
(
data1
)
# print('list_a', len(list_a), list_a)
info
=
(
list_a
,
list_b
,
list_c
)
titlename
=
(
'list_a'
,
'list_b'
,
'list_c'
)
# 将验收单详细信息写入到文件中
FileUtils
().
w_info8
(
info
,
'b5_spd3_core_business_flow'
,
'上架入库列表数据'
,
titlename
,
'message16'
)
# =====================上架入库列表查询==========================
# =============查看验收单详情========================
url
=
commonFuc
().
get_api_add_port_url
()
+
commonFuc
().
get_business_data
(
module
,
"stock_detail_url"
)
print
(
'查看验收单详情'
,
url
)
deptid1
=
FileUtils
().
r_info8
(
'b2_herp3_bs'
,
'所有科室id'
,
'message6'
)[
"deptid1"
]
areaCode
=
FileUtils
().
r_info8
(
'b2_herp3_bs'
,
'shelfinfo'
,
'message7'
)[
"shelfCode"
]
branch_id
=
FileUtils
().
r_info
(
'b2_herp3_bs'
,
'院区新增'
)[
"branch_id"
]
hosId
=
branch_id
[
0
:
5
]
# 从message16文件中获取信息
info
=
FileUtils
().
r_info8
(
'b5_spd3_core_business_flow'
,
"上架入库列表数据"
,
'message16'
)
list_a
=
info
[
'list_a'
]
print
(
'list_a'
,
list_a
)
sourceId
=
'sourceId'
for
i
in
list_a
:
for
j
in
i
:
sourceId
=
j
[
'sourceId'
]
break
# 从message13文件中获取信息
info
=
FileUtils
().
r_info8
(
'b5_spd3_core_business_flow'
,
"验收单信息"
,
'message13'
)
# print('info', info)
list_a
=
info
[
'list_a'
]
print
(
'list_a'
,
list_a
)
sourceType
=
"sourceType"
for
i
in
list_a
:
print
(
'i'
,
i
)
for
j
in
i
:
sourceType
=
j
[
"sourceType"
]
break
def
check_mcms_psi_batch_in
(
DR_ID
,
type
,
order_no
,
source_id1
):
if
type
==
1
:
bill_mode
=
16
elif
type
==
2
:
bill_mode
=
20
else
:
bill_mode
=
66
branch_id
=
get_branch_id
()
sql
=
" select * from mcms_psi_batch where pid='%s';"
%
(
DR_ID
)
actual
=
check_mcms_dept_buy_sql
(
sql
)
print
(
'actual'
,
actual
)
batch_main_key
=
get_id
(
"select id from mcms_psi_batch where pid='%s';"
%
DR_ID
)
pid
=
DR_ID
# order_no = get_id("select order_no from mcms_psi_dept_batch where pid='%s';" % DR_ID)
source_detail_id
=
source_id1
+
'0001'
pick_detail_id
=
None
if
type
==
1
:
hos_goods_id
=
FileUtils
().
r_info8
(
'b5_spd3_core_business_flow'
,
'产品审核信息'
,
'message2'
)[
"hosGoodsId"
]
elif
type
==
2
:
hos_goods_id
=
FileUtils
().
r_info8
(
'b5_spd3_core_business_flow'
,
'产品审核信息'
,
'message1'
)[
"hosGoodsId"
]
else
:
hos_goods_id
=
FileUtils
().
r_info8
(
'b5_spd3_core_business_flow'
,
'产品审核信息'
,
'message3'
)[
"hosGoodsId"
]
mdm_goods_code
=
get_mdm_goods_code1
(
hos_goods_id
)
goods_code
=
get_goods_code1
(
hos_goods_id
)
pkg_def_id
=
get_pkg_def_id
(
hos_goods_id
)
picker_name
=
FileUtils
().
r_info8
(
"b2_herp3_bs"
,
"用户名信息"
,
'message3'
)[
'username1'
]
picker
=
get_login_user_uxid_bydb
(
picker_name
)
source_batch_id
=
get_id
(
"select source_batch_id from mcms_psi_batch where pid='%s';"
%
DR_ID
)
distrBillId
=
get_distrBillId
(
branch_id
,
bill_mode
)
target_batch_id
=
get_id
(
"select id from mcms_hos_batch where hos_goods_id='%s' order by create_time desc LIMIT 1;"
%
hos_goods_id
)
if
check_batch_id
(
source_batch_id
)
==
True
:
print
(
'ok'
)
commonFuc
().
check_text_exist_result_text
(
'succees'
,
'succees'
)
else
:
commonFuc
().
check_text_exist_result_text
(
'error'
,
'succees'
)
barcode_id
=
get_id
(
"select barcode_id from mcms_psi_batch where pid='%s';"
%
DR_ID
)
if
check_barcode_id
(
barcode_id
)
==
True
:
print
(
'ok'
)
commonFuc
().
check_text_exist_result_text
(
'succees'
,
'succees'
)
else
:
commonFuc
().
check_text_exist_result_text
(
'error'
,
'succees'
)
dept_name
=
FileUtils
().
r_info8
(
"b2_herp3_bs"
,
"所有科室name"
,
'message6'
)[
'deptid2'
]
shelf_code
=
get_stockAreaId_new_fromDb
(
get_branch_name
(),
1
)
create_user
=
get_login_user_uxid_bydb
(
FileUtils
().
r_info8
(
'b2_herp3_bs'
,
"用户名信息"
,
'message3'
)[
'username1'
])
create_time
=
get_create_time
(
"select create_time from mcms_psi_batch where pid='%s';"
%
DR_ID
)
last_modified
=
get_create_time
(
"select last_modified from mcms_psi_batch where pid='%s';"
%
DR_ID
)
last_modified_user
=
create_user
if
type
==
1
:
expected
=
commonFuc
().
get_business_data
(
'b6_spd3_core_business_flow_database_check'
,
'mcms_psi_batch_in1'
,
batch_main_key
,
pid
,
order_no
,
source_detail_id
,
hos_goods_id
,
mdm_goods_code
,
goods_code
,
source_batch_id
,
target_batch_id
,
barcode_id
,
shelf_code
,
create_user
,
create_time
,
last_modified
,
last_modified_user
request_body
=
commonFuc
().
get_business_data
(
module
,
"payload4"
,
deptid1
,
areaCode
,
hosId
,
sourceId
,
sourceType
)
)
elif
type
==
2
:
expected
=
commonFuc
().
get_business_data
(
'b6_spd3_core_business_flow_database_check'
,
'mcms_psi_batch_in2'
,
batch_main_key
,
pid
,
order_no
,
source_detail_id
,
hos_goods_id
,
mdm_goods_code
,
goods_code
,
source_batch_id
,
target_batch_id
,
barcode_id
,
shelf_code
,
create_user
,
create_time
,
last_modified
,
last_modified_user
print
(
'request_body'
,
'd'
,
request_body
)
result
=
commonFuc
().
http_post
(
url
,
request_body
,
headers
)
print
(
'result'
,
result
)
# 将验收单详情写入文件
data
=
commonFuc
().
analysis_json
(
'data'
,
result
)
list_a
=
[]
list_b
=
[]
list_c
=
[]
list_a
.
append
(
data
)
list_b
.
append
(
data
)
list_c
.
append
(
data
)
# print('list_a', len(list_a), list_a)
info
=
(
list_a
,
list_b
,
list_c
)
titlename
=
(
'list_a'
,
'list_b'
,
'list_c'
)
# 将验收单详细信息写入到文件中
FileUtils
().
w_info8
(
info
,
'b5_spd3_core_business_flow'
,
'验收单库存详情信息'
,
titlename
,
'message17'
)
time
.
sleep
(
6
)
# ============= # =============查看验收单详情================================================
# ================================入库上架===================
url
=
commonFuc
().
get_api_add_port_url
()
+
commonFuc
().
get_business_data
(
module
,
"stock_detail_submit_url"
)
print
(
url
)
viewId
=
'viewId'
shelfCode
=
'shelfCode'
sourceId
=
'sourceId'
sourceType
=
'PSD'
sourceVersion
=
'0'
targetDeptId
=
deptid1
targetDeptName
=
'targetDeptName'
targetAreaCode
=
'targetAreaCode'
targetAreaName
=
'targetAreaName'
# 从message17文件中获取信息
info
=
FileUtils
().
r_info8
(
'b5_spd3_core_business_flow'
,
"验收单库存详情信息"
,
'message17'
)
# print('info', info)
list_a
=
info
[
'list_a'
]
print
(
'list_a'
,
list_a
)
for
i
in
list_a
:
print
(
'i'
,
i
)
for
j
in
i
:
viewId
=
j
[
"viewId"
]
shelfCode
=
j
[
'shelfCode'
]
sourceId
=
j
[
'id'
]
break
# 从message16文件中获取信息
info
=
FileUtils
().
r_info8
(
'b5_spd3_core_business_flow'
,
"上架入库列表数据"
,
'message16'
)
list_a
=
info
[
'list_a'
]
print
(
'list_a'
,
list_a
)
for
i
in
list_a
:
for
j
in
i
:
sourceId
=
j
[
'sourceId'
]
targetAreaName
=
j
[
'areaName'
]
targetAreaCode
=
j
[
'areaCode'
]
break
request_body
=
commonFuc
().
get_business_data
(
module
,
"payload5"
,
viewId
,
shelfCode
,
sourceId
,
sourceType
,
sourceVersion
,
targetDeptId
,
targetDeptName
,
targetAreaCode
,
targetAreaName
)
print
(
'request_body'
,
request_body
)
time
.
sleep
(
3
)
result
=
commonFuc
().
http_post
(
url
,
request_body
,
headers
)
print
(
'result'
,
result
)
# ================================入库上架================================
)
else
:
expected
=
commonFuc
().
get_business_data
(
'b6_spd3_core_business_flow_database_check'
,
'mcms_psi_batch_in3'
,
batch_main_key
,
pid
,
order_no
,
source_detail_id
,
hos_goods_id
,
mdm_goods_code
,
goods_code
,
source_batch_id
,
target_batch_id
,
barcode_id
,
shelf_code
,
create_user
,
create_time
,
last_modified
,
last_modified_user
)
# def settle(): # 结算相关
# # # 登录获取用户id等信息,使用创建的用户登录===========开始
# info = FileUtils().r_info8('b2_herp3_bs', "用户名信息", 'message3')
# # print(info)
# username = info['username1']
# password = commonFuc().get_business_data("b2_herp3_bs", "password")
#
# print('username,password', username, password)
# # # print(username, password,'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')
# # # 获取token和projectCode
# token, projectCode, uxid, corpId, info = login_system(username, password).get_token()
# # 登录获取用户id等信息,使用创建的用户登录============结束
# headers = commonFuc().get_business_data(module, "json_headers2",
# commonFuc().get_business_data(module, "json_contentType"), token,
# projectCode)
# # ================生成结算单列表查询===================
# url = commonFuc().get_api_add_port_url() + commonFuc().get_business_data(module, "mcmsOutUnBalance_listVoPage_url")
# print(url)
# branch_id = FileUtils().r_info('b2_herp3_bs', '院区新增')["branch_id"]
# hosId = branch_id[0:5]
# request_body = commonFuc().get_business_data(module, "payload7", hosId, hosId)
# result = commonFuc().http_post(url, request_body, headers)
# print('result', result)
# data = commonFuc().analysis_json('data', result)
# data1 = commonFuc().analysis_json('data', data)
# # 取结算单id
# list_a = []
# for i in data1:
# id = commonFuc().analysis_json('id', i)
# list_a.append(id)
# print(list_a)
# print(list_a[0], list_a[1], type(list_a[0]))
# # ===================生成结算单列表查询======================
# # ================== 生成结算单==============================
# url = commonFuc().get_api_add_port_url() + commonFuc().get_business_data(module,
# "mcmsOutUnBalance_genBalanceBill_url")
# print(url)
# request_body = commonFuc().get_business_data(module, "payload8", hosId, list_a[0], list_a[1], hosId)
# print('request_body', request_body)
# result = commonFuc().http_post(url, request_body, headers)
# print('result', result)
# # ================== 生成结算单==============================
print
(
'expected'
,
expected
)
print
(
'aaaaaaaaaaaaaaaaa'
)
actual_value1
=
get_process_list2
(
actual
)
expected_value1
=
get_process_list2
(
expected
)
actual_value2
=
actual_value1
.
replace
(
" "
,
""
)
expected_value2
=
expected_value1
.
replace
(
" "
,
""
)
# print('actual_value2',actual_value2)
print
(
'expected_value2'
)
print
(
expected_value2
)
print
(
'actual_value2'
)
print
(
actual_value2
)
print
(
compare_text_index
(
actual_value2
,
expected_value2
))
if
actual_value2
==
expected_value2
:
print
(
'ok'
)
commonFuc
().
check_text_exist_result_text
(
'succees'
,
'succees'
)
else
:
print
(
'error'
)
commonFuc
().
check_text_exist_result_text
(
'error'
,
'succees'
)
pro_path
=
commonFuc
().
get_pro_path2
()
print
(
'pro_path'
,
pro_path
)
sys
.
path
.
append
(
pro_path
+
r
'/air_case/b5_spd3_core_business_flow/a_a7流程五_中心库入库结算_正结算3_内网验收入库结算.air'
)
using
(
pro_path
+
r
'//air_case/b5_spd3_core_business_flow/a_a7流程五_中心库入库结算_正结算3_内网验收入库结算.air'
)
from
a_a7流程五_中心库入库结算_正结算3_内网验收入库结算
import
check_accept_order
,
putaway
try
:
# 验收
check_accept_order
(
1
)
# 低值
time
.
sleep
(
6
)
check_mcms_check
(
1
)
time
.
sleep
(
3
)
check_accept_order
(
2
)
# 高值
time
.
sleep
(
6
)
check_mcms_check
(
2
)
time
.
sleep
(
3
)
check_accept_order
(
3
)
# 试剂
time
.
sleep
(
6
)
check_mcms_check
(
3
)
# 入库上架
putaway
()
putaway
()
...
...
common/db/sql/sql_del_branch_info.py
View file @
7221ea16
...
...
@@ -829,15 +829,19 @@ class delData(object):
# 查询脚本创建的的院区
sql1
=
"select id from mcms_branch_info where name like '%%东土城路院区%%' and id <>'%s';"
%
no_branch_id
cursor
.
execute
(
sql1
)
results
=
cursor
.
fetchone
()
str1
=
str
(
results
)
new_str1
=
str1
.
replace
(
',)'
,
''
)
new_str1
=
new_str1
.
replace
(
'('
,
''
)
new_str1
=
new_str1
.
replace
(
','
,
''
)
new_str1
=
new_str1
.
replace
(
"'"
,
''
)
# results = cursor.fetchone()
results
=
cursor
.
fetchall
()
list_1
=
[]
for
i
in
results
:
str1
=
str
(
i
)
new_str1
=
str1
.
replace
(
',)'
,
''
)
new_str1
=
new_str1
.
replace
(
'('
,
''
)
new_str1
=
new_str1
.
replace
(
','
,
''
)
new_str1
=
new_str1
.
replace
(
"'"
,
''
)
list_1
.
append
(
new_str1
)
# print(new_str1)
cursor
.
close
()
return
new_
st
r
1
return
li
st
_
1
# delData().Delete_branch_by_id()
# delData().Delete_goods_change_info()
...
...
main1/main_text.py
View file @
7221ea16
...
...
@@ -301,15 +301,17 @@ def main_text():
root
.
geometry
(
"+{}+{}"
.
format
(
x
,
y
))
root
.
resizable
(
False
,
False
)
combo
=
ttk
.
Combobox
(
root
)
branch_id
=
delData
().
get_branch_id_bydb
(
FileUtils
().
r_info8
(
'b2_herp3_bs'
,
"院区新增"
,
'message'
)[
'branch_id'
])
combo
[
"values"
]
=
(
branch_id
)
combo
.
current
(
0
)
# 设置默认选项
combo
.
bind
(
"<<ComboboxSelected>>"
,
dropdown_changed
)
combo
.
pack
()
button
=
tk
.
Button
(
root
,
text
=
"删除此院区数据"
,
command
=
lambda
:
button_clicked
(
branch_id
))
button
.
pack
()
branch_id_list
=
delData
().
get_branch_id_bydb
(
FileUtils
().
r_info8
(
'b2_herp3_bs'
,
"院区新增"
,
'message'
)[
'branch_id'
])
for
i
in
branch_id_list
:
combo
[
"values"
]
=
(
i
)
combo
.
current
(
0
)
# 设置默认选项
combo
.
bind
(
"<<ComboboxSelected>>"
,
dropdown_changed
)
combo
.
pack
()
button
=
tk
.
Button
(
root
,
text
=
"删除此院区数据"
,
command
=
lambda
:
button_clicked
(
i
))
button
.
pack
()
root
.
mainloop
()
root
.
mainloop
()
def
button_clicked
(
branch_id
):
result
=
confirm
()
...
...
@@ -326,6 +328,7 @@ def main_text():
delData
().
Delete_business_Data_All2
()
delData
().
Delete_branch_by_id
(
3
,
branch_id
)
print
(
branch_id
,
'删除完毕'
)
def
get_input
():
import
tkinter
as
tk
from
tkinter
import
simpledialog
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment