import threading import time from airtest.core.api import * from selenium import webdriver from selenium.webdriver.common.keys import Keys from airtest_selenium.proxy import * def job3(): print("这是一个需要执行的任务。。。。。") from testcase.od.多明细.销售订单_多明细100 import create_salesorder create_salesorder(2) print("当前线程的个数:", threading.active_count() ) time.sleep(1) print("当前线程的信息:", threading.current_thread()) def job1(): print("这是一个需要执行的任务。。。。。") # from testcase.od.多明细.调拨出库单多明细100 import create_salesorder1 from testcase.od.多明细.手术请领单多明细100 import create_salesorder create_salesorder(2) print("当前线程的个数:", threading.active_count() ) time.sleep(1) print("当前线程的信息:", threading.current_thread()) if __name__ == "__main__": # 创建多线程时, 需要制定该线程执行的任务.name线程名字 target目标函数名 # t1 = threading.Thread(target=create_salesorder,name="job1") # t2 = threading.Thread(target=create_salesorder1,name="job2") # t1.start() # t2.start() count = 25 # semaphore = threading.Semaphore(value=1) # for i in range(count): # if i<15: # t = threading.Thread(target=job3) # t.start() # if i>=15: # t = threading.Thread(target=job1) # t.start() # # t.join() for i in range(count): t = threading.Thread(target=job1) t.start() print(threading.active_count()) # sleep(30) # sys.exit(0) # print("程序执行结束.....") # for i in range(10): # from testcase.od.销售订单 import * # start1()