diff --git a/bindings/sysman/python/source/examples/pyzes_black_box_test.py b/bindings/sysman/python/source/examples/pyzes_black_box_test.py index 978f19d0..4ab024de 100755 --- a/bindings/sysman/python/source/examples/pyzes_black_box_test.py +++ b/bindings/sysman/python/source/examples/pyzes_black_box_test.py @@ -208,6 +208,103 @@ def get_engine_type_string(engine_type): return type_map.get(engine_type, f"UNKNOWN_ENGINE_TYPE_{engine_type}") +def get_power_domain_string(power_domain): + """Convert power domain enum to string""" + domain_map = { + pz.ZES_POWER_DOMAIN_UNKNOWN: "ZES_POWER_DOMAIN_UNKNOWN", + pz.ZES_POWER_DOMAIN_CARD: "ZES_POWER_DOMAIN_CARD", + pz.ZES_POWER_DOMAIN_PACKAGE: "ZES_POWER_DOMAIN_PACKAGE", + pz.ZES_POWER_DOMAIN_STACK: "ZES_POWER_DOMAIN_STACK", + pz.ZES_POWER_DOMAIN_MEMORY: "ZES_POWER_DOMAIN_MEMORY", + pz.ZES_POWER_DOMAIN_GPU: "ZES_POWER_DOMAIN_GPU", + } + return domain_map.get(power_domain, f"UNKNOWN_POWER_DOMAIN_{power_domain}") + + +def get_power_level_string(power_level): + """Convert power level enum to string""" + level_map = { + pz.ZES_POWER_LEVEL_UNKNOWN: "ZES_POWER_LEVEL_UNKNOWN", + pz.ZES_POWER_LEVEL_SUSTAINED: "ZES_POWER_LEVEL_SUSTAINED", + pz.ZES_POWER_LEVEL_BURST: "ZES_POWER_LEVEL_BURST", + pz.ZES_POWER_LEVEL_PEAK: "ZES_POWER_LEVEL_PEAK", + pz.ZES_POWER_LEVEL_INSTANTANEOUS: "ZES_POWER_LEVEL_INSTANTANEOUS", + } + return level_map.get(power_level, f"UNKNOWN_POWER_LEVEL_{power_level}") + + +def get_power_source_string(power_source): + """Convert power source enum to string""" + source_map = { + pz.ZES_POWER_SOURCE_ANY: "ZES_POWER_SOURCE_ANY", + pz.ZES_POWER_SOURCE_MAINS: "ZES_POWER_SOURCE_MAINS", + pz.ZES_POWER_SOURCE_BATTERY: "ZES_POWER_SOURCE_BATTERY", + } + return source_map.get(power_source, f"UNKNOWN_POWER_SOURCE_{power_source}") + + +def get_limit_unit_string(limit_unit): + """Convert power limit unit enum to string""" + unit_map = { + pz.ZES_LIMIT_UNIT_UNKNOWN: "ZES_LIMIT_UNIT_UNKNOWN", + pz.ZES_LIMIT_UNIT_CURRENT: "ZES_LIMIT_UNIT_CURRENT", + pz.ZES_LIMIT_UNIT_POWER: "ZES_LIMIT_UNIT_POWER", + } + return unit_map.get(limit_unit, f"UNKNOWN_LIMIT_UNIT_{limit_unit}") + + +def get_ecc_state_string(ecc_state): + """Convert ECC state enum to string""" + state_map = { + pz.ZES_DEVICE_ECC_STATE_UNAVAILABLE: "ZES_DEVICE_ECC_STATE_UNAVAILABLE", + pz.ZES_DEVICE_ECC_STATE_ENABLED: "ZES_DEVICE_ECC_STATE_ENABLED", + pz.ZES_DEVICE_ECC_STATE_DISABLED: "ZES_DEVICE_ECC_STATE_DISABLED", + } + return state_map.get(ecc_state, f"UNKNOWN_ECC_STATE_{ecc_state}") + + +def get_device_action_string(action): + """Convert device action enum to string""" + action_map = { + pz.ZES_DEVICE_ACTION_NONE: "ZES_DEVICE_ACTION_NONE", + pz.ZES_DEVICE_ACTION_WARM_CARD_RESET: "ZES_DEVICE_ACTION_WARM_CARD_RESET", + pz.ZES_DEVICE_ACTION_COLD_CARD_RESET: "ZES_DEVICE_ACTION_COLD_CARD_RESET", + pz.ZES_DEVICE_ACTION_COLD_SYSTEM_REBOOT: "ZES_DEVICE_ACTION_COLD_SYSTEM_REBOOT", + } + return action_map.get(action, f"UNKNOWN_DEVICE_ACTION_{action}") + + +def get_ras_error_category_string(category): + """Convert RAS error category enum to string""" + category_map = { + pz.ZES_RAS_ERROR_CATEGORY_EXP_RESET: "ZES_RAS_ERROR_CATEGORY_EXP_RESET", + pz.ZES_RAS_ERROR_CATEGORY_EXP_PROGRAMMING_ERRORS: "ZES_RAS_ERROR_CATEGORY_EXP_PROGRAMMING_ERRORS", + pz.ZES_RAS_ERROR_CATEGORY_EXP_DRIVER_ERRORS: "ZES_RAS_ERROR_CATEGORY_EXP_DRIVER_ERRORS", + pz.ZES_RAS_ERROR_CATEGORY_EXP_COMPUTE_ERRORS: "ZES_RAS_ERROR_CATEGORY_EXP_COMPUTE_ERRORS", + pz.ZES_RAS_ERROR_CATEGORY_EXP_NON_COMPUTE_ERRORS: "ZES_RAS_ERROR_CATEGORY_EXP_NON_COMPUTE_ERRORS", + pz.ZES_RAS_ERROR_CATEGORY_EXP_CACHE_ERRORS: "ZES_RAS_ERROR_CATEGORY_EXP_CACHE_ERRORS", + pz.ZES_RAS_ERROR_CATEGORY_EXP_DISPLAY_ERRORS: "ZES_RAS_ERROR_CATEGORY_EXP_DISPLAY_ERRORS", + pz.ZES_RAS_ERROR_CATEGORY_EXP_MEMORY_ERRORS: "ZES_RAS_ERROR_CATEGORY_EXP_MEMORY_ERRORS", + pz.ZES_RAS_ERROR_CATEGORY_EXP_SCALE_ERRORS: "ZES_RAS_ERROR_CATEGORY_EXP_SCALE_ERRORS", + pz.ZES_RAS_ERROR_CATEGORY_EXP_L3FABRIC_ERRORS: "ZES_RAS_ERROR_CATEGORY_EXP_L3FABRIC_ERRORS", + } + return category_map.get(category, f"UNKNOWN_RAS_CATEGORY_{category}") + + +def is_root_user(): + """Return whether the current user has root privileges on platforms that support it""" + geteuid = getattr(os, "geteuid", None) + return bool(geteuid and geteuid() == 0) + + +def check_rc_allow_action_required(label, rc): + """Accept ZE_RESULT_SUCCESS and ZE_RESULT_WARNING_ACTION_REQUIRED""" + if rc in (pz.ZE_RESULT_SUCCESS, pz.ZE_RESULT_WARNING_ACTION_REQUIRED): + return True + print(f"ERROR: {label} failed with ze_result_t={rc}") + return False + + def initialize_sysman_and_get_devices(): """Initialize Sysman and enumerate drivers/devices. Returns (drivers, driver_count, devices, device_count).""" if not initialize_sysman(): @@ -456,6 +553,219 @@ def test_global_operation(driver_handle, device_handle, device_index): return True +def test_pci_module(device_handle, device_index): + """Test PCI properties and stats operations""" + print(f"\n---- Device {device_index} PCI Test ----") + + properties = pz.zes_pci_properties_t() + properties.stype = pz.ZES_STRUCTURE_TYPE_PCI_PROPERTIES + properties.pNext = None + + rc = pz.zesDevicePciGetProperties(device_handle, byref(properties)) + if not check_rc(f"zesDevicePciGetProperties(device {device_index})", rc): + return False + + print_verbose("PCI Properties:") + print_verbose(f" Domain: 0x{properties.address.domain:X}") + print_verbose(f" Bus: 0x{properties.address.bus:X}") + print_verbose(f" Device: 0x{properties.address.device:X}") + print_verbose(f" Function: 0x{properties.address.function:X}") + print_verbose(f" Max Gen: {properties.maxSpeed.gen}") + print_verbose(f" Max Width: {properties.maxSpeed.width}") + print_verbose(f" Max Bandwidth: {properties.maxSpeed.maxBandwidth}") + print_verbose( + f" Have Bandwidth Counters: {bool(properties.haveBandwidthCounters)}" + ) + print_verbose(f" Have Packet Counters: {bool(properties.havePacketCounters)}") + print_verbose(f" Have Replay Counters: {bool(properties.haveReplayCounters)}") + + stats = pz.zes_pci_stats_t() + rc = pz.zesDevicePciGetStats(device_handle, byref(stats)) + if not check_rc(f"zesDevicePciGetStats(device {device_index})", rc): + return False + + print_verbose("PCI Stats:") + if properties.haveReplayCounters: + print_verbose(f" Replay Counter: {stats.replayCounter}") + if properties.havePacketCounters: + print_verbose(f" Packet Counter: {stats.packetCounter}") + if properties.haveBandwidthCounters: + print_verbose(f" RX Counter: {stats.rxCounter}") + print_verbose(f" TX Counter: {stats.txCounter}") + print_verbose(f" Timestamp: {stats.timestamp}") + print_verbose(f" Current Gen: {stats.speed.gen}") + print_verbose(f" Current Width: {stats.speed.width}") + print_verbose(f" Current Max Bandwidth: {stats.speed.maxBandwidth}") + + return True + + +def test_ecc_module(device_handle, device_index): + """Test ECC availability, configurability, state, and set-state operations""" + print(f"\n---- Device {device_index} ECC Test ----") + + ecc_available = pz.ze_bool_t(0) + rc = pz.zesDeviceEccAvailable(device_handle, byref(ecc_available)) + if not check_rc(f"zesDeviceEccAvailable(device {device_index})", rc): + return False + + print_verbose(f"ECC Available: {bool(ecc_available.value)}") + if not ecc_available.value: + print_verbose("ECC is not available on this device") + return True + + ecc_configurable = pz.ze_bool_t(0) + rc = pz.zesDeviceEccConfigurable(device_handle, byref(ecc_configurable)) + if not check_rc(f"zesDeviceEccConfigurable(device {device_index})", rc): + return False + + print_verbose(f"ECC Configurable: {bool(ecc_configurable.value)}") + + get_state = pz.zes_device_ecc_properties_t() + get_state.stype = pz.ZES_STRUCTURE_TYPE_DEVICE_ECC_PROPERTIES + get_state.pNext = None + + rc = pz.zesDeviceGetEccState(device_handle, byref(get_state)) + if not check_rc(f"zesDeviceGetEccState(device {device_index})", rc): + return False + + print_verbose("ECC State:") + print_verbose(f" Current State: {get_ecc_state_string(get_state.currentState)}") + print_verbose(f" Pending State: {get_ecc_state_string(get_state.pendingState)}") + print_verbose( + f" Pending Action: {get_device_action_string(get_state.pendingAction)}" + ) + + if not ecc_configurable.value: + print_verbose("ECC is not configurable on this device") + return True + + if not is_root_user(): + print_verbose( + "Skipping zesDeviceSetEccState test due to insufficient permissions" + ) + return True + + restore_state = get_state.pendingState + if restore_state not in ( + pz.ZES_DEVICE_ECC_STATE_ENABLED, + pz.ZES_DEVICE_ECC_STATE_DISABLED, + ): + restore_state = get_state.currentState + + if restore_state == pz.ZES_DEVICE_ECC_STATE_ENABLED: + test_state = pz.ZES_DEVICE_ECC_STATE_DISABLED + else: + test_state = pz.ZES_DEVICE_ECC_STATE_ENABLED + + new_state = pz.zes_device_ecc_desc_t() + new_state.stype = pz.ZES_STRUCTURE_TYPE_DEVICE_ECC_DESC + new_state.pNext = None + new_state.state = test_state + + set_state = pz.zes_device_ecc_properties_t() + set_state.stype = pz.ZES_STRUCTURE_TYPE_DEVICE_ECC_PROPERTIES + set_state.pNext = None + + rc = pz.zesDeviceSetEccState(device_handle, byref(new_state), byref(set_state)) + if not check_rc_allow_action_required( + f"zesDeviceSetEccState(device {device_index}, test)", rc + ): + return False + + print_verbose("ECC Set State Result:") + print_verbose(f" Current State: {get_ecc_state_string(set_state.currentState)}") + print_verbose(f" Pending State: {get_ecc_state_string(set_state.pendingState)}") + print_verbose( + f" Pending Action: {get_device_action_string(set_state.pendingAction)}" + ) + + if restore_state != test_state: + new_state.state = restore_state + rc = pz.zesDeviceSetEccState(device_handle, byref(new_state), byref(set_state)) + if not check_rc_allow_action_required( + f"zesDeviceSetEccState(device {device_index}, restore)", rc + ): + return False + print_verbose("ECC configuration restored to original state") + + return True + + +def test_ras_module(device_handle, device_index): + """Test RAS handle enumeration, state retrieval, and clear-state operations""" + print(f"\n---- Device {device_index} RAS Test ----") + + ras_count = c_uint32(0) + rc = pz.zesDeviceEnumRasErrorSets(device_handle, byref(ras_count), None) + if not check_rc(f"zesDeviceEnumRasErrorSets(device {device_index}, count)", rc): + return False + + if ras_count.value == 0: + print_verbose("No RAS error sets found on this device") + return True + + print_verbose(f"Found {ras_count.value} RAS error set(s)") + + RasArray = pz.zes_ras_handle_t * ras_count.value + ras_handles = RasArray() + + rc = pz.zesDeviceEnumRasErrorSets(device_handle, byref(ras_count), ras_handles) + if not check_rc(f"zesDeviceEnumRasErrorSets(device {device_index}, handles)", rc): + return False + + for i in range(ras_count.value): + print_verbose(f"\n RAS Handle {i}:") + + state_count = c_uint32(0) + rc = pz.zesRasGetStateExp(ras_handles[i], byref(state_count), None) + if not check_rc(f"zesRasGetStateExp(handle {i}, count)", rc): + continue + + if state_count.value == 0: + print_verbose(" No RAS categories reported for this handle") + continue + + RasStateArray = pz.zes_ras_state_exp_t * state_count.value + ras_states = RasStateArray() + + rc = pz.zesRasGetStateExp(ras_handles[i], byref(state_count), ras_states) + if not check_rc(f"zesRasGetStateExp(handle {i}, states)", rc): + continue + + print_verbose(" RAS States:") + for ras_state in ras_states: + print_verbose( + f" {get_ras_error_category_string(ras_state.category)}: {ras_state.errorCounter}" + ) + + if not is_root_user(): + print_verbose( + " Skipping zesRasClearStateExp due to insufficient permissions" + ) + continue + + for ras_state in ras_states: + rc = pz.zesRasClearStateExp(ras_handles[i], ras_state.category) + if not check_rc( + f"zesRasClearStateExp(handle {i}, {get_ras_error_category_string(ras_state.category)})", + rc, + ): + return False + + rc = pz.zesRasGetStateExp(ras_handles[i], byref(state_count), ras_states) + if not check_rc(f"zesRasGetStateExp(handle {i}, verify)", rc): + continue + + print_verbose(" RAS States After Clear:") + for ras_state in ras_states: + print_verbose( + f" {get_ras_error_category_string(ras_state.category)}: {ras_state.errorCounter}" + ) + + return True + + def test_device_processes(device_handle, device_index): """Test device processes state""" print(f"\n---- Device {device_index} Processes Test ----") @@ -638,7 +948,7 @@ def test_memory_modules(device_handle, device_index): def test_power_module(device_handle, device_index): - """Test power domain enumeration and energy counter operations""" + """Test power domain enumeration, energy counter, and power limit extension operations""" print(f"\n---- Device {device_index} Power Domains Test ----") # Get power domain count @@ -665,6 +975,56 @@ def test_power_module(device_handle, device_index): for i in range(power_count.value): print_verbose(f"\n Power Domain {i}:") + limit_count = c_uint32(0) + rc = pz.zesPowerGetLimitsExt(power_handles[i], byref(limit_count), None) + if not check_rc(f"zesPowerGetLimitsExt(power {i}, count)", rc): + continue + + print_verbose(f" Power Limit Descriptor Count: {limit_count.value}") + + limit_descs = None + if limit_count.value > 0: + PowerLimitArray = pz.zes_power_limit_ext_desc_t * limit_count.value + limit_descs = PowerLimitArray() + + for limit_index in range(limit_count.value): + limit_descs[limit_index].stype = ( + pz.ZES_STRUCTURE_TYPE_POWER_LIMIT_EXT_DESC + ) + limit_descs[limit_index].pNext = None + + rc = pz.zesPowerGetLimitsExt( + power_handles[i], byref(limit_count), limit_descs + ) + if not check_rc(f"zesPowerGetLimitsExt(power {i}, descriptors)", rc): + continue + + print_verbose(" Power Limit Descriptors:") + for limit_index in range(limit_count.value): + limit_desc = limit_descs[limit_index] + print_verbose(f" Descriptor {limit_index}:") + print_verbose( + f" Level: {get_power_level_string(limit_desc.level)}" + ) + print_verbose( + f" Source: {get_power_source_string(limit_desc.source)}" + ) + print_verbose( + f" Limit Unit: {get_limit_unit_string(limit_desc.limitUnit)}" + ) + print_verbose( + f" Enabled State Locked: {bool(limit_desc.enabledStateLocked)}" + ) + print_verbose(f" Enabled: {bool(limit_desc.enabled)}") + print_verbose( + f" Interval Value Locked: {bool(limit_desc.intervalValueLocked)}" + ) + print_verbose(f" Interval: {limit_desc.interval}") + print_verbose( + f" Limit Value Locked: {bool(limit_desc.limitValueLocked)}" + ) + print_verbose(f" Limit: {limit_desc.limit}") + # Test power energy counter energy_counter = pz.zes_power_energy_counter_t() @@ -691,11 +1051,29 @@ def test_power_module(device_handle, device_index): f" Energy Delta: {energy_delta} over {time_delta} microseconds" ) + if limit_descs is None: + continue + + if not is_root_user(): + print_verbose( + " Skipping zesPowerSetLimitsExt due to insufficient permissions" + ) + continue + + set_count = c_uint32(limit_count.value) + rc = pz.zesPowerSetLimitsExt(power_handles[i], byref(set_count), limit_descs) + if not check_rc(f"zesPowerSetLimitsExt(power {i})", rc): + return False + + print_verbose( + f" Re-applied {set_count.value} power limit descriptor(s) successfully" + ) + return True def test_frequency_domains(device_handle, device_index): - """Test frequency domain enumeration and state operations""" + """Test frequency domain enumeration and state, range, and clock operations""" print(f"\n---- Device {device_index} Frequency Domains Test ----") # Get frequency domain count @@ -726,6 +1104,47 @@ def test_frequency_domains(device_handle, device_index): for i in range(freq_count.value): print_verbose(f"\n Frequency Domain {i}:") + available_clock_count = c_uint32(0) + rc = pz.zesFrequencyGetAvailableClocks( + freq_handles[i], byref(available_clock_count), None + ) + if not check_rc(f"zesFrequencyGetAvailableClocks(frequency {i}, count)", rc): + continue + + available_clocks = None + if available_clock_count.value > 0: + AvailableClockArray = c_double * available_clock_count.value + available_clocks = AvailableClockArray() + rc = pz.zesFrequencyGetAvailableClocks( + freq_handles[i], byref(available_clock_count), available_clocks + ) + if not check_rc( + f"zesFrequencyGetAvailableClocks(frequency {i}, clocks)", rc + ): + continue + + print_verbose(" Available Clocks:") + for clock_index in range(available_clock_count.value): + print_verbose(f" {available_clocks[clock_index]:.1f} MHz") + + freq_range = pz.zes_freq_range_t() + rc = pz.zesFrequencyGetRange(freq_handles[i], byref(freq_range)) + if not check_rc(f"zesFrequencyGetRange(frequency {i})", rc): + continue + + print_verbose(" Frequency Range:") + print_verbose(f" Min: {freq_range.min:.1f} MHz") + print_verbose(f" Max: {freq_range.max:.1f} MHz") + + throttle_time = pz.zes_freq_throttle_time_t() + rc = pz.zesFrequencyGetThrottleTime(freq_handles[i], byref(throttle_time)) + if not check_rc(f"zesFrequencyGetThrottleTime(frequency {i})", rc): + continue + + print_verbose(" Throttle Time:") + print_verbose(f" Throttle Time: {throttle_time.throttleTime}") + print_verbose(f" Timestamp: {throttle_time.timestamp} microseconds") + # Test frequency state freq_state = pz.zes_freq_state_t() freq_state.stype = pz.ZES_STRUCTURE_TYPE_FREQ_STATE @@ -765,6 +1184,49 @@ def test_frequency_domains(device_handle, device_index): f" Throttle Reasons: {get_throttle_reasons_string(freq_state.throttleReasons)}" ) + if available_clocks is None or available_clock_count.value == 0: + continue + + if not is_root_user(): + print_verbose( + " Skipping zesFrequencySetRange test due to insufficient permissions" + ) + continue + + original_range = pz.zes_freq_range_t() + original_range.min = freq_range.min + original_range.max = freq_range.max + + test_range = pz.zes_freq_range_t() + test_range.min = available_clocks[0] + test_range.max = available_clocks[0] + + rc = pz.zesFrequencySetRange(freq_handles[i], byref(test_range)) + if not check_rc(f"zesFrequencySetRange(frequency {i}, test)", rc): + return False + + verify_range = pz.zes_freq_range_t() + rc = pz.zesFrequencyGetRange(freq_handles[i], byref(verify_range)) + if not check_rc(f"zesFrequencyGetRange(frequency {i}, verify)", rc): + return False + + print_verbose(" Frequency Range After Set:") + print_verbose(f" Min: {verify_range.min:.1f} MHz") + print_verbose(f" Max: {verify_range.max:.1f} MHz") + + rc = pz.zesFrequencySetRange(freq_handles[i], byref(original_range)) + if not check_rc(f"zesFrequencySetRange(frequency {i}, restore)", rc): + return False + + restored_range = pz.zes_freq_range_t() + rc = pz.zesFrequencyGetRange(freq_handles[i], byref(restored_range)) + if not check_rc(f"zesFrequencyGetRange(frequency {i}, restored)", rc): + return False + + print_verbose(" Frequency Range Restored:") + print_verbose(f" Min: {restored_range.min:.1f} MHz") + print_verbose(f" Max: {restored_range.max:.1f} MHz") + return True @@ -894,6 +1356,15 @@ def run_all_tests(): # Test global device operations (properties and processes) test_global_operation(drivers[driver_idx], devices[device_idx], device_idx) + # Test PCI module + test_pci_module(devices[device_idx], device_idx) + + # Test ECC module + test_ecc_module(devices[device_idx], device_idx) + + # Test RAS module + test_ras_module(devices[device_idx], device_idx) + # Test memory modules test_memory_modules(devices[device_idx], device_idx) @@ -921,7 +1392,10 @@ def main(): %(prog)s -a # Run all tests %(prog)s -m # Memory tests only %(prog)s -g # Global operations (device properties and processes) only - %(prog)s -p # Power tests only + %(prog)s -p # PCI tests only + %(prog)s -C # ECC tests only + %(prog)s -R # RAS tests only + %(prog)s -o # Power tests only %(prog)s -f # Frequency tests only %(prog)s -t # Temperature tests only %(prog)s -e # Engine tests only @@ -940,8 +1414,11 @@ def main(): help="Run only global operations (device properties and processes)", ) parser.add_argument( - "-p", "--power", action="store_true", help="Run only power-related tests" + "-o", "--power", action="store_true", help="Run only power-related tests" ) + parser.add_argument("-p", "--pci", action="store_true", help="Run only PCI tests") + parser.add_argument("-C", "--ecc", action="store_true", help="Run only ECC tests") + parser.add_argument("-R", "--ras", action="store_true", help="Run only RAS tests") parser.add_argument( "-f", "--frequency", @@ -967,6 +1444,9 @@ def main(): specific_test = ( args.memory or getattr(args, "global", False) + or args.pci + or args.ecc + or args.ras or args.power or args.frequency or args.temperature @@ -1002,6 +1482,15 @@ def main(): if getattr(args, "global", False): test_global_operation(drivers[0], devices[device_idx], device_idx) + if args.pci: + test_pci_module(devices[device_idx], device_idx) + + if args.ecc: + test_ecc_module(devices[device_idx], device_idx) + + if args.ras: + test_ras_module(devices[device_idx], device_idx) + if args.memory: test_memory_modules(devices[device_idx], device_idx) diff --git a/bindings/sysman/python/source/pyzes.py b/bindings/sysman/python/source/pyzes.py index f7841f50..26d090ae 100644 --- a/bindings/sysman/python/source/pyzes.py +++ b/bindings/sysman/python/source/pyzes.py @@ -151,6 +151,10 @@ class zes_engine_handle_t(c_void_p): pass +class zes_ras_handle_t(c_void_p): + pass + + ## ze_bool_t = c_uint32 @@ -224,6 +228,26 @@ class zes_engine_handle_t(c_void_p): ZES_POWER_DOMAIN_GPU = 5 ZES_POWER_DOMAIN_FORCE_UINT32 = 0x7FFFFFFF +zes_power_level_t = c_int32 +ZES_POWER_LEVEL_UNKNOWN = 0 +ZES_POWER_LEVEL_SUSTAINED = 1 +ZES_POWER_LEVEL_BURST = 2 +ZES_POWER_LEVEL_PEAK = 3 +ZES_POWER_LEVEL_INSTANTANEOUS = 4 +ZES_POWER_LEVEL_FORCE_UINT32 = 0x7FFFFFFF + +zes_power_source_t = c_int32 +ZES_POWER_SOURCE_ANY = 0 +ZES_POWER_SOURCE_MAINS = 1 +ZES_POWER_SOURCE_BATTERY = 2 +ZES_POWER_SOURCE_FORCE_UINT32 = 0x7FFFFFFF + +zes_limit_unit_t = c_int32 +ZES_LIMIT_UNIT_UNKNOWN = 0 +ZES_LIMIT_UNIT_CURRENT = 1 +ZES_LIMIT_UNIT_POWER = 2 +ZES_LIMIT_UNIT_FORCE_UINT32 = 0x7FFFFFFF + ## Frequency domain enums ## zes_freq_domain_t = c_int32 ZES_FREQ_DOMAIN_GPU = 0 @@ -271,6 +295,20 @@ class zes_engine_handle_t(c_void_p): ZES_ENGINE_GROUP_MEDIA_CODEC_SINGLE = 14 ZES_ENGINE_GROUP_FORCE_UINT32 = 0x7FFFFFFF +## RAS error category enums ## +zes_ras_error_category_exp_t = c_int32 +ZES_RAS_ERROR_CATEGORY_EXP_RESET = 0 +ZES_RAS_ERROR_CATEGORY_EXP_PROGRAMMING_ERRORS = 1 +ZES_RAS_ERROR_CATEGORY_EXP_DRIVER_ERRORS = 2 +ZES_RAS_ERROR_CATEGORY_EXP_COMPUTE_ERRORS = 3 +ZES_RAS_ERROR_CATEGORY_EXP_NON_COMPUTE_ERRORS = 4 +ZES_RAS_ERROR_CATEGORY_EXP_CACHE_ERRORS = 5 +ZES_RAS_ERROR_CATEGORY_EXP_DISPLAY_ERRORS = 6 +ZES_RAS_ERROR_CATEGORY_EXP_MEMORY_ERRORS = 7 +ZES_RAS_ERROR_CATEGORY_EXP_SCALE_ERRORS = 8 +ZES_RAS_ERROR_CATEGORY_EXP_L3FABRIC_ERRORS = 9 +ZES_RAS_ERROR_CATEGORY_EXP_FORCE_UINT32 = 0x7FFFFFFF + ze_result_t = c_int32 ZE_RESULT_SUCCESS = 0 ZE_RESULT_NOT_READY = 1 @@ -334,14 +372,19 @@ class zes_engine_handle_t(c_void_p): # Structure type enum values ZES_STRUCTURE_TYPE_DEVICE_PROPERTIES = 0x1 +ZES_STRUCTURE_TYPE_PCI_PROPERTIES = 0x2 +ZES_STRUCTURE_TYPE_DEVICE_ECC_DESC = 0x25 +ZES_STRUCTURE_TYPE_DEVICE_ECC_PROPERTIES = 0x26 +ZES_STRUCTURE_TYPE_POWER_LIMIT_EXT_DESC = 0x27 +ZES_STRUCTURE_TYPE_POWER_EXT_PROPERTIES = 0x28 ZES_STRUCTURE_TYPE_DEVICE_EXT_PROPERTIES = 0x2D # from zes_structure_type_t ZES_STRUCTURE_TYPE_SUBDEVICE_EXP_PROPERTIES = 0x2E # Experimental subdevice properties ZES_STRUCTURE_TYPE_MEM_PROPERTIES = 0xB ZES_STRUCTURE_TYPE_MEM_STATE = 0x1E ZES_STRUCTURE_TYPE_FREQ_PROPERTIES = 0x9 ZES_STRUCTURE_TYPE_FREQ_STATE = 0x1B -ZES_STRUCTURE_TYPE_TEMP_PROPERTIES = 0xA -ZES_STRUCTURE_TYPE_TEMP_CONFIG = 0x1C +ZES_STRUCTURE_TYPE_TEMP_PROPERTIES = 0x14 +ZES_STRUCTURE_TYPE_TEMP_CONFIG = 0x23 ZES_STRUCTURE_TYPE_ENGINE_PROPERTIES = 0x5 @@ -406,6 +449,96 @@ class zes_process_state_t(_PrintableStructure): _fmt_ = {"memSize": "%d bytes", "sharedMemSize": "%d bytes"} +## PCI structures ## +class zes_pci_address_t(_PrintableStructure): + _fields_ = [ + ("domain", c_uint32), + ("bus", c_uint32), + ("device", c_uint32), + ("function", c_uint32), + ] + + +class zes_pci_speed_t(_PrintableStructure): + _fields_ = [ + ("gen", c_int32), + ("width", c_int32), + ("maxBandwidth", c_int64), + ] + _fmt_ = {"maxBandwidth": "%d"} + + +class zes_pci_properties_t(_PrintableStructure): + _fields_ = [ + ("stype", c_int32), + ("pNext", c_void_p), + ("address", zes_pci_address_t), + ("maxSpeed", zes_pci_speed_t), + ("haveBandwidthCounters", ze_bool_t), + ("havePacketCounters", ze_bool_t), + ("haveReplayCounters", ze_bool_t), + ] + + +class zes_pci_stats_t(_PrintableStructure): + _fields_ = [ + ("timestamp", c_uint64), + ("replayCounter", c_uint64), + ("packetCounter", c_uint64), + ("rxCounter", c_uint64), + ("txCounter", c_uint64), + ("speed", zes_pci_speed_t), + ] + _fmt_ = { + "timestamp": "%d", + "replayCounter": "%d", + "packetCounter": "%d", + "rxCounter": "%d", + "txCounter": "%d", + } + + +## RAS structures ## +class zes_ras_state_exp_t(_PrintableStructure): + _fields_ = [ + ("category", zes_ras_error_category_exp_t), + ("errorCounter", c_uint64), + ] + + +## ECC enums and structures ## +zes_device_ecc_state_t = c_int32 +ZES_DEVICE_ECC_STATE_UNAVAILABLE = 0 +ZES_DEVICE_ECC_STATE_ENABLED = 1 +ZES_DEVICE_ECC_STATE_DISABLED = 2 +ZES_DEVICE_ECC_STATE_FORCE_UINT32 = 0x7FFFFFFF + +zes_device_action_t = c_int32 +ZES_DEVICE_ACTION_NONE = 0 +ZES_DEVICE_ACTION_WARM_CARD_RESET = 1 +ZES_DEVICE_ACTION_COLD_CARD_RESET = 2 +ZES_DEVICE_ACTION_COLD_SYSTEM_REBOOT = 3 +ZES_DEVICE_ACTION_FORCE_UINT32 = 0x7FFFFFFF + + +class zes_device_ecc_desc_t(_PrintableStructure): + _fields_ = [ + ("stype", c_int32), + ("pNext", c_void_p), + ("state", zes_device_ecc_state_t), + ] + + +class zes_device_ecc_properties_t(_PrintableStructure): + _fields_ = [ + ("stype", c_int32), + ("pNext", c_void_p), + ("currentState", zes_device_ecc_state_t), + ("pendingState", zes_device_ecc_state_t), + ("pendingAction", zes_device_action_t), + ] + + ## Sysman zes_uuid_t ## class zes_uuid_t(_PrintableStructure): _fields_ = [("id", c_ubyte * ZES_MAX_UUID_SIZE)] @@ -478,6 +611,31 @@ class zes_mem_bandwidth_t(_PrintableStructure): ## Power structures ## +class zes_power_limit_ext_desc_t(_PrintableStructure): + _fields_ = [ + ("stype", c_int32), + ("pNext", c_void_p), + ("level", zes_power_level_t), + ("source", zes_power_source_t), + ("limitUnit", zes_limit_unit_t), + ("enabledStateLocked", ze_bool_t), + ("enabled", ze_bool_t), + ("intervalValueLocked", ze_bool_t), + ("interval", c_int32), + ("limitValueLocked", ze_bool_t), + ("limit", c_int32), + ] + + +class zes_power_ext_properties_t(_PrintableStructure): + _fields_ = [ + ("stype", c_int32), + ("pNext", c_void_p), + ("domain", zes_power_domain_t), + ("defaultLimit", POINTER(zes_power_limit_ext_desc_t)), + ] + + class zes_power_energy_counter_t(_PrintableStructure): _fields_ = [ ("energy", c_uint64), # monotonic energy counter in microjoules @@ -487,6 +645,14 @@ class zes_power_energy_counter_t(_PrintableStructure): ## Frequency structures ## +class zes_freq_range_t(_PrintableStructure): + _fields_ = [ + ("min", c_double), + ("max", c_double), + ] + _fmt_ = {"min": "%.1f MHz", "max": "%.1f MHz"} + + class zes_freq_state_t(_PrintableStructure): _fields_ = [ ("stype", c_int32), # ZES_STRUCTURE_TYPE_FREQ_STATE @@ -508,6 +674,14 @@ class zes_freq_state_t(_PrintableStructure): } +class zes_freq_throttle_time_t(_PrintableStructure): + _fields_ = [ + ("throttleTime", c_uint64), + ("timestamp", c_uint64), + ] + _fmt_ = {"throttleTime": "%d", "timestamp": "%d microseconds"} + + ## Temperature structures ## class zes_temp_properties_t(_PrintableStructure): _fields_ = [ @@ -635,6 +809,118 @@ def zesDeviceGetProperties(hDevice, pProperties): return retVal +## PCI management functions ## +def zesDevicePciGetProperties(hDevice, pProperties): + """Wraps API: + ze_result_t zesDevicePciGetProperties(zes_device_handle_t hDevice, zes_pci_properties_t* pProperties) + + Parameters: + hDevice: device handle + pProperties: POINTER(zes_pci_properties_t) - PCI properties structure to fill + Returns: + ze_result_t - return code only, properties are filled into pProperties + """ + funcPtr = getFunctionPointerList("zesDevicePciGetProperties") + funcPtr.argtypes = [zes_device_handle_t, POINTER(zes_pci_properties_t)] + funcPtr.restype = ze_result_t + retVal = funcPtr(hDevice, pProperties) + return retVal + + +def zesDevicePciGetStats(hDevice, pStats): + """Wraps API: + ze_result_t zesDevicePciGetStats(zes_device_handle_t hDevice, zes_pci_stats_t* pStats) + + Parameters: + hDevice: device handle + pStats: POINTER(zes_pci_stats_t) - PCI stats structure to fill + Returns: + ze_result_t - return code only, stats are filled into pStats + """ + funcPtr = getFunctionPointerList("zesDevicePciGetStats") + funcPtr.argtypes = [zes_device_handle_t, POINTER(zes_pci_stats_t)] + funcPtr.restype = ze_result_t + retVal = funcPtr(hDevice, pStats) + return retVal + + +## ECC management functions ## +def zesDeviceEccAvailable(hDevice, pAvailable): + """Wraps API: + ze_result_t zesDeviceEccAvailable(zes_device_handle_t hDevice, ze_bool_t* pAvailable) + + Parameters: + hDevice: device handle + pAvailable: POINTER(ze_bool_t) - ECC availability flag to fill + Returns: + ze_result_t - return code only, availability is filled into pAvailable + """ + funcPtr = getFunctionPointerList("zesDeviceEccAvailable") + funcPtr.argtypes = [zes_device_handle_t, POINTER(ze_bool_t)] + funcPtr.restype = ze_result_t + retVal = funcPtr(hDevice, pAvailable) + return retVal + + +def zesDeviceEccConfigurable(hDevice, pConfigurable): + """Wraps API: + ze_result_t zesDeviceEccConfigurable(zes_device_handle_t hDevice, ze_bool_t* pConfigurable) + + Parameters: + hDevice: device handle + pConfigurable: POINTER(ze_bool_t) - ECC configurability flag to fill + Returns: + ze_result_t - return code only, configurability is filled into pConfigurable + """ + funcPtr = getFunctionPointerList("zesDeviceEccConfigurable") + funcPtr.argtypes = [zes_device_handle_t, POINTER(ze_bool_t)] + funcPtr.restype = ze_result_t + retVal = funcPtr(hDevice, pConfigurable) + return retVal + + +def zesDeviceGetEccState(hDevice, pState): + """Wraps API: + ze_result_t zesDeviceGetEccState(zes_device_handle_t hDevice, zes_device_ecc_properties_t* pState) + + Parameters: + hDevice: device handle + pState: POINTER(zes_device_ecc_properties_t) - ECC state structure to fill + Returns: + ze_result_t - return code only, ECC state is filled into pState + """ + funcPtr = getFunctionPointerList("zesDeviceGetEccState") + funcPtr.argtypes = [zes_device_handle_t, POINTER(zes_device_ecc_properties_t)] + funcPtr.restype = ze_result_t + retVal = funcPtr(hDevice, pState) + return retVal + + +def zesDeviceSetEccState(hDevice, newState, pState): + """Wraps API: + ze_result_t zesDeviceSetEccState( + zes_device_handle_t hDevice, + const zes_device_ecc_desc_t* newState, + zes_device_ecc_properties_t* pState) + + Parameters: + hDevice: device handle + newState: POINTER(zes_device_ecc_desc_t) - desired ECC state descriptor + pState: POINTER(zes_device_ecc_properties_t) - resulting ECC state structure to fill + Returns: + ze_result_t - return code only, ECC state is filled into pState + """ + funcPtr = getFunctionPointerList("zesDeviceSetEccState") + funcPtr.argtypes = [ + zes_device_handle_t, + POINTER(zes_device_ecc_desc_t), + POINTER(zes_device_ecc_properties_t), + ] + funcPtr.restype = ze_result_t + retVal = funcPtr(hDevice, newState, pState) + return retVal + + def zesDeviceGetSubDevicePropertiesExp(hDevice, pCount, pSubdeviceProps): """Wraps API: ze_result_t zesDeviceGetSubDevicePropertiesExp( @@ -714,6 +1000,76 @@ def zesDeviceProcessesGetState(hDevice, pCount, pProcesses): return retVal +## RAS management functions ## +def zesDeviceEnumRasErrorSets(hDevice, pCount, phRas): + """Wraps API: + ze_result_t zesDeviceEnumRasErrorSets( + zes_device_handle_t hDevice, + uint32_t* pCount, + zes_ras_handle_t* phRas) + + Parameters: + hDevice: device handle + pCount: POINTER(c_uint32) + phRas: POINTER(zes_ras_handle_t) or None + Returns: + ze_result_t - return code only, RAS handles are filled into phRas + """ + funcPtr = getFunctionPointerList("zesDeviceEnumRasErrorSets") + funcPtr.argtypes = [ + zes_device_handle_t, + POINTER(c_uint32), + POINTER(zes_ras_handle_t), + ] + funcPtr.restype = ze_result_t + retVal = funcPtr(hDevice, pCount, phRas) + return retVal + + +def zesRasGetStateExp(hRas, pCount, pState): + """Wraps API: + ze_result_t zesRasGetStateExp( + zes_ras_handle_t hRas, + uint32_t* pCount, + zes_ras_state_exp_t* pState) + + Parameters: + hRas: RAS handle + pCount: POINTER(c_uint32) + pState: POINTER(zes_ras_state_exp_t) or None + Returns: + ze_result_t - return code only, RAS states are filled into pState + """ + funcPtr = getFunctionPointerList("zesRasGetStateExp") + funcPtr.argtypes = [ + zes_ras_handle_t, + POINTER(c_uint32), + POINTER(zes_ras_state_exp_t), + ] + funcPtr.restype = ze_result_t + retVal = funcPtr(hRas, pCount, pState) + return retVal + + +def zesRasClearStateExp(hRas, category): + """Wraps API: + ze_result_t zesRasClearStateExp( + zes_ras_handle_t hRas, + zes_ras_error_category_exp_t category) + + Parameters: + hRas: RAS handle + category: RAS error category to clear + Returns: + ze_result_t - return code only + """ + funcPtr = getFunctionPointerList("zesRasClearStateExp") + funcPtr.argtypes = [zes_ras_handle_t, zes_ras_error_category_exp_t] + funcPtr.restype = ze_result_t + retVal = funcPtr(hRas, category) + return retVal + + ## Memory management functions ## def zesDeviceEnumMemoryModules(hDevice, pCount, phMemory): """Wraps API: @@ -839,6 +1195,58 @@ def zesPowerGetEnergyCounter(hPower, pEnergy): return retVal +def zesPowerGetLimitsExt(hPower, pCount, pSustained): + """Wraps API: + ze_result_t zesPowerGetLimitsExt( + zes_pwr_handle_t hPower, + uint32_t* pCount, + zes_power_limit_ext_desc_t* pSustained) + + Parameters: + hPower: power handle + pCount: POINTER(c_uint32) + pSustained: POINTER(zes_power_limit_ext_desc_t) or None + Returns: + ze_result_t - return code only, power limit descriptors are filled into pSustained + """ + funcPtr = getFunctionPointerList("zesPowerGetLimitsExt") + funcPtr.argtypes = [ + zes_pwr_handle_t, + POINTER(c_uint32), + POINTER(zes_power_limit_ext_desc_t), + ] + funcPtr.restype = ze_result_t + + retVal = funcPtr(hPower, pCount, pSustained) + return retVal + + +def zesPowerSetLimitsExt(hPower, pCount, pSustained): + """Wraps API: + ze_result_t zesPowerSetLimitsExt( + zes_pwr_handle_t hPower, + uint32_t* pCount, + zes_power_limit_ext_desc_t* pSustained) + + Parameters: + hPower: power handle + pCount: POINTER(c_uint32) + pSustained: POINTER(zes_power_limit_ext_desc_t) or None + Returns: + ze_result_t - return code only + """ + funcPtr = getFunctionPointerList("zesPowerSetLimitsExt") + funcPtr.argtypes = [ + zes_pwr_handle_t, + POINTER(c_uint32), + POINTER(zes_power_limit_ext_desc_t), + ] + funcPtr.restype = ze_result_t + + retVal = funcPtr(hPower, pCount, pSustained) + return retVal + + ## Frequency module functions ## def zesDeviceEnumFrequencyDomains(hDevice, pCount, phFrequency): """Wraps API: @@ -883,6 +1291,88 @@ def zesFrequencyGetState(hFrequency, pState): return retVal +def zesFrequencyGetAvailableClocks(hFrequency, pCount, phFrequency): + """Wraps API: + ze_result_t zesFrequencyGetAvailableClocks( + zes_freq_handle_t hFrequency, + uint32_t* pCount, + double* phFrequency) + + Parameters: + hFrequency: frequency handle + pCount: POINTER(c_uint32) + phFrequency: POINTER(c_double) or None + Returns: + ze_result_t - return code only, available clocks are filled into phFrequency + """ + funcPtr = getFunctionPointerList("zesFrequencyGetAvailableClocks") + funcPtr.argtypes = [zes_freq_handle_t, POINTER(c_uint32), POINTER(c_double)] + funcPtr.restype = ze_result_t + + retVal = funcPtr(hFrequency, pCount, phFrequency) + return retVal + + +def zesFrequencyGetRange(hFrequency, pLimits): + """Wraps API: + ze_result_t zesFrequencyGetRange( + zes_freq_handle_t hFrequency, + zes_freq_range_t* pLimits) + + Parameters: + hFrequency: frequency handle + pLimits: POINTER(zes_freq_range_t) + Returns: + ze_result_t - return code only, frequency range is filled into pLimits + """ + funcPtr = getFunctionPointerList("zesFrequencyGetRange") + funcPtr.argtypes = [zes_freq_handle_t, POINTER(zes_freq_range_t)] + funcPtr.restype = ze_result_t + + retVal = funcPtr(hFrequency, pLimits) + return retVal + + +def zesFrequencySetRange(hFrequency, pLimits): + """Wraps API: + ze_result_t zesFrequencySetRange( + zes_freq_handle_t hFrequency, + const zes_freq_range_t* pLimits) + + Parameters: + hFrequency: frequency handle + pLimits: POINTER(zes_freq_range_t) + Returns: + ze_result_t - return code only + """ + funcPtr = getFunctionPointerList("zesFrequencySetRange") + funcPtr.argtypes = [zes_freq_handle_t, POINTER(zes_freq_range_t)] + funcPtr.restype = ze_result_t + + retVal = funcPtr(hFrequency, pLimits) + return retVal + + +def zesFrequencyGetThrottleTime(hFrequency, pThrottleTime): + """Wraps API: + ze_result_t zesFrequencyGetThrottleTime( + zes_freq_handle_t hFrequency, + zes_freq_throttle_time_t* pThrottleTime) + + Parameters: + hFrequency: frequency handle + pThrottleTime: POINTER(zes_freq_throttle_time_t) + Returns: + ze_result_t - return code only, throttle time is filled into pThrottleTime + """ + funcPtr = getFunctionPointerList("zesFrequencyGetThrottleTime") + funcPtr.argtypes = [zes_freq_handle_t, POINTER(zes_freq_throttle_time_t)] + funcPtr.restype = ze_result_t + + retVal = funcPtr(hFrequency, pThrottleTime) + return retVal + + ## Temperature sensor functions ## def zesDeviceEnumTemperatureSensors(hDevice, pCount, phTemperature): """Wraps API: diff --git a/bindings/sysman/python/test/unit_tests/test_frequency.py b/bindings/sysman/python/test/unit_tests/test_frequency.py index 2df47743..8ea937ea 100644 --- a/bindings/sysman/python/test/unit_tests/test_frequency.py +++ b/bindings/sysman/python/test/unit_tests/test_frequency.py @@ -21,7 +21,6 @@ @patch("pyzes.getFunctionPointerList") class TestFrequencyFunctions(unittest.TestCase): - def setUp(self): import pyzes @@ -87,6 +86,109 @@ def mock_get_state(frequency_handle, state_ptr): mock_get_func.assert_called_with("zesFrequencyGetState") mock_func.assert_called_once() + def test_GivenValidFrequencyHandleWhenCallingZesFrequencyGetAvailableClocksThenCallSucceedsWithClockList( + self, mock_get_func + ): + mock_clocks = [300.0, 600.0, 1200.0] + + def mock_get_available_clocks(frequency_handle, count_ptr, clocks_ptr): + count_ptr._obj.value = len(mock_clocks) + if clocks_ptr: + for index, clock in enumerate(mock_clocks): + clocks_ptr[index] = clock + return self.pyzes.ZE_RESULT_SUCCESS + + mock_func = MagicMock(side_effect=mock_get_available_clocks) + mock_get_func.return_value = mock_func + + frequency_handle = self.pyzes.zes_freq_handle_t() + count = c_uint32(len(mock_clocks)) + clock_array = (c_double * len(mock_clocks))() + + result = self.pyzes.zesFrequencyGetAvailableClocks( + frequency_handle, byref(count), clock_array + ) + + self.assertEqual(result, self.pyzes.ZE_RESULT_SUCCESS) + self.assertEqual(count.value, len(mock_clocks)) + self.assertEqual(list(clock_array), mock_clocks) + mock_get_func.assert_called_with("zesFrequencyGetAvailableClocks") + mock_func.assert_called_once() + + def test_GivenValidFrequencyHandleWhenCallingZesFrequencyGetRangeThenCallSucceedsWithRange( + self, mock_get_func + ): + mock_min = 400.0 + mock_max = 1700.0 + + def mock_get_range(frequency_handle, range_ptr): + range_ptr._obj.min = mock_min + range_ptr._obj.max = mock_max + return self.pyzes.ZE_RESULT_SUCCESS + + mock_func = MagicMock(side_effect=mock_get_range) + mock_get_func.return_value = mock_func + + frequency_handle = self.pyzes.zes_freq_handle_t() + freq_range = self.pyzes.zes_freq_range_t() + + result = self.pyzes.zesFrequencyGetRange(frequency_handle, byref(freq_range)) + + self.assertEqual(result, self.pyzes.ZE_RESULT_SUCCESS) + self.assertEqual(freq_range.min, mock_min) + self.assertEqual(freq_range.max, mock_max) + mock_get_func.assert_called_with("zesFrequencyGetRange") + mock_func.assert_called_once() + + def test_GivenValidFrequencyHandleWhenCallingZesFrequencySetRangeThenCallSucceeds( + self, mock_get_func + ): + def mock_set_range(frequency_handle, range_ptr): + self.assertEqual(range_ptr._obj.min, 500.0) + self.assertEqual(range_ptr._obj.max, 1500.0) + return self.pyzes.ZE_RESULT_SUCCESS + + mock_func = MagicMock(side_effect=mock_set_range) + mock_get_func.return_value = mock_func + + frequency_handle = self.pyzes.zes_freq_handle_t() + freq_range = self.pyzes.zes_freq_range_t() + freq_range.min = 500.0 + freq_range.max = 1500.0 + + result = self.pyzes.zesFrequencySetRange(frequency_handle, byref(freq_range)) + + self.assertEqual(result, self.pyzes.ZE_RESULT_SUCCESS) + mock_get_func.assert_called_with("zesFrequencySetRange") + mock_func.assert_called_once() + + def test_GivenValidFrequencyHandleWhenCallingZesFrequencyGetThrottleTimeThenCallSucceedsWithThrottleTime( + self, mock_get_func + ): + mock_throttle_time = 12345 + mock_timestamp = 67890 + + def mock_get_throttle_time(frequency_handle, throttle_time_ptr): + throttle_time_ptr._obj.throttleTime = mock_throttle_time + throttle_time_ptr._obj.timestamp = mock_timestamp + return self.pyzes.ZE_RESULT_SUCCESS + + mock_func = MagicMock(side_effect=mock_get_throttle_time) + mock_get_func.return_value = mock_func + + frequency_handle = self.pyzes.zes_freq_handle_t() + throttle_time = self.pyzes.zes_freq_throttle_time_t() + + result = self.pyzes.zesFrequencyGetThrottleTime( + frequency_handle, byref(throttle_time) + ) + + self.assertEqual(result, self.pyzes.ZE_RESULT_SUCCESS) + self.assertEqual(throttle_time.throttleTime, mock_throttle_time) + self.assertEqual(throttle_time.timestamp, mock_timestamp) + mock_get_func.assert_called_with("zesFrequencyGetThrottleTime") + mock_func.assert_called_once() + if __name__ == "__main__": unittest.main() diff --git a/bindings/sysman/python/test/unit_tests/test_global_operations.py b/bindings/sysman/python/test/unit_tests/test_global_operations.py index b74a5db5..33094855 100644 --- a/bindings/sysman/python/test/unit_tests/test_global_operations.py +++ b/bindings/sysman/python/test/unit_tests/test_global_operations.py @@ -21,7 +21,6 @@ @patch("pyzes.getFunctionPointerList") class TestGlobalOperations(unittest.TestCase): - def setUp(self): import pyzes @@ -102,6 +101,100 @@ def mock_get_properties(device_handle, properties_ptr): mock_get_func.assert_called_with("zesDeviceGetProperties") mock_func.assert_called_once() + def test_GivenValidDeviceHandleWhenCallingZesDevicePciGetPropertiesThenCallSucceedsWithValidProperties( + self, mock_get_func + ): + mock_domain = 0 + mock_bus = 3 + mock_device = 0x1F + mock_function = 0 + mock_gen = 5 + mock_width = 16 + mock_max_bandwidth = 63_015_384_000 + mock_have_bandwidth_counters = 1 + mock_have_packet_counters = 1 + mock_have_replay_counters = 0 + + def mock_get_pci_properties(device_handle, properties_ptr): + properties_ptr._obj.address.domain = mock_domain + properties_ptr._obj.address.bus = mock_bus + properties_ptr._obj.address.device = mock_device + properties_ptr._obj.address.function = mock_function + properties_ptr._obj.maxSpeed.gen = mock_gen + properties_ptr._obj.maxSpeed.width = mock_width + properties_ptr._obj.maxSpeed.maxBandwidth = mock_max_bandwidth + properties_ptr._obj.haveBandwidthCounters = mock_have_bandwidth_counters + properties_ptr._obj.havePacketCounters = mock_have_packet_counters + properties_ptr._obj.haveReplayCounters = mock_have_replay_counters + return self.pyzes.ZE_RESULT_SUCCESS + + mock_func = MagicMock(side_effect=mock_get_pci_properties) + mock_get_func.return_value = mock_func + + device_handle = self.pyzes.zes_device_handle_t() + pci_props = self.pyzes.zes_pci_properties_t() + + result = self.pyzes.zesDevicePciGetProperties(device_handle, byref(pci_props)) + + self.assertEqual(result, self.pyzes.ZE_RESULT_SUCCESS) + self.assertEqual(pci_props.address.domain, mock_domain) + self.assertEqual(pci_props.address.bus, mock_bus) + self.assertEqual(pci_props.address.device, mock_device) + self.assertEqual(pci_props.address.function, mock_function) + self.assertEqual(pci_props.maxSpeed.gen, mock_gen) + self.assertEqual(pci_props.maxSpeed.width, mock_width) + self.assertEqual(pci_props.maxSpeed.maxBandwidth, mock_max_bandwidth) + self.assertEqual(pci_props.haveBandwidthCounters, mock_have_bandwidth_counters) + self.assertEqual(pci_props.havePacketCounters, mock_have_packet_counters) + self.assertEqual(pci_props.haveReplayCounters, mock_have_replay_counters) + + mock_get_func.assert_called_with("zesDevicePciGetProperties") + mock_func.assert_called_once() + + def test_GivenValidDeviceHandleWhenCallingZesDevicePciGetStatsThenCallSucceedsWithValidStats( + self, mock_get_func + ): + mock_timestamp = 123456789 + mock_replay_counter = 7 + mock_packet_counter = 19 + mock_rx_counter = 4096 + mock_tx_counter = 8192 + mock_gen = 4 + mock_width = 8 + mock_max_bandwidth = 16_000_000_000 + + def mock_get_pci_stats(device_handle, stats_ptr): + stats_ptr._obj.timestamp = mock_timestamp + stats_ptr._obj.replayCounter = mock_replay_counter + stats_ptr._obj.packetCounter = mock_packet_counter + stats_ptr._obj.rxCounter = mock_rx_counter + stats_ptr._obj.txCounter = mock_tx_counter + stats_ptr._obj.speed.gen = mock_gen + stats_ptr._obj.speed.width = mock_width + stats_ptr._obj.speed.maxBandwidth = mock_max_bandwidth + return self.pyzes.ZE_RESULT_SUCCESS + + mock_func = MagicMock(side_effect=mock_get_pci_stats) + mock_get_func.return_value = mock_func + + device_handle = self.pyzes.zes_device_handle_t() + pci_stats = self.pyzes.zes_pci_stats_t() + + result = self.pyzes.zesDevicePciGetStats(device_handle, byref(pci_stats)) + + self.assertEqual(result, self.pyzes.ZE_RESULT_SUCCESS) + self.assertEqual(pci_stats.timestamp, mock_timestamp) + self.assertEqual(pci_stats.replayCounter, mock_replay_counter) + self.assertEqual(pci_stats.packetCounter, mock_packet_counter) + self.assertEqual(pci_stats.rxCounter, mock_rx_counter) + self.assertEqual(pci_stats.txCounter, mock_tx_counter) + self.assertEqual(pci_stats.speed.gen, mock_gen) + self.assertEqual(pci_stats.speed.width, mock_width) + self.assertEqual(pci_stats.speed.maxBandwidth, mock_max_bandwidth) + + mock_get_func.assert_called_with("zesDevicePciGetStats") + mock_func.assert_called_once() + def test_GivenValidDeviceHandleWhenCallingZesDeviceProcessesGetStateThenCallSucceedsWithProcessCount( self, mock_get_func ): diff --git a/bindings/sysman/python/test/unit_tests/test_power.py b/bindings/sysman/python/test/unit_tests/test_power.py index b4b98734..391a8702 100644 --- a/bindings/sysman/python/test/unit_tests/test_power.py +++ b/bindings/sysman/python/test/unit_tests/test_power.py @@ -21,7 +21,6 @@ @patch("pyzes.getFunctionPointerList") class TestPowerFunctions(unittest.TestCase): - def setUp(self): import pyzes @@ -74,6 +73,80 @@ def mock_get_energy(power_handle, energy_ptr): mock_get_func.assert_called_with("zesPowerGetEnergyCounter") mock_func.assert_called_once() + def test_GivenValidPowerHandleWhenCallingZesPowerGetLimitsExtThenCallSucceedsWithDescriptors( + self, mock_get_func + ): + mock_count = 2 + + def mock_get_limits_ext(power_handle, count_ptr, limits_ptr): + count_ptr._obj.value = mock_count + limits_ptr[0].level = self.pyzes.ZES_POWER_LEVEL_SUSTAINED + limits_ptr[0].source = self.pyzes.ZES_POWER_SOURCE_MAINS + limits_ptr[0].limitUnit = self.pyzes.ZES_LIMIT_UNIT_POWER + limits_ptr[0].enabled = 1 + limits_ptr[0].interval = 1000 + limits_ptr[0].limit = 250000 + limits_ptr[1].level = self.pyzes.ZES_POWER_LEVEL_BURST + limits_ptr[1].source = self.pyzes.ZES_POWER_SOURCE_ANY + limits_ptr[1].limitUnit = self.pyzes.ZES_LIMIT_UNIT_POWER + limits_ptr[1].enabled = 0 + limits_ptr[1].interval = 0 + limits_ptr[1].limit = 300000 + return self.pyzes.ZE_RESULT_SUCCESS + + mock_func = MagicMock(side_effect=mock_get_limits_ext) + mock_get_func.return_value = mock_func + + power_handle = self.pyzes.zes_pwr_handle_t() + count = c_uint32(mock_count) + limits = (self.pyzes.zes_power_limit_ext_desc_t * mock_count)() + + result = self.pyzes.zesPowerGetLimitsExt(power_handle, byref(count), limits) + + self.assertEqual(result, self.pyzes.ZE_RESULT_SUCCESS) + self.assertEqual(count.value, mock_count) + self.assertEqual(limits[0].level, self.pyzes.ZES_POWER_LEVEL_SUSTAINED) + self.assertEqual(limits[0].source, self.pyzes.ZES_POWER_SOURCE_MAINS) + self.assertEqual(limits[0].limit, 250000) + self.assertEqual(limits[1].level, self.pyzes.ZES_POWER_LEVEL_BURST) + self.assertEqual(limits[1].enabled, 0) + self.assertEqual(limits[1].limit, 300000) + mock_get_func.assert_called_with("zesPowerGetLimitsExt") + mock_func.assert_called_once() + + def test_GivenValidPowerHandleWhenCallingZesPowerSetLimitsExtThenCallSucceeds( + self, mock_get_func + ): + def mock_set_limits_ext(power_handle, count_ptr, limits_ptr): + self.assertEqual(count_ptr._obj.value, 1) + self.assertEqual(limits_ptr[0].level, self.pyzes.ZES_POWER_LEVEL_SUSTAINED) + self.assertEqual(limits_ptr[0].source, self.pyzes.ZES_POWER_SOURCE_MAINS) + self.assertEqual(limits_ptr[0].limitUnit, self.pyzes.ZES_LIMIT_UNIT_POWER) + self.assertEqual(limits_ptr[0].enabled, 1) + self.assertEqual(limits_ptr[0].interval, 2000) + self.assertEqual(limits_ptr[0].limit, 275000) + return self.pyzes.ZE_RESULT_SUCCESS + + mock_func = MagicMock(side_effect=mock_set_limits_ext) + mock_get_func.return_value = mock_func + + power_handle = self.pyzes.zes_pwr_handle_t() + count = c_uint32(1) + limits = (self.pyzes.zes_power_limit_ext_desc_t * 1)() + limits[0].stype = self.pyzes.ZES_STRUCTURE_TYPE_POWER_LIMIT_EXT_DESC + limits[0].level = self.pyzes.ZES_POWER_LEVEL_SUSTAINED + limits[0].source = self.pyzes.ZES_POWER_SOURCE_MAINS + limits[0].limitUnit = self.pyzes.ZES_LIMIT_UNIT_POWER + limits[0].enabled = 1 + limits[0].interval = 2000 + limits[0].limit = 275000 + + result = self.pyzes.zesPowerSetLimitsExt(power_handle, byref(count), limits) + + self.assertEqual(result, self.pyzes.ZE_RESULT_SUCCESS) + mock_get_func.assert_called_with("zesPowerSetLimitsExt") + mock_func.assert_called_once() + if __name__ == "__main__": unittest.main()