From b9d87fcf2d9a22c9f5aa7c84491d3267550520db Mon Sep 17 00:00:00 2001 From: Steven Hahn Date: Mon, 9 Feb 2026 14:37:32 -0500 Subject: [PATCH 1/7] Replace cpu-benchmark with similar stress-ng monte-carlo test --- bin/wfbench | 65 +++++++++++++++++++++++++++++++++-------------------- 1 file changed, 41 insertions(+), 24 deletions(-) diff --git a/bin/wfbench b/bin/wfbench index 4bc03cba..e30273d3 100755 --- a/bin/wfbench +++ b/bin/wfbench @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # Copyright (c) 2021-2025 The WfCommons Team. @@ -20,6 +20,7 @@ import re import json import logging import pandas as pd +import psutil from io import StringIO from filelock import FileLock @@ -27,6 +28,9 @@ from pathos.helpers import mp as multiprocessing from typing import List, Optional +int32_max = 2147483647 + + # Configure logging logging.basicConfig( level=logging.INFO, # Change this to control the verbosity @@ -39,6 +43,16 @@ logging.basicConfig( this_dir = pathlib.Path(__file__).resolve().parent +def kill_process_and_children(proc): + try: + parent = psutil.Process(proc.pid) + children = parent.children(recursive=True) + for child in children: + child.kill() + parent.kill() + except psutil.NoSuchProcess: + pass # Process is already dead + def log_info(msg: str): """ Log an info message to stderr @@ -165,34 +179,38 @@ def cpu_mem_benchmark(cpu_queue: multiprocessing.Queue, :rtype: List """ total_mem = f"{total_mem}B" if total_mem else f"{100.0 / os.cpu_count()}%" - cpu_work_per_thread = int(cpu_work / cpu_threads) - - cpu_procs = [] - mem_procs = [] - cpu_prog = [f"{this_dir.joinpath('cpu-benchmark')}", f"{cpu_work_per_thread}"] + cpu_work_per_thread = int(1000000 * cpu_work / (16384 * cpu_threads)) + cpu_samples = min(cpu_work_per_thread, int32_max) + cpu_ops = (cpu_work_per_thread + int32_max - 1) // int32_max + if cpu_ops > int32_max: + log_info("Exceeded maximum number of cpu work.") + cpu_ops = int32_max + + cpu_proc = None + mem_proc = None + + cpu_prog = ["stress-ng", "--monte-carlo", f"{cpu_threads}", + "--monte-carlo-method", "pi", + "--monte-carlo-rand", "lcg", + "--monte-carlo-samples", f"{cpu_samples}", + "--monte-carlo-ops", f"{cpu_ops}"] mem_prog = ["stress-ng", "--vm", f"{mem_threads}", "--vm-bytes", f"{total_mem}", "--vm-keep"] - for i in range(cpu_threads): - cpu_proc = subprocess.Popen(cpu_prog, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + if cpu_threads > 0: + cpu_proc = subprocess.Popen(cpu_prog, preexec_fn=os.setsid) # NOTE: might be a good idea to use psutil to set the affinity (works across platforms) if core: os.sched_setaffinity(cpu_proc.pid, {core}) - cpu_procs.append(cpu_proc) - - # Start a thread to monitor the progress of each CPU benchmark process - monitor_thread = multiprocessing.Process(target=monitor_progress, args=(cpu_proc, cpu_queue)) - monitor_thread.start() if mem_threads > 0: # NOTE: add a check to use creationflags=subprocess.CREATE_NEW_PROCESS_GROUP for Windows mem_proc = subprocess.Popen(mem_prog, preexec_fn=os.setsid) if core: os.sched_setaffinity(mem_proc.pid, {core}) - mem_procs.append(mem_proc) - return cpu_procs, mem_procs + return cpu_proc, mem_proc def io_read_benchmark_user_input_data_size(inputs, @@ -446,14 +464,14 @@ def main(): log_debug(f"{args.name} acquired core {core}") mem_threads=int(10 - 10 * args.percent_cpu) - cpu_procs, mem_procs = cpu_mem_benchmark(cpu_queue=cpu_queue, + cpu_proc, mem_proc = cpu_mem_benchmark(cpu_queue=cpu_queue, cpu_threads=int(10 * args.percent_cpu), mem_threads=mem_threads, - cpu_work=sys.maxsize if args.time else int(args.cpu_work), + cpu_work=int32_max**2 if args.time else int(args.cpu_work), core=core, total_mem=mem_bytes) - procs.extend(cpu_procs) + procs.append(cpu_proc) if args.time: time.sleep(int(args.time)) for proc in procs: @@ -461,7 +479,7 @@ def main(): if proc.is_alive(): proc.terminate() elif isinstance(proc, subprocess.Popen): - proc.terminate() + kill_process_and_children(proc) else: for proc in procs: if isinstance(proc, subprocess.Popen): @@ -470,11 +488,10 @@ def main(): # io_proc.terminate() io_proc.join() - for mem_proc in mem_procs: - try: - os.kill(mem_proc.pid, signal.SIGKILL) # Force kill if SIGTERM fails - except subprocess.TimeoutExpired: - log_debug("Memory process did not terminate; force-killing.") + try: + os.kill(mem_proc.pid, signal.SIGKILL) # Force kill if SIGTERM fails + except subprocess.TimeoutExpired: + log_debug("Memory process did not terminate; force-killing.") # As a fallback, use pkill if any remaining instances are stuck subprocess.Popen(["pkill", "-f", "stress-ng"]).wait() From d3778c2fb3170e96d8d1da9b17960a8c5b81afe4 Mon Sep 17 00:00:00 2001 From: Steven Hahn Date: Tue, 10 Feb 2026 14:33:17 -0500 Subject: [PATCH 2/7] Add dependency psutil --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index b3ceed01..caa985f0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,11 +29,11 @@ dependencies = [ "networkx", "numpy", "pandas", + "psutil", "python-dateutil", "requests", "scipy>=1.16.1", "pyyaml", - "pandas", "shortuuid", "stringcase", "filelock", From 827c1893f05cffda3d062442ad2e3af9b8266d15 Mon Sep 17 00:00:00 2001 From: Steven Hahn Date: Tue, 10 Feb 2026 14:38:32 -0500 Subject: [PATCH 3/7] terminate io process --- bin/wfbench | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/bin/wfbench b/bin/wfbench index e30273d3..34aa5425 100755 --- a/bin/wfbench +++ b/bin/wfbench @@ -44,6 +44,8 @@ this_dir = pathlib.Path(__file__).resolve().parent def kill_process_and_children(proc): + if proc is None: + return try: parent = psutil.Process(proc.pid) children = parent.children(recursive=True) @@ -470,7 +472,6 @@ def main(): cpu_work=int32_max**2 if args.time else int(args.cpu_work), core=core, total_mem=mem_bytes) - procs.append(cpu_proc) if args.time: time.sleep(int(args.time)) @@ -485,11 +486,11 @@ def main(): if isinstance(proc, subprocess.Popen): proc.wait() if io_proc is not None and io_proc.is_alive(): - # io_proc.terminate() + io_proc.terminate() io_proc.join() try: - os.kill(mem_proc.pid, signal.SIGKILL) # Force kill if SIGTERM fails + kill_process_and_children(mem_proc) except subprocess.TimeoutExpired: log_debug("Memory process did not terminate; force-killing.") # As a fallback, use pkill if any remaining instances are stuck From e41861c335b0a5c309cf587e34f0613f709b5c42 Mon Sep 17 00:00:00 2001 From: Steven Hahn Date: Tue, 10 Feb 2026 14:54:56 -0500 Subject: [PATCH 4/7] check for division by zero --- bin/wfbench | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bin/wfbench b/bin/wfbench index 34aa5425..fdc7177e 100755 --- a/bin/wfbench +++ b/bin/wfbench @@ -181,11 +181,11 @@ def cpu_mem_benchmark(cpu_queue: multiprocessing.Queue, :rtype: List """ total_mem = f"{total_mem}B" if total_mem else f"{100.0 / os.cpu_count()}%" - cpu_work_per_thread = int(1000000 * cpu_work / (16384 * cpu_threads)) + cpu_work_per_thread = int(1000000 * cpu_work / (16384 * cpu_threads)) if cpu_threads != 0 else int32_max**2 cpu_samples = min(cpu_work_per_thread, int32_max) cpu_ops = (cpu_work_per_thread + int32_max - 1) // int32_max if cpu_ops > int32_max: - log_info("Exceeded maximum number of cpu work.") + log_info("Exceeded maximum allowed value of cpu work.") cpu_ops = int32_max cpu_proc = None From 901e87bcebe598eae9e3c60eedc4d5a702812dcb Mon Sep 17 00:00:00 2001 From: Steven Hahn Date: Tue, 10 Feb 2026 15:35:42 -0500 Subject: [PATCH 5/7] mute stress-ng --- bin/wfbench | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/bin/wfbench b/bin/wfbench index fdc7177e..74194106 100755 --- a/bin/wfbench +++ b/bin/wfbench @@ -195,7 +195,8 @@ def cpu_mem_benchmark(cpu_queue: multiprocessing.Queue, "--monte-carlo-method", "pi", "--monte-carlo-rand", "lcg", "--monte-carlo-samples", f"{cpu_samples}", - "--monte-carlo-ops", f"{cpu_ops}"] + "--monte-carlo-ops", f"{cpu_ops}", + "--quiet"] mem_prog = ["stress-ng", "--vm", f"{mem_threads}", "--vm-bytes", f"{total_mem}", "--vm-keep"] From 8c560de6eec7fa5cf61f496819adca54e3fac0fe Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Tue, 10 Feb 2026 16:05:36 -1000 Subject: [PATCH 6/7] Added a --quiet flag to another stress-ng invocation --- bin/wfbench | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/wfbench b/bin/wfbench index 74194106..7b3853d6 100755 --- a/bin/wfbench +++ b/bin/wfbench @@ -198,7 +198,7 @@ def cpu_mem_benchmark(cpu_queue: multiprocessing.Queue, "--monte-carlo-ops", f"{cpu_ops}", "--quiet"] mem_prog = ["stress-ng", "--vm", f"{mem_threads}", - "--vm-bytes", f"{total_mem}", "--vm-keep"] + "--vm-bytes", f"{total_mem}", "--vm-keep", "--quiet"] if cpu_threads > 0: cpu_proc = subprocess.Popen(cpu_prog, preexec_fn=os.setsid) From ffac645e0466e61e341fa4bf1f1dd4cb2a14be08 Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Wed, 11 Feb 2026 11:50:10 -1000 Subject: [PATCH 7/7] Fixed the zombie problem --- bin/wfbench | 6 ++++-- tests/test_helpers.py | 5 +++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/bin/wfbench b/bin/wfbench index 7b3853d6..c253d543 100755 --- a/bin/wfbench +++ b/bin/wfbench @@ -52,9 +52,11 @@ def kill_process_and_children(proc): for child in children: child.kill() parent.kill() + except psutil.NoSuchProcess: pass # Process is already dead + def log_info(msg: str): """ Log an info message to stderr @@ -213,7 +215,7 @@ def cpu_mem_benchmark(cpu_queue: multiprocessing.Queue, if core: os.sched_setaffinity(mem_proc.pid, {core}) - return cpu_proc, mem_proc + return [cpu_proc, mem_proc] def io_read_benchmark_user_input_data_size(inputs, @@ -467,7 +469,7 @@ def main(): log_debug(f"{args.name} acquired core {core}") mem_threads=int(10 - 10 * args.percent_cpu) - cpu_proc, mem_proc = cpu_mem_benchmark(cpu_queue=cpu_queue, + [cpu_proc, mem_proc] = cpu_mem_benchmark(cpu_queue=cpu_queue, cpu_threads=int(10 * args.percent_cpu), mem_threads=mem_threads, cpu_work=int32_max**2 if args.time else int(args.cpu_work), diff --git a/tests/test_helpers.py b/tests/test_helpers.py index 3f45290c..966b7379 100644 --- a/tests/test_helpers.py +++ b/tests/test_helpers.py @@ -90,7 +90,8 @@ def _start_docker_container(backend, mounted_dir, working_dir, bin_dir, command= working_dir=working_dir, user="wfcommons", tty=True, - detach=True + detach=True, + init=True # For zombies ) # Installing WfCommons on container @@ -165,4 +166,4 @@ def _compare_workflows(workflow_1: Workflow, workflow_2: Workflow): # sys.stderr.write(f"WORKFLOW2 OUTPUT FILE: {output_file.file_id} {output_file.size}\n") workflow2_output_bytes += output_file.size assert (workflow1_input_bytes == workflow2_input_bytes) - assert (workflow1_output_bytes == workflow2_output_bytes) \ No newline at end of file + assert (workflow1_output_bytes == workflow2_output_bytes)