runner.py 3.72 KiB
# -*- encoding=utf8 -*-
import os
import sys
import platform
import shutil
import common.run_case_conditions as run
import common.case_tag_get as case_tag_get
from common.db.sql.sql_del_branch_info import delData
curPath = os.path.abspath(os.path.dirname(__file__))
rootPath = os.path.split(curPath)[0]
sys.path.append(rootPath)
if __name__ == '__main__':
    # 第一个参数 case 执行类型,'all', 'sc', 'tag'
    caseType = sys.argv[1]
    # 第2个参数要执行
    # 第3个参数,设置日志等级的caseName, 多个文件逗号分割
    caseName = sys.argv[2]
    log_level = sys.argv[3]
    log_level = log_level.lower()
    #获取运行环境,写入环境变量
    env = sys.argv[4]
    os.environ['ENV'] = env.lower()
    job_url = ""
    #判断传入过来的参数数量是否>5个,如果>5,则传过来的第5个参数为job链接
    if len(sys.argv) > 5:
        job_url = sys.argv[5]
    #复制hosts文件
    if platform.system() == 'Windows':
        cmd = "xcopy /s /y  data\hosts  C:\Windows\System32\drivers\etc\ "
    else:
        cmd = 'mv data/hosts /etc/'
    os.system(cmd)
    workspace = os.path.abspath(".")
    case_path = os.path.join(workspace, 'air_case')
    log_path = os.path.join(workspace, 'log')
    runtime_log = os.path.join(log_path, 'runtime.txt')
    if not os.path.exists(log_path):
        os.makedirs('log')
    if not os.path.exists(runtime_log):
        os.system('type nul >log/run_time.txt')
    print("workspace: " + workspace)
    print("air_case_path: " + case_path)
    print("air_log_path: " + log_path)
    print("caseType:" + caseType)
    test = run.CustomAirTestCase(workspace, case_path, log_path, runtime_log, log_level)
    try:
        if caseType == 'sc':
            moduleName = sys.argv[2]
            caseName = sys.argv[3]
            sceName = sys.argv[4]
            log_level = sys.argv[5]
            log_level = log_level.lower()
            # 获取运行环境,写入环境变量
            env = sys.argv[6]
            os.environ['ENV'] = env.lower()
            test.run_by_scenario(moduleName, caseName, sceName)
        else:
            case_list = caseName.split(',')
            if caseType == 'tag':
                tag = sys.argv[2]
                case_list = case_tag_get.get_case_tag_list(case_path, tag)
            run_case, case_list, case_scenario_name = test.merge_new_file(case_list)
            test.run_case(run_case, case_list, job_url, case_scenario_name)
7172737475767778798081828384858687888990919293949596979899100101
except Exception as e: print("Exception:", e) finally: """ 检查air_case中是否有多余的.py文件和场景文件,有就删除 """ dir_list = os.listdir(case_path) for cur_file in dir_list: path = os.path.join(case_path, cur_file) if os.path.isdir(path): file_list = os.listdir(path) for file in file_list: file_path = os.path.join(path, file) if os.path.isfile(file_path): os.remove(file_path) print('删了什么文件',file_path) else: if str(file).startswith("场景"): name = str(file).replace(".air", "") del_file = os.path.join(file_path, name + ".py") if os.path.exists(del_file): os.remove(del_file) shutil.rmtree(file_path) print('删了什么文件', del_file) #删除自动化脚本产生的数据 delData().Delete_branch_by_id() delData().Delete_goods_change_info() delData().Delete_probeInfo_other() delData().Del_supply_relation() print('结束了')