diff --git a/bin/wfbench b/bin/wfbench index 4bc03cba..c253d543 100755 --- a/bin/wfbench +++ b/bin/wfbench @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # Copyright (c) 2021-2025 The WfCommons Team. @@ -20,6 +20,7 @@ import re import json import logging import pandas as pd +import psutil from io import StringIO from filelock import FileLock @@ -27,6 +28,9 @@ from pathos.helpers import mp as multiprocessing from typing import List, Optional +int32_max = 2147483647 + + # Configure logging logging.basicConfig( level=logging.INFO, # Change this to control the verbosity @@ -39,6 +43,20 @@ logging.basicConfig( this_dir = pathlib.Path(__file__).resolve().parent +def kill_process_and_children(proc): + if proc is None: + return + try: + parent = psutil.Process(proc.pid) + children = parent.children(recursive=True) + for child in children: + child.kill() + parent.kill() + + except psutil.NoSuchProcess: + pass # Process is already dead + + def log_info(msg: str): """ Log an info message to stderr @@ -165,34 +183,39 @@ def cpu_mem_benchmark(cpu_queue: multiprocessing.Queue, :rtype: List """ total_mem = f"{total_mem}B" if total_mem else f"{100.0 / os.cpu_count()}%" - cpu_work_per_thread = int(cpu_work / cpu_threads) - - cpu_procs = [] - mem_procs = [] - cpu_prog = [f"{this_dir.joinpath('cpu-benchmark')}", f"{cpu_work_per_thread}"] + cpu_work_per_thread = int(1000000 * cpu_work / (16384 * cpu_threads)) if cpu_threads != 0 else int32_max**2 + cpu_samples = min(cpu_work_per_thread, int32_max) + cpu_ops = (cpu_work_per_thread + int32_max - 1) // int32_max + if cpu_ops > int32_max: + log_info("Exceeded maximum allowed value of cpu work.") + cpu_ops = int32_max + + cpu_proc = None + mem_proc = None + + cpu_prog = ["stress-ng", "--monte-carlo", f"{cpu_threads}", + "--monte-carlo-method", "pi", + "--monte-carlo-rand", "lcg", + "--monte-carlo-samples", f"{cpu_samples}", + "--monte-carlo-ops", f"{cpu_ops}", + "--quiet"] mem_prog = ["stress-ng", "--vm", f"{mem_threads}", - "--vm-bytes", f"{total_mem}", "--vm-keep"] + "--vm-bytes", f"{total_mem}", "--vm-keep", "--quiet"] - for i in range(cpu_threads): - cpu_proc = subprocess.Popen(cpu_prog, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + if cpu_threads > 0: + cpu_proc = subprocess.Popen(cpu_prog, preexec_fn=os.setsid) # NOTE: might be a good idea to use psutil to set the affinity (works across platforms) if core: os.sched_setaffinity(cpu_proc.pid, {core}) - cpu_procs.append(cpu_proc) - - # Start a thread to monitor the progress of each CPU benchmark process - monitor_thread = multiprocessing.Process(target=monitor_progress, args=(cpu_proc, cpu_queue)) - monitor_thread.start() if mem_threads > 0: # NOTE: add a check to use creationflags=subprocess.CREATE_NEW_PROCESS_GROUP for Windows mem_proc = subprocess.Popen(mem_prog, preexec_fn=os.setsid) if core: os.sched_setaffinity(mem_proc.pid, {core}) - mem_procs.append(mem_proc) - return cpu_procs, mem_procs + return [cpu_proc, mem_proc] def io_read_benchmark_user_input_data_size(inputs, @@ -446,14 +469,13 @@ def main(): log_debug(f"{args.name} acquired core {core}") mem_threads=int(10 - 10 * args.percent_cpu) - cpu_procs, mem_procs = cpu_mem_benchmark(cpu_queue=cpu_queue, + [cpu_proc, mem_proc] = cpu_mem_benchmark(cpu_queue=cpu_queue, cpu_threads=int(10 * args.percent_cpu), mem_threads=mem_threads, - cpu_work=sys.maxsize if args.time else int(args.cpu_work), + cpu_work=int32_max**2 if args.time else int(args.cpu_work), core=core, total_mem=mem_bytes) - - procs.extend(cpu_procs) + procs.append(cpu_proc) if args.time: time.sleep(int(args.time)) for proc in procs: @@ -461,20 +483,19 @@ def main(): if proc.is_alive(): proc.terminate() elif isinstance(proc, subprocess.Popen): - proc.terminate() + kill_process_and_children(proc) else: for proc in procs: if isinstance(proc, subprocess.Popen): proc.wait() if io_proc is not None and io_proc.is_alive(): - # io_proc.terminate() + io_proc.terminate() io_proc.join() - for mem_proc in mem_procs: - try: - os.kill(mem_proc.pid, signal.SIGKILL) # Force kill if SIGTERM fails - except subprocess.TimeoutExpired: - log_debug("Memory process did not terminate; force-killing.") + try: + kill_process_and_children(mem_proc) + except subprocess.TimeoutExpired: + log_debug("Memory process did not terminate; force-killing.") # As a fallback, use pkill if any remaining instances are stuck subprocess.Popen(["pkill", "-f", "stress-ng"]).wait() diff --git a/pyproject.toml b/pyproject.toml index b3ceed01..caa985f0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,11 +29,11 @@ dependencies = [ "networkx", "numpy", "pandas", + "psutil", "python-dateutil", "requests", "scipy>=1.16.1", "pyyaml", - "pandas", "shortuuid", "stringcase", "filelock", diff --git a/tests/test_helpers.py b/tests/test_helpers.py index 3f45290c..966b7379 100644 --- a/tests/test_helpers.py +++ b/tests/test_helpers.py @@ -90,7 +90,8 @@ def _start_docker_container(backend, mounted_dir, working_dir, bin_dir, command= working_dir=working_dir, user="wfcommons", tty=True, - detach=True + detach=True, + init=True # For zombies ) # Installing WfCommons on container @@ -165,4 +166,4 @@ def _compare_workflows(workflow_1: Workflow, workflow_2: Workflow): # sys.stderr.write(f"WORKFLOW2 OUTPUT FILE: {output_file.file_id} {output_file.size}\n") workflow2_output_bytes += output_file.size assert (workflow1_input_bytes == workflow2_input_bytes) - assert (workflow1_output_bytes == workflow2_output_bytes) \ No newline at end of file + assert (workflow1_output_bytes == workflow2_output_bytes)