main.py 1.76 KiB
import threading
import time
from airtest.core.api import *
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from airtest_selenium.proxy import *
def job3():
    print("这是一个需要执行的任务。。。。。")
    from testcase.od.多明细.销售订单_多明细100 import create_salesorder
    create_salesorder(2)
    print("当前线程的个数:", threading.active_count() )
    time.sleep(1)
    print("当前线程的信息:", threading.current_thread())
def job1():
    print("这是一个需要执行的任务。。。。。")
    # from testcase.od.多明细.调拨出库单多明细100 import create_salesorder1
    from testcase.od.多明细.手术请领单多明细100 import create_salesorder
    create_salesorder(2)
    print("当前线程的个数:", threading.active_count() )
    time.sleep(1)
    print("当前线程的信息:", threading.current_thread())
if __name__ == "__main__":
    # 创建多线程时, 需要制定该线程执行的任务.name线程名字 target目标函数名
    # t1 = threading.Thread(target=create_salesorder,name="job1")
    # t2 = threading.Thread(target=create_salesorder1,name="job2")
    # t1.start()
    # t2.start()
    count = 25
    # semaphore = threading.Semaphore(value=1)
    # for i in range(count):
    #     if i<15:
    #         t = threading.Thread(target=job3)
    #         t.start()
    #     if i>=15:
    #         t = threading.Thread(target=job1)
    #         t.start()
    #     # t.join()
    for i in range(count):
        t = threading.Thread(target=job1)
        t.start()
    print(threading.active_count())
    # sleep(30)
    # sys.exit(0)
    # print("程序执行结束.....")
    # for i in range(10):
    #     from testcase.od.销售订单 import *
    #     start1()