Commit 16c3516a authored by 17322369953's avatar 17322369953
Browse files

air框架改造-登录案例改造

parent 9f105e0a
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
<module type="PYTHON_MODULE" version="4"> <module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager"> <component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" /> <content url="file://$MODULE_DIR$" />
<orderEntry type="jdk" jdkName="D:\python\python.exe" jdkType="Python SDK" /> <orderEntry type="jdk" jdkName="Python 3.7" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" /> <orderEntry type="sourceFolder" forTests="false" />
</component> </component>
</module> </module>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<project version="4"> <project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="D:\python\python.exe" project-jdk-type="Python SDK" /> <component name="ProjectRootManager" version="2" project-jdk-name="Python 3.7" project-jdk-type="Python SDK" />
</project> </project>
\ No newline at end of file
# author:qinguanglei # author:qinguanglei
# ddate:2023/11/21 17:12 # ddate:2023/11/21 17:12
from common.dbutils import SQLHelper
from common.logger import log
class SwitchAction(object): class SwitchAction(object):
def close_relate_verify(self): def close_relate_verify(self, company_code):
# 前关联交易结果
curr = SQLHelper.fetch_one(
'select COMPANY_CODE, F_RELATION_CHECK_LEVEL from F_RELATION_CHECK_LEVEL where COMPANY_CODE = %s',
[company_code])
log.info(f'当前关联交易结果:{curr}')
# 关闭关联交易验证,常规测试,关闭该校验 # 关闭关联交易验证,常规测试,关闭该校验
pass SQLHelper.update_one('update BO_EU_BSI_COMPANY set F_RELATION_CHECK_LEVEL = %s where COMPANY_CODE = %s',
[" ", company_code])
now = SQLHelper.fetch_one(
'select COMPANY_CODE, F_RELATION_CHECK_LEVEL from F_RELATION_CHECK_LEVEL where COMPANY_CODE = %s',
[company_code])
# 关闭关联交易后结果
log.info(f'关闭关联交易结果:{now}')
def open_relate_verify(self): def open_relate_verify(self, company_code, level):
# 打开关联交易验证 # 打开关联交易验证
pass # 当前关联交易结果
curr = SQLHelper.fetch_one(
'select COMPANY_CODE, F_RELATION_CHECK_LEVEL from F_RELATION_CHECK_LEVEL where COMPANY_CODE = %s',
[company_code])
log.info(f'当前关联交易结果:{curr}')
# 关闭关联交易验证,常规测试,关闭该校验
SQLHelper.update_one('update BO_EU_BSI_COMPANY set F_RELATION_CHECK_LEVEL = %s where COMPANY_CODE = %s',
[level, company_code])
now = SQLHelper.fetch_one(
'select COMPANY_CODE, F_RELATION_CHECK_LEVEL from F_RELATION_CHECK_LEVEL where COMPANY_CODE = %s',
[company_code])
# 关闭关联交易后结果
log.info(f'打开关联交易结果:{now}')
def close_license_verify(self): def close_license_verify(self):
# 关闭证照校验,常规测试,关闭该校验 # 关闭证照校验,常规测试,关闭该校验
...@@ -19,10 +45,35 @@ class SwitchAction(object): ...@@ -19,10 +45,35 @@ class SwitchAction(object):
# 打开证照校验 # 打开证照校验
pass pass
def close_credit_control(self): def close_credit_control(self, company_address):
# 关闭信用管控,常规测试,关闭该校验 # 关闭信用管控,常规测试,关闭该校验
# 当前信用管控结果
curr = SQLHelper.fetch_one(
'select COMPANY_ADDRESS, LIMIT_CONTROL,PAYDAYS from BO_EU_BSI_CREDIT_CONSTANT where COMPANY_ADDRESS = %s',
[company_address])
log.info(f'当前信用管控结果:{curr}')
SQLHelper.update_one(
'update BO_EU_BSI_CREDIT_CONSTANT set LIMIT_CONTROL = %s and PAYDAYS = %s where COMPANY_ADDRESS = %s',
["N", "N", company_address])
# 关闭后信用管控结果
now = SQLHelper.fetch_one(
'select COMPANY_ADDRESS, LIMIT_CONTROL,PAYDAYS from BO_EU_BSI_CREDIT_CONSTANT where COMPANY_ADDRESS = %s',
[company_address])
log.info(f'关闭后信用管控结果:{curr}')
pass pass
def open_credit_control(self): def open_credit_control(self, company_address, limit_control, paydays):
# 打开信用管控 # 打开信用管控
pass # 当前信用管控结果
curr = SQLHelper.fetch_one(
'select COMPANY_ADDRESS, LIMIT_CONTROL,PAYDAYS from BO_EU_BSI_CREDIT_CONSTANT where COMPANY_ADDRESS = %s',
[company_address])
log.info(f'当前信用管控结果:{curr}')
SQLHelper.update_one(
'update BO_EU_BSI_CREDIT_CONSTANT set LIMIT_CONTROL = %s and PAYDAYS = %s where COMPANY_ADDRESS = %s',
[limit_control, paydays, company_address])
# 关闭后信用管控结果
now = SQLHelper.fetch_one(
'select COMPANY_ADDRESS, LIMIT_CONTROL,PAYDAYS from BO_EU_BSI_CREDIT_CONSTANT where COMPANY_ADDRESS = %s',
[company_address])
log.info(f'打开后信用管控结果:{now}')
# author:qinguanglei
# ddate:2023/11/17 15:54
"""
case_tag:case,case_login,case_login_no_input_userid
"""
from selenium import webdriver
from test_login.steps_login import StepsLogin
option = webdriver.ChromeOptions()
# option.add_experimental_option("detach", True) # 设置不要自动关闭浏览器
# 获取驱动器
driver = webdriver.Chrome(options=option)
# 最大化页面
driver.maximize_window()
# 载入步骤集合
steps_login = StepsLogin(driver)
# 打开浏览器
steps_login.open_browser()
# 输入账号密码,只输入密码
steps_login.enter_account_password(2)
# 输入验证码,输入正确的验证码
steps_login.enter_verify_code(0)
# 点击登录
steps_login.click_login_button()
# 判断结果
steps_login.judge_result_error(2)
# author:qinguanglei
# ddate:2023/11/20 9:56
"""
case_tag:case,case_login,case_login_no_input_verifycode
"""
from selenium import webdriver
from test_login.steps_login import StepsLogin
"""家庭作业"""
option = webdriver.ChromeOptions()
# option.add_experimental_option("detach", True) # 设置不要自动关闭浏览器
# 获取驱动器
driver = webdriver.Chrome(options=option)
# 最大化页面
driver.maximize_window()
"""开始执行案例"""
# 载入步骤集合
steps_login = StepsLogin(driver)
# 打开浏览器
steps_login.open_browser()
# 输入账号密码,都输入正确的
steps_login.enter_account_password(0)
# 输入验证码,不输入验证码
steps_login.enter_verify_code(2)
# 点击登录
steps_login.click_login_button()
# 判断结果
steps_login.judge_result_error(3)
# author:qinguanglei
# ddate:2023/11/20 9:43
"""
case_tag:case,case_login,case_login_password_error
"""
from selenium import webdriver
from test_login.steps_login import StepsLogin
"""家庭作业"""
option = webdriver.ChromeOptions()
# option.add_experimental_option("detach", True) # 设置不要自动关闭浏览器
# 获取驱动器
driver = webdriver.Chrome(options=option)
# 最大化页面
driver.maximize_window()
"""开始执行案例"""
# 载入步骤集合
steps_login = StepsLogin(driver)
# 打开浏览器
steps_login.open_browser()
# 输入账号密码,密码输入错误
steps_login.enter_account_password(4)
# 输入验证码,输入正确的验证码
steps_login.enter_verify_code(0)
# 点击登录
steps_login.click_login_button()
# 判断结果
steps_login.judge_result_error(1)
# author:qinguanglei
# ddate:2023/11/16 11:18
"""
case_tag:case,case_login,case_login_success
"""
from selenium import webdriver
from test_login.steps_login import StepsLogin
"""家庭作业"""
option = webdriver.ChromeOptions()
# option.add_experimental_option("detach", True) # 设置不要自动关闭浏览器
# 获取驱动器
driver = webdriver.Chrome(options=option)
# 最大化页面
driver.maximize_window()
"""开始执行案例"""
# 载入步骤集合
steps_login = StepsLogin(driver)
# 打开浏览器
steps_login.open_browser()
# 输入账号密码,账号与密码都输入
steps_login.enter_account_password(0)
# 输入验证码,正确的验证码
steps_login.enter_verify_code(0)
# 点击登录
steps_login.click_login_button()
# 判断结果
steps_login.judge_result()
# author:qinguanglei
# ddate:2023/11/20 9:23
"""
case_tag:case,case_login,case_login_userid_error
"""
from selenium import webdriver
from test_login.steps_login import StepsLogin
"""家庭作业"""
option = webdriver.ChromeOptions()
# option.add_experimental_option("detach", True) # 设置不要自动关闭浏览器
# 获取驱动器
driver = webdriver.Chrome(options=option)
# 最大化页面
driver.maximize_window()
"""开始执行案例"""
# 载入步骤集合
steps_login = StepsLogin(driver)
# 打开浏览器
steps_login.open_browser()
# 输入账号密码,用户名输入错误
steps_login.enter_account_password(3)
# 输入验证码,输入正确的验证码
steps_login.enter_verify_code(0)
# 点击登录
steps_login.click_login_button()
# 判断结果
steps_login.judge_result_error(1)
# author:qinguanglei
# ddate:2023/11/16 16:40
"""
case_tag:case,case_login,case_login_verify_code_error
"""
from selenium import webdriver
from test_login.steps_login import StepsLogin
"""家庭作业"""
option = webdriver.ChromeOptions()
# option.add_experimental_option("detach", True) # 设置不要自动关闭浏览器
# 获取驱动器
driver = webdriver.Chrome(options=option)
# 最大化页面
driver.maximize_window()
"""开始执行案例"""
# 载入步骤集合
steps_login = StepsLogin(driver)
# 打开浏览器
steps_login.open_browser()
# 输入账号密码,都输入正确的
steps_login.enter_account_password(0)
# 输入验证码,错误的验证码
steps_login.enter_verify_code(1)
# 点击登录
steps_login.click_login_button()
# 判断结果
steps_login.judge_result_error(0)
import re
import os
def get_message(case_path, case_name):
"""
从注释中获取某个文件的 case tag 信息
"""
tags = []
pyfile = open(case_path + os.sep + case_name + '.py', 'r', encoding='utf-8')
lines = pyfile.readlines()
for line in lines:
if 'case_tag' in line:
line = line.split('case_tag')[-1]
#对不规范的写法做一些处理,如中文分号, 中文逗号,多空格等。。
line = re.sub(' ', '', line)
line = line.replace(',', ',').replace(':', '').replace(':', '')
line = line.strip().strip('\n')
tags = line.split(',')
pyfile.close()
return tags
def get_case_tag_list(case_path, cases):
"""
遍历case目录下的文件,从文件中获取case tag,加入列表中
再加个判断,判断是跑线上还是线下
"""
tagList = cases.split('#')
env = os.environ['ENV']
case_list = []
for f in os.listdir(case_path):
nf_path = os.path.join(case_path, f)
if os.path.isdir(nf_path) and nf_path.endswith(".air") == False:
for nf in os.listdir(nf_path):
script = os.path.join(nf_path, nf)
if script.endswith(".air"):
air_name = nf.replace('.air', '')
case_tags = get_message(script, air_name)
flag = False
for con in tagList:
conList = con.split(',')
if list(set(case_tags) & set(conList)):
flag = True
else:
flag = False
break
if flag and (env in case_tags or env in ('sit', 'sita')):
case_hash = {}
case_hash["air_path"] = str(script)
case_hash["air_name"] = str(air_name)
case_hash["module"] = str(script.split(os.sep)[-2])
case_list.append(case_hash)
else:
script = os.path.join(case_path, f)
if f.endswith(".air"):
air_name = f.replace('.air', '')
case_tags = get_message(script, air_name)
flag = False
for con in tagList:
conList = con.split(',')
if list(set(case_tags) & set(conList)):
flag = True
else:
flag = False
break
if flag and (env in case_tags or env in ('sit', 'sita')):
case_hash = {}
case_hash["air_path"] = str(script)
case_hash["air_name"] = str(air_name)
case_hash["module"] = str(script.split(os.sep)[-2])
case_list.append(case_hash)
return case_list
def get_case_tag_list_2(case_path, cases):
"""
遍历case目录下的文件,从文件中获取case tag,加入列表中
"""
tagList = cases.split('#')
case_list = []
for f in os.listdir(case_path):
script = os.path.join(case_path, f)
air_name = f.replace('.air', '')
case_tags = get_message(script, air_name)
flag = False
for con in tagList:
conList = con.split(',')
if list(set(case_tags) & set(conList)):
flag = True
else:
flag = False
break
if flag:
case_list.append(air_name)
return case_list
# -*- coding: utf-8 -*-
import unittest
import os
import sys
import six
import re
import shutil
import traceback
import warnings
from io import open
from airtest.core.api import G, auto_setup, log
from airtest.core.settings import Settings as ST
from airtest.utils.compat import decode_path, script_dir_name, script_log_dir
from copy import copy
class AirtestCase(unittest.TestCase):
PROJECT_ROOT = "."
SCRIPTEXT = ".air"
TPLEXT = ".png"
@classmethod
def setUpClass(cls):
cls.args = args
setup_by_args(args)
# setup script exec scope
cls.scope = copy(globals())
cls.scope["exec_script"] = cls.exec_other_script
def setUp(self):
if self.args.log and self.args.recording:
for dev in G.DEVICE_LIST:
try:
dev.start_recording()
except:
traceback.print_exc()
def tearDown(self):
if self.args.log and self.args.recording:
for k, dev in enumerate(G.DEVICE_LIST):
try:
output = os.path.join(self.args.log, "recording_%d.mp4" % k)
dev.stop_recording(output)
except:
traceback.print_exc()
def runTest(self):
scriptpath, pyfilename = script_dir_name(self.args.script)
pyfilepath = os.path.join(scriptpath, pyfilename)
pyfilepath = os.path.abspath(pyfilepath)
self.scope["__file__"] = pyfilepath
with open(pyfilepath, 'r', encoding="utf8") as f:
code = f.read()
pyfilepath = pyfilepath.encode(sys.getfilesystemencoding())
try:
exec(compile(code.encode("utf-8"), pyfilepath, 'exec'), self.scope)
except Exception as err:
tb = traceback.format_exc()
log("Final Error", tb)
six.reraise(*sys.exc_info())
@classmethod
def exec_other_script(cls, scriptpath):
"""run other script in test script"""
warnings.simplefilter("always")
warnings.warn("please use using() api instead.", PendingDeprecationWarning)
def _sub_dir_name(scriptname):
dirname = os.path.splitdrive(os.path.normpath(scriptname))[-1]
dirname = dirname.strip(os.path.sep).replace(os.path.sep, "_").replace(cls.SCRIPTEXT, "_sub")
return dirname
def _copy_script(src, dst):
if os.path.isdir(dst):
shutil.rmtree(dst, ignore_errors=True)
os.mkdir(dst)
for f in os.listdir(src):
srcfile = os.path.join(src, f)
if not (os.path.isfile(srcfile) and f.endswith(cls.TPLEXT)):
continue
dstfile = os.path.join(dst, f)
shutil.copy(srcfile, dstfile)
# find script in PROJECT_ROOT
scriptpath = os.path.join(ST.PROJECT_ROOT, scriptpath)
# copy submodule's images into sub_dir
sub_dir = _sub_dir_name(scriptpath)
sub_dirpath = os.path.join(cls.args.script, sub_dir)
_copy_script(scriptpath, sub_dirpath)
# read code
pyfilename = os.path.basename(scriptpath).replace(cls.SCRIPTEXT, ".py")
pyfilepath = os.path.join(scriptpath, pyfilename)
pyfilepath = os.path.abspath(pyfilepath)
with open(pyfilepath, 'r', encoding='utf8') as f:
code = f.read()
# replace tpl filepath with filepath in sub_dir
code = re.sub("[\'\"](\w+.png)[\'\"]", "\"%s/\g<1>\"" % sub_dir, code)
exec(compile(code.encode("utf8"), pyfilepath, 'exec'), cls.scope)
def setup_by_args(args):
# init devices
if isinstance(args.device, list):
devices = args.device
elif args.device:
devices = [args.device]
else:
devices = []
# set base dir to find tpl
dirpath, _ = script_dir_name(args.script)
# set log dir
if args.log:
args.log = script_log_dir(dirpath, args.log)
print("save log in '%s'" % args.log)
else:
print("do not save log")
# guess project_root to be basedir of current .air path
project_root = os.path.dirname(args.script) if not ST.PROJECT_ROOT else None
try:
auto_setup(dirpath, devices, args.log, project_root)
except:
os.system('adb devices')
auto_setup(dirpath, devices, args.log, project_root)
def run_script(parsed_args, testcase_cls=AirtestCase):
global args # make it global deliberately to be used in AirtestCase & test scripts
args = parsed_args
suite = unittest.TestSuite()
suite.addTest(testcase_cls())
result = unittest.TextTestRunner(verbosity=0).run(suite)
if not result.wasSuccessful():
sys.exit(-1)
class ModuleName(object):
SEARCH_MERGER = "search-merger"
SEARCH_SUGGEST = "search-suggest"
SEARCH_PLATFORM_INDEX = "search-platform-index"
SEARCH_PLATFORM_WEB = "search-platform-web"
import configparser
import os
import yaml
cf = configparser.ConfigParser()
curPath = os.path.abspath(os.path.dirname(__file__))
rootPath = rootPath = os.path.split(curPath)[0]
class confOP:
def getConfValue(self, filename, section, option, type='string'):
cf.read(filename)
if (type == 'string'):
value = cf.get(section, option)
else:
value = cf.getint(section, option)
return value
def getYamlValue(self, filename):
"""
获取公共sql中的sql
:param filename:
:return:
"""
file = os.path.join(rootPath, "data" + os.sep + filename)
f = open(file, 'r', encoding='utf-8')
cont = f.read()
value = yaml.load(cont, Loader=yaml.FullLoader)
f.close()
return value
def getBusiYamlValue(self, path, filename):
"""
获取业务中的yaml文件数据
:param path:
:param filename:
:return:
"""
file = os.path.join(path, filename)
f = open(file, 'r', encoding='utf-8')
cont = f.read()
value = yaml.load(cont, Loader=yaml.FullLoader)
f.close()
return value
[meitun_db]
host = rm-bp18as20m3solyi69.mysql.rds.aliyuncs.com
port = 3306
user = test_admin
password = ahdp0b.cr76_Pje1
[bj_db]
host = 172.25.1.6
port = 3320
user = local_rw
password = baidugoogleyahoo
[promotion]
livedb = 19
liveenv = 4
liveserver = 129
predb = 19
preenv = 2
preserver = 58
[mongo]
ip = 127.0.0.1
port = 27017
[redis]
redis_host=192.168.24.31
import pymysql
import os
import sys
from common.confop import confOP
from common.db.sql.sqlByIdb import sqlByIdb
import pymongo
curPath = os.path.abspath(os.path.dirname(__file__))
rootPath = os.path.split(curPath)[0]
sys.path.append(rootPath)
class dbOP():
def __init__(self, path=rootPath):
self.path = path
def selectSql(self, schema, param=[], db="bj_db"):
host = mySql().getConf(db)[0]
port = mySql().getConf(db)[1]
user = mySql().getConf(db)[2]
pwd = mySql().getConf(db)[3]
value = confOP().getYamlValue("sql")
if schema not in value:
value = confOP().getBusiYamlValue(self.path, "sql")
env = os.environ["ENV"]
if env == "on" or env == "pre":
database = value[schema][0]
sql = value[schema][1]
param = param
result = sqlByIdb().selectSql(database, sql, param, env)
else:
result = mySql().selectSql(host, port, user, pwd, value[schema][0], value[schema][1], param)
return result
def ddlSql(self, schema, param=[], db="bj_db"):
host = mySql().getConf(db)[0]
port = mySql().getConf(db)[1]
user = mySql().getConf(db)[2]
pwd = mySql().getConf(db)[3]
value = confOP().getYamlValue("sql")
if schema not in value:
value = confOP().getBusiYamlValue(self.path, "sql")
result = mySql().executeUpdate(host, port, user, pwd, value[schema][0], value[schema][1], param)
return result
class mySql:
def __init__(self):
pass
def getConf(self, db="bj_db"):
host = confOP().getConfValue(curPath + "/conf.ini", db, "host")
port = confOP().getConfValue(curPath + "/conf.ini", db, "port", "int")
user = confOP().getConfValue(curPath + "/conf.ini", db, "user")
pwd = confOP().getConfValue(curPath + "/conf.ini", db, "password")
return host,port,user,pwd
def connection(self, host, port, user, pwd, database):
return pymysql.connect(host=host, port=port,user=user,passwd=pwd,db=database)
def selectSql(self, host, port, user, pwd, database, sql, param=[]):
conn = self.connection(host, port, user, pwd, database)
cursor = conn.cursor()
try:
cursor.execute(sql, param)
data = cursor.fetchall()
cols = cursor.description
col = []
for i in cols:
col.append(i[0])
data = list(map(list, data))
return data
except Exception as e:
print(e)
conn.rollback()
cursor.close()
conn.close()
def executeUpdate(self, host, port, user, pwd, database, sql, param=[]):
conn = self.connection(host, port, user, pwd, database)
cursor = conn.cursor()
try:
ret = cursor.execute(sql, param)
conn.commit()
return ret
except Exception as e:
print(e)
conn.rollback()
cursor.close()
conn.close()
class mongodb:
def connection(self,db,collect):
ip = confOP().getConfValue(curPath + "/conf.ini", "mongo", "ip")
port = confOP().getConfValue(curPath + "/conf.ini", "mongo", "port")
path = "mongodb://" + ip + ":" + port + "/"
myclient = pymongo.MongoClient(path)
mydb = myclient[db]
mycol = mydb[collect]
return mycol
#查询所有的值
def findALl(self, db, collect):
result = []
mycol = self.connection(db, collect)
for x in mycol.find():
result.append(x)
return result
#按照条件查询:条件为{}类型
def findByCon(self, db, collect, condition):
result = []
mycol = self.connection(db, collect)
for x in mycol.find(condition):
result.append(x)
return result
if __name__ == '__main__':
env = 'sit'
os.environ['ENV'] = env.lower()
path = "D:\\myCode\\autotest-airtest-local\\data\\月嫂"
#mysql的例子
result = dbOP(path).selectSql("local_worker_info")
print(result[0])
#monggdb的例子
#mongodb().findByCon("yapi","user", {"role": "admin"})
#redis的例子
#redisClass().connect()
This diff is collapsed.
# -*- encoding=utf8 -*-
import base64
import requests
import json
from common.confop import confOP
class sqlByIdb:
# 登录
def login(self):
url = "http://docp.plt.babytree-inc.com/api/v1/passport/login/"
header = {
'Content-Type': 'application/json'
}
username = "dGVzdF9jbG91ZA=="
password = "eDZpbkRITnJVRWRl"
postdata = json.dumps({
"username": base64.b64decode(username.encode('utf-8')).decode("utf-8"),
"password": base64.b64decode(password.encode('utf-8')).decode("utf-8")
})
response = requests.request("POST", url, headers=header, data=postdata)
result = json.loads(response.text.encode('utf8'))
return result
def selectSql(self, database, sql, param, env):
login_result = sqlByIdb().login()
sql = sql.replace('%d', '%s')
for pa in param:
pa = "'" + str(pa) + "'"
sql = sql.replace('%s', pa, 1)
print(sql)
value = confOP().getYamlValue("idbSet")
if env == "on":
env = "live"
instance_id = value[database][env]
url = "http://docp.plt.babytree-inc.com/api/v1/sql/mysql/query/query/"
payload = json.dumps({
"instance_id": instance_id,
"db_name": database,
"tb_name": database,
"limit_num": 3000,
"sqlContent": sql
})
headers = {
'Authorization': login_result["sid"],
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
result = json.loads(response.text.encode('utf8'))
return result["results"][0]["rows"]
# -*- encoding=utf8 -*-
import pymysql
class delByBranchId(object):
def Delete_branch_by_id(self,id):
connection = pymysql.connect(host="10.17.65.108", user="root", password="Cmic.2023", database="spd3_herp_test2",
charset="utf8")
cursor = connection.cursor()
# 删除新增的院区数据,减少垃圾数据的产生,保证脚本下次还可以正常运行,接口脚本每次都使用新增数据
sql = "DELETE from mcms_branch_info where id = '%s';" % id
print(sql)
cursor.execute(sql)
cursor.execute("commit;")
print('%s院区数据已删除成功' % id)
cursor.close()
# delByBranchId().Delete_branch_by_name('11122222')
\ No newline at end of file
# -*- encoding=utf8 -*-
import pymysql
class delKindGoodsinfo(object):
def delKindGoodsinfo(self):
connection = pymysql.connect(host="10.17.65.108", user="root", password="Cmic.2023", database="spd3_herp_test2",
charset="utf8")
cursor = connection.cursor()
# 删除耗材分类下的产品,便于脚本下次执行时再次添加产品信息到该耗材分类中
sql = "DELETE from mcms_goods_kind_collection where kind_id=(select id from mcms_goods_kind where kind_name='9999_2');"
print(sql)
cursor.execute(sql)
cursor.execute("commit;")
print('耗材分类下的产品删除完毕' )
cursor.close()
def delkind(self):
connection = pymysql.connect(host="10.17.65.108", user="root", password="Cmic.2023", database="spd3_herp_test2",
charset="utf8")
cursor = connection.cursor()
# 删除耗材分类下的产品,便于脚本下次执行时再次添加产品信息到该耗材分类中
sql = "DELETE from mcms_goods_kind where kind_code in('9999','9999_2');"
print(sql)
cursor.execute(sql)
cursor.execute("commit;")
print('耗材分类删除完毕')
# delByBranchId().Delete_branch_by_name('11122222')
\ No newline at end of file
# -*- encoding=utf8 -*-
import pymysql
class delByNoticeName(object):
def delete_ByNoticeName(self):
connection = pymysql.connect(host="10.17.65.108", user="root", password="Cmic.2023", database="spd3_herp_test2",
charset="utf8")
cursor = connection.cursor()
# 删除新增的院区数据,减少垃圾数据的产生,保证脚本下次还可以正常运行,接口脚本每次都使用新增数据
sql = "update sys_notice set del_flag=1 where title like '%test%';"
print(sql)
cursor.execute(sql)
cursor.execute("commit;")
print('%s公告数据已逻辑删除成功')
cursor.close()
# delByNoticeName().delete_ByNoticeName()
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment