import threading import time from airtest.core.api import * from selenium import webdriver from selenium.webdriver.common.keys import Keys from airtest_selenium.proxy import * def job3(): print("这是一个需要执行的任务。。。。。") create_salesorder(2) print("当前线程的个数:", threading.active_count() ) time.sleep(1) print("当前线程的信息:", threading.current_thread()) def job1(): print("这是一个需要执行的任务。。。。。") create_salesorder1(2) print("当前线程的个数:", threading.active_count() ) time.sleep(1) print("当前线程的信息:", threading.current_thread()) if __name__ == "__main__": # 创建多线程时, 需要制定该线程执行的任务.name线程名字 target目标函数名 from testcase.od.库存预占边界测试.销售订单1 import * from testcase.od.库存预占边界测试.调拨出库单1 import * # t1 = threading.Thread(target=create_salesorder,name="job1") # t2 = threading.Thread(target=create_salesorder1,name="job2") # t1.start() # t2.start() count = 4 #4个人下单,只有3个商品 # semaphore = threading.Semaphore(value=1) for i in range(count): if i<2: t = threading.Thread(target=job3) t.start() if i>=2: t = threading.Thread(target=job1) t.start() # t.join() print(threading.active_count()) # sleep(30) # sys.exit(0) # print("程序执行结束.....") # for i in range(10): # from testcase.od.销售订单 import * # start1()