Commit 5271a37a authored by 周念东's avatar 周念东
Browse files

20240617

parents ced008e1 c3acdd56
# -*- encoding=utf8 -*-
__author__ = "liguangyu"
"""
case_tag:cmdc_api,cmdc批次号无效_通过批号获取上报数据,00,00-63,sit,be
主数据平台:后台运营系统获取首营客户信息接口
"""
from common.common_func import commonFuc
from air_case.cmdc_login.后台管理系统登录.后台管理系统登录 import CmdcDoLogin
import requests
import random
import json
module = "cmdc_special_version"
# 第一步登录后台运营系统获取token
username = commonFuc().get_business_data(module, "username_admin2")
password = commonFuc().get_business_data(module, "password_admin2")
# 获取登录后Cmdc_access_token
cmdc_access_token = CmdcDoLogin(username, password).get_token()
headers = commonFuc().get_business_data(module, "json_headers", cmdc_access_token)
# 第二步获取首营客户信息
url = commonFuc().get_api_url() + commonFuc().get_business_data(module, "url52")
request_body = commonFuc().get_business_data(module, "payload52")
# 发送请求
result = requests.post(url, json=request_body, headers=headers)
result = json.loads(result.content)
# 生成随机数
customer_random = random.randint(0, len(result["data"]) - 1)
# 获取客户信息
customerCode = result["data"][customer_random]["customerCode"]
# 随机生成数据
batchNumber = random.randint(0, 1000)
# 第三步通过批次号获取上报数据
url = commonFuc().get_api_url() + commonFuc().get_business_data(module, "url63")
request_body = commonFuc().get_business_data(module, "payload63", batchNumber, customerCode)
"""
场景: 批次号无效_通过批号获取上报数据
用例名称:批次号无效_通过批号获取上报数据
输出:{"message":"生产批号:%s 不存在或已上报"}
"""
# 发送请求
result = requests.post(url, json=request_body, headers=headers)
# 获取接口响应时间
api_time = float(result.elapsed.total_seconds())
# 将接口响应时间添加至result
result = json.loads(result.content)
result["api_time"] = api_time
# 获取预期结果
check_dict = commonFuc().get_business_data(module, "checkDict63_1", batchNumber)
# print(check_dict)
# 断言实际结果中是否包含预期结果的内容
commonFuc().check_result(check_dict, result)
# -*- encoding=utf8 -*-
__author__ = "liguangyu"
"""
case_tag:cmdc_api,cmdc批量导入新增生产批号验证,00,00-68,sit,be
主数据平台:后台运营系统生产批号新增接口
"""
from common.common_func import commonFuc
from air_case.cmdc_login.后台管理系统登录.后台管理系统登录 import CmdcDoLogin
from common.handle_excel import HandleExcel
from common.db.db import mySql
import requests
import json
import os
module = "cmdc_special_version"
# 第一步登录后台运营系统获取token
username = commonFuc().get_business_data(module, "username_admin2")
password = commonFuc().get_business_data(module, "password_admin2")
# 获取登录后Cmdc_access_token
cmdc_access_token = CmdcDoLogin(username, password).get_token()
headers = commonFuc().get_business_data(module, "json_headers_file", cmdc_access_token)
# 数据库操作
mysql_handle = mySql()
# 获取conf.ini文件中配置的数据库信息
host, port, user, pwd = mysql_handle.getConf(db="cmdc_db_be")
# 在数据库中已存在且未被上报的生产批号信息
sql = "SELECT t.customerCode,t.productCode,t.batchNumber,t.productLineCode FROM `cmdc-order`.tc_report_batch t " \
"WHERE deleteSign = 0"
customerCode = mysql_handle.selectSql(host, port, user, pwd, "cmdc-order", sql)[1][0]
productCode = mysql_handle.selectSql(host, port, user, pwd, "cmdc-order", sql)[1][1]
batchNumber = mysql_handle.selectSql(host, port, user, pwd, "cmdc-order", sql)[1][2]
productLineCode = mysql_handle.selectSql(host, port, user, pwd, "cmdc-order", sql)[1][3]
# 获取文件
BASE_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
file_path = BASE_DIR + "/data/cmdc_files/植入生产批号导入模板.xlsx"
# 将商品信息写入文件中
excel = HandleExcel(file_path, "上报植入")
excel.write_data(row=2, column=10, value=productLineCode)
excel.write_data(row=2, column=11, value=customerCode)
excel.write_data(row=2, column=12, value=productCode)
excel.write_data(row=2, column=13, value=batchNumber)
# 第三步批量导入生产批号新增
url = commonFuc().get_api_url() + commonFuc().get_business_data(module, "url68")
request_body = commonFuc().get_business_data(module, "payload68", productLineCode)
# 获取文件
file = open(file_path, "rb")
files = {"file": file}
"""
场景: 生产批号新增_手动添加_后台
用例名称:生产批号新增_手动添加_后台
输出:{"batchNumber":"%s"}
"""
# 发送请求
result = requests.post(url, files=files, headers=headers, data=request_body)
# 获取接口响应时间
api_time = float(result.elapsed.total_seconds())
# 获取生产批号
result = json.loads(result.content)
# print(result)
result = {"batchNumber": result["data"]["reportBatchList"][0]["batchNumber"]}
# 将接口响应时间添加至result
result["api_time"] = api_time
# 获取预期结果
check_dict = commonFuc().get_business_data(module, "checkDict68", batchNumber)
# print(check_dict)
# 断言实际结果中是否包含预期结果的内容
commonFuc().check_result(check_dict, result)
# -*- encoding=utf8 -*-
__author__ = "liguangyu"
"""
case_tag:cmdc_api,cmdc批量配货模板下载,00,00-7,sit,be
主数据平台:后台运营系统批量配货模板下载接口
"""
from common.common_func import commonFuc
from air_case.cmdc_login.后台管理系统登录.后台管理系统登录 import CmdcDoLogin
from common.handle_excel import HandleExcel
import requests
import random
import json
import os
module = "cmdc_special_version"
# 第一步登录后台运营系统获取token
username = commonFuc().get_business_data(module, "username_do_be")
password = commonFuc().get_business_data(module, "password_do_be")
# 获取登录后Cmdc_access_token
cmdc_access_token = CmdcDoLogin(username, password).get_token()
headers = commonFuc().get_business_data(module, "json_headers", cmdc_access_token)
# 第二步获取需求单列表
url = commonFuc().get_api_url() + commonFuc().get_business_data(module, "url3")
request_body = commonFuc().get_business_data(module, "payload3")
# 发送请求
result = requests.post(url, json=request_body, headers=headers)
result = json.loads(result.content)
# 生成随机数
demand_random = random.randint(0, len(result["data"]["list"]) - 1)
# 获取需求单信息
demandCode = result["data"]["list"][demand_random]["demandCode"]
# 第三步进行批量配货模板下载
url = commonFuc().get_api_url() + commonFuc().get_business_data(module, "url7")
request_body = commonFuc().get_business_data(module, "payload7", demandCode)
"""
场景: 批量配货模板下载
用例名称:批量配货模板下载
输出:{"codes":"%s"}
"""
# 发送请求
result = requests.get(url, params=request_body, headers=headers)
# 获取接口响应时间
api_time = float(result.elapsed.total_seconds())
# 获取文件
BASE_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
file_path = BASE_DIR + "/data/cmdc_files/批量导入配货模板.xlsx"
# 获取到导出文件存入cmdc_files文件中
with open(file_path, 'wb') as f:
f.write(result.content)
# 获取导出文件中子需求单号
excel = HandleExcel(file_path, "批量导入配货模板")
result = {"codes": excel.read_data()[0].get("子需求单号\n(系统带出,不可修改)")}
# 将接口响应时间添加至result
result["api_time"] = api_time
# 获取预期结果
check_dict = commonFuc().get_business_data(module, "checkDict7", demandCode)
# print(check_dict)
# 断言实际结果中是否包含预期结果的内容
commonFuc().check_result(check_dict, result)
# -*- encoding=utf8 -*-
__author__ = "liguangyu"
"""
case_tag:cmdc_api,cmdc查看物流详情_三方需求单,00,00-29,sit,be
主数据平台:后台运营系统查看物流信息接口
"""
from common.common_func import commonFuc
from air_case.cmdc_login.后台管理系统登录.后台管理系统登录 import CmdcDoLogin
import datetime
import requests
import random
import json
module = "cmdc_special_version"
# 第一步登录后台运营系统获取token
username = commonFuc().get_business_data(module, "username_do_be")
password = commonFuc().get_business_data(module, "password_do_be")
# 获取登录后Cmdc_access_token
cmdc_access_token = CmdcDoLogin(username, password).get_token()
headers = commonFuc().get_business_data(module, "json_headers", cmdc_access_token)
# 第二步获取待发货列表
url = commonFuc().get_api_url() + commonFuc().get_business_data(module, "url27")
request_body = commonFuc().get_business_data(module, "payload27")
# 发送请求
result = requests.post(url, json=request_body, headers=headers)
result = json.loads(result.content)
# 生成随机数
so_random = random.randint(0, len(result["data"]["list"]) - 1)
# 获取三方需求单信息
soCode = result["data"]["list"][so_random]["soCode"]
demandParentCode = result["data"]["list"][so_random]["demandParentCode"]
# 第二步查看物流
url = commonFuc().get_api_url() + commonFuc().get_business_data(module, "url29")
request_body = commonFuc().get_business_data(module, "payload29", soCode, demandParentCode)
"""
场景: 查看物流详情_三方需求单
用例名称:查看物流详情_三方需求单
输出:{"sddoco":"%s"}
"""
# 发送请求
result = requests.get(url, params=request_body, headers=headers)
# 获取接口响应时间
api_time = float(result.elapsed.total_seconds())
# 获取物流信息
result = json.loads(result.content)
# print(result)
result = {"sddoco": result["data"]["detail"][0]["SDDOCO"]}
# 将接口响应时间添加至result
result["api_time"] = api_time
# 获取预期结果
check_dict = commonFuc().get_business_data(module, "checkDict29", soCode)
# print(check_dict)
# 断言实际结果中是否包含预期结果的内容
commonFuc().check_result(check_dict, result)
# -*- encoding=utf8 -*-
__author__ = "liguangyu"
"""
case_tag:cmdc_api,cmdc植入上报导出功能验证,00,00-34,sit,be
主数据平台:后台运营系统植入上报导出接口
"""
from common.common_func import commonFuc
from air_case.cmdc_login.后台管理系统登录.后台管理系统登录 import CmdcDoLogin
from common.handle_excel import HandleExcel
from common.db.db import mySql
import random
import requests
import json
import os
module = "cmdc_special_version"
# 第一步登录后台运营系统获取token
username = commonFuc().get_business_data(module, "username_do_be")
password = commonFuc().get_business_data(module, "password_do_be")
# 获取登录后Cmdc_access_token
cmdc_access_token = CmdcDoLogin(username, password).get_token()
headers = commonFuc().get_business_data(module, "json_headers", cmdc_access_token)
# 数据库操作
mysql_handle = mySql()
# 获取conf.ini文件中配置的数据库信息
host, port, user, pwd = mysql_handle.getConf(db="cmdc_db_be")
# 在数据库中已存在的植入上报数据对应的产品线信息
sql = "SELECT t.productLineCode, t.companyCode FROM `cmdc-order`.tc_report t where deleteSign = 0"
productLineCode = mysql_handle.selectSql(host, port, user, pwd, "cmdc-order", sql)[0][0]
companyCode = mysql_handle.selectSql(host, port, user, pwd, "cmdc-order", sql)[0][1]
# 第二步获取上报植入列表
url = commonFuc().get_api_url() + commonFuc().get_business_data(module, "url33")
request_body = commonFuc().get_business_data(module, "payload33_1", companyCode, productLineCode)
# 发送请求
result = requests.post(url, json=request_body, headers=headers)
result = json.loads(result.content)
# print(result)
# 生成随机数
report_random = random.randint(0, len(result["data"]["list"]) - 1)
# 获取上报植入信息
batchNumber = result["data"]["list"][report_random]["batchNumber"]
# 第三步进行植入上报信息导出
url = commonFuc().get_api_url() + commonFuc().get_business_data(module, "url34")
request_body = commonFuc().get_business_data(module, "payload34", companyCode, productLineCode, batchNumber)
"""
场景: 植入上报导出功能验证
用例名称:植入上报导出功能验证
输出:{"batchNumber":"%s"}
"""
# 发送请求
result = requests.post(url, json=request_body, headers=headers)
# 获取接口响应时间
api_time = float(result.elapsed.total_seconds())
# 获取文件
BASE_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
file_path = BASE_DIR + "/data/cmdc_files/上报植入导出明细.xlsx"
# 获取到导出文件存入cmdc_files文件中
with open(file_path, 'wb') as f:
f.write(result.content)
# 获取导出文件中生产批次号
excel = HandleExcel(file_path, "Sheet1")
result = {"batchNumber": excel.read_data()[0].get("生产批号")}
# 将接口响应时间添加至result
result["api_time"] = api_time
# 获取预期结果
check_dict = commonFuc().get_business_data(module, "checkDict34", batchNumber)
# print(check_dict)
# 断言实际结果中是否包含预期结果的内容
commonFuc().check_result(check_dict, result)
# -*- encoding=utf8 -*-
__author__ = "liguangyu"
"""
case_tag:cmdc_api,cmdc植入上报日期_上报植入筛选,00,00-59,sit,be
主数据平台:后台运营系统获取上报植入列表接口
"""
from common.common_func import commonFuc
from air_case.cmdc_login.后台管理系统登录.后台管理系统登录 import CmdcDoLogin
from common.db.db import mySql
import datetime
import random
import requests
import json
module = "cmdc_special_version"
# 第一步登录后台运营系统获取token
username = commonFuc().get_business_data(module, "username_admin2")
password = commonFuc().get_business_data(module, "password_admin2")
# 获取登录后Cmdc_access_token
cmdc_access_token = CmdcDoLogin(username, password).get_token()
headers = commonFuc().get_business_data(module, "json_headers", cmdc_access_token)
# 数据库操作
mysql_handle = mySql()
# 获取conf.ini文件中配置的数据库信息
host, port, user, pwd = mysql_handle.getConf(db="cmdc_db_be")
# 在数据库中已存在的植入上报数据对应的产品线信息
sql = "SELECT t.productLineCode, t.companyCode FROM `cmdc-order`.tc_report t where deleteSign = 0"
productLineCode = mysql_handle.selectSql(host, port, user, pwd, "cmdc-order", sql)[0][0]
companyCode = mysql_handle.selectSql(host, port, user, pwd, "cmdc-order", sql)[0][1]
# 第二步获取上报植入列表
url = commonFuc().get_api_url() + commonFuc().get_business_data(module, "url33")
request_body = commonFuc().get_business_data(module, "payload33_1", companyCode, productLineCode)
# 发送请求
result = requests.post(url, json=request_body, headers=headers)
result = json.loads(result.content)
# print(result)
# 生成随机数
report_random = random.randint(0, len(result["data"]["list"]) - 1)
# 获取上报植入信息
batchNumber = result["data"]["list"][report_random]["batchNumber"]
# 判断植入上报日期是否为空
if result["data"]["list"][report_random]["reportDateStr"]:
reportDateStart = result["data"]["list"][report_random]["reportDateStr"][:10] + " 00:00:00"
reportDateEnd = datetime.datetime.now().strftime("%Y-%m-%d") + " 23:59:59"
# 第三步进行列表查询条件验证
url = commonFuc().get_api_url() + commonFuc().get_business_data(module, "url59")
request_body = commonFuc().get_business_data(module, "payload59", companyCode, productLineCode, reportDateStart,
reportDateEnd)
# print(request_body)
"""
场景: 植入上报日期_上报植入筛选
用例名称:植入上报日期_上报植入筛选
输出:{"batchNumber":"%s"}
"""
# 发送请求
result = requests.post(url, json=request_body, headers=headers)
# 获取接口响应时间
api_time = float(result.elapsed.total_seconds())
# 获取上报植入信息
result = json.loads(result.content)
# print(result)
# result = {"batchNumber": result["data"]["list"][0]["batchNumber"]}
# 将接口响应时间添加至result
result["api_time"] = api_time
# 断言实际结果中是否包含预期结果的内容
commonFuc().check_text_exist(batchNumber, result)
else:
# 第三步进行列表查询条件验证
url = commonFuc().get_api_url() + commonFuc().get_business_data(module, "url59")
request_body = commonFuc().get_business_data(module, "payload59_1", companyCode, productLineCode)
# print(request_body)
# 发送请求
result = requests.post(url, json=request_body, headers=headers)
# 获取接口响应时间
api_time = float(result.elapsed.total_seconds())
# 获取上报植入信息
result = json.loads(result.content)
# print(result)
# result = {"batchNumber": result["data"]["list"][0]["batchNumber"]}
# 将接口响应时间添加至result
result["api_time"] = api_time
# 断言实际结果中是否包含预期结果的内容
commonFuc().check_text_exist(batchNumber, result)
# -*- encoding=utf8 -*-
__author__ = "liguangyu"
"""
case_tag:cmdc_api,cmdc植入上报附件下载功能验证,00,00-34,sit,be
主数据平台:后台运营系统植入上报附件下载接口
"""
from common.common_func import commonFuc
from air_case.cmdc_login.后台管理系统登录.后台管理系统登录 import CmdcDoLogin
from common.db.db import mySql
import random
import requests
import json
module = "cmdc_special_version"
# 第一步登录后台运营系统获取token
username = commonFuc().get_business_data(module, "username_admin2")
password = commonFuc().get_business_data(module, "password_admin2")
# 获取登录后Cmdc_access_token
cmdc_access_token = CmdcDoLogin(username, password).get_token()
headers = commonFuc().get_business_data(module, "json_headers", cmdc_access_token)
# 数据库操作
mysql_handle = mySql()
# 获取conf.ini文件中配置的数据库信息
host, port, user, pwd = mysql_handle.getConf(db="cmdc_db_be")
# 在数据库中已存在的植入上报数据对应的产品线信息
sql = "SELECT t.productLineCode, t.companyCode FROM `cmdc-order`.tc_report t where deleteSign = 0"
productLineCode = mysql_handle.selectSql(host, port, user, pwd, "cmdc-order", sql)[0][0]
companyCode = mysql_handle.selectSql(host, port, user, pwd, "cmdc-order", sql)[0][1]
# 第二步获取上报植入列表
url = commonFuc().get_api_url() + commonFuc().get_business_data(module, "url33")
request_body = commonFuc().get_business_data(module, "payload33_1", companyCode, productLineCode)
# 发送请求
result = requests.post(url, json=request_body, headers=headers)
result = json.loads(result.content)
# print(result)
# 生成随机数
report_random = random.randint(0, len(result["data"]["list"]) - 1)
# 获取上报植入信息
reportId = result["data"]["list"][report_random]["reportId"]
batchNumber = result["data"]["list"][report_random]["batchNumber"]
# 第三步进行植入附件下载
url = commonFuc().get_api_url() + commonFuc().get_business_data(module, "url35")
request_body = commonFuc().get_business_data(module, "payload35", reportId)
# print(request_body)
"""
场景: 植入上报附件下载功能验证
用例名称:植入上报附件下载功能验证
输出:{"status":1}
"""
# 发送请求
result = requests.get(url, params=request_body, headers=headers)
# 获取接口响应时间
api_time = float(result.elapsed.total_seconds())
# 获取植入附件下载状态
result = json.loads(result.content)
print(result)
# 增加判断,审核不通过的不支持附件下载
if result["success"]:
result = {"status": result["data"]["status"]}
# 将接口响应时间添加至result
result["api_time"] = api_time
# 获取预期结果
check_dict = commonFuc().get_business_data(module, "checkDict35")
# print(check_dict)
# 断言实际结果中是否包含预期结果的内容
commonFuc().check_result(check_dict, result)
else:
# 将接口响应时间添加至result
result["api_time"] = api_time
# 获取预期结果
check_dict = commonFuc().get_business_data(module, "checkDict35_2", batchNumber)
# print(check_dict)
# 断言实际结果中是否包含预期结果的内容
commonFuc().check_result(check_dict, result)
# -*- encoding=utf8 -*-
__author__ = "liguangyu"
"""
case_tag:cmdc_api,cmdc植入产品线修改功能验证,00,00-49,sit,be
主数据平台:后台运营系统植入产品线修改接口
"""
from common.common_func import commonFuc
from air_case.cmdc_login.后台管理系统登录.后台管理系统登录 import CmdcDoLogin
import requests
import json
module = "cmdc_special_version"
# 第一步登录后台运营系统获取token
username = commonFuc().get_business_data(module, "username_admin2")
password = commonFuc().get_business_data(module, "password_admin2")
# 获取登录后Cmdc_access_token
cmdc_access_token = CmdcDoLogin(username, password).get_token()
headers = commonFuc().get_business_data(module, "json_headers", cmdc_access_token)
# 第二步获取植入产品线列表
url = commonFuc().get_api_url() + commonFuc().get_business_data(module, "url45")
request_body = commonFuc().get_business_data(module, "payload45")
# 发送请求
result = requests.post(url, json=request_body, headers=headers)
result = json.loads(result.content)
# 获取产品线信息
reportProductLineId = result["data"]["list"][0]["reportProductLineId"]
status = result["data"]["list"][0]["status"]
# 第三步植入产品线修改操作
url = commonFuc().get_api_url() + commonFuc().get_business_data(module, "url49")
request_body = commonFuc().get_business_data(module, "payload49", reportProductLineId, status)
"""
场景: 植入产品线修改功能验证
用例名称:植入产品线修改功能验证
输出:{"success":true,"code":"200","message":"OK","data":1}
"""
# 发送请求
result = requests.post(url, json=request_body, headers=headers)
# 获取接口响应时间
api_time = float(result.elapsed.total_seconds())
# 将接口响应时间添加至result
result = json.loads(result.content)
result["api_time"] = api_time
# 获取预期结果
check_dict = commonFuc().get_business_data(module, "checkDict49")
# print(check_dict)
# 断言实际结果中是否包含预期结果的内容
commonFuc().check_result(check_dict, result)
# -*- encoding=utf8 -*-
__author__ = "liguangyu"
"""
case_tag:cmdc_api,cmdc植入产品线新增验证,00,00-51,sit,be
主数据平台:后台运营系统植入产品线新增接口
"""
from common.common_func import commonFuc
from air_case.cmdc_login.后台管理系统登录.后台管理系统登录 import CmdcDoLogin
from common.db.db import mySql
import requests
import random
import json
module = "cmdc_special_version"
# 第一步登录后台运营系统获取token
username = commonFuc().get_business_data(module, "username_admin2")
password = commonFuc().get_business_data(module, "password_admin2")
# 获取登录后Cmdc_access_token
cmdc_access_token = CmdcDoLogin(username, password).get_token()
headers = commonFuc().get_business_data(module, "json_headers", cmdc_access_token)
# 第二步获取新增植入产品线下拉数据
url = commonFuc().get_api_url() + commonFuc().get_business_data(module, "url50")
request_body = commonFuc().get_business_data(module, "payload50")
# 发送请求
result = requests.post(url, json=request_body, headers=headers)
result = json.loads(result.content)
# 生成随机数
line_random = random.randint(0, len(result["data"]) - 1)
# 获取产品线信息
productLineCode = result["data"][line_random]["productLineCode"]
productLineName = result["data"][line_random]["productLineName"]
# 第三步进行植入产品线新增操作
url = commonFuc().get_api_url() + commonFuc().get_business_data(module, "url51")
request_body = [commonFuc().get_business_data(module, "payload51", productLineCode, productLineName)]
"""
场景: 植入产品线新增验证
用例名称:植入产品线新增验证
输出:{"success":true,"code":"200","message":"OK","data":{"errList":null,"success":true}}
"""
# 发送请求
result = requests.post(url, json=request_body, headers=headers)
# 获取接口响应时间
api_time = float(result.elapsed.total_seconds())
result = json.loads(result.content)
if result["data"]["success"]:
# 将接口响应时间添加至result
result["api_time"] = api_time
# 获取预期结果
check_dict = commonFuc().get_business_data(module, "checkDict51")
# print(check_dict)
# 断言实际结果中是否包含预期结果的内容
commonFuc().check_result(check_dict, result)
# 进行测试数据清除操作
# 数据库操作
mysql_handle = mySql()
# 获取conf.ini文件中配置的数据库信息
host, port, user, pwd = mysql_handle.getConf(db="cmdc_db_be")
# 在数据库中删除新增的植入产品线信息
sql = "DELETE FROM `cmdc-order`.tc_report_productline " \
"WHERE productLineCode = '121' and status = 0 and companyCode = '00102' ORDER BY createTime DESC"
mysql_handle.executeUpdate(host, port, user, pwd, "cmdc-order", sql)
else:
# print(result)
result = {"errList": result["data"]["errList"][0]}
# 将接口响应时间添加至result
result["api_time"] = api_time
# 获取预期结果
check_dict = commonFuc().get_business_data(module, "checkDict47", productLineName)
# print(check_dict)
# 断言实际结果中是否包含预期结果的内容
commonFuc().check_result(check_dict, result)
# -*- encoding=utf8 -*-
__author__ = "liguangyu"
"""
case_tag:cmdc_api,cmdc植入审核时间_上报植入筛选,00,00-60,sit,be
主数据平台:后台运营系统获取上报植入列表接口
"""
from common.common_func import commonFuc
from air_case.cmdc_login.后台管理系统登录.后台管理系统登录 import CmdcDoLogin
from common.db.db import mySql
import datetime
import random
import requests
import json
module = "cmdc_special_version"
# 第一步登录后台运营系统获取token
username = commonFuc().get_business_data(module, "username_admin2")
password = commonFuc().get_business_data(module, "password_admin2")
# 获取登录后Cmdc_access_token
cmdc_access_token = CmdcDoLogin(username, password).get_token()
headers = commonFuc().get_business_data(module, "json_headers", cmdc_access_token)
# 数据库操作
mysql_handle = mySql()
# 获取conf.ini文件中配置的数据库信息
host, port, user, pwd = mysql_handle.getConf(db="cmdc_db_be")
# 在数据库中已存在的植入上报数据对应的产品线信息
sql = "SELECT t.productLineCode, t.companyCode FROM `cmdc-order`.tc_report t where deleteSign = 0"
productLineCode = mysql_handle.selectSql(host, port, user, pwd, "cmdc-order", sql)[0][0]
companyCode = mysql_handle.selectSql(host, port, user, pwd, "cmdc-order", sql)[0][1]
# 第二步获取上报植入列表
url = commonFuc().get_api_url() + commonFuc().get_business_data(module, "url33")
request_body = commonFuc().get_business_data(module, "payload33_1", companyCode, productLineCode)
# 发送请求
result = requests.post(url, json=request_body, headers=headers)
result = json.loads(result.content)
# print(result)
# 生成随机数
report_random = random.randint(0, len(result["data"]["list"]) - 1)
# 获取上报植入信息
batchNumber = result["data"]["list"][report_random]["batchNumber"]
# 判断植入审核时间是否为空
if result["data"]["list"][report_random]["reportAuditTimeStr"]:
reportAuditTimeStart = result["data"]["list"][report_random]["reportAuditTimeStr"][:10] + " 00:00:00"
reportAuditTimeEnd = datetime.datetime.now().strftime("%Y-%m-%d") + " 23:59:59"
# 第三步进行列表查询条件验证
url = commonFuc().get_api_url() + commonFuc().get_business_data(module, "url60")
request_body = commonFuc().get_business_data(module, "payload60", companyCode, productLineCode, reportAuditTimeStart,
reportAuditTimeEnd)
# print(request_body)
"""
场景: 植入审核时间_上报植入筛选
用例名称:植入审核时间_上报植入筛选
输出:{"batchNumber":"%s"}
"""
# 发送请求
result = requests.post(url, json=request_body, headers=headers)
# 获取接口响应时间
api_time = float(result.elapsed.total_seconds())
# 获取上报植入信息
result = json.loads(result.content)
# 将接口响应时间添加至result
result["api_time"] = api_time
# 断言实际结果中是否包含预期结果的内容
commonFuc().check_text_exist(batchNumber, result)
else:
# 第三步进行列表查询条件验证
url = commonFuc().get_api_url() + commonFuc().get_business_data(module, "url60")
request_body = commonFuc().get_business_data(module, "payload60_1", companyCode, productLineCode)
# print(request_body)
# 发送请求
result = requests.post(url, json=request_body, headers=headers)
# 获取接口响应时间
api_time = float(result.elapsed.total_seconds())
# 获取上报植入信息
result = json.loads(result.content)
# 将接口响应时间添加至result
result["api_time"] = api_time
# 断言实际结果中是否包含预期结果的内容
commonFuc().check_text_exist(batchNumber, result)
# -*- encoding=utf8 -*-
__author__ = "liguangyu"
"""
case_tag:cmdc_api,cmdc模板上传接口验证,00,00-13,sit,be
主数据平台:后台运营系统模板上传接口
"""
from common.common_func import commonFuc
from air_case.cmdc_login.后台管理系统登录.后台管理系统登录 import CmdcDoLogin
import requests
import json
import os
module = "cmdc_special_version"
# 第一步登录后台运营系统获取token
username = commonFuc().get_business_data(module, "username_admin")
password = commonFuc().get_business_data(module, "password_admin")
# 获取登录后Cmdc_access_token
cmdc_access_token = CmdcDoLogin(username, password).get_token()
headers = commonFuc().get_business_data(module, "json_headers_file", cmdc_access_token)
# 获取文件地址
BASE_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
file_path = BASE_DIR + "/data/cmdc_files/随货同行单模板.xlsx"
# 进行图片附件上传操作
url = commonFuc().get_api_url() + commonFuc().get_business_data(module, "url13")
# 获取文件
file = open(file_path, "rb")
files = {"file": file}
"""
场景: 模板上传接口验证
用例名称:模板上传接口验证
输出:{"success":true,"code":"200","message":"OK"}
"""
# 发送请求
result = requests.post(url, files=files, headers=headers)
# 获取接口响应时间
api_time = float(result.elapsed.total_seconds())
result = json.loads(result.content)
# print(result)
result["api_time"] = api_time
# 获取预期结果
check_dict = commonFuc().get_business_data(module, "checkDict13")
# print(check_dict)
# 断言实际结果中是否包含预期结果的内容
commonFuc().check_result(check_dict, result)
# -*- encoding=utf8 -*-
__author__ = "liguangyu"
"""
case_tag:cmdc_api,cmdc模板为空_集配模板新增,00,00-16,sit,be
主数据平台:后台运营系统获取集配模板列表接口
"""
from common.common_func import commonFuc
from air_case.cmdc_login.后台管理系统登录.后台管理系统登录 import CmdcDoLogin
import requests
import json
module = "cmdc_special_version"
# 第一步登录后台运营系统获取token
username = commonFuc().get_business_data(module, "username_admin")
password = commonFuc().get_business_data(module, "password_admin")
# 获取登录后Cmdc_access_token
cmdc_access_token = CmdcDoLogin(username, password).get_token()
headers = commonFuc().get_business_data(module, "json_headers", cmdc_access_token)
# 第二步进行集配模板新增操作
url = commonFuc().get_api_url() + commonFuc().get_business_data(module, "url16")
request_body = commonFuc().get_business_data(module, "payload16")
"""
场景: 模板为空_集配模板新增
用例名称:模板为空_集配模板新增
输出:{"success":false,"code":"ERROR","message":"保存失败","data":null,"freshToken":null}
"""
# 发送请求
result = requests.post(url, json=request_body, headers=headers)
# 获取接口响应时间
api_time = float(result.elapsed.total_seconds())
# 将接口响应时间添加至result
result = json.loads(result.content)
print(result)
result["api_time"] = api_time
# 获取预期结果
check_dict = commonFuc().get_business_data(module, "checkDict16")
# print(check_dict)
# 断言实际结果中是否包含预期结果的内容
commonFuc().check_result(check_dict, result)
# -*- encoding=utf8 -*-
__author__ = "liguangyu"
"""
case_tag:cmdc_api,cmdc模板类型为空_集配模板新增,00,00-14,sit,be
主数据平台:后台运营系统获取集配模板列表接口
"""
from common.common_func import commonFuc
from air_case.cmdc_login.后台管理系统登录.后台管理系统登录 import CmdcDoLogin
import requests
import json
module = "cmdc_special_version"
# 第一步登录后台运营系统获取token
username = commonFuc().get_business_data(module, "username_admin")
password = commonFuc().get_business_data(module, "password_admin")
# 获取登录后Cmdc_access_token
cmdc_access_token = CmdcDoLogin(username, password).get_token()
headers = commonFuc().get_business_data(module, "json_headers", cmdc_access_token)
# 第二步进行集配模板新增操作
url = commonFuc().get_api_url() + commonFuc().get_business_data(module, "url14")
request_body = commonFuc().get_business_data(module, "payload14")
"""
场景: 模板类型为空_集配模板新增
用例名称:模板类型为空_集配模板新增
输出:{"success":false,"code":"error","message":"模板类型不能为空"}
"""
# 发送请求
result = requests.post(url, json=request_body, headers=headers)
# 获取接口响应时间
api_time = float(result.elapsed.total_seconds())
# 将接口响应时间添加至result
result = json.loads(result.content)
# print(result)
result["api_time"] = api_time
# 获取预期结果
check_dict = commonFuc().get_business_data(module, "checkDict14")
# print(check_dict)
# 断言实际结果中是否包含预期结果的内容
commonFuc().check_result(check_dict, result)
# -*- encoding=utf8 -*-
__author__ = "liguangyu"
"""
case_tag:cmdc_api,cmdc模糊搜索_集配模板新增集配客户下拉框,00,00-22,sit,be
主数据平台:后台运营系统获取集配模板新增集配客户下拉选项接口
"""
from common.common_func import commonFuc
from air_case.cmdc_login.后台管理系统登录.后台管理系统登录 import CmdcDoLogin
import requests
import random
import json
module = "cmdc_special_version"
# 第一步登录后台运营系统获取token
username = commonFuc().get_business_data(module, "username_admin")
password = commonFuc().get_business_data(module, "password_admin")
# 获取登录后Cmdc_access_token
cmdc_access_token = CmdcDoLogin(username, password).get_token()
headers = commonFuc().get_business_data(module, "json_headers", cmdc_access_token)
# 第二步获取集配模板新增集配客户下拉选项
url = commonFuc().get_api_url() + commonFuc().get_business_data(module, "url23")
request_body = commonFuc().get_business_data(module, "payload23")
# 发送请求
result = requests.get(url, params=request_body, headers=headers)
result = json.loads(result.content)
# 生成随机数
company_random = random.randint(0, len(result["data"]) - 1)
# 获取集配客户信息
cusCompanyName = result["data"][company_random]["cusCompanyName"][:2]
# 第三步进行模糊查询
url = commonFuc().get_api_url() + commonFuc().get_business_data(module, "url22")
request_body = commonFuc().get_business_data(module, "payload22", cusCompanyName)
"""
场景: 模糊搜索_集配模板新增集配客户下拉框
用例名称:模糊搜索_集配模板新增集配客户下拉框
输出:{"cusCompanyName":"%s"}
"""
# 发送请求
result = requests.get(url, params=request_body, headers=headers)
# 获取接口响应时间
api_time = float(result.elapsed.total_seconds())
# 获取集配客户信息
result = json.loads(result.content)
# print(result)
result = {"cusCompanyName": result["data"][0]["cusCompanyName"][:2]}
# 将接口响应时间添加至result
result["api_time"] = api_time
# 获取预期结果
check_dict = commonFuc().get_business_data(module, "checkDict22", cusCompanyName)
# print(check_dict)
# 断言实际结果中是否包含预期结果的内容
commonFuc().check_result(check_dict, result)
# -*- encoding=utf8 -*-
__author__ = "liguangyu"
"""
case_tag:cmdc_api,cmdc模糊查询_获取首营客户信息,00,00-53,sit,be
主数据平台:后台运营系统获取首营客户信息接口
"""
from common.common_func import commonFuc
from air_case.cmdc_login.后台管理系统登录.后台管理系统登录 import CmdcDoLogin
import requests
import random
import json
module = "cmdc_special_version"
# 第一步登录后台运营系统获取token
username = commonFuc().get_business_data(module, "username_admin2")
password = commonFuc().get_business_data(module, "password_admin2")
# 获取登录后Cmdc_access_token
cmdc_access_token = CmdcDoLogin(username, password).get_token()
headers = commonFuc().get_business_data(module, "json_headers", cmdc_access_token)
# 生成随机数
number_random = random.randint(1, 5)
# 第二步获取首营客户信息
url = commonFuc().get_api_url() + commonFuc().get_business_data(module, "url53")
request_body = commonFuc().get_business_data(module, "payload53", number_random)
"""
场景: 模糊查询_获取首营客户信息
用例名称:模糊查询_获取首营客户信息
输出:{"customerCode":"%s"}
"""
# 发送请求
result = requests.post(url, json=request_body, headers=headers)
# 获取接口响应时间
api_time = float(result.elapsed.total_seconds())
# 获取客户信息
result = json.loads(result.content)
result = {"customerCode": result["data"][0]["customerCode"]}
# 将接口响应时间添加至result
result["api_time"] = api_time
# print(check_dict)
# 断言实际结果中是否包含预期结果的内容
commonFuc().analysis_json(number_random, result)
# -*- encoding=utf8 -*-
__author__ = "liguangyu"
"""
case_tag:cmdc_api,cmdc生产批号信息为空_新增植入批号,00,00-55,sit,be
主数据平台:后台运营系统待植入批号新增接口
"""
from common.common_func import commonFuc
from air_case.cmdc_login.后台管理系统登录.后台管理系统登录 import CmdcDoLogin
import requests
import json
module = "cmdc_special_version"
# 第一步登录后台运营系统获取token
username = commonFuc().get_business_data(module, "username_admin2")
password = commonFuc().get_business_data(module, "password_admin2")
# 获取登录后Cmdc_access_token
cmdc_access_token = CmdcDoLogin(username, password).get_token()
headers = commonFuc().get_business_data(module, "json_headers", cmdc_access_token)
# 第二步进行待植入批号新增
url = commonFuc().get_api_url() + commonFuc().get_business_data(module, "url55")
request_body = commonFuc().get_business_data(module, "payload55")
"""
场景: 生产批号信息为空_新增植入批号
用例名称:生产批号信息为空_新增植入批号
输出:{"success":false,"code":"ERROR","message":"提交数据不能为空","data":null}
"""
# 发送请求
result = requests.post(url, json=request_body, headers=headers)
# 获取接口响应时间
api_time = float(result.elapsed.total_seconds())
# 将接口响应时间添加至result
result = json.loads(result.content)
result["api_time"] = api_time
# 获取预期结果
check_dict = commonFuc().get_business_data(module, "checkDict55")
# print(check_dict)
# 断言实际结果中是否包含预期结果的内容
commonFuc().check_result(check_dict, result)
# -*- encoding=utf8 -*-
__author__ = "liguangyu"
"""
case_tag:cmdc_api,cmdc生产批号已存在_生产批号新增,00,00-66,sit,be
主数据平台:后台运营系统生产批号新增接口
"""
from common.common_func import commonFuc
from air_case.cmdc_login.后台管理系统登录.后台管理系统登录 import CmdcDoLogin
from common.db.db import mySql
import requests
import json
module = "cmdc_special_version"
# 第一步登录后台运营系统获取token
username = commonFuc().get_business_data(module, "username_admin2")
password = commonFuc().get_business_data(module, "password_admin2")
# 获取登录后Cmdc_access_token
cmdc_access_token = CmdcDoLogin(username, password).get_token()
headers = commonFuc().get_business_data(module, "json_headers", cmdc_access_token)
# 数据库操作
mysql_handle = mySql()
# 获取conf.ini文件中配置的数据库信息
host, port, user, pwd = mysql_handle.getConf(db="cmdc_db_be")
# 在数据库中已存在且未被上报的生产批号信息
sql = "SELECT t.batchNumber,t.customerCode FROM `cmdc-order`.tc_report_batch t " \
"WHERE deleteSign = 0"
batchNumber = mysql_handle.selectSql(host, port, user, pwd, "cmdc-order", sql)[0][0]
customerCode = mysql_handle.selectSql(host, port, user, pwd, "cmdc-order", sql)[0][1]
# 第二步生产批号新增
url = commonFuc().get_api_url() + commonFuc().get_business_data(module, "url66")
request_body = commonFuc().get_business_data(module, "payload66", customerCode, batchNumber)
"""
场景: 生产批号已存在_生产批号新增
用例名称:生产批号已存在_生产批号新增
输出:{"success":false,"code":"ERROR","message":"生产批号:%s已存在,不可录入"}
"""
# 发送请求
result = requests.post(url, json=request_body, headers=headers)
# 获取接口响应时间
api_time = float(result.elapsed.total_seconds())
# 将接口响应时间添加至result
result = json.loads(result.content)
result["api_time"] = api_time
# 获取预期结果
check_dict = commonFuc().get_business_data(module, "checkDict66", batchNumber)
# print(check_dict)
# 断言实际结果中是否包含预期结果的内容
commonFuc().check_result(check_dict, result)
# -*- encoding=utf8 -*-
__author__ = "liguangyu"
"""
case_tag:cmdc_api,cmdc生产批号新增_手动添加_后台,00,00-67,sit,be
主数据平台:后台运营系统生产批号新增接口
"""
from common.common_func import commonFuc
from air_case.cmdc_login.后台管理系统登录.后台管理系统登录 import CmdcDoLogin
from common.db.db import mySql
import requests
import json
module = "cmdc_special_version"
# 第一步登录后台运营系统获取token
username = commonFuc().get_business_data(module, "username_admin2")
password = commonFuc().get_business_data(module, "password_admin2")
# 获取登录后Cmdc_access_token
cmdc_access_token = CmdcDoLogin(username, password).get_token()
headers = commonFuc().get_business_data(module, "json_headers", cmdc_access_token)
# 数据库操作
mysql_handle = mySql()
# 获取conf.ini文件中配置的数据库信息
host, port, user, pwd = mysql_handle.getConf(db="cmdc_db_be")
# 在数据库中已存在且未被上报的生产批号信息
sql = "SELECT t.customerCode,t.productCode,t.batchNumber,t.productLineCode FROM `cmdc-order`.tc_report_batch t " \
"WHERE deleteSign = 0"
customerCode = mysql_handle.selectSql(host, port, user, pwd, "cmdc-order", sql)[0][0]
productCode = mysql_handle.selectSql(host, port, user, pwd, "cmdc-order", sql)[0][1]
batchNumber = mysql_handle.selectSql(host, port, user, pwd, "cmdc-order", sql)[0][2]
productLineCode = mysql_handle.selectSql(host, port, user, pwd, "cmdc-order", sql)[0][3]
# 第二步生产批号新增
url = commonFuc().get_api_url() + commonFuc().get_business_data(module, "url67")
request_body = [
commonFuc().get_business_data(module, "payload67", customerCode, productCode, batchNumber, productLineCode)]
"""
场景: 生产批号新增_手动添加_后台
用例名称:生产批号新增_手动添加_后台
输出:{"batchNumber":"%s"}
"""
# 发送请求
result = requests.post(url, json=request_body, headers=headers)
# 获取接口响应时间
api_time = float(result.elapsed.total_seconds())
# 获取生产批号
result = json.loads(result.content)
result = {"batchNumber": result["data"]["reportBatchList"][0]["batchNumber"]}
# 将接口响应时间添加至result
result["api_time"] = api_time
# 获取预期结果
check_dict = commonFuc().get_business_data(module, "checkDict67", batchNumber)
# print(check_dict)
# 断言实际结果中是否包含预期结果的内容
commonFuc().check_result(check_dict, result)
# -*- encoding=utf8 -*-
__author__ = "liguangyu"
"""
case_tag:cmdc_api,cmdc站点信息为空_获取集配客户信息,00,00-83,sit,be
主数据平台:后台运营系统获取集配客户信息接口
"""
from common.common_func import commonFuc
from air_case.cmdc_login.后台管理系统登录.后台管理系统登录 import CmdcDoLogin
import requests
import json
module = "cmdc_special_version"
# 第一步登录后台运营系统获取token
username = commonFuc().get_business_data(module, "username_admin")
password = commonFuc().get_business_data(module, "password_admin")
# 获取登录后Cmdc_access_token
cmdc_access_token = CmdcDoLogin(username, password).get_token()
headers = commonFuc().get_business_data(module, "json_headers", cmdc_access_token)
# 第二步获取集配客户信息
url = commonFuc().get_api_url() + commonFuc().get_business_data(module, "url83")
request_body = commonFuc().get_business_data(module, "payload83")
"""
场景: 站点信息为空_获取集配客户信息
用例名称:站点信息为空_获取集配客户信息
输出:{"success":true,"code":"200","message":"OK","data":[]}
"""
# 发送请求
result = requests.get(url, params=request_body, headers=headers)
# 获取接口响应时间
api_time = float(result.elapsed.total_seconds())
# 将接口响应时间添加至result
result = json.loads(result.content)
result["api_time"] = api_time
# 获取预期结果
check_dict = commonFuc().get_business_data(module, "checkDict83")
# print(check_dict)
# 断言实际结果中是否包含预期结果的内容
commonFuc().check_result(check_dict, result)
# -*- encoding=utf8 -*-
__author__ = "liguangyu"
"""
case_tag:cmdc_api,cmdc获取三方需求单审核明细信息,00,00-6,sit,be
主数据平台:后台运营系统获取需求单列表接口
"""
from common.common_func import commonFuc
from air_case.cmdc_login.后台管理系统登录.后台管理系统登录 import CmdcDoLogin
import requests
import random
import json
module = "cmdc_special_version"
# 第一步登录后台运营系统获取token
username = commonFuc().get_business_data(module, "username_do_be")
password = commonFuc().get_business_data(module, "password_do_be")
# 获取登录后Cmdc_access_token
cmdc_access_token = CmdcDoLogin(username, password).get_token()
headers = commonFuc().get_business_data(module, "json_headers", cmdc_access_token)
# 第二步获取需求单列表
url = commonFuc().get_api_url() + commonFuc().get_business_data(module, "url2")
request_body = commonFuc().get_business_data(module, "payload2")
# 发送请求
result = requests.post(url, json=request_body, headers=headers)
result = json.loads(result.content)
# 生成随机数
demand_random = random.randint(0, len(result["data"]["list"]) - 1)
# 获取需求单信息
demandId = result["data"]["list"][demand_random]["demandId"]
# 第三步获取三方需求单审核明细信息
url = commonFuc().get_api_url() + commonFuc().get_business_data(module, "url6")
request_body = commonFuc().get_business_data(module, "payload6", demandId)
"""
场景: 获取三方需求单审核明细信息
用例名称:获取三方需求单审核明细信息
输出:{"demandId":"%s"}
"""
# 发送请求
result = requests.get(url, params=request_body, headers=headers)
# 获取接口响应时间
api_time = float(result.elapsed.total_seconds())
# 获取需求单审核明细信息
result = json.loads(result.content)
result = {"demandId": result["data"]["demandId"]}
# 将接口响应时间添加至result
result["api_time"] = api_time
# 获取预期结果
check_dict = commonFuc().get_business_data(module, "checkDict6", demandId)
# print(check_dict)
# 断言实际结果中是否包含预期结果的内容
commonFuc().check_result(check_dict, result)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment