多进程执行分布式自动化测试
发表于:2024-11-28 作者:热门IT资讯网编辑
编辑最后更新 2024年11月28日,场景:进入搜狗,输入搜索关键字进行搜索利用多进程分布式实现from multiprocessing import Poolimport os, timefrom selenium import web
场景:
进入搜狗,输入搜索关键字进行搜索
利用多进程分布式实现
from multiprocessing import Poolimport os, timefrom selenium import webdriverfrom selenium.webdriver.common.keys import Keysfrom multiprocessing import Manager, current_processimport traceback#定义测试行为函数:此处为打开搜狗搜索内容def node_task(name, lock, arg, successTestCases, failTestCases): """ :param name: 执行进程名 :param lock:进程间的共享资源锁 :param arg:node节点计算机、浏览器字典 ,如:{"node": "http://127.0.0.1:6666/wd/hub", "browserName": "chrome"} :param successTestCases:成功执行用例列表 :param failTestCases:失败用例列表 :return:返回成功执行、失败执行的用例列表 """ procName = current_process().name print("当前进程名:",procName) time.sleep(1.2) #获取节点计算机地址 node_host = arg["node"] #获取浏览器 browser = arg["browserName"] print(arg["node"]) print(arg["browserName"]) print('Run task %s (%s)...\n' % (name, os.getpid())) #获取用例执行的开始时间 start = time.time() #获取driver对象 driver = webdriver.Remote( #节点计算机 command_executor="%s" %arg["node"], desired_capabilities={ #浏览器 "browserName": "%s" %arg["browserName"], "video": "True", "platform": "WINDOWS"}) try: driver.maximize_window() driver.get("http://www.sogou.com") assert "搜狗" in driver.title element = driver.find_element_by_id("query") element.send_keys("测试开发") element.send_keys(Keys.RETURN) time.sleep(3) assert "测试开发" in driver.page_source #获取共享锁 lock.acquire() #用例执行成功,用例加入成功列表 successTestCases.append("TestCase: " + str(name)) #释放共享锁 lock.release() print("TestCase: " + str(name) + " done!") except AssertionError as e: #断言失败,打印异常信息 print("AssertionError occur!" " testCase " + str(name)) print(traceback.print_exc()) #保存异常现场图片 driver.save_screenshot("e:\\screenshot" + str(name) + ".png") #获取共享锁 lock.acquire() #存储失败用例信息 failTestCases.append("TestCase: " + str(name)) #释放共享锁 lock.release() print("测试用例执行失败") except Exception as e: print("Exception occur!") print(traceback.print_exc()) driver.save_screenshot("e:\\screenshot" + str(name) + ".png") #获取共享锁,存储失败用例信息,并释放锁 with lock: failTestCases.append("TestCase: " + str(name)) print("测试 用例执行失败") finally: #退出驱动,并关闭所有窗口 driver.quit() end = time.time() #打印用例执行时间 print("Tast %s run %.2f seconds." % (name, (end - start)))#封装多进程执行函数def run(nodeSeq): """ :param nodeSeq: 节点机器和浏览器字典,列表 :return: 返回成功、失败用例列表successTestCases, failTestCases """ #定义共享manager对象 manager = Manager() #创建进程间共享的用例执行成功列表 successTestCases = manager.list([]) # 创建进程间共享的用例执行失败列表 failTestCases = manager.list([]) #创建一个共享资源锁,各进程共享 lock = manager.Lock() #打印父进程ID print("Parent process %s." % os.getpid()) #创建包含3个进程的进程池 p = Pool(processes=3) #获取用例数量 testCaseNumber = len(nodeSeq) #循环索引遍历用例列表,多进程执行node_task函数(搜狗搜索) for i in range(testCaseNumber): p.apply_async(node_task, args=(i + 1, lock, nodeSeq[i], successTestCases, failTestCases)) print("Waiting for all subprocesses done...") #关闭p p.close() #等待各个子进程执行结束 p.join() return successTestCases, failTestCases#封装写测试结果函数def resultReport(testCaseNumber, successTestCases, failTestCases): """ :param testCaseNumber: 用例个数 :param successTestCases: 成功执行列表 :param failTestCases: 失败执行列表 :return: 无 """ print("测试报告: \n") print("共执行测试用例:" + str(testCaseNumber) + "个\n") print("执行成功的测试用例: " + str(len(successTestCases)) + "个") if len(successTestCases) > 0: for t in successTestCases: print(t) else: print("没有执行成功的测试用例") print("执行失败的测试用例: " + str(len(failTestCases)) + "个") if len(failTestCases) > 0: for t in failTestCases: print(t) else: print("没有执行失败的测试用例")if __name__ == "__main__": nodeList = [ {"node": "http://127.0.0.1:6666/wd/hub", "browserName": "internet explorer"}, {"node": "http://127.0.0.1:6666/wd/hub", "browserName": "chrome"}, {"node": "http://127.0.0.1:6666/wd/hub", "browserName": "firefox"}] testCaseNumber = len(nodeList) #执行多进程函数 successTestCases, failTestCases = run(nodeList) print("All processes done") #写测试结果 resultReport(testCaseNumber, successTestCases, failTestCases)