From 39c740d321ed831fc458bcc74333197b9846f20a Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Fri, 30 Jan 2026 17:08:49 -0800 Subject: [PATCH 1/2] feat(k8s): Add multi-processing support to PipelineStep macro Automatically detect and configure multiprocessing based on pipeline configuration. When parallelism.multi_process.processes is specified: - Multiply CPU and memory resources by process count - Mount /dev/shm shared memory volume for IPC - Validate that only one segment specifies parallelism Fixes STREAM-707 --- .../sentry_streams_k8s/pipeline_step.py | 118 ++++++++-- .../tests/test_pipeline_step.py | 218 ++++++++++++++++++ 2 files changed, 318 insertions(+), 18 deletions(-) diff --git a/sentry_streams_k8s/sentry_streams_k8s/pipeline_step.py b/sentry_streams_k8s/sentry_streams_k8s/pipeline_step.py index c24e6837..117ecd0d 100644 --- a/sentry_streams_k8s/sentry_streams_k8s/pipeline_step.py +++ b/sentry_streams_k8s/sentry_streams_k8s/pipeline_step.py @@ -41,6 +41,48 @@ def make_k8s_name(name: str) -> str: return name +def get_multiprocess_config(pipeline_config: dict[str, Any]) -> tuple[int | None, list[int]]: + """ + Extract multiprocessing configuration from pipeline config. + + Iterates through all segments in the pipeline configuration and looks for + parallelism.multi_process.processes configuration in any step. + + Examples: + >>> config = {"pipeline": {"segments": [{"steps_config": {"step1": {"parallelism": {"multi_process": {"processes": 4}}}}}]}} + >>> get_multiprocess_config(config) + (4, [0]) + """ + segments_with_parallelism: list[int] = [] + process_count: int | None = None + + # Pipeline and segments are guaranteed to exist by schema validation + segments = pipeline_config["pipeline"]["segments"] + + for segment_idx, segment in enumerate(segments): + steps_config = segment.get("steps_config", {}) + + for step_config in steps_config.values(): + # Check if this step has multiprocessing parallelism configured + parallelism = step_config.get("parallelism") + if not parallelism or not isinstance(parallelism, dict): + continue + + multi_process = parallelism.get("multi_process") + if not multi_process: + continue + + processes = multi_process.get("processes") + if processes is not None: + segments_with_parallelism.append(segment_idx) + # Store the first process count we find + if process_count is None: + process_count = processes + break # Found parallelism in this segment, move to next segment + + return process_count, segments_with_parallelism + + def build_container( container_template: dict[str, Any], pipeline_name: str, @@ -49,6 +91,7 @@ def build_container( cpu_per_process: int, memory_per_process: int, segment_id: int, + process_count: int | None = None, ) -> dict[str, Any]: """ Build a complete container specification for the pipeline step. @@ -59,10 +102,33 @@ def build_container( some standard parameters like securityContext 3. building the streaming pipeline specific parameters and merging them onto the result of step 2. + """ base_container = load_base_template("container") container = deepmerge(base_container, container_template) + # Calculate total resources based on process count + cpu_total = cpu_per_process * (process_count or 1) + memory_total = memory_per_process * (process_count or 1) + + # Build volume mounts - add shared memory volume for multiprocessing + volume_mounts: list[dict[str, Any]] = [ + { + "name": "pipeline-config", + "mountPath": "/etc/pipeline-config", + "readOnly": True, + } + ] + + # Add shared memory volume when multiprocessing is enabled + if process_count is not None and process_count > 1: + volume_mounts.append( + { + "name": "dshm", + "mountPath": "/dev/shm", + } + ) + pipeline_additions = { "name": "pipeline-consumer", "image": image_name, @@ -80,20 +146,14 @@ def build_container( ], "resources": { "requests": { - "cpu": f"{cpu_per_process}m", - "memory": f"{memory_per_process}Mi", + "cpu": f"{cpu_total}m", + "memory": f"{memory_total}Mi", }, "limits": { - "memory": f"{memory_per_process}Mi", + "memory": f"{memory_total}Mi", }, }, - "volumeMounts": [ - { - "name": "pipeline-config", - "mountPath": "/etc/pipeline-config", - "readOnly": True, - } - ], + "volumeMounts": volume_mounts, } return deepmerge(container, pipeline_additions) @@ -245,6 +305,15 @@ def run(self, context: dict[str, Any]) -> dict[str, Any]: segment_id = ctx["segment_id"] service_name = ctx["service_name"] + # Detect and validate multiprocessing configuration + process_count, segments_with_parallelism = get_multiprocess_config(pipeline_config) + if len(segments_with_parallelism) > 1: + raise ValueError( + f"Multi-processing configuration can only be specified in one segment. " + f"Found parallelism configuration in {len(segments_with_parallelism)} segments " + f"(segment indices: {segments_with_parallelism})." + ) + # Create deployment container = build_container( @@ -255,6 +324,7 @@ def run(self, context: dict[str, Any]) -> dict[str, Any]: cpu_per_process, memory_per_process, segment_id, + process_count, ) base_deployment = load_base_template("deployment") @@ -266,6 +336,25 @@ def run(self, context: dict[str, Any]) -> dict[str, Any]: } configmap_name = make_k8s_name(f"{service_name}-pipeline-{pipeline_name}") + # Build volumes list - add shared memory volume for multiprocessing + volumes: list[dict[str, Any]] = [ + { + "name": "pipeline-config", + "configMap": { + "name": configmap_name, + }, + } + ] + + # Add shared memory volume when multiprocessing is enabled + if process_count is not None and process_count > 1: + volumes.append( + { + "name": "dshm", + "emptyDir": {"medium": "Memory"}, + } + ) + pipeline_additions = { "metadata": { "name": make_k8s_name(f"{service_name}-pipeline-{pipeline_name}-{segment_id}"), @@ -281,14 +370,7 @@ def run(self, context: dict[str, Any]) -> dict[str, Any]: }, "spec": { "containers": [container], - "volumes": [ - { - "name": "pipeline-config", - "configMap": { - "name": configmap_name, - }, - } - ], + "volumes": volumes, }, }, }, diff --git a/sentry_streams_k8s/tests/test_pipeline_step.py b/sentry_streams_k8s/tests/test_pipeline_step.py index ccfffe4c..43f5ddbd 100644 --- a/sentry_streams_k8s/tests/test_pipeline_step.py +++ b/sentry_streams_k8s/tests/test_pipeline_step.py @@ -573,3 +573,221 @@ def test_user_volumes_and_containers_preserved() -> None: volume_mount_names = [vm["name"] for vm in pipeline_container["volumeMounts"]] assert "user-volume" in volume_mount_names assert "pipeline-config" in volume_mount_names + + +def test_get_multiprocess_config_detects_processes() -> None: + """Test that get_multiprocess_config() correctly extracts process count.""" + from sentry_streams_k8s.pipeline_step import get_multiprocess_config + + pipeline_config = { + "env": {}, + "pipeline": { + "segments": [ + { + "steps_config": { + "myinput": { + "starts_segment": True, + "bootstrap_servers": ["127.0.0.1:9092"], + "parallelism": 1, + }, + "parser": { + "starts_segment": True, + "parallelism": { + "multi_process": { + "processes": 4, + "batch_size": 1000, + "batch_time": 0.2, + } + }, + }, + "mysink": { + "starts_segment": True, + "bootstrap_servers": ["127.0.0.1:9092"], + }, + } + } + ] + }, + } + + process_count, segments = get_multiprocess_config(pipeline_config) + assert process_count == 4 + assert segments == [0] + + +def test_build_container_with_multiprocessing() -> None: + """Test that build_container() correctly handles multiprocessing configuration.""" + container_template: dict[str, Any] = {} + + container = build_container( + container_template=container_template, + pipeline_name="profiles", + pipeline_module="sbc.profiles", + image_name="my-image:latest", + cpu_per_process=1000, + memory_per_process=512, + segment_id=0, + process_count=4, + ) + + # Check resources are multiplied by process count + assert container["resources"]["requests"]["cpu"] == "4000m" + assert container["resources"]["requests"]["memory"] == "2048Mi" + assert container["resources"]["limits"]["memory"] == "2048Mi" + + # Check shared memory volume is added + volume_mount_names = [vm["name"] for vm in container["volumeMounts"]] + assert "pipeline-config" in volume_mount_names + assert "dshm" in volume_mount_names + + dshm_mount = next(vm for vm in container["volumeMounts"] if vm["name"] == "dshm") + assert dshm_mount["mountPath"] == "/dev/shm" + + +def test_build_container_without_multiprocessing() -> None: + """Test that build_container() works normally when multiprocessing is not configured.""" + container_template: dict[str, Any] = {} + + container = build_container( + container_template=container_template, + pipeline_name="profiles", + pipeline_module="sbc.profiles", + image_name="my-image:latest", + cpu_per_process=1000, + memory_per_process=512, + segment_id=0, + process_count=None, + ) + + # Check resources are NOT multiplied + assert container["resources"]["requests"]["cpu"] == "1000m" + assert container["resources"]["requests"]["memory"] == "512Mi" + assert container["resources"]["limits"]["memory"] == "512Mi" + + volume_mount_names = [vm["name"] for vm in container["volumeMounts"]] + assert "pipeline-config" in volume_mount_names + assert "dshm" not in volume_mount_names + + +def test_run_with_multiprocessing() -> None: + """Test complete deployment generation with multiprocessing configuration.""" + context: dict[str, Any] = { + "service_name": "my-service", + "pipeline_name": "profiles", + "deployment_template": {}, + "container_template": {}, + "pipeline_config": { + "env": {}, + "pipeline": { + "segments": [ + { + "steps_config": { + "myinput": { + "starts_segment": True, + "bootstrap_servers": ["127.0.0.1:9092"], + }, + "parser": { + "starts_segment": True, + "parallelism": { + "multi_process": { + "processes": 4, + "batch_size": 1000, + "batch_time": 0.2, + } + }, + }, + "mysink": { + "starts_segment": True, + "bootstrap_servers": ["127.0.0.1:9092"], + }, + } + } + ] + }, + }, + "pipeline_module": "sbc.profiles", + "image_name": "my-image:latest", + "cpu_per_process": 1000, + "memory_per_process": 512, + "segment_id": 0, + } + + pipeline_step = PipelineStep() + result = pipeline_step.run(context) + + deployment = result["deployment"] + + # Check resources are multiplied + container = deployment["spec"]["template"]["spec"]["containers"][0] + assert container["resources"]["requests"]["cpu"] == "4000m" + assert container["resources"]["requests"]["memory"] == "2048Mi" + + # Check shared memory volume is in deployment + volumes = deployment["spec"]["template"]["spec"]["volumes"] + volume_names = [v["name"] for v in volumes] + assert "pipeline-config" in volume_names + assert "dshm" in volume_names + + dshm_volume = next(v for v in volumes if v["name"] == "dshm") + assert dshm_volume["emptyDir"]["medium"] == "Memory" + + volume_mount_names = [vm["name"] for vm in container["volumeMounts"]] + assert "dshm" in volume_mount_names + + +def test_run_rejects_multiple_segments_with_parallelism() -> None: + """Test that run() rejects configuration with parallelism in multiple segments.""" + context: dict[str, Any] = { + "service_name": "my-service", + "pipeline_name": "profiles", + "deployment_template": {}, + "container_template": {}, + "pipeline_config": { + "env": {}, + "pipeline": { + "segments": [ + { + "steps_config": { + "step1": { + "starts_segment": True, + "parallelism": { + "multi_process": { + "processes": 4, + "batch_size": 1000, + "batch_time": 0.2, + } + }, + } + } + }, + { + "steps_config": { + "step2": { + "starts_segment": True, + "parallelism": { + "multi_process": { + "processes": 2, + "batch_size": 500, + "batch_time": 0.1, + } + }, + } + } + }, + ] + }, + }, + "pipeline_module": "sbc.profiles", + "image_name": "my-image:latest", + "cpu_per_process": 1000, + "memory_per_process": 512, + "segment_id": 0, + } + + pipeline_step = PipelineStep() + + with pytest.raises( + ValueError, + match=r"Multi-processing configuration can only be specified in one segment", + ): + pipeline_step.run(context) From 9fea014d37e9d621873942516704121938159772 Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Tue, 10 Feb 2026 17:21:07 -0800 Subject: [PATCH 2/2] Clean up comments --- .../sentry_streams_k8s/pipeline_step.py | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/sentry_streams_k8s/sentry_streams_k8s/pipeline_step.py b/sentry_streams_k8s/sentry_streams_k8s/pipeline_step.py index f26c05ca..e77ec195 100644 --- a/sentry_streams_k8s/sentry_streams_k8s/pipeline_step.py +++ b/sentry_streams_k8s/sentry_streams_k8s/pipeline_step.py @@ -56,14 +56,12 @@ def get_multiprocess_config(pipeline_config: dict[str, Any]) -> tuple[int | None segments_with_parallelism: list[int] = [] process_count: int | None = None - # Pipeline and segments are guaranteed to exist by schema validation segments = pipeline_config["pipeline"]["segments"] for segment_idx, segment in enumerate(segments): steps_config = segment.get("steps_config", {}) for step_config in steps_config.values(): - # Check if this step has multiprocessing parallelism configured parallelism = step_config.get("parallelism") if not parallelism or not isinstance(parallelism, dict): continue @@ -75,7 +73,6 @@ def get_multiprocess_config(pipeline_config: dict[str, Any]) -> tuple[int | None processes = multi_process.get("processes") if processes is not None: segments_with_parallelism.append(segment_idx) - # Store the first process count we find if process_count is None: process_count = processes break # Found parallelism in this segment, move to next segment @@ -107,11 +104,11 @@ def build_container( base_container = load_base_template("container") container = deepmerge(base_container, container_template) - # Calculate total resources based on process count + # CPU and memory are provided per process, so we need to multiply them + # by the number of processes to get the total resources. cpu_total = cpu_per_process * (process_count or 1) memory_total = memory_per_process * (process_count or 1) - # Build volume mounts - add shared memory volume for multiprocessing volume_mounts: list[dict[str, Any]] = [ { "name": "pipeline-config", @@ -120,7 +117,8 @@ def build_container( } ] - # Add shared memory volume when multiprocessing is enabled + # Shared memory volume is needed to allow the communication between processes. + # via shared memory. Only needed when in multiprocess mode. if process_count is not None and process_count > 1: volume_mounts.append( { @@ -319,7 +317,6 @@ def run(self, context: dict[str, Any]) -> dict[str, Any]: replicas = ctx["replicas"] emergency_patch = ctx.get("emergency_patch", {}) - # Detect and validate multiprocessing configuration process_count, segments_with_parallelism = get_multiprocess_config(pipeline_config) if len(segments_with_parallelism) > 1: raise ValueError( @@ -328,8 +325,6 @@ def run(self, context: dict[str, Any]) -> dict[str, Any]: f"(segment indices: {segments_with_parallelism})." ) - # Create deployment - container = build_container( container_template, pipeline_name, @@ -349,7 +344,6 @@ def run(self, context: dict[str, Any]) -> dict[str, Any]: } configmap_name = make_k8s_name(f"{service_name}-pipeline-{pipeline_name}") - # Build volumes list - add shared memory volume for multiprocessing volumes: list[dict[str, Any]] = [ { "name": "pipeline-config", @@ -359,7 +353,8 @@ def run(self, context: dict[str, Any]) -> dict[str, Any]: } ] - # Add shared memory volume when multiprocessing is enabled + # Shared memory volume is needed to allow the communication between processes. + # via shared memory. Only needed when in multiprocess mode. if process_count is not None and process_count > 1: volumes.append( { @@ -411,7 +406,6 @@ def run(self, context: dict[str, Any]) -> dict[str, Any]: if emergency_patch: deployment = deepmerge(deployment, emergency_patch) - # Create configmap configmap = { "apiVersion": "v1", "kind": "ConfigMap", @@ -424,7 +418,6 @@ def run(self, context: dict[str, Any]) -> dict[str, Any]: }, } - # Add namespace if present in deployment template if "namespace" in deployment.get("metadata", {}): metadata = cast(dict[str, Any], configmap["metadata"]) metadata["namespace"] = deployment["metadata"]["namespace"]