Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in / Register
Toggle navigation
Menu
Open sidebar
test
autotest-airtest-web-spd
Commits
2b9824e9
Commit
2b9824e9
authored
Sep 14, 2024
by
xiao-hesheng
Browse files
流程五脚本提交
parent
29de8abd
Changes
5
Hide whitespace changes
Inline
Side-by-side
air_case/b5_spd3_core_business_flow/a_b4流程十一请领业务流_二级库向直送.air/a_b4流程十一请领业务流_二级库向直送.py
View file @
2b9824e9
...
...
@@ -859,7 +859,8 @@ def main():
approval_center
(
FileUtils
().
r_info8
(
'b5_spd3_core_business_flow'
,
'采购计划单号试剂'
,
'message12'
))
approval_center
(
FileUtils
().
r_info8
(
'b5_spd3_core_business_flow'
,
'采购计划单号'
,
'message7'
))
approval_center
(
FileUtils
().
r_info8
(
'b5_spd3_core_business_flow'
,
'采购计划单号高值'
,
'message11'
))
#外网配送前再检查一下是否有单据未审批
approval_center_all
()
order_dp
(
1
)
# 1低值
order_dp
(
2
)
# 2高值
order_dp
(
3
)
# 3试剂
...
...
air_case/b6_spd3_core_business_flow_database_check/a_b0流程七_中心库入库结算_正负结算_数据库验证.air/a_b0流程七_中心库入库结算_正负结算_数据库验证.py
View file @
2b9824e9
...
...
@@ -18,206 +18,193 @@ case_tag:api,b6_spd3_core_business_flow_database_check,a_b0流程七_中心库
python runner_test.py tag id2303-10 debug mdm3
"""
def
check_mcms_psi_out
(
type
=
2
):
print
(
'检查中心库出库单'
)
if
type
==
1
:
bill_mode
=
16
elif
type
==
2
:
bill_mode
=
25
else
:
bill_mode
=
66
branch_id
=
get_branch_id
()
sql
=
"select * from mcms_psi where source_branch_id='%s' and bill_mode='%s' and stock_kind='CK_TH' order by CREATE_TIME desc LIMIT 1;"
%
(
branch_id
,
bill_mode
)
# 获取实际值
actual
=
check_mcms_pur_plan_sql
(
sql
)
print
(
'actual'
,
actual
)
sql
=
"select id from mcms_psi where source_branch_id='%s' and bill_mode='%s' and stock_kind='CK_TH' order by CREATE_TIME desc LIMIT 1;"
%
(
branch_id
,
bill_mode
)
id
=
get_id
(
sql
)
sql
=
"select order_no from mcms_psi where source_branch_id='%s' and bill_mode='%s' and stock_kind='CK_TH' order by CREATE_TIME desc LIMIT 1;"
%
(
branch_id
,
bill_mode
)
order_no
=
get_id
(
sql
)
target_corp_id
=
get_hosid
()
target_branch_id
=
get_branch_id
()
target_dept_id
=
FileUtils
().
r_info8
(
"b2_herp3_bs"
,
"所有科室id"
,
'message6'
)[
'deptid1'
]
target_dept_name
=
FileUtils
().
r_info8
(
"b2_herp3_bs"
,
"所有科室name"
,
'message6'
)[
'deptid1'
]
stock_name
=
get_branch_name
()
+
"中心库房"
target_stock_id
=
get_stockId_fromdb
(
get_branch_name
()
+
'中心库房'
)
target_area_code
=
get_stockAreaId_new_fromDb
(
target_dept_name
,
1
)
source_id
=
get_id
(
"select id from mcms_psi_dept where target_branch_id='%s' and stock_kind='CK_TK' order by CREATE_TIME desc LIMIT 1;"
%
branch_id
)
source_no
=
None
source_repl_id
=
None
source_corp_id
=
get_hosid
()
source_branch_id
=
get_branch_id
()
source_dept_id
=
FileUtils
().
r_info8
(
"b2_herp3_bs"
,
"所有科室id"
,
'message6'
)[
'deptid2'
]
source_dept_name
=
FileUtils
().
r_info8
(
"b2_herp3_bs"
,
"所有科室name"
,
'message6'
)[
'deptid2'
]
source_stock_id
=
get_stockId_fromdb
(
source_dept_name
+
'库房'
)
source_area_code
=
get_stockAreaId_new_fromDb
(
source_dept_name
,
2
)
accounter
=
get_login_user_uxid_bydb
(
FileUtils
().
r_info8
(
'b2_herp3_bs'
,
"用户名信息"
,
'message3'
)[
'username1'
])
account_date
=
get_create_time
(
"select account_date from mcms_psi where id='%s';"
%
id
)
buyBillId
=
get_buyBillId
(
branch_id
,
bill_mode
)
if
type
==
1
:
hos_goods_id
=
FileUtils
().
r_info8
(
'b5_spd3_core_business_flow'
,
'产品审核信息'
,
'message2'
)[
"hosGoodsId"
]
elif
type
==
2
:
hos_goods_id
=
FileUtils
().
r_info8
(
'b5_spd3_core_business_flow'
,
'产品审核信息'
,
'message1'
)[
"hosGoodsId"
]
else
:
hos_goods_id
=
FileUtils
().
r_info8
(
'b5_spd3_core_business_flow'
,
'产品审核信息'
,
'message3'
)[
"hosGoodsId"
]
pickBillId
=
get_PICK_id_KS_TK
(
branch_id
,
bill_mode
)
pickOrderNo
=
get_id
(
"select order_no from mcms_pick where id='%s';"
%
pickBillId
)
def
test1
():
# 提交采购计划
execute_command
(
"python runner_test.py tag id2302-5 debug mdm3"
)
# 外网配送
execute_command
(
"python runner_test.py tag id2302-6 debug mdm3"
)
# 内网验收入库结算
execute_command
(
"python runner_test.py tag id2302-7 debug mdm3"
)
# 中心库退货
def
center_warehouse_return
():
module
=
"b5_spd3_core_business_flow"
# # 登录获取用户id等信息,使用创建的用户登录===========开始
info
=
FileUtils
().
r_info8
(
'b2_herp3_bs'
,
"用户名信息"
,
'message3'
)
username
=
info
[
'username1'
]
password
=
commonFuc
().
get_business_data
(
"b2_herp3_bs"
,
"password"
)
print
(
'username,password'
,
username
,
password
)
# # 获取token和projectCode
token
,
projectCode
,
uxid
,
corpId
,
info
=
login
(
username
,
password
,
2
)
# 登录获取用户id等信息,使用创建的用户登录============结束
# 到库房明细查询,查询产品的udi码
get_udi
(
1
)
url
=
commonFuc
().
get_api_add_port_url
()
+
commonFuc
().
get_business_data
(
module
,
"herpService_pick_submit_url"
)
print
(
'url'
,
url
)
headers
=
commonFuc
().
get_business_data
(
module
,
"json_headers2"
,
commonFuc
().
get_business_data
(
module
,
"json_contentType"
),
token
,
commonFuc
().
get_business_data
(
module
,
"X-APP-CODE"
))
#从文件中获取必要信息
info
=
FileUtils
().
r_info8
(
module
,
"条码信息"
,
'message18'
)
# print('info', info)
list_a
=
info
[
'list_a'
]
print
(
'list_a'
,
list_a
[
0
],
type
(
list_a
),
len
(
list_a
))
UTid
=
'UT'
areaCode
=
'areaCode'
createTime
=
'createTime'
hosId
=
'hosId'
stockId
=
'stockId'
areaCode
=
'areaCode'
areaName
=
'areaName'
shelfCode
=
'shelfCode'
codeType
=
'codeType'
tagType
=
'tagType'
hosGoodsId
=
'hosGoodsId'
mdmGoodsCode
=
'mdmGoodsCode'
goodsCode
=
'goodsCode'
goodsDi
=
'goodsDi'
barName
=
'barName'
batchCode
=
'batchCode'
expdtDate
=
'expdtDate'
productDate
=
'productDate'
pkgCode
=
'pkgCode'
pkgCodeSee
=
'pkgCodeSee'
ssccCode
=
'ssccCode'
tbStatus
=
'tbStatus'
pkgDefQty
=
'pkgDefQty'
purBillId
=
'purBillId'
distrBillId
=
'distrBillId'
inStockTime
=
'inStockTime'
distrDetailId
=
'distrDetailId'
purMode
=
'purMode'
goodsMfrsName
=
'goodsMfrsName'
goodsSpec
=
'goodsSpec'
unit
=
'unit'
goodsName
=
'goodsName'
printNum
=
'printNum'
made
=
'made'
provId
=
'provId'
provName
=
'provName'
subProvId
=
'subProvId'
subProvName
=
'subProvName'
hosGoodsCode
=
'hosGoodsCode'
subSendFlag
=
'subSendFlag'
onlyKey
=
'onlyKey'
pkgDefId
=
'pkgDefId'
pkgDefName
=
'pkgDefName'
print
(
'aaaaaaaaaaaaaaaaaaaaaaaaa'
,
list_a
[
0
])
list_b
=
list_a
[
0
]
for
i
in
list_b
:
print
(
i
)
UTid
=
i
[
'id'
]
areaCode
=
i
[
'areaCode'
]
createTime
=
i
[
'createTime'
]
hosId
=
i
[
'hosId'
]
stockId
=
i
[
'stockId'
]
areaCode
=
i
[
'areaCode'
]
areaName
=
i
[
'areaName'
]
shelfCode
=
i
[
'shelfCode'
]
codeType
=
i
[
'codeType'
]
tagType
=
i
[
'tagType'
]
hosGoodsId
=
i
[
'hosGoodsId'
]
mdmGoodsCode
=
i
[
'mdmGoodsCode'
]
goodsCode
=
i
[
'goodsCode'
]
goodsDi
=
i
[
'goodsDi'
]
barName
=
i
[
'barName'
]
batchCode
=
i
[
'batchCode'
]
expdtDate
=
i
[
'expdtDate'
]
productDate
=
i
[
'productDate'
]
pkgCode
=
i
[
'pkgCode'
]
pkgCodeSee
=
i
[
'pkgCodeSee'
]
ssccCode
=
i
[
'ssccCode'
]
tbStatus
=
i
[
'tbStatus'
]
onlyKey
=
i
[
'hosGoodsId'
]
ext
=
i
[
'ext'
]
print
(
'ext'
,
ext
)
distrDetailId
=
ext
[
'distrDetailId'
]
distrBillId
=
ext
[
'distrBillId'
]
purBillId
=
ext
[
'purBillId'
]
pkgDefQty
=
ext
[
'pkgDefQty'
]
pkgDefId
=
ext
[
'pkgDefId'
]
pkgDefName
=
ext
[
'pkgDefName'
]
purMode
=
i
[
'purMode'
]
goodsMfrsName
=
i
[
'goodsMfrsName'
]
goodsSpec
=
i
[
'goodsSpec'
]
unit
=
i
[
'unit'
]
goodsName
=
i
[
'goodsName'
]
pkgDefId
=
i
[
'pkgDefId'
]
pkgDefQty
=
i
[
'pkgDefQty'
]
pkgDefName
=
i
[
'pkgDefName'
]
printNum
=
i
[
'printNum'
]
made
=
i
[
'made'
]
provId
=
i
[
'provId'
]
provName
=
i
[
'provName'
]
subProvId
=
i
[
'subProvId'
]
subProvName
=
i
[
'subProvName'
]
hosGoodsCode
=
i
[
'hosGoodsCode'
]
subSendFlag
=
i
[
'subSendFlag'
]
areaName
=
i
[
'areaName'
]
hosId
=
i
[
'hosId'
]
break
deptid1
=
FileUtils
().
r_info8
(
'b2_herp3_bs'
,
'所有科室id'
,
'message6'
)[
"deptid1"
]
deptName
=
FileUtils
().
r_info8
(
'b2_herp3_bs'
,
'所有科室name'
,
'message6'
)[
"deptid1"
]
# # 请求体
request_body
=
commonFuc
().
get_business_data
(
module
,
"payload19"
,
areaCode
,
UTid
,
createTime
,
UTid
,
hosId
,
stockId
,
areaCode
,
areaName
,
shelfCode
,
codeType
,
tagType
,
hosGoodsId
,
mdmGoodsCode
,
goodsCode
,
goodsDi
,
barName
,
batchCode
,
expdtDate
,
productDate
,
pkgCode
,
pkgCodeSee
,
ssccCode
,
tbStatus
,
distrDetailId
,
distrBillId
,
purBillId
,
pkgDefQty
,
pkgDefId
,
pkgDefName
,
purMode
,
goodsMfrsName
,
goodsSpec
,
unit
,
goodsName
,
pkgDefId
,
pkgDefQty
,
pkgDefName
,
printNum
,
made
,
provId
,
provName
,
subProvId
,
subProvName
,
hosGoodsCode
,
subSendFlag
,
onlyKey
,
areaName
,
hosId
,
get_branch_id
(),
deptid1
,
deptName
,
stockId
)
print
(
'request_body'
,
'ccccccc'
,
request_body
)
# # # # 发送请求
result
=
commonFuc
().
http_post
(
url
,
request_body
,
headers
)
print
(
'result'
,
result
)
PICK_ID
=
commonFuc
().
analysis_json
(
'data'
,
result
)
# =====================出库复核列表查询获取pickid===============
url
=
commonFuc
().
get_api_add_port_url
()
+
commonFuc
().
get_business_data
(
module
,
"herpService_pick_return_page_url"
)
print
(
'url'
,
url
)
request_body
=
commonFuc
().
get_business_data
(
module
,
"payload20"
,
get_branch_id
(),
get_hosid
(),
deptid1
)
# # # # 发送请求
result
=
commonFuc
().
http_post
(
url
,
request_body
,
headers
)
planBillId
=
get_planBillId
(
hos_goods_id
)
psiOutBillId
=
get_id
(
"select id from mcms_psi_dept where target_branch_id='%s' and stock_kind='CK_TK' order by CREATE_TIME desc LIMIT 1;"
%
branch_id
)
psiOutOrderNo
=
get_id
(
"select order_no from mcms_psi_dept where id='%s';"
%
psiOutBillId
)
buyOrderNo
=
get_buyOrderNo
(
branch_id
,
bill_mode
)
purOrderNo
=
get_purOrderNo
(
branch_id
,
bill_mode
)
purBillId
=
get_purBillId
(
branch_id
,
bill_mode
)
bill_relation_json
=
get_purBillDate2
(
branch_id
,
bill_mode
)
purBillDate
=
reGetString
(
bill_relation_json
,
r
'purBillDate":"'
,
r
'""distrBillId'
)
distrBillId
=
get_distrBillId
(
branch_id
,
bill_mode
)
distrOrderNo
=
get_distrOrderNo
(
branch_id
,
bill_mode
)
checkBillId
=
source_id
sql
=
"select order_no from mcms_check where id='%s';"
%
checkBillId
checkOrderNo
=
get_id
(
sql
)
psiInBillId
=
id
psiInOrderNo
=
order_no
create_user
=
get_login_user_uxid_bydb
(
FileUtils
().
r_info8
(
'b2_herp3_bs'
,
"用户名信息"
,
'message3'
)[
'username1'
])
create_time
=
get_create_time
(
"select create_time from mcms_psi where id='%s';"
%
id
)
last_modified
=
get_create_time
(
"select last_modified from mcms_psi where id='%s';"
%
id
)
last_modified_user
=
create_user
check_mcms_psi_batch_out2
(
id
,
type
,
order_no
,
source_id
)
expected
=
commonFuc
().
get_business_data
(
'b6_spd3_core_business_flow_database_check'
,
'check_mcms_psi_out_0913'
,
id
,
order_no
,
target_corp_id
,
target_branch_id
,
target_dept_id
,
target_dept_name
,
target_stock_id
,
target_area_code
,
source_id
,
source_corp_id
,
source_branch_id
,
source_dept_id
,
source_dept_name
,
source_stock_id
,
source_area_code
,
accounter
,
account_date
,
pickOrderNo
,
pickBillId
,
psiOutBillId
,
psiOutOrderNo
,
psiInBillId
,
psiInOrderNo
,
create_user
,
create_time
,
last_modified
,
last_modified_user
)
# 对比预期值和实际值是否一致
actual_value1
=
get_process_list2
(
actual
)
expected_value1
=
get_process_list2
(
expected
)
actual_value2
=
actual_value1
.
replace
(
" "
,
""
)
expected_value2
=
expected_value1
.
replace
(
" "
,
""
)
# print('actual_value2',actual_value2)
print
(
'expected_value2'
)
print
(
expected_value2
)
print
(
'actual_value2'
)
print
(
actual_value2
)
result
=
(
compare_text_index
(
actual_value2
,
expected_value2
))
print
(
'result'
,
result
)
if
actual_value2
==
expected_value2
:
print
(
'ok'
)
commonFuc
().
check_text_exist_result_text
(
'succees'
,
'succees'
)
else
:
print
(
'error'
)
commonFuc
().
check_text_exist_result_text
(
'error'
,
'succees'
)
def
check_mcms_psi_batch_out2
(
DR_ID
,
type
,
order_no
,
source_id1
):
if
type
==
1
:
bill_mode
=
16
elif
type
==
2
:
bill_mode
=
25
else
:
bill_mode
=
66
branch_id
=
get_branch_id
()
sql
=
" select * from mcms_psi_batch where pid='%s';"
%
(
DR_ID
)
actual
=
check_mcms_dept_buy_sql
(
sql
)
print
(
'actual'
,
actual
)
batch_main_key
=
get_id
(
"select id from mcms_psi_batch where pid='%s';"
%
DR_ID
)
pid
=
DR_ID
# order_no = get_id("select order_no from mcms_psi_dept_batch where pid='%s';" % DR_ID)
sql
=
"select id from mcms_psi_dept where target_branch_id='%s' and stock_kind='CK_TK' order by CREATE_TIME desc LIMIT 1;"
%
branch_id
source_detail_id
=
get_id
(
sql
)
source_detail_id
=
source_detail_id
+
'0001'
pick_detail_id
=
None
if
type
==
1
:
hos_goods_id
=
FileUtils
().
r_info8
(
'b5_spd3_core_business_flow'
,
'产品审核信息'
,
'message2'
)[
"hosGoodsId"
]
elif
type
==
2
:
hos_goods_id
=
FileUtils
().
r_info8
(
'b5_spd3_core_business_flow'
,
'产品审核信息'
,
'message1'
)[
"hosGoodsId"
]
else
:
hos_goods_id
=
FileUtils
().
r_info8
(
'b5_spd3_core_business_flow'
,
'产品审核信息'
,
'message3'
)[
"hosGoodsId"
]
mdm_goods_code
=
get_mdm_goods_code1
(
hos_goods_id
)
goods_code
=
get_goods_code1
(
hos_goods_id
)
pkg_def_id
=
get_pkg_def_id
(
hos_goods_id
)
picker_name
=
FileUtils
().
r_info8
(
"b2_herp3_bs"
,
"用户名信息"
,
'message3'
)[
'username1'
]
picker
=
get_login_user_uxid_bydb
(
picker_name
)
source_batch_id
=
get_id
(
"select source_batch_id from mcms_psi_batch where pid='%s';"
%
DR_ID
)
distrBillId
=
get_distrBillId
(
branch_id
,
bill_mode
)
target_batch_id
=
source_batch_id
if
check_batch_id
(
source_batch_id
)
==
True
:
print
(
'ok'
)
commonFuc
().
check_text_exist_result_text
(
'succees'
,
'succees'
)
else
:
commonFuc
().
check_text_exist_result_text
(
'error'
,
'succees'
)
barcode_id
=
get_id
(
"select barcode_id from mcms_psi_batch where pid='%s';"
%
DR_ID
)
if
check_barcode_id
(
barcode_id
)
==
True
:
print
(
'ok'
)
commonFuc
().
check_text_exist_result_text
(
'succees'
,
'succees'
)
else
:
commonFuc
().
check_text_exist_result_text
(
'error'
,
'succees'
)
dept_name
=
FileUtils
().
r_info8
(
"b2_herp3_bs"
,
"所有科室name"
,
'message6'
)[
'deptid2'
]
shelf_code
=
get_stockAreaId_new_fromDb
(
get_branch_name
(),
1
)
create_user
=
get_login_user_uxid_bydb
(
FileUtils
().
r_info8
(
'b2_herp3_bs'
,
"用户名信息"
,
'message3'
)[
'username1'
])
create_time
=
get_create_time
(
"select create_time from mcms_psi_batch where pid='%s';"
%
DR_ID
)
last_modified
=
get_create_time
(
"select last_modified from mcms_psi_batch where pid='%s';"
%
DR_ID
)
last_modified_user
=
None
expected
=
commonFuc
().
get_business_data
(
'b6_spd3_core_business_flow_database_check'
,
'mcms_psi_batch_out_0913'
,
batch_main_key
,
pid
,
order_no
,
source_detail_id
,
pick_detail_id
,
hos_goods_id
,
mdm_goods_code
,
goods_code
,
source_batch_id
,
target_batch_id
,
barcode_id
,
shelf_code
,
create_user
,
create_time
,
last_modified
,
last_modified_user
)
print
(
'expected'
,
expected
)
print
(
'aaaaaaaaaaaaaaaaa'
)
actual_value1
=
get_process_list2
(
actual
)
expected_value1
=
get_process_list2
(
expected
)
actual_value2
=
actual_value1
.
replace
(
" "
,
""
)
expected_value2
=
expected_value1
.
replace
(
" "
,
""
)
# print('actual_value2',actual_value2)
print
(
'expected_value2'
)
print
(
expected_value2
)
print
(
'actual_value2'
)
print
(
actual_value2
)
print
(
compare_text_index
(
actual_value2
,
expected_value2
))
if
actual_value2
==
expected_value2
:
print
(
'ok'
)
commonFuc
().
check_text_exist_result_text
(
'succees'
,
'succees'
)
else
:
print
(
'error'
)
commonFuc
().
check_text_exist_result_text
(
'error'
,
'succees'
)
# =====================出库复核列表查询获取pickid===============
# =======================查看出库单详细信息===============
url
=
commonFuc
().
get_api_add_port_url
()
+
commonFuc
().
get_business_data
(
module
,
"herpService_pick_checkReview_url"
)
url
=
url
+
' '
.
join
(
PICK_ID
)
+
'?pickedFlag=true'
print
(
'url'
,
url
)
# # # # 发送请求
result
=
commonFuc
().
http_get
(
url
,
headers
)
print
(
'查看出库单详细信息result'
,
result
)
# =======================查看出库单详细信息===============
# ========================扫码复核
url
=
commonFuc
().
get_api_add_port_url
()
+
commonFuc
().
get_business_data
(
module
,
"herpService_stock_hdiBarcode_scan_url"
)
url
=
url
+
pkgCode
print
(
'url'
,
url
)
# # # # 发送请求
result
=
commonFuc
().
http_get
(
url
,
headers
)
print
(
'扫码复核result'
,
result
)
data
=
commonFuc
().
analysis_json
(
'data'
,
result
)
UT_id
=
commonFuc
().
analysis_json
(
'id'
,
data
)
createTime
=
commonFuc
().
analysis_json
(
'createTime'
,
data
)
snCode
=
commonFuc
().
analysis_json
(
'snCode'
,
data
)
tbStatus
=
commonFuc
().
analysis_json
(
'tbStatus'
,
data
)
print
(
UT_id
)
# ========================扫码复核
# ========出库复核提交==========
url
=
commonFuc
().
get_api_add_port_url
()
+
commonFuc
().
get_business_data
(
module
,
"herpService_stock_review_pass_submit_url"
)
print
(
'url'
,
url
)
sourceId
=
' '
.
join
(
PICK_ID
)
# 请求体
request_body
=
commonFuc
().
get_business_data
(
module
,
"payload21"
,
UT_id
,
createTime
,
snCode
,
hosId
,
stockId
,
areaCode
,
areaName
,
shelfCode
,
codeType
,
tagType
,
hosGoodsId
,
mdmGoodsCode
,
goodsCode
,
goodsDi
,
barName
,
batchCode
,
expdtDate
,
productDate
,
pkgCode
,
pkgCodeSee
,
ssccCode
,
tbStatus
,
distrDetailId
,
distrBillId
,
purBillId
,
pkgDefQty
,
pkgDefId
,
pkgDefName
,
purMode
,
goodsMfrsName
,
goodsSpec
,
unit
,
goodsName
,
pkgDefId
,
pkgDefQty
,
pkgDefName
,
made
,
hosGoodsCode
,
sourceId
)
print
(
'request_body'
,
request_body
)
# # # # 发送请求
result
=
commonFuc
().
http_post
(
url
,
request_body
,
headers
)
print
(
'herpService_stock_review_pass_submit_urlresult'
,
result
)
# ========出库复核提交
pro_path
=
commonFuc
().
get_pro_path2
()
print
(
'pro_path'
,
pro_path
)
sys
.
path
.
append
(
pro_path
+
r
'/air_case/b5_spd3_core_business_flow/a_b0流程七_中心库入库结算_正负结算.air'
)
using
(
pro_path
+
r
'//air_case/b5_spd3_core_business_flow/a_b0流程七_中心库入库结算_正负结算.air'
)
from
a_b0流程七_中心库入库结算_正负结算
import
center_warehouse_return
try
:
# test1() # 提交采购计划 # 外网配送 # 内网验收入库结算
...
...
@@ -228,11 +215,15 @@ try:
set_dept_user
(
FileUtils
().
r_info8
(
"b2_herp3_bs"
,
"所有科室id"
,
'message6'
)[
'deptid2'
],
FileUtils
().
r_info8
(
"b2_herp3_bs"
,
"所有科室name"
,
'message6'
)[
'deptid2'
],
2
)
set_dept_user
(
FileUtils
().
r_info8
(
"b2_herp3_bs"
,
"所有科室id"
,
'message6'
)[
'deptid3'
],
FileUtils
().
r_info8
(
"b2_herp3_bs"
,
"所有科室name"
,
'message6'
)[
'deptid3'
],
3
)
center_warehouse_return
()
#中心库退货
time
.
sleep
(
6
)
check_mcms_psi_out
(
1
)
#验证出库单数据据数据(低值)
OBh_ID
=
settle
()
# 生成结算单
# print(OBh_ID)
# 调用审批结算单
approval_center
(
OBh_ID
)
invoice
(
3
,
OBh_ID
)
#type=2 传递结算单号,外网查询开票
commonFuc
().
check_text_exist_result_text
(
'succees'
,
'succees'
)
except
:
commonFuc
().
check_text_exist_result_text
(
'error'
,
'1'
)
except
Exception
as
e
:
# 打印错误信息
print
(
f
"发生错误:
{
e
}
"
)
commonFuc
().
check_text_exist_result_text
(
'error'
,
'succees'
)
\ No newline at end of file
air_case/b6_spd3_core_business_flow_database_check/a_b4流程十一请领业务流_二级库向直送_数据库验证.air/a_b4流程十一请领业务流_二级库向直送_数据库验证.py
View file @
2b9824e9
...
...
@@ -2431,6 +2431,8 @@ def main():
check_mcms_purchase_out_and_in
(
1
)
#1主表
print
(
'检查同步到外网的采购订单数据跟内网的是否一致'
)
check_mcms_purchase_out_and_in
(
2
)
#2子表
# 外网配送前再检查一下是否有单据未审批
approval_center_all
()
order_dp
(
1
)
# 1低值 配送单发货
time
.
sleep
(
6
)
order_dp
(
2
)
# 2高值 配送单发货
...
...
air_case/demo/产品检查.air/产品检查.py
View file @
2b9824e9
...
...
@@ -58,22 +58,34 @@ import sys
# commonFuc().check_text_exist_result_text('error', 'succees')
# 第二种方式=================================
# 备份当前data目录
pro_path
=
commonFuc
().
get_pro_path2
()
src
=
pro_path
+
r
'/data'
dst
=
pro_path
+
r
'/data'
+
get_branch_name
()
+
get_branch_id
()
#将备份的目录路径写入到文件中
info
=
(
src
,
dst
)
titlename
=
(
'源目录'
,
'备份目录'
)
FileUtils
().
w_info8
(
info
,
'b5_spd3_core_business_flow'
,
'备份路径'
,
titlename
,
'message29'
)
#备份目录
FileUtils
().
copy_folder_backup
(
src
,
dst
)
dst
=
FileUtils
().
r_info8
(
'b5_spd3_core_business_flow'
,
'备份路径'
,
'message29'
)[
'备份目录'
]
#恢复目录
#先删除文件夹
import
shutil
,
os
if
os
.
path
.
exists
(
src
):
shutil
.
rmtree
(
src
)
# # 备份当前data目录
# pro_path = commonFuc().get_pro_path2()
# src = pro_path + r'/data'
# dst = pro_path + r'/data' + get_branch_name()+get_branch_id()
# #将备份的目录路径写入到文件中
# info = (src, dst)
# titlename = ('源目录', '备份目录')
# FileUtils().w_info8(info, 'b5_spd3_core_business_flow', '备份路径',titlename,'message29')
# #备份目录
# FileUtils().copy_folder_backup(src, dst)
#
# dst=FileUtils().r_info8('b5_spd3_core_business_flow', '备份路径','message29')['备份目录']
# #恢复目录
# #先删除文件夹
# import shutil,os
# if os.path.exists(src):
# shutil.rmtree(src)
#
# FileUtils().copy_folder_backup(dst,src)
FileUtils
().
copy_folder_backup
(
dst
,
src
)
list1
=
get_all_goodsinfo
()
print
(
len
(
list1
))
for
i
in
list1
:
# print(i)
# print(type(i))
str1
=
str
(
i
)
new_str1
=
str1
.
replace
(
'('
,
''
)
new_str1
=
new_str1
.
replace
(
')'
,
''
)
new_str1
=
new_str1
.
replace
(
"'"
,
'"'
)
# new_str1 = new_str1.replace("'", '')
print
(
new_str1
)
common/db/sql/sql_tools.py
View file @
2b9824e9
...
...
@@ -699,3 +699,14 @@ def get_last_auditor(order_no):
# except Exception as e:
# # 打印错误信息
# print(f"发生错误: {e}")
def
get_all_goodsinfo
():
db
,
cursor
=
get_sql_conn
()
sql
=
"select * from mcms_goods_info;"
res_dict
=
get_dict_data_sql
(
cursor
,
sql
)
print
(
'res_dict'
,
res_dict
)
cursor
.
close
()
db
.
close
()
return
res_dict
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment