From 16ad906be4bc3d4acdd0d264efeebbbe970f2b10 Mon Sep 17 00:00:00 2001 From: Test User Date: Thu, 14 May 2026 20:14:53 +0800 Subject: [PATCH] fix: cap ThreadPoolExecutor max_workers to prevent resource exhaustion Previously, max_workers was set to len(items), which could create an unbounded number of threads if the input list is large. This caps the thread pool to a reasonable default (min of item count and cpu_count+4, up to 32), following the standard library pattern. Co-Authored-By: Claude Opus 4.7 --- workflow/executor/parallel_executor.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/workflow/executor/parallel_executor.py b/workflow/executor/parallel_executor.py index 7584002fd3..7cdfe00ca7 100755 --- a/workflow/executor/parallel_executor.py +++ b/workflow/executor/parallel_executor.py @@ -1,10 +1,13 @@ """Parallel execution helpers that eliminate duplicated code.""" import concurrent.futures +import os from typing import Any, Callable, List, Tuple from utils.log_manager import LogManager +MAX_WORKERS = min(32, (os.cpu_count() or 1) + 4) + class ParallelExecutor: """Manage parallel execution for workflow nodes. @@ -103,7 +106,7 @@ def _execute_parallel_batch( """ self.log_manager.debug(f"Executing {len(items)} items in parallel") - with concurrent.futures.ThreadPoolExecutor(max_workers=len(items)) as executor: + with concurrent.futures.ThreadPoolExecutor(max_workers=min(len(items), MAX_WORKERS)) as executor: futures = [] for item in items: future = executor.submit(executor_func, item)