diff --git a/Drivers/Built_In_Driver.py b/Drivers/Built_In_Driver.py index dd2e894c5..dde1b15f6 100755 --- a/Drivers/Built_In_Driver.py +++ b/Drivers/Built_In_Driver.py @@ -9,14 +9,14 @@ from Framework.Built_In_Automation.Sequential_Actions import sequential_actions as sa -def sequential_actions( +async def sequential_actions( step_data, test_action_info, temp_q, debug_actions=None, ): try: - sTestStepReturnStatus = sa.Sequential_Actions( + sTestStepReturnStatus = await sa.Sequential_Actions( step_data, test_action_info, debug_actions, diff --git a/Framework/Built_In_Automation/Sequential_Actions/sequential_actions.py b/Framework/Built_In_Automation/Sequential_Actions/sequential_actions.py index 4ff0044c8..bf43ab147 100755 --- a/Framework/Built_In_Automation/Sequential_Actions/sequential_actions.py +++ b/Framework/Built_In_Automation/Sequential_Actions/sequential_actions.py @@ -279,7 +279,7 @@ def if_else_log_for_actions(left, next_level_step_data, statement="if"): return left + ".... condition matched\n" + "Running actions: " + log_actions -def If_else_action(step_data, data_set_no): +async def If_else_action(step_data, data_set_no): sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME try: data_set = step_data[data_set_no] @@ -526,7 +526,7 @@ def check_operators(): ) return "zeuz_failed" if data_set_index not in inner_skip: - result, skip = Run_Sequential_Actions( + result, skip = await Run_Sequential_Actions( [data_set_index] ) # Running inner_skip = list(set(inner_skip+skip)) @@ -559,7 +559,7 @@ def sanitize_deprecated_dataset(value): return value -def for_loop_action(step_data, data_set_no): +async def for_loop_action(step_data, data_set_no): sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME try: data_set = step_data[data_set_no] @@ -748,7 +748,7 @@ def for_loop_action(step_data, data_set_no): sr.Set_Shared_Variables(CommonUtil.dont_prettify_on_server[0], step_data, protected=True, pretty=False) sr.test_action_info = CommonUtil.all_action_info[step_index] return "zeuz_failed", outer_skip - result, skip = Run_Sequential_Actions([data_set_index]) + result, skip = await Run_Sequential_Actions([data_set_index]) inner_skip = list(set(inner_skip + skip)) outer_skip = list(set(outer_skip + inner_skip)) @@ -848,7 +848,7 @@ def for_loop_action(step_data, data_set_no): return CommonUtil.Exception_Handler(sys.exc_info()), [] -def While_Loop_Action(step_data, data_set_no): +async def While_Loop_Action(step_data, data_set_no): sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME try: data_set = step_data[data_set_no] @@ -947,7 +947,7 @@ def While_Loop_Action(step_data, data_set_no): 3 ) return "zeuz_failed", outer_skip - result, skip = Run_Sequential_Actions( + result, skip = await Run_Sequential_Actions( [data_set_index] ) # new edit: full step data is passed. [step_data[data_set_index]]) # Recursively call this function until all called data sets are complete @@ -1049,7 +1049,7 @@ def ticker_linear_shape(seconds, callable, *args, **kwargs): seconds -= 1 -def Sequential_Actions( +async def Sequential_Actions( step_data, test_action_info, debug_actions=None, @@ -1068,13 +1068,13 @@ def Sequential_Actions( # sr.Set_Shared_Variables("test_action_info", test_action_info, protected=True, print_variable=False) sr.test_action_info = test_action_info - result, skip_for_loop = Run_Sequential_Actions([], debug_actions) + result, skip_for_loop = await Run_Sequential_Actions([], debug_actions) # empty list means run all, instead of step data we want to send the dataset no's of the step data to run write_browser_logs() return result -def Run_Sequential_Actions( +async def Run_Sequential_Actions( data_set_list=None, debug_actions=None ): # data_set_no will used in recursive conditional action call if data_set_list is None: @@ -1100,7 +1100,7 @@ def Run_Sequential_Actions( data_set_list.append(i) if len(data_set_list) == 0 and CommonUtil.debug_status and not sr.Test_Shared_Variables("selenium_driver") and ConfigModule.get_config_value("Inspector", "ai_plugin").strip().lower() in CommonUtil.affirmative_words: - return Action_Handler([["browser", "selenium action", "browser"]], ["browser", "selenium action", "browser"]), [] + return await Action_Handler([["browser", "selenium action", "browser"]], ["browser", "selenium action", "browser"]), [] for dataset_cnt in data_set_list: # For each data set within step data data_set = step_data[dataset_cnt] # Save data set to variable @@ -1196,7 +1196,7 @@ def Run_Sequential_Actions( # If middle column = action, call action handler, but always return a pass elif "optional action" in action_name: - result = Action_Handler(data_set, row) # Pass data set, and action_name to action handler + result = await Action_Handler(data_set, row) # Pass data set, and action_name to action handler if result == "zeuz_failed": CommonUtil.ExecLog(sModuleInfo, "Optional action failed. Returning pass anyway", 2) result = "passed" @@ -1204,7 +1204,7 @@ def Run_Sequential_Actions( # If middle column = conditional action, evaluate data set elif "conditional action" in action_name or "if else" in action_name: if action_name.lower().strip() == "windows conditional action": - result, to_skip = Conditional_Action_Handler(step_data, dataset_cnt) + result, to_skip = await Conditional_Action_Handler(step_data, dataset_cnt) skip += to_skip skip_for_loop += to_skip if result in failed_tag_list: @@ -1215,7 +1215,7 @@ def Run_Sequential_Actions( elif action_name.lower().strip() != "conditional action" and action_name.lower().strip() != "if else": # old style conditional action - result, to_skip = Conditional_Action_Handler(step_data, dataset_cnt) + result, to_skip = await Conditional_Action_Handler(step_data, dataset_cnt) skip += to_skip skip_for_loop += to_skip if result in failed_tag_list: @@ -1224,7 +1224,7 @@ def Run_Sequential_Actions( break else: - result, to_skip = If_else_action(step_data, dataset_cnt) + result, to_skip = await If_else_action(step_data, dataset_cnt) skip += to_skip skip_for_loop += to_skip if result in failed_tag_list: @@ -1235,7 +1235,7 @@ def Run_Sequential_Actions( # Simulate a while/for loop with the specified data sets elif "loop action" in action_name: if action_name.lower().strip() == "for loop action": - result, skip_for_loop = for_loop_action(step_data, dataset_cnt) + result, skip_for_loop = await for_loop_action(step_data, dataset_cnt) skip = list(set(skip + skip_for_loop)) if result in failed_tag_list: return "zeuz_failed", skip_for_loop @@ -1243,7 +1243,7 @@ def Run_Sequential_Actions( elif action_name.lower().strip() not in ("while loop action", "for loop action"): # old style loop action # CommonUtil.ExecLog(sModuleInfo,"Old style loop action found. This will not be supported in 2020, please replace them with new loop actions",2) - result, skip_for_loop = Loop_Action_Handler(data_set, row, dataset_cnt) + result, skip_for_loop = await Loop_Action_Handler(data_set, row, dataset_cnt) skip = skip_for_loop position_of_loop_action = dataset_cnt @@ -1273,7 +1273,7 @@ def Run_Sequential_Actions( return "zeuz_failed", skip_for_loop elif "loop" in action_name: if "while" in action_name.lower(): - result, skip_for_loop = While_Loop_Action(step_data, dataset_cnt) + result, skip_for_loop = await While_Loop_Action(step_data, dataset_cnt) skip = list(set(skip + skip_for_loop)) if result in failed_tag_list: return "zeuz_failed", skip_for_loop @@ -1343,7 +1343,7 @@ def Run_Sequential_Actions( # If middle column = action, call action handler elif "action" in action_name: # Must be last, since it's a single word that also exists in other action types - result = Action_Handler(data_set, row) # Pass data set, and action_name to action handler + result = await Action_Handler(data_set, row) # Pass data set, and action_name to action handler if row[0].lower().strip() in ("step exit", "testcase exit"): global step_exit_fail_called, step_exit_pass_called CommonUtil.ExecLog(sModuleInfo, f"{row[0].lower().strip()} Exit called. Stopping Test Step.", 1) @@ -1373,12 +1373,12 @@ def Run_Sequential_Actions( continue CommonUtil.ExecLog(sModuleInfo, "Action failed. Trying bypass #%d" % (i + 1), 1) - result = Action_Handler(bypass_data_set[i], bypass_row[i]) + result = await Action_Handler(bypass_data_set[i], bypass_row[i]) if result in failed_tag_list: # This also failed, so chances are first failure was real continue # Try the next bypass, if any else: # Bypass passed, which indicates there was something blocking the element in the first place CommonUtil.ExecLog(sModuleInfo, "Bypass passed. Retrying original action", 1) - result = Action_Handler(data_set, row) # Retry failed original data set + result = await Action_Handler(data_set, row) # Retry failed original data set if result in failed_tag_list: # Still a failure, give up return "zeuz_failed", skip_for_loop break # No need to process more bypasses @@ -1404,7 +1404,7 @@ def Run_Sequential_Actions( return CommonUtil.Exception_Handler(sys.exc_info()) -def Loop_Action_Handler(data, row, dataset_cnt): +async def Loop_Action_Handler(data, row, dataset_cnt): """ Performs a sub-set of the data set in a loop, similar to a for or while loop """ sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME @@ -1903,7 +1903,7 @@ def build_subset(new_step_data): return CommonUtil.Exception_Handler(sys.exc_info()) -def Conditional_Action_Handler(step_data, dataset_cnt): +async def Conditional_Action_Handler(step_data, dataset_cnt): """ Process conditional actions, called only by Sequential_Actions() """ sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME @@ -1984,6 +1984,37 @@ def Conditional_Action_Handler(step_data, dataset_cnt): logic_decision = False log_msg += "Element is not found\n" + elif module == "playwright": + try: + from Framework.Built_In_Automation.Web.Playwright import locator as PlaywrightLocator + from Framework.Built_In_Automation.Web.Playwright.BuiltInFunctions import current_page + + wait = 10 + for left, mid, right in data_set: + mid = mid.lower() + left = left.lower() + if "optional parameter" in mid and "wait" in left: + wait = float(right.strip()) + + if current_page is None: + CommonUtil.ExecLog(sModuleInfo, "No browser open for Playwright conditional action", 3) + logic_decision = False + log_msg += "Browser not open\n" + else: + Element = await PlaywrightLocator.Get_Element(data_set, current_page, element_wait=wait) + if Element == "zeuz_failed": + CommonUtil.ExecLog(sModuleInfo, "Conditional Actions could not find the element", 3) + logic_decision = False + log_msg += "Element is not found\n" + else: + logic_decision = True + log_msg += "Element is found\n" + + except: # Element doesn't exist, proceed with the step data following the fail/false path + CommonUtil.ExecLog(sModuleInfo, "Conditional Actions could not find the element", 3) + logic_decision = False + log_msg += "Element is not found\n" + elif module == "windows": try: from Framework.Built_In_Automation.Desktop.Windows import BuiltInFunctions @@ -2186,7 +2217,7 @@ def Conditional_Action_Handler(step_data, dataset_cnt): 2 ) if data_set_index not in inner_skip: - result, skip = Run_Sequential_Actions( + result, skip = await Run_Sequential_Actions( [data_set_index] ) # Running inner_skip = list(set(inner_skip + skip)) @@ -2296,7 +2327,63 @@ def compare_variable_names(set, dataset): CommonUtil.compare_action_varnames = {"left": "Left", "right": "Right"} -def Action_Handler(_data_set, action_row, _bypass_bug=True): +def get_browser_driver_routing(action_subfield, data_set): + """ + Check if browser driver optional parameter is present and route to appropriate driver. + + Args: + action_subfield (str): The original action subfield (e.g., "selenium action", "playwright action") + data_set (list): The data set containing optional parameters + + Returns: + str: Updated action_subfield based on browser driver parameter + + This function checks if there is a "browser driver" optional parameter in the data set or a BROWSER_DRIVER in runtime parameters. + If any of them are present, it updates the action_subfield to the value specified. + If both are present, it uses the action-level optional parameter. + If neither are present, it returns the original action_subfield. + """ + sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME + + # If the action subfield is not for playwright or selenium, return as is + if action_subfield not in ("playwright action", "selenium action"): + return action_subfield + + # Initialize the updated action subfield with the original action subfield + updated_action_subfield = action_subfield + + # Get the runtime parameter for browser driver preference + browser_driver_runtime_parameter = sr.shared_variables.get("BROWSER_DRIVER") + + # If runtime parameter is present and valid, update the action subfield + if browser_driver_runtime_parameter and browser_driver_runtime_parameter.strip().lower() in ("playwright", "selenium"): + CommonUtil.ExecLog(sModuleInfo, "Runtime parameter for browser driver preference detected", 5) + updated_action_subfield = browser_driver_runtime_parameter.strip().lower() + " action" + + # Check if there is an optional parameter for browser driver in the data set + for left, mid, right in data_set: + # If optional parameter is present and valid, update the action subfield + if (mid.strip().lower().startswith("optional") + and left.strip().lower() == "browser driver" + and right.strip().lower() in ("playwright", "selenium")): + + # If runtime parameter is also present, action-level optional parameter will take precedence + if browser_driver_runtime_parameter: + # log a warning for browser driver preference in two places + CommonUtil.ExecLog(sModuleInfo, "Both runtime parameter and optional parameter for browser driver detected, using optional parameter", 2) + else: + CommonUtil.ExecLog(sModuleInfo, "Optional parameter for browser driver preference detected in action", 5) + updated_action_subfield = right.strip().lower() + " action" + break + + # If the action subfield has changed, log the change + if action_subfield != updated_action_subfield: + CommonUtil.ExecLog(sModuleInfo, "Browser action changed from %s to %s" % (action_subfield, updated_action_subfield), 1) + + return updated_action_subfield + + +async def Action_Handler(_data_set, action_row, _bypass_bug=True): """ Finds the appropriate function for the requested action in the step data and executes it """ sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME @@ -2306,6 +2393,9 @@ def Action_Handler(_data_set, action_row, _bypass_bug=True): action_name = action_row[0] action_subfield = action_row[1] + # Apply browser driver routing if applicable + action_subfield = get_browser_driver_routing(action_subfield, _data_set) + if str(action_name).startswith("%|"): # if shared variable action_name = sr.get_previous_response_variables_in_strings(action_name) @@ -2417,6 +2507,8 @@ def Action_Handler(_data_set, action_row, _bypass_bug=True): elif module in CommonUtil.global_sleep and "_all_" in CommonUtil.global_sleep[module]: time.sleep(CommonUtil.global_sleep[module]["_all_"]["pre"]) result = run_function(data_set) # Execute function, providing all rows in the data set + if inspect.iscoroutine(result): + result = await result if post_sleep: time.sleep(post_sleep) elif module in CommonUtil.global_sleep and "_all_" in CommonUtil.global_sleep[module]: @@ -2436,7 +2528,7 @@ def Action_Handler(_data_set, action_row, _bypass_bug=True): compare_variable_names(False, []) if performance_action.zeuz_cycle != -1: CommonUtil.action_perf[-1]['cycle'] = performance_action.zeuz_cycle - CommonUtil.TakeScreenShot(function) + await CommonUtil.TakeScreenShot(function) CommonUtil.previous_action_name = CommonUtil.current_action_name if _bypass_bug: CommonUtil.print_execlog = False diff --git a/Framework/Built_In_Automation/Web/Playwright/BuiltInFunctions.py b/Framework/Built_In_Automation/Web/Playwright/BuiltInFunctions.py index b13120e32..bfca34b7c 100644 --- a/Framework/Built_In_Automation/Web/Playwright/BuiltInFunctions.py +++ b/Framework/Built_In_Automation/Web/Playwright/BuiltInFunctions.py @@ -20,6 +20,7 @@ Author: Zeuz/AutomationSolutionz """ +import asyncio import sys import os import inspect @@ -27,8 +28,8 @@ import re from pathlib import Path -from playwright.sync_api import ( - sync_playwright, +from playwright.async_api import ( + async_playwright, Page, Browser, BrowserContext, @@ -44,6 +45,43 @@ ) from Framework.Utilities.CommonUtil import passed_tag_list, failed_tag_list from . import locator as PlaywrightLocator +from . import utils as PlaywrightUtils +from settings import ZEUZ_NODE_DOWNLOADS_DIR + +def _get_frame_locator(): + """Helper function to get current frame locator from shared variables.""" + try: + frame_locator = sr.Get_Shared_Variables("playwright_frame") + if frame_locator in failed_tag_list: + return None + return frame_locator + except: + # Variable doesn't exist yet + return None + + +def connect_selenium_to_playwright(port=9222): + """Connect Selenium to Playwright browser via CDP""" + try: + from selenium import webdriver + from selenium.webdriver.chrome.options import Options + + options = Options() + options.add_experimental_option("debuggerAddress", f"127.0.0.1:{port}") + + driver = webdriver.Chrome(options=options) + + from Framework.Built_In_Automation.Web.Selenium import BuiltInFunctions as SeleniumBuiltInFunctions + SeleniumBuiltInFunctions.selenium_driver = driver + + sr.Set_Shared_Variables("selenium_driver", driver) + + CommonUtil.ExecLog("connect_selenium_to_playwright", "Connected Selenium to Playwright", 1) + return "passed" + + except Exception as e: + CommonUtil.ExecLog("connect_selenium_to_playwright", f"Failed to connect Selenium to Playwright: {e}", 3) + return "zeuz_failed" ######################### # # @@ -75,7 +113,7 @@ ######################### @logger -def Open_Browser(step_data): +async def Open_Browser(step_data): """ Launch a new browser instance with Playwright. @@ -104,7 +142,7 @@ def Open_Browser(step_data): # Parse parameters url = None browser_name = "chromium" - headless = True + headless = False viewport = default_viewport.copy() args = [] timeout = default_timeout @@ -166,9 +204,14 @@ def Open_Browser(step_data): # Handle Selenium-style capabilities where possible pass + # Ensure Chrome for Testing is available + chrome_binary_path, success = PlaywrightUtils.ensure_chromium_downloads(sModuleInfo) + if not success: + return "zeuz_failed" + # Launch Playwright CommonUtil.ExecLog(sModuleInfo, f"Launching Playwright with {browser_name} browser", 1) - playwright_instance = sync_playwright().start() + playwright_instance = await async_playwright().start() # Browser launch options launch_options = { @@ -176,27 +219,35 @@ def Open_Browser(step_data): "slow_mo": slow_mo, "devtools": devtools, } - if args: - launch_options["args"] = args + + # Add remote debugging port for CDP connection + all_args = args + ["--remote-debugging-port=9222"] + if all_args: + launch_options["args"] = all_args if downloads_path: launch_options["downloads_path"] = downloads_path + + # Use Chrome for Testing binary if available + if chrome_binary_path and browser_name in ("chrome", "chromium"): + launch_options["executable_path"] = chrome_binary_path + CommonUtil.ExecLog(sModuleInfo, f"Using Chrome for Testing binary: {chrome_binary_path}", 1) # Select and launch browser if browser_name in ("chrome", "chromium"): - browser = playwright_instance.chromium.launch(**launch_options) + browser = await playwright_instance.chromium.launch(**launch_options) elif browser_name == "firefox": - browser = playwright_instance.firefox.launch(**launch_options) + browser = await playwright_instance.firefox.launch(**launch_options) elif browser_name in ("webkit", "safari"): - browser = playwright_instance.webkit.launch(**launch_options) + browser = await playwright_instance.webkit.launch(**launch_options) elif browser_name in ("edge", "msedge", "microsoft edge"): launch_options["channel"] = "msedge" - browser = playwright_instance.chromium.launch(**launch_options) + browser = await playwright_instance.chromium.launch(**launch_options) elif browser_name == "chrome-beta": launch_options["channel"] = "chrome-beta" - browser = playwright_instance.chromium.launch(**launch_options) + browser = await playwright_instance.chromium.launch(**launch_options) else: CommonUtil.ExecLog(sModuleInfo, f"Unknown browser '{browser_name}', using chromium", 2) - browser = playwright_instance.chromium.launch(**launch_options) + browser = await playwright_instance.chromium.launch(**launch_options) # Context options context_options = {"viewport": viewport} @@ -214,9 +265,9 @@ def Open_Browser(step_data): context_options["color_scheme"] = color_scheme # Create context and page - context = browser.new_context(**context_options) + context = await browser.new_context(**context_options) context.set_default_timeout(timeout) - current_page = context.new_page() + current_page = await context.new_page() current_page_id = page_id # Store in details @@ -229,7 +280,7 @@ def Open_Browser(step_data): # Navigate if URL provided if url: - current_page.goto(url, wait_until="domcontentloaded") + await current_page.goto(url, wait_until="domcontentloaded") CommonUtil.ExecLog(sModuleInfo, f"Navigated to: {url}", 1) # Save to shared variables for compatibility @@ -237,6 +288,12 @@ def Open_Browser(step_data): sr.Set_Shared_Variables("playwright_context", context) sr.Set_Shared_Variables("playwright_browser", browser) sr.Set_Shared_Variables("element_wait", timeout / 1000) # In seconds + + # Set screenshot variables for CommonUtil.TakeScreenShot() + CommonUtil.set_screenshot_vars(sr.Shared_Variable_Export()) + + # Connect Selenium to Playwright via CDP + connect_selenium_to_playwright(port=9222) CommonUtil.ExecLog(sModuleInfo, f"Browser opened successfully (page_id: {page_id})", 1) return "passed" @@ -246,7 +303,7 @@ def Open_Browser(step_data): @logger -def Go_To_Link(step_data): +async def Go_To_Link(step_data): """ Navigate to a URL. @@ -263,8 +320,11 @@ def Go_To_Link(step_data): try: if current_page is None: - CommonUtil.ExecLog(sModuleInfo, "No browser open. Use 'open browser' first.", 3) - return "zeuz_failed" + CommonUtil.ExecLog(sModuleInfo, "No browser open. Opening browser with default settings.", 2) + result = await Open_Browser(step_data) + if result == "zeuz_failed": + CommonUtil.ExecLog(sModuleInfo, "Failed to open browser automatically", 3) + return "zeuz_failed" url = None wait_until = "domcontentloaded" @@ -275,11 +335,10 @@ def Go_To_Link(step_data): mid_l = mid.strip().lower() right_v = right.strip() - if mid_l == "input parameter": - if left_l in ("go to link", "url", "link"): + if left_l in ("go to link", "url", "link"): url = right_v elif mid_l == "optional parameter": - if left_l in ("wait until", "wait_until", "waituntil"): + if left_l in ("wait until", "wait_until", "waituntil", "wait time"): wait_until = right_v.lower() elif left_l == "timeout": timeout = int(float(right_v) * 1000) @@ -292,7 +351,11 @@ def Go_To_Link(step_data): if timeout: goto_options["timeout"] = timeout - current_page.goto(url, **goto_options) + await current_page.goto(url, **goto_options) + + # Reset frame context when navigating to a new URL + sr.Set_Shared_Variables("playwright_frame", None) + CommonUtil.ExecLog(sModuleInfo, f"Navigated to: {url}", 1) return "passed" @@ -301,7 +364,7 @@ def Go_To_Link(step_data): @logger -def Tear_Down_Playwright(step_data=None): +async def Tear_Down_Playwright(step_data=None): """ Close browser and clean up Playwright resources. @@ -318,34 +381,34 @@ def Tear_Down_Playwright(step_data=None): for page_id, details in playwright_details.items(): try: if details.get("page"): - details["page"].close() + await details["page"].close() if details.get("context"): - details["context"].close() + await details["context"].close() except Exception: pass # Close main instances try: if current_page and current_page not in [d.get("page") for d in playwright_details.values()]: - current_page.close() + await current_page.close() except Exception: pass try: if context: - context.close() + await context.close() except Exception: pass try: if browser: - browser.close() + await browser.close() except Exception: pass try: if playwright_instance: - playwright_instance.stop() + await playwright_instance.stop() except Exception: pass @@ -406,6 +469,9 @@ def Switch_Browser(step_data): sr.Set_Shared_Variables("playwright_page", current_page) sr.Set_Shared_Variables("playwright_context", context) + + # Set screenshot variables for CommonUtil.TakeScreenShot() + CommonUtil.set_screenshot_vars(sr.Shared_Variable_Export()) CommonUtil.ExecLog(sModuleInfo, f"Switched to page: {target_id}", 1) return "passed" @@ -421,7 +487,7 @@ def Switch_Browser(step_data): ######################### @logger -def Click_Element(step_data): +async def Click_Element(step_data): """ Click an element. @@ -492,7 +558,7 @@ def Click_Element(step_data): right_click = True # Get element - locator = PlaywrightLocator.Get_Element(step_data, current_page) + locator = await PlaywrightLocator.Get_Element(step_data, current_page, frame_locator=_get_frame_locator()) if locator == "zeuz_failed": CommonUtil.ExecLog(sModuleInfo, "Element not found", 3) return "zeuz_failed" @@ -514,14 +580,14 @@ def Click_Element(step_data): # Perform click if double_click: - locator.dblclick(**{k: v for k, v in click_options.items() if k != "click_count"}) + await locator.dblclick(**{k: v for k, v in click_options.items() if k != "click_count"}) CommonUtil.ExecLog(sModuleInfo, "Double click performed", 1) elif right_click: click_options["button"] = "right" - locator.click(**click_options) + await locator.click(**click_options) CommonUtil.ExecLog(sModuleInfo, "Right click performed", 1) else: - locator.click(**click_options) + await locator.click(**click_options) CommonUtil.ExecLog(sModuleInfo, "Click performed", 1) return "passed" @@ -531,7 +597,7 @@ def Click_Element(step_data): @logger -def Double_Click_Element(step_data): +async def Double_Click_Element(step_data): """ Double-click an element. @@ -548,11 +614,11 @@ def Double_Click_Element(step_data): modified_step_data[i] = ("double click", mid, right) break - return Click_Element(modified_step_data) + return await Click_Element(modified_step_data) @logger -def Right_Click_Element(step_data): +async def Right_Click_Element(step_data): """ Right-click (context click) an element. @@ -567,11 +633,11 @@ def Right_Click_Element(step_data): modified_step_data[i] = ("right click", mid, right) break - return Click_Element(modified_step_data) + return await Click_Element(modified_step_data) @logger -def Hover_Over_Element(step_data): +async def Hover_Over_Element(step_data): """ Hover over an element. @@ -606,7 +672,7 @@ def Hover_Over_Element(step_data): elif left_l == "timeout": timeout = int(float(right_v) * 1000) - locator = PlaywrightLocator.Get_Element(step_data, current_page) + locator = await PlaywrightLocator.Get_Element(step_data, current_page, frame_locator=_get_frame_locator()) if locator == "zeuz_failed": CommonUtil.ExecLog(sModuleInfo, "Element not found", 3) return "zeuz_failed" @@ -619,7 +685,7 @@ def Hover_Over_Element(step_data): if timeout: hover_options["timeout"] = timeout - locator.hover(**hover_options) + await locator.hover(**hover_options) CommonUtil.ExecLog(sModuleInfo, "Hover performed", 1) return "passed" @@ -634,7 +700,7 @@ def Hover_Over_Element(step_data): ######################### @logger -def Enter_Text_In_Text_Box(step_data): +async def Enter_Text_In_Text_Box(step_data): """ Enter text in a text field. @@ -683,7 +749,7 @@ def Enter_Text_In_Text_Box(step_data): elif left_l == "timeout": timeout = int(float(right.strip()) * 1000) - locator = PlaywrightLocator.Get_Element(step_data, current_page) + locator = await PlaywrightLocator.Get_Element(step_data, current_page, frame_locator=_get_frame_locator()) if locator == "zeuz_failed": CommonUtil.ExecLog(sModuleInfo, "Element not found", 3) return "zeuz_failed" @@ -691,17 +757,17 @@ def Enter_Text_In_Text_Box(step_data): # Enter text based on options if use_js: # Use JavaScript to set value directly - locator.evaluate(f"el => {{ el.value = `{text_value}`; }}") + await locator.evaluate(f"el => {{ el.value = `{text_value}`; }}") # Trigger events - locator.dispatch_event("input") - locator.dispatch_event("change") + await locator.dispatch_event("input") + await locator.dispatch_event("change") CommonUtil.ExecLog(sModuleInfo, f"Text entered via JS: {text_value[:50]}{'...' if len(text_value) > 50 else ''}", 1) elif clear: # fill() clears and sets value - recommended approach fill_options = {} if timeout: fill_options["timeout"] = timeout - locator.fill(text_value, **fill_options) + await locator.fill(text_value, **fill_options) CommonUtil.ExecLog(sModuleInfo, f"Text filled: {text_value[:50]}{'...' if len(text_value) > 50 else ''}", 1) else: # type() appends to existing value @@ -710,7 +776,7 @@ def Enter_Text_In_Text_Box(step_data): type_options["delay"] = int(delay * 1000) if timeout: type_options["timeout"] = timeout - locator.type(text_value, **type_options) + await locator.type(text_value, **type_options) CommonUtil.ExecLog(sModuleInfo, f"Text typed: {text_value[:50]}{'...' if len(text_value) > 50 else ''}", 1) return "passed" @@ -720,7 +786,7 @@ def Enter_Text_In_Text_Box(step_data): @logger -def Keystroke_For_Element(step_data): +async def Keystroke_For_Element(step_data): """ Send keystrokes to an element or the page. @@ -827,18 +893,18 @@ def Keystroke_For_Element(step_data): key = key_map.get(key, keystroke_value) if has_element: - locator = PlaywrightLocator.Get_Element(step_data, current_page) + locator = await PlaywrightLocator.Get_Element(step_data, current_page, frame_locator=_get_frame_locator()) if locator == "zeuz_failed": CommonUtil.ExecLog(sModuleInfo, "Element not found", 3) return "zeuz_failed" for _ in range(key_count): - locator.press(key) + await locator.press(key) if delay > 0: time.sleep(delay) else: for _ in range(key_count): - current_page.keyboard.press(key) + await current_page.keyboard.press(key) if delay > 0: time.sleep(delay) @@ -850,13 +916,13 @@ def Keystroke_For_Element(step_data): type_options["delay"] = int(delay * 1000) if has_element: - locator = PlaywrightLocator.Get_Element(step_data, current_page) + locator = await PlaywrightLocator.Get_Element(step_data, current_page, frame_locator=_get_frame_locator()) if locator == "zeuz_failed": CommonUtil.ExecLog(sModuleInfo, "Element not found", 3) return "zeuz_failed" - locator.type(keystroke_value, **type_options) + await locator.type(keystroke_value, **type_options) else: - current_page.keyboard.type(keystroke_value, **type_options) + await current_page.keyboard.type(keystroke_value, **type_options) CommonUtil.ExecLog(sModuleInfo, f"Typed chars: {keystroke_value[:50]}{'...' if len(keystroke_value) > 50 else ''}", 1) @@ -873,7 +939,7 @@ def Keystroke_For_Element(step_data): ######################### @logger -def Validate_Text(step_data): +async def Validate_Text(step_data): """ Validate that an element contains expected text. @@ -919,19 +985,21 @@ def Validate_Text(step_data): case_insensitive = True elif left_l.startswith("*"): partial_match = True + elif "partial" in left_l: + partial_match = True expected_text = right_v elif mid_l == "optional parameter": if left_l == "timeout": timeout = int(float(right_v) * 1000) - locator = PlaywrightLocator.Get_Element(step_data, current_page) + locator = await PlaywrightLocator.Get_Element(step_data, current_page, frame_locator=_get_frame_locator()) if locator == "zeuz_failed": CommonUtil.ExecLog(sModuleInfo, "Element not found", 3) return "zeuz_failed" # Get actual text - actual_text = locator.text_content() or "" + actual_text = await locator.text_content() or "" # Compare match = False @@ -962,7 +1030,7 @@ def Validate_Text(step_data): @logger -def if_element_exists(step_data): +async def if_element_exists(step_data): """ Check if an element exists on the page. @@ -990,14 +1058,14 @@ def if_element_exists(step_data): if mid_l == "optional parameter" and left_l == "timeout": timeout = int(float(right_v) * 1000) - locator = PlaywrightLocator.Get_Element(step_data, current_page, element_wait=timeout/1000) + locator = await PlaywrightLocator.Get_Element(step_data, current_page, element_wait=timeout/1000, frame_locator=_get_frame_locator()) if locator == "zeuz_failed": CommonUtil.ExecLog(sModuleInfo, "Element does not exist", 1) return "zeuz_failed" try: - count = locator.count() + count = await locator.count() if count > 0: CommonUtil.ExecLog(sModuleInfo, f"Element exists ({count} found)", 1) return "passed" @@ -1013,7 +1081,7 @@ def if_element_exists(step_data): @logger -def Save_Attribute(step_data): +async def Save_Attribute(step_data): """ Save an element's attribute value to a shared variable. @@ -1043,16 +1111,18 @@ def Save_Attribute(step_data): attribute_name = None save_variable = None + save_attribute = None for left, mid, right in step_data: left_l = left.strip().lower() mid_l = mid.strip().lower() right_v = right.strip() - if mid_l == "input parameter": + if mid_l in ["input parameter", "element parameter"]: attribute_name = left.strip() # Keep original case elif mid_l == "save parameter": - save_variable = left.strip() + save_variable = right_v + save_attribute = left_l if not attribute_name: CommonUtil.ExecLog(sModuleInfo, "No attribute name specified", 3) @@ -1062,7 +1132,7 @@ def Save_Attribute(step_data): CommonUtil.ExecLog(sModuleInfo, "No save variable specified", 3) return "zeuz_failed" - locator = PlaywrightLocator.Get_Element(step_data, current_page) + locator = await PlaywrightLocator.Get_Element(step_data, current_page, frame_locator=_get_frame_locator()) if locator == "zeuz_failed": CommonUtil.ExecLog(sModuleInfo, "Element not found", 3) return "zeuz_failed" @@ -1070,30 +1140,30 @@ def Save_Attribute(step_data): # Get attribute value based on name attr_lower = attribute_name.lower() if attr_lower == "text": - value = locator.text_content() + value = await locator.text_content() elif attr_lower == "innertext": - value = locator.inner_text() + value = await locator.inner_text() elif attr_lower == "innerhtml": - value = locator.inner_html() + value = await locator.inner_html() elif attr_lower == "outerhtml": - value = locator.evaluate("el => el.outerHTML") + value = await locator.evaluate("el => el.outerHTML") elif attr_lower == "value": - value = locator.input_value() + value = await locator.input_value() elif attr_lower == "checked": - value = locator.is_checked() + value = await locator.is_checked() elif attr_lower == "selected": - value = locator.evaluate("el => el.selected") + value = await locator.evaluate("el => el.selected") elif attr_lower == "visible": - value = locator.is_visible() + value = await locator.is_visible() elif attr_lower == "enabled": - value = locator.is_enabled() + value = await locator.is_enabled() elif attr_lower == "disabled": - value = locator.is_disabled() + value = await locator.is_disabled() else: - value = locator.get_attribute(attribute_name) + value = await locator.get_attribute(save_attribute) sr.Set_Shared_Variables(save_variable, value) - CommonUtil.ExecLog(sModuleInfo, f"Saved '{attribute_name}' = '{value}' to '{save_variable}'", 1) + CommonUtil.ExecLog(sModuleInfo, f"Saved '{save_attribute}' = '{value}' to '{save_variable}'", 1) return "passed" except Exception: @@ -1101,7 +1171,7 @@ def Save_Attribute(step_data): @logger -def get_element_info(step_data): +async def get_element_info(step_data): """ Get detailed information about an element. @@ -1124,23 +1194,23 @@ def get_element_info(step_data): if mid.strip().lower() == "save parameter": save_variable = left.strip() - locator = PlaywrightLocator.Get_Element(step_data, current_page) + locator = await PlaywrightLocator.Get_Element(step_data, current_page, frame_locator=_get_frame_locator()) if locator == "zeuz_failed": CommonUtil.ExecLog(sModuleInfo, "Element not found", 3) return "zeuz_failed" # Gather element info info = { - "tag_name": locator.evaluate("el => el.tagName"), - "text": locator.text_content(), - "inner_html": locator.inner_html(), - "visible": locator.is_visible(), - "enabled": locator.is_enabled(), - "bounding_box": locator.bounding_box(), + "tag_name": await locator.evaluate("el => el.tagName"), + "text": await locator.text_content(), + "inner_html": await locator.inner_html(), + "visible": await locator.is_visible(), + "enabled": await locator.is_enabled(), + "bounding_box": await locator.bounding_box(), } # Get all attributes - attributes = locator.evaluate("""el => { + attributes = await locator.evaluate("""el => { const attrs = {}; for (const attr of el.attributes) { attrs[attr.name] = attr.value; @@ -1324,7 +1394,7 @@ def Scroll(step_data): @logger -def scroll_to_element(step_data): +async def scroll_to_element(step_data): """ Scroll an element into view. @@ -1356,15 +1426,15 @@ def scroll_to_element(step_data): elif left_l == "align to top": align_to_top = right_v.lower() in ("true", "yes", "1") - locator = PlaywrightLocator.Get_Element(step_data, current_page) + locator = await PlaywrightLocator.Get_Element(step_data, current_page, frame_locator=_get_frame_locator()) if locator == "zeuz_failed": CommonUtil.ExecLog(sModuleInfo, "Element not found", 3) return "zeuz_failed" if use_js: - locator.evaluate(f"el => el.scrollIntoView({str(align_to_top).lower()})") + await locator.evaluate(f"el => el.scrollIntoView({str(align_to_top).lower()})") else: - locator.scroll_into_view_if_needed() + await locator.scroll_into_view_if_needed() CommonUtil.ExecLog(sModuleInfo, "Scrolled element into view", 1) return "passed" @@ -1380,7 +1450,7 @@ def scroll_to_element(step_data): ######################### @logger -def Select_Deselect(step_data): +async def Select_Deselect(step_data): """ Select or deselect options in a dropdown/select element. @@ -1436,7 +1506,7 @@ def Select_Deselect(step_data): CommonUtil.ExecLog(sModuleInfo, "No selection value provided", 3) return "zeuz_failed" - locator = PlaywrightLocator.Get_Element(step_data, current_page) + locator = await PlaywrightLocator.Get_Element(step_data, current_page, frame_locator=_get_frame_locator()) if locator == "zeuz_failed": CommonUtil.ExecLog(sModuleInfo, "Element not found", 3) return "zeuz_failed" @@ -1451,7 +1521,7 @@ def Select_Deselect(step_data): if is_deselect: # Playwright doesn't have direct deselect, use JavaScript - locator.evaluate(f"""el => {{ + await locator.evaluate(f"""el => {{ for (const opt of el.options) {{ if (opt.{'value' if select_type == 'value' else 'text'} === '{select_value}') {{ opt.selected = false; @@ -1461,7 +1531,7 @@ def Select_Deselect(step_data): }}""") CommonUtil.ExecLog(sModuleInfo, f"Deselected: {select_value}", 1) else: - locator.select_option(**option) + await locator.select_option(**option) CommonUtil.ExecLog(sModuleInfo, f"Selected: {select_value}", 1) return "passed" @@ -1477,7 +1547,7 @@ def Select_Deselect(step_data): ######################### @logger -def check_uncheck(step_data): +async def check_uncheck(step_data): """ Check or uncheck a checkbox/radio button. @@ -1514,7 +1584,7 @@ def check_uncheck(step_data): if left_l == "use js": use_js = right_v in ("true", "yes", "1") - locator = PlaywrightLocator.Get_Element(step_data, current_page) + locator = await PlaywrightLocator.Get_Element(step_data, current_page, frame_locator=_get_frame_locator()) if locator == "zeuz_failed": CommonUtil.ExecLog(sModuleInfo, "Element not found", 3) return "zeuz_failed" @@ -1524,10 +1594,10 @@ def check_uncheck(step_data): options["force"] = True if action == "check": - locator.check(**options) + await locator.check(**options) CommonUtil.ExecLog(sModuleInfo, "Checkbox checked", 1) else: - locator.uncheck(**options) + await locator.uncheck(**options) CommonUtil.ExecLog(sModuleInfo, "Checkbox unchecked", 1) return "passed" @@ -1598,6 +1668,7 @@ def switch_window_or_tab(step_data): current_page = page page.bring_to_front() sr.Set_Shared_Variables("playwright_page", current_page) + CommonUtil.set_screenshot_vars(sr.Shared_Variable_Export()) CommonUtil.ExecLog(sModuleInfo, f"Switched to tab: {page_title}", 1) return "passed" else: @@ -1605,6 +1676,7 @@ def switch_window_or_tab(step_data): current_page = page page.bring_to_front() sr.Set_Shared_Variables("playwright_page", current_page) + CommonUtil.set_screenshot_vars(sr.Shared_Variable_Export()) CommonUtil.ExecLog(sModuleInfo, f"Switched to tab: {page_title}", 1) return "passed" @@ -1616,6 +1688,7 @@ def switch_window_or_tab(step_data): current_page = pages[switch_by_index] current_page.bring_to_front() sr.Set_Shared_Variables("playwright_page", current_page) + CommonUtil.set_screenshot_vars(sr.Shared_Variable_Export()) CommonUtil.ExecLog(sModuleInfo, f"Switched to tab index {switch_by_index}: {current_page.title()}", 1) return "passed" else: @@ -1659,6 +1732,7 @@ def open_new_tab(step_data): new_page = context.new_page() current_page = new_page sr.Set_Shared_Variables("playwright_page", current_page) + CommonUtil.set_screenshot_vars(sr.Shared_Variable_Export()) if url: new_page.goto(url) @@ -1737,6 +1811,7 @@ def close_tab(step_data): if pages: current_page = pages[-1] sr.Set_Shared_Variables("playwright_page", current_page) + CommonUtil.set_screenshot_vars(sr.Shared_Variable_Export()) return "passed" @@ -1751,7 +1826,7 @@ def close_tab(step_data): ######################### @logger -def switch_iframe(step_data): +async def switch_iframe(step_data): """ Switch to an iframe or back to main content. @@ -1799,6 +1874,7 @@ def switch_iframe(step_data): if switch_to_default: # In Playwright, we work with the main page directly # Store a flag or reset frame locator + sr.Set_Shared_Variables("playwright_frame", None) CommonUtil.ExecLog(sModuleInfo, "Switched to default content", 1) return "passed" @@ -1936,7 +2012,7 @@ def handle_dialog(dialog): ######################### @logger -def drag_and_drop(step_data): +async def drag_and_drop(step_data): """ Drag and drop an element to a target. @@ -1955,33 +2031,31 @@ def drag_and_drop(step_data): return "zeuz_failed" # Separate source and target parameters - source_params = [] - target_params = [] + source_param = None + target_param = None for left, mid, right in step_data: mid_l = mid.strip().lower() - if mid_l == "target parameter": - target_params.append((left, "element parameter", right)) - elif mid_l == "element parameter": - source_params.append((left, mid, right)) - else: - source_params.append((left, mid, right)) - target_params.append((left, mid, right)) + if "element parameter" in mid_l: + if mid_l.startswith("dst"): + target_param = (left, mid, right) + elif mid_l.startswith("src"): + source_param = (left, mid, right) # Get source element - source_locator = PlaywrightLocator.Get_Element(source_params, current_page) + source_locator = await PlaywrightLocator.Get_Element([source_param], current_page, frame_locator=_get_frame_locator()) if source_locator == "zeuz_failed": CommonUtil.ExecLog(sModuleInfo, "Source element not found", 3) return "zeuz_failed" # Get target element - target_locator = PlaywrightLocator.Get_Element(target_params, current_page) + target_locator = await PlaywrightLocator.Get_Element([target_param], current_page, frame_locator=_get_frame_locator()) if target_locator == "zeuz_failed": CommonUtil.ExecLog(sModuleInfo, "Target element not found", 3) return "zeuz_failed" # Perform drag and drop - source_locator.drag_to(target_locator) + await source_locator.drag_to(target_locator) CommonUtil.ExecLog(sModuleInfo, "Drag and drop completed", 1) return "passed" @@ -1996,7 +2070,7 @@ def drag_and_drop(step_data): ######################### @logger -def take_screenshot_playwright(step_data): +async def take_screenshot_playwright(step_data): """ Take a screenshot. @@ -2048,13 +2122,13 @@ def take_screenshot_playwright(step_data): # Take screenshot if has_element: - locator = PlaywrightLocator.Get_Element(step_data, current_page) + locator = await PlaywrightLocator.Get_Element(step_data, current_page, frame_locator=_get_frame_locator()) if locator == "zeuz_failed": CommonUtil.ExecLog(sModuleInfo, "Element not found", 3) return "zeuz_failed" - locator.screenshot(path=screenshot_path) + await locator.screenshot(path=screenshot_path) else: - current_page.screenshot(path=screenshot_path, full_page=full_page) + await current_page.screenshot(path=screenshot_path, full_page=full_page) if save_variable: sr.Set_Shared_Variables(save_variable, screenshot_path) @@ -2073,7 +2147,7 @@ def take_screenshot_playwright(step_data): ######################### @logger -def execute_javascript(step_data): +async def execute_javascript(step_data): """ Execute JavaScript code in the browser. @@ -2118,13 +2192,13 @@ def execute_javascript(step_data): # Execute JS if has_element: - locator = PlaywrightLocator.Get_Element(step_data, current_page) + locator = await PlaywrightLocator.Get_Element(step_data, current_page, frame_locator=_get_frame_locator()) if locator == "zeuz_failed": CommonUtil.ExecLog(sModuleInfo, "Element not found", 3) return "zeuz_failed" - result = locator.evaluate(js_code) + result = await locator.evaluate(js_code) else: - result = current_page.evaluate(js_code) + result = await current_page.evaluate(js_code) if save_variable: sr.Set_Shared_Variables(save_variable, result) @@ -2145,7 +2219,7 @@ def execute_javascript(step_data): ######################### @logger -def upload_file(step_data): +async def upload_file(step_data): """ Upload a file via file input. @@ -2188,12 +2262,12 @@ def upload_file(step_data): CommonUtil.ExecLog(sModuleInfo, f"File not found: {file_path}", 3) return "zeuz_failed" - locator = PlaywrightLocator.Get_Element(step_data, current_page) + locator = await PlaywrightLocator.Get_Element(step_data, current_page, frame_locator=_get_frame_locator()) if locator == "zeuz_failed": CommonUtil.ExecLog(sModuleInfo, "Element not found", 3) return "zeuz_failed" - locator.set_input_files(file_path) + await locator.set_input_files(file_path) CommonUtil.ExecLog(sModuleInfo, f"File uploaded: {file_path}", 1) return "passed" @@ -2272,7 +2346,7 @@ def resize_window(step_data): ######################### @logger -def Wait_For_Element(step_data): +async def Wait_For_Element(step_data): """ Wait for an element to appear/disappear. @@ -2309,11 +2383,13 @@ def Wait_For_Element(step_data): if mid_l == "input parameter": if left_l in ("wait", "state"): state = right_v.lower() - elif mid_l == "optional parameter": - if left_l == "timeout": - timeout = int(float(right_v) * 1000) + elif left_l == "wait for element": + timeout = int(right_v) - locator = PlaywrightLocator.Get_Element(step_data, current_page, element_wait=0.1) + if timeout: + await asyncio.sleep(timeout) + + locator = await PlaywrightLocator.Get_Element(step_data, current_page, frame_locator=_get_frame_locator()) if locator == "zeuz_failed": # For hidden/detached states, element not found is actually success @@ -2323,11 +2399,11 @@ def Wait_For_Element(step_data): CommonUtil.ExecLog(sModuleInfo, "Element not found", 3) return "zeuz_failed" - wait_options = {"state": state} - if timeout: - wait_options["timeout"] = timeout + # wait_options = {"state": state} + # if timeout: + # wait_options["timeout"] = timeout - locator.wait_for(**wait_options) + # locator.wait_for(**wait_options) CommonUtil.ExecLog(sModuleInfo, f"Element reached state: {state}", 1) return "passed" @@ -2497,7 +2573,7 @@ def handle_route(route): ######################### @logger -def Extract_Table_Data(step_data): +async def Extract_Table_Data(step_data): """ Extract data from an HTML table. @@ -2532,13 +2608,13 @@ def Extract_Table_Data(step_data): elif left_l == "column": col_filter = right_v - locator = PlaywrightLocator.Get_Element(step_data, current_page) + locator = await PlaywrightLocator.Get_Element(step_data, current_page, frame_locator=_get_frame_locator()) if locator == "zeuz_failed": CommonUtil.ExecLog(sModuleInfo, "Table element not found", 3) return "zeuz_failed" # Extract table data using JavaScript - table_data = locator.evaluate("""table => { + table_data = await locator.evaluate("""table => { const data = []; const rows = table.querySelectorAll('tr'); rows.forEach(row => { diff --git a/Framework/Built_In_Automation/Web/Playwright/locator.py b/Framework/Built_In_Automation/Web/Playwright/locator.py index 5e7420d83..9acbe5b84 100644 --- a/Framework/Built_In_Automation/Web/Playwright/locator.py +++ b/Framework/Built_In_Automation/Web/Playwright/locator.py @@ -25,7 +25,7 @@ MODULE_NAME = inspect.getmodulename(__file__) -def Get_Element(step_data, page, return_all=False, element_wait=None): +async def Get_Element(step_data, page, return_all=False, element_wait=None, frame_locator=None): """ Get element using Playwright's native Locator API. @@ -38,6 +38,7 @@ def Get_Element(step_data, page, return_all=False, element_wait=None): page: Playwright Page object return_all: If True, return list of all matching ElementHandles element_wait: Override default wait timeout (in seconds) + frame_locator: Optional frame locator for iframe context Returns: Locator | List[ElementHandle] | "zeuz_failed" @@ -75,7 +76,7 @@ def Get_Element(step_data, page, return_all=False, element_wait=None): return "zeuz_failed" # Build the locator - locator = _build_locator(page, step_data, params) + locator = _build_locator(page, step_data, params, frame_locator) if locator is None: CommonUtil.ExecLog(sModuleInfo, "Could not build locator from step data", 3) @@ -119,7 +120,7 @@ def Get_Element(step_data, page, return_all=False, element_wait=None): # Return all elements if requested if return_all: try: - elements = locator.all() + elements = await locator.all() CommonUtil.ExecLog(sModuleInfo, f"Found {len(elements)} elements", 1) return elements except Exception as e: @@ -128,7 +129,7 @@ def Get_Element(step_data, page, return_all=False, element_wait=None): # Check if element exists (with timeout) try: - count = locator.count() + count = await locator.count() if count == 0: CommonUtil.ExecLog(sModuleInfo, "No elements found matching locator", 3) return "zeuz_failed" @@ -198,7 +199,7 @@ def _parse_element_params(step_data): params['wait'] = float(right_stripped) # Element parameters - elif mid_lower == "element parameter": + elif mid_lower == "element parameter" or "element parameter" in mid_lower: if left_lower == "index": try: params['index'] = int(right_stripped) @@ -230,7 +231,7 @@ def _parse_element_params(step_data): return params -def _build_locator(page, step_data, params): +def _build_locator(page, step_data, params, frame_locator=None): """ Build a Playwright Locator from step data. @@ -238,15 +239,24 @@ def _build_locator(page, step_data, params): 1. Playwright-native selectors (test-id, role, text, etc.) - fastest 2. Direct xpath/css if provided 3. Build xpath from element parameters using existing logic + + Args: + page: Playwright Page object + step_data: Step data for building xpath + params: Parsed element parameters + frame_locator: Optional frame locator for iframe context """ + # Use frame locator if provided, otherwise use page + base_locator = frame_locator if frame_locator else page + # Strategy 1: Check for Playwright-native selectors (fastest path) for left, right in params['element_params']: left_lower = left.lower() # Test ID selectors if left_lower in ("test-id", "testid", "data-testid", "data-test-id"): - return page.get_by_test_id(right) + return base_locator.get_by_test_id(right) # Role selector if left_lower == "role": @@ -257,54 +267,54 @@ def _build_locator(page, step_data, params): name = r break if name: - return page.get_by_role(right, name=name) - return page.get_by_role(right) + return base_locator.get_by_role(right, name=name) + return base_locator.get_by_role(right) # Text selectors if left_lower == "text": - return page.get_by_text(right, exact=True) + return base_locator.get_by_text(right, exact=True) if left_lower == "*text": - return page.get_by_text(right, exact=False) + return base_locator.get_by_text(right, exact=False) if left_lower == "**text": # Case-insensitive partial match - return page.get_by_text(re.compile(re.escape(right), re.IGNORECASE)) + return base_locator.get_by_text(re.compile(re.escape(right), re.IGNORECASE)) # Label selector if left_lower == "label": - return page.get_by_label(right) + return base_locator.get_by_label(right) # Placeholder selector if left_lower == "placeholder": - return page.get_by_placeholder(right) + return base_locator.get_by_placeholder(right) # Alt text selector if left_lower in ("alt", "alt text", "alt-text"): - return page.get_by_alt_text(right) + return base_locator.get_by_alt_text(right) # Title selector if left_lower == "title" and "parameter" not in params.get('mid', ''): - return page.get_by_title(right) + return base_locator.get_by_title(right) # Direct xpath if left_lower == "xpath": - return page.locator(f"xpath={right}") + return base_locator.locator(f"xpath={right}") # Direct CSS selector if left_lower in ("css", "css selector", "css_selector"): - return page.locator(right) + return base_locator.locator(right) - # Strategy 2: Check for unique parameters - for left, right in params['unique_params']: + # Strategy 2: Check for unique parameters and element parameters + for left, right in params['unique_params'] + params['element_params']: left_lower = left.lower() if left_lower == "id": - return page.locator(f"#{right}") + return base_locator.locator(f"#{right}") elif left_lower == "name": - return page.locator(f"[name='{right}']") + return base_locator.locator(f"[name='{right}']") elif left_lower == "class": - return page.locator(f".{right}") + return base_locator.locator(f".{right}") elif left_lower == "tag": - return page.locator(right) + return base_locator.locator(right) # Strategy 3: Build xpath from element/parent/child parameters xpath = _build_xpath_from_params(step_data, params) @@ -314,7 +324,7 @@ def _build_locator(page, step_data, params): f"Built xpath from parameters: {xpath}", 5 ) - return page.locator(f"xpath={xpath}") + return base_locator.locator(f"xpath={xpath}") # Strategy 4: Simple element parameters as xpath if params['element_params']: @@ -348,9 +358,9 @@ def _build_locator(page, step_data, params): if xpath_parts: xpath = f"//{tag}[{' and '.join(xpath_parts)}]" - return page.locator(f"xpath={xpath}") + return base_locator.locator(f"xpath={xpath}") elif tag != "*": - return page.locator(tag) + return base_locator.locator(tag) return None @@ -471,7 +481,7 @@ def _extract_sr_index(mid_value): return 1 -def wait_for_element(step_data, page, state="visible", timeout=None): +async def wait_for_element(step_data, page, state="visible", timeout=None): """ Wait for element to reach a specific state. @@ -487,7 +497,7 @@ def wait_for_element(step_data, page, state="visible", timeout=None): sModuleInfo = "wait_for_element" try: - locator = Get_Element(step_data, page) + locator = await Get_Element(step_data, page) if locator == "zeuz_failed": return "zeuz_failed" diff --git a/Framework/Built_In_Automation/Web/Playwright/utils.py b/Framework/Built_In_Automation/Web/Playwright/utils.py new file mode 100644 index 000000000..731b88fa6 --- /dev/null +++ b/Framework/Built_In_Automation/Web/Playwright/utils.py @@ -0,0 +1,45 @@ +# -*- coding: utf-8 -*- +""" +Playwright Utility Functions for Zeuz Node + +This module provides utility functions for Playwright automation, +including browser download and setup functionality. + +Author: Zeuz/AutomationSolutionz +""" + +import os +from pathlib import Path +from Framework.Utilities import CommonUtil +from Framework.Built_In_Automation.Web.Selenium.utils import ChromeForTesting + +# Initialize Chrome for Testing instance +chrome_for_testing = ChromeForTesting() + + +def ensure_chromium_downloads(sModuleInfo): + """ + Ensure Chrome for Testing is available for Playwright. + + Args: + sModuleInfo: Module information for logging + + Returns: + tuple: (chrome_binary_path, success_flag) where success_flag is True if successful + """ + try: + CommonUtil.ExecLog(sModuleInfo, "Setting up Chrome for Testing for Playwright...", 1) + + # Use Chrome for Testing to get Chrome binary + chrome_bin, driver_bin = chrome_for_testing.setup_chrome_for_testing() + + if chrome_bin and chrome_bin.exists(): + CommonUtil.ExecLog(sModuleInfo, f"Chrome for Testing ready: {chrome_bin}", 1) + return str(chrome_bin), True + else: + CommonUtil.ExecLog(sModuleInfo, "Failed to setup Chrome for Testing", 3) + return None, False + + except Exception as e: + CommonUtil.ExecLog(sModuleInfo, f"Error setting up Chrome for Testing: {str(e)}", 3) + return None, False diff --git a/Framework/Built_In_Automation/Web/Selenium/BuiltInFunctions.py b/Framework/Built_In_Automation/Web/Selenium/BuiltInFunctions.py index bb523ea00..f7c831dda 100644 --- a/Framework/Built_In_Automation/Web/Selenium/BuiltInFunctions.py +++ b/Framework/Built_In_Automation/Web/Selenium/BuiltInFunctions.py @@ -71,6 +71,8 @@ from Framework.AI.NLP import binary_classification from .utils import ChromeForTesting, ChromeExtensionDownloader +from playwright.async_api import async_playwright + ######################### # # # Global Variables # @@ -656,8 +658,27 @@ def headless(): return options +async def connect_playwright_to_selenium(port=9222): + playwright_instance = await async_playwright().start() + browser = await playwright_instance.chromium.connect_over_cdp(f"http://localhost:{port}") + context = browser.contexts[0] + page = context.pages[0] + + from Framework.Built_In_Automation.Web.Playwright import BuiltInFunctions as PlaywrightBuiltInFunctions + PlaywrightBuiltInFunctions.playwright_instance = playwright_instance + PlaywrightBuiltInFunctions.browser = browser + PlaywrightBuiltInFunctions.context = context + PlaywrightBuiltInFunctions.current_page = page + + Shared_Resources.Set_Shared_Variables("playwright_context", context) + Shared_Resources.Set_Shared_Variables("playwright_browser", browser) + Shared_Resources.Set_Shared_Variables("playwright_page", page) + + return "passed" + + @logger -def Open_Browser(browser, browser_options: BrowserOptions): +async def Open_Browser(browser, browser_options: BrowserOptions): """Launch browser from options and service object""" try: global selenium_driver @@ -687,6 +708,9 @@ def Open_Browser(browser, browser_options: BrowserOptions): options = generate_options(browser, browser_options) + # Enable remote debugging / CDP + options.add_argument("--remote-debugging-port=9222") + if browser in ("android", "chrome", "chromeheadless"): from selenium.webdriver.chrome.service import Service @@ -756,6 +780,13 @@ def Open_Browser(browser, browser_options: BrowserOptions): ) return "zeuz_failed" + # Connect Playwright to Selenium via CDP + try: + await connect_playwright_to_selenium(port=9222) + CommonUtil.ExecLog(sModuleInfo, "Connected Playwright to Selenium", 1) + except Exception as e: + CommonUtil.ExecLog(sModuleInfo, f"Failed to connect Playwright to Selenium: {e}", 3) + CommonUtil.ExecLog(sModuleInfo, f"Started {browser} browser", 1) Shared_Resources.Set_Shared_Variables("selenium_driver", selenium_driver) CommonUtil.set_screenshot_vars(Shared_Resources.Shared_Variable_Export()) @@ -915,7 +946,7 @@ def parse_and_verify_datatype(left: str, right: str, chrome_version=None): @logger -def Go_To_Link(dataset: Dataset) -> ReturnType: +async def Go_To_Link(dataset: Dataset) -> ReturnType: try: sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME window_size_X = None @@ -1117,7 +1148,7 @@ def Go_To_Link(dataset: Dataset) -> ReturnType: sModuleInfo, "Browser not previously opened, doing so now", 1 ) - if Open_Browser(dependency["Browser"], browser_options) == "zeuz_failed": + if await Open_Browser(dependency["Browser"], browser_options) == "zeuz_failed": return "zeuz_failed" if ConfigModule.get_config_value( @@ -1191,7 +1222,7 @@ def Go_To_Link(dataset: Dataset) -> ReturnType: # If the browser is closed but selenium instance is on, relaunch selenium_driver if Shared_Resources.Test_Shared_Variables("dependency"): dependency = Shared_Resources.Get_Shared_Variables("dependency") - result = Open_Browser(dependency["Browser"], browser_options) + result = await Open_Browser(dependency["Browser"], browser_options) else: result = "zeuz_failed" diff --git a/Framework/MainDriverApi.py b/Framework/MainDriverApi.py index f7734a5fe..729368cb5 100644 --- a/Framework/MainDriverApi.py +++ b/Framework/MainDriverApi.py @@ -281,7 +281,7 @@ def terminate_thread(thread): # To kill running thread raise SystemError("PyThreadState_SetAsyncExc failed") # call the function of a test step that is in its driver file -def call_driver_function_of_test_step( +async def call_driver_function_of_test_step( sModuleInfo, all_step_info, StepSeq, @@ -312,6 +312,7 @@ def call_driver_function_of_test_step( try: # importing functions from driver functionTocall = getattr(module_name, step_name) + print(functionTocall) except Exception as e: CommonUtil.Exception_Handler( sys.exc_info(), @@ -379,12 +380,20 @@ def call_driver_function_of_test_step( CommonUtil.Exception_Handler(sys.exc_info()) else: # run sequentially - sStepResult = functionTocall( - test_steps_data, - test_action_info, - simple_queue, - debug_actions, - ) + if inspect.iscoroutinefunction(functionTocall): + sStepResult = await functionTocall( + test_steps_data, + test_action_info, + simple_queue, + debug_actions, + ) + else: + sStepResult = functionTocall( + test_steps_data, + test_action_info, + simple_queue, + debug_actions, + ) except: CommonUtil.Exception_Handler(sys.exc_info()) # handle exceptions sStepResult = "zeuz_failed" @@ -411,7 +420,7 @@ def call_driver_function_of_test_step( # runs all test steps of a test case -def run_all_test_steps_in_a_test_case( +async def run_all_test_steps_in_a_test_case( testcase_info, test_case, sModuleInfo, @@ -606,7 +615,7 @@ def run_all_test_steps_in_a_test_case( CommonUtil.ExecLog(sModuleInfo, "STEP-%s is skipped" % StepSeq, 2) sStepResult = "skipped" else: - sStepResult = call_driver_function_of_test_step( + sStepResult = await call_driver_function_of_test_step( sModuleInfo, all_step_info, StepSeq, @@ -771,13 +780,20 @@ def zip_and_delete_tc_folder( FL.DeleteFolder(path) -def cleanup_driver_instances(): # cleans up driver(selenium, appium) instances +async def cleanup_driver_instances(): # cleans up driver(selenium, playwright, appium) instances try: # if error happens. we don't care, main driver should not stop, pass in exception import Framework.Built_In_Automation.Web.Selenium.BuiltInFunctions as Selenium + import Framework.Built_In_Automation.Web.Playwright.BuiltInFunctions as Playwright try: Selenium.Tear_Down_Selenium() except: pass + + try: + await Playwright.Tear_Down_Playwright() + except: + pass + if shared.Test_Shared_Variables("appium_details"): import Framework.Built_In_Automation.Mobile.CrossPlatform.Appium.BuiltInFunctions as Appium driver = shared.Remove_From_Shared_Variables("appium_details") @@ -936,7 +952,7 @@ def check_test_skip(run_id, tc_num, skip_remaining=True) -> bool: return False -def run_test_case( +async def run_test_case( TestCaseID, sModuleInfo, run_id, @@ -994,7 +1010,7 @@ def run_test_case( if check_test_skip(run_id, tc_num): sTestStepResultList = ['SKIPPED' for i in range(len(testcase_info['steps']))] else: - sTestStepResultList = run_all_test_steps_in_a_test_case( + sTestStepResultList = await run_all_test_steps_in_a_test_case( testcase_info, test_case, sModuleInfo, @@ -1107,7 +1123,7 @@ def run_test_case( else: CommonUtil.Join_Thread_and_Return_Result("screenshot") if str(shared.Get_Shared_Variables("zeuz_auto_teardown")).strip().lower() not in ("off", "no", "false", "disable"): - cleanup_driver_instances() + await cleanup_driver_instances() shared.Clean_Up_Shared_Variables(run_id) if ConfigModule.get_config_value("RunDefinition", "local_run") == "False": @@ -1806,7 +1822,7 @@ def download_or_copy(attachment): # main function -def main(device_dict, all_run_id_info): +async def main(device_dict, all_run_id_info): try: # get module info sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME @@ -1919,7 +1935,7 @@ def main(device_dict, all_run_id_info): if "debug_step_actions" in run_id_info: debug_info["debug_step_actions"] = run_id_info["debug_step_actions"] if run_id_info["debug_clean"] == "YES": - cleanup_driver_instances() + await cleanup_driver_instances() shared.Clean_Up_Shared_Variables(run_id) driver_list = ["Not needed currently"] @@ -1940,7 +1956,7 @@ def main(device_dict, all_run_id_info): shared.Set_Shared_Variables("zeuz_auto_teardown", "on") if not CommonUtil.debug_status and str(shared.Get_Shared_Variables("zeuz_auto_teardown")).strip().lower() not in ("off", "no", "false", "disable"): - cleanup_driver_instances() + await cleanup_driver_instances() if not shared.Test_Shared_Variables("zeuz_collect_browser_log"): shared.Set_Shared_Variables("zeuz_collect_browser_log", "on") @@ -2093,7 +2109,7 @@ def kill(process): ) except Exception as e: CommonUtil.ExecLog(sModuleInfo, str(e), 3) - run_test_case( + await run_test_case( test_case_no, sModuleInfo, run_id, @@ -2111,7 +2127,7 @@ def kill(process): else: - run_test_case( + await run_test_case( test_case_no, sModuleInfo, run_id, diff --git a/Framework/Utilities/CommonUtil.py b/Framework/Utilities/CommonUtil.py index 2edf195fc..de2847a4b 100644 --- a/Framework/Utilities/CommonUtil.py +++ b/Framework/Utilities/CommonUtil.py @@ -869,14 +869,16 @@ def set_screenshot_vars(shared_variables): screen_capture_driver = appium_details[device_id][ "driver" ] # Driver for selected device - if screen_capture_type == "web": # Selenium driver object + if screen_capture_type == "web": # Selenium or Playwright driver object if "selenium_driver" in shared_variables: screen_capture_driver = shared_variables["selenium_driver"] + elif "playwright_page" in shared_variables: + screen_capture_driver = shared_variables["playwright_page"] except: ExecLog(sModuleInfo, "Error setting screenshot variables", 3) -def TakeScreenShot(function_name, local_run=False): +async def TakeScreenShot(function_name, local_run=False): """ Puts TakeScreenShot into a thread, so it doesn't block test case execution """ if not ws_ss_log or performance_testing: return try: @@ -925,8 +927,7 @@ def TakeScreenShot(function_name, local_run=False): break image_name = "Step#" + current_step_no + "_Action#" + current_action_no + "_" + filename - thread = executor.submit(Thread_ScreenShot, function_name, image_folder, Method, Driver, image_name) - SaveThread("screenshot", thread) + await Thread_ScreenShot(function_name, image_folder, Method, Driver, image_name) except: return Exception_Handler(sys.exc_info()) @@ -939,7 +940,7 @@ def pil_image_to_bytearray(img): return img_byte_array -def Thread_ScreenShot(function_name, image_folder, Method, Driver, image_name): +async def Thread_ScreenShot(function_name, image_folder, Method, Driver, image_name): """ Capture screen of mobile or desktop """ if performance_testing: return sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME @@ -985,7 +986,11 @@ def Thread_ScreenShot(function_name, image_folder, Method, Driver, image_name): # Capture screenshot of web browser elif Method == "web": - Driver.get_screenshot_as_file(ImageName) # Must be .png, otherwise an exception occurs + # Check if it's a Playwright page or Selenium driver + if hasattr(Driver, 'screenshot'): # Playwright page + await Driver.screenshot(path=ImageName, full_page=True) + else: # Selenium driver + Driver.get_screenshot_as_file(ImageName) # Must be .png, otherwise an exception occurs # Capture screenshot of mobile elif Method == "mobile": diff --git a/node_cli.py b/node_cli.py index d4e1ce890..e8b13301e 100755 --- a/node_cli.py +++ b/node_cli.py @@ -440,7 +440,7 @@ async def response_callback(response: str): # 3. Call MainDriver device_info = All_Device_Info.get_all_connected_device_info() await install_handler.cancel_run() - MainDriverApi.main( + await MainDriverApi.main( device_dict=device_info, all_run_id_info=node_json, )