库存预占边界测试.py 1.58 KiB
import threading
import time
from airtest.core.api import *
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from airtest_selenium.proxy import *
def job3():
    print("这是一个需要执行的任务。。。。。")
    create_salesorder(2)
    print("当前线程的个数:", threading.active_count() )
    time.sleep(1)
    print("当前线程的信息:", threading.current_thread())
def job1():
    print("这是一个需要执行的任务。。。。。")
    create_salesorder1(2)
    print("当前线程的个数:", threading.active_count() )
    time.sleep(1)
    print("当前线程的信息:", threading.current_thread())
if __name__ == "__main__":
    # 创建多线程时, 需要制定该线程执行的任务.name线程名字 target目标函数名
    from testcase.od.库存预占边界测试.销售订单1 import *
    from testcase.od.库存预占边界测试.调拨出库单1 import *
    # t1 = threading.Thread(target=create_salesorder,name="job1")
    # t2 = threading.Thread(target=create_salesorder1,name="job2")
    # t1.start()
    # t2.start()
    count = 4  #4个人下单,只有3个商品
    # semaphore = threading.Semaphore(value=1)
    for i in range(count):
        if i<2:
            t = threading.Thread(target=job3)
            t.start()
        if i>=2:
            t = threading.Thread(target=job1)
            t.start()
        # t.join()
    print(threading.active_count())
    # sleep(30)
    # sys.exit(0)
    # print("程序执行结束.....")
    # for i in range(10):
    #     from testcase.od.销售订单 import *
    #     start1()