Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in / Register
Toggle navigation
Menu
Open sidebar
test
autotest-airtest-web-spd
Commits
37904ae4
Commit
37904ae4
authored
Aug 01, 2024
by
xiao-hesheng
Browse files
数据库验证流程十三脚本编写
parent
c5b12575
Changes
6
Hide whitespace changes
Inline
Side-by-side
air_case/b5_spd3_core_business_flow/a_a4_流程四_基础模块设置.air/a_a4_流程四_基础模块设置.py
View file @
37904ae4
# -*- encoding=utf8 -*-
# -*- encoding=utf8 -*-
from
air_case.public1.public1.public1
import
*
from
air_case.public1.public1.public1
import
*
from
common.dateUtils
import
dateUtils
from
common.db.sql.sql_del_branch_info
import
delData
from
common.db.sql.sql_del_branch_info
import
delData
from
common.fileUtls
import
FileUtils
from
common.fileUtls
import
FileUtils
from
common.run_cmd_script
import
execute_command
from
common.run_cmd_script
import
execute_command
...
@@ -58,7 +59,9 @@ def test1(type=1):
...
@@ -58,7 +59,9 @@ def test1(type=1):
"python runner_test.py tag id2263-19,id2263-20,id2263-21,id2263-22,id2263-23,id2263-24,id2263-25,id2263-26 debug sit"
)
"python runner_test.py tag id2263-19,id2263-20,id2263-21,id2263-22,id2263-23,id2263-24,id2263-25,id2263-26 debug sit"
)
else
:
else
:
branch_add
()
branch_add
()
add_dept
()
# 新增科室
execute_command
(
"python runner_test.py tag id2263-19,id2263-20,id2263-21,id2263-22,id2263-23,id2263-24,id2263-25,id2263-26 debug sit"
)
# 一键导入所有产品
# 一键导入所有产品
execute_command
(
"python runner_test.py tag id2263-48 debug sit"
)
execute_command
(
"python runner_test.py tag id2263-48 debug sit"
)
#导入同步过来的新产品,防止因同步时间差导致新建的产品没有导入过来
#导入同步过来的新产品,防止因同步时间差导致新建的产品没有导入过来
...
@@ -324,6 +327,17 @@ def add_StockArea():#添加货位
...
@@ -324,6 +327,17 @@ def add_StockArea():#添加货位
else
:
else
:
commonFuc
().
check_text_exist
(
'error'
,
result
)
commonFuc
().
check_text_exist
(
'error'
,
result
)
def
recover_dataFile
():
delData
().
Delete_branch_by_id
(
2
)
pro_path
=
commonFuc
().
get_pro_path2
()
src
=
pro_path
+
r
'/data'
dst
=
FileUtils
().
r_info8
(
'b5_spd3_core_business_flow'
,
'备份路径'
,
'message29'
)[
'备份目录'
]
# 恢复目录
# 先删除文件夹
import
shutil
,
os
if
os
.
path
.
exists
(
src
):
shutil
.
rmtree
(
src
)
FileUtils
().
copy_folder_backup
(
dst
,
src
)
def
branch_add
(
token
):
def
branch_add
(
token
):
module
=
'b2_herp3_bs'
module
=
'b2_herp3_bs'
# 内网登录
# 内网登录
...
@@ -352,11 +366,17 @@ def branch_add(token):
...
@@ -352,11 +366,17 @@ def branch_add(token):
id1
=
commonFuc
().
analysis_json
(
'id'
,
commonFuc
().
analysis_json
(
'data'
,
result
))
id1
=
commonFuc
().
analysis_json
(
'id'
,
commonFuc
().
analysis_json
(
'data'
,
result
))
name
=
commonFuc
().
analysis_json
(
'name'
,
commonFuc
().
analysis_json
(
'data'
,
result
))
name
=
commonFuc
().
analysis_json
(
'name'
,
commonFuc
().
analysis_json
(
'data'
,
result
))
#读取原有院区文件中的内容
#备份当前data目录
old_branch_result
=
FileUtils
().
r_info9
(
module
,
'院区新增'
,
'message'
)
pro_path
=
commonFuc
().
get_pro_path2
()
print
(
old_branch_result
)
src
=
pro_path
+
r
'/data'
#写入到另一个文件中备份起来
dst
=
pro_path
+
r
'/data'
+
timeUtils
().
get_time_hms
(
5
)
FileUtils
().
w_info9
(
'b5_spd3_core_business_flow'
,
old_branch_result
,
'message29'
)
#将备份的目录路径写入到文件中
info
=
(
src
,
dst
)
titlename
=
(
'源目录'
,
'备份目录'
)
FileUtils
().
w_info8
(
info
,
'b5_spd3_core_business_flow'
,
'院区新增'
,
titlename
,
'message29'
)
#备份data文件
FileUtils
().
copy_folder_backup
(
src
,
dst
)
info
=
(
id1
,
name
)
info
=
(
id1
,
name
)
titlename
=
(
'院区id'
,
'院区name'
)
titlename
=
(
'院区id'
,
'院区name'
)
FileUtils
().
w_info2
(
info
,
module
,
'院区新增'
)
FileUtils
().
w_info2
(
info
,
module
,
'院区新增'
)
...
...
air_case/demo/产品检查.air/产品检查.py
View file @
37904ae4
...
@@ -58,7 +58,22 @@ import sys
...
@@ -58,7 +58,22 @@ import sys
# commonFuc().check_text_exist_result_text('error', 'succees')
# commonFuc().check_text_exist_result_text('error', 'succees')
# 第二种方式=================================
# 第二种方式=================================
old_branch_result
=
FileUtils
().
r_info9
(
'b2_herp3_bs'
,
'院区新增'
,
'message'
)
# 备份当前data目录
print
(
old_branch_result
)
pro_path
=
commonFuc
().
get_pro_path2
()
# 写入到另一个文件中
src
=
pro_path
+
r
'/data'
FileUtils
().
w_info9
(
'b5_spd3_core_business_flow'
,
old_branch_result
,
'message29'
)
dst
=
pro_path
+
r
'/data'
+
timeUtils
().
get_time_hms
(
5
)
\ No newline at end of file
#将备份的目录路径写入到文件中
info
=
(
src
,
dst
)
titlename
=
(
'源目录'
,
'备份目录'
)
FileUtils
().
w_info8
(
info
,
'b5_spd3_core_business_flow'
,
'备份路径'
,
titlename
,
'message29'
)
#备份目录
FileUtils
().
copy_folder_backup
(
src
,
dst
)
dst
=
FileUtils
().
r_info8
(
'b5_spd3_core_business_flow'
,
'备份路径'
,
'message29'
)[
'备份目录'
]
#恢复目录
#先删除文件夹
import
shutil
,
os
if
os
.
path
.
exists
(
src
):
shutil
.
rmtree
(
src
)
FileUtils
().
copy_folder_backup
(
dst
,
src
)
common/db/sql/sql_del_branch_info.py
View file @
37904ae4
...
@@ -9,13 +9,16 @@ from common.fileUtls import FileUtils
...
@@ -9,13 +9,16 @@ from common.fileUtls import FileUtils
class
delData
(
object
):
class
delData
(
object
):
def
Delete_branch_by_id
(
self
):
def
Delete_branch_by_id
(
self
,
type
=
1
):
connection
=
pymysql
.
connect
(
host
=
"10.17.65.108"
,
user
=
"root"
,
password
=
"Cmic.2023"
,
database
=
"spd3_herp_test2"
,
connection
=
pymysql
.
connect
(
host
=
"10.17.65.108"
,
user
=
"root"
,
password
=
"Cmic.2023"
,
database
=
"spd3_herp_test2"
,
charset
=
"utf8"
)
charset
=
"utf8"
)
cursor
=
connection
.
cursor
()
cursor
=
connection
.
cursor
()
# 删除新增的院区数据,减少垃圾数据的产生,保证脚本下次还可以正常运行,接口脚本每次都使用新增数据
# 删除新增的院区数据,减少垃圾数据的产生,保证脚本下次还可以正常运行,接口脚本每次都使用新增数据
# sql = "DELETE from mcms_branch_info where id = '%s';" % id
# sql = "DELETE from mcms_branch_info where id = '%s';" % id
sql
=
"DELETE from mcms_branch_info where name like '%东土城路院区_____';"
if
type
==
1
:
sql
=
"DELETE from mcms_branch_info where name like '%东土城路院区_____';"
else
:
sql
=
"DELETE from mcms_branch_info where name like '%东土城路院区______';"
print
(
sql
)
print
(
sql
)
cursor
.
execute
(
sql
)
cursor
.
execute
(
sql
)
cursor
.
execute
(
"commit;"
)
cursor
.
execute
(
"commit;"
)
...
@@ -28,7 +31,10 @@ class delData(object):
...
@@ -28,7 +31,10 @@ class delData(object):
cursor
=
connection
.
cursor
()
cursor
=
connection
.
cursor
()
# 删除新增的院区数据,减少垃圾数据的产生,保证脚本下次还可以正常运行,接口脚本每次都使用新增数据
# 删除新增的院区数据,减少垃圾数据的产生,保证脚本下次还可以正常运行,接口脚本每次都使用新增数据
# sql = "DELETE from mcms_branch_info where id = '%s';" % id
# sql = "DELETE from mcms_branch_info where id = '%s';" % id
sql
=
"DELETE from mcms_branch_info where name like '%东土城路院区_____';"
if
type
==
1
:
sql
=
"DELETE from mcms_branch_info where name like '%东土城路院区_____';"
else
:
sql
=
"DELETE from mcms_branch_info where name like '%东土城路院区______';"
print
(
sql
)
print
(
sql
)
cursor
.
execute
(
sql
)
cursor
.
execute
(
sql
)
cursor
.
execute
(
"commit;"
)
cursor
.
execute
(
"commit;"
)
...
...
common/fileUtls.py
View file @
37904ae4
...
@@ -329,7 +329,23 @@ class FileUtils(object):
...
@@ -329,7 +329,23 @@ class FileUtils(object):
# 写入到yaml文件
# 写入到yaml文件
with
open
(
yamlpath
,
"a"
,
encoding
=
"utf-8"
)
as
f
:
with
open
(
yamlpath
,
"a"
,
encoding
=
"utf-8"
)
as
f
:
yaml
.
dump
(
dict
,
f
,
Dumper
=
yaml
.
RoundTripDumper
,
allow_unicode
=
True
)
yaml
.
dump
(
dict
,
f
,
Dumper
=
yaml
.
RoundTripDumper
,
allow_unicode
=
True
)
def
copy_folder_backup
(
self
,
src
,
dst
):
import
shutil
import
os
if
os
.
path
.
exists
(
dst
):
print
(
f
"The backup directory '
{
dst
}
' already exists."
)
return
try
:
shutil
.
copytree
(
src
,
dst
)
print
(
f
"Folder '
{
src
}
' backed up successfully to '
{
dst
}
'."
)
except
OSError
as
e
:
print
(
f
"Error:
{
e
}
"
)
# 使用方法
# 假设你要备份的文件夹是 '/path/to/source',备份的目标文件夹是 '/path/to/backup'
# copy_folder_backup('/path/to/source', '/path/to/backup')
if
__name__
==
'__main__'
:
if
__name__
==
'__main__'
:
# info=("aaaa","bbbbbb","mdm3-pim")
# info=("aaaa","bbbbbb","mdm3-pim")
# FileUtils().w_info(info,"产品新增")
# FileUtils().w_info(info,"产品新增")
...
...
data/b5_spd3_core_business_flow/message15
View file @
37904ae4
验收单号
:
验收单号
:
CHECK_NO1
:
CHECKh034720240
73
1000
32
CHECK_NO1
:
CHECKh034720240
80
1000
06
CHECK_NO2
:
CHECKh034720240
73
1000
32
CHECK_NO2
:
CHECKh034720240
80
1000
06
main1/main_text.py
View file @
37904ae4
...
@@ -84,8 +84,19 @@ def main_text():
...
@@ -84,8 +84,19 @@ def main_text():
# sys.exit(0)
# sys.exit(0)
def
flow4
():
def
flow4
():
# execute_command("python runner_test.py tag id2302-4 debug mdm3")
# execute_command("python runner_test.py tag id2302-4 debug mdm3")
from
execute
import
flow4
# from execute import flow4
flow4
()
# flow4()
sys
.
path
.
append
(
pro_path
+
r
'/air_case/b5_spd3_core_business_flow/a_a4_流程四_基础模块设置.air'
)
using
(
pro_path
+
r
'//air_case/b5_spd3_core_business_flow/a_a4_流程四_基础模块设置.air'
)
from
a_a4_流程四_基础模块设置
import
test1
,
test2
,
dept_query_all
,
grants_user
,
set_dept_lead1
,
add_StockArea
,
recover_dataFile
test1
(
2
)
# 参数2是新增院区后缀为6位数的参数1是老模式
test2
()
# 科室添加产品信息(产品是从外网同步过来的)
dept_query_all
()
# 科室查询,将科室id写入文件中
grants_user
()
# 给用户授权
set_dept_lead1
()
# 设置一级科室负责人
add_StockArea
()
# 添加库区
recover_dataFile
()
#恢复备份的data目录
commonFuc
().
check_text_exist_result_text
(
'succees'
,
'succees'
)
print
(
'流程4执行完毕'
)
print
(
'流程4执行完毕'
)
# sys.exit(0)
# sys.exit(0)
def
flow5
():
def
flow5
():
...
@@ -192,6 +203,7 @@ def main_text():
...
@@ -192,6 +203,7 @@ def main_text():
try
:
try
:
flow5
()
flow5
()
flow6
()
flow6
()
time
.
sleep
(
20
)
flow7
()
flow7
()
flow8
()
flow8
()
flow9
()
flow9
()
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment