diff --git a/sentry_streams_k8s/sentry_streams_k8s/pipeline_step.py b/sentry_streams_k8s/sentry_streams_k8s/pipeline_step.py index fc1facdd..e77ec195 100644 --- a/sentry_streams_k8s/sentry_streams_k8s/pipeline_step.py +++ b/sentry_streams_k8s/sentry_streams_k8s/pipeline_step.py @@ -41,6 +41,45 @@ def make_k8s_name(name: str) -> str: return name +def get_multiprocess_config(pipeline_config: dict[str, Any]) -> tuple[int | None, list[int]]: + """ + Extract multiprocessing configuration from pipeline config. + + Iterates through all segments in the pipeline configuration and looks for + parallelism.multi_process.processes configuration in any step. + + Examples: + >>> config = {"pipeline": {"segments": [{"steps_config": {"step1": {"parallelism": {"multi_process": {"processes": 4}}}}}]}} + >>> get_multiprocess_config(config) + (4, [0]) + """ + segments_with_parallelism: list[int] = [] + process_count: int | None = None + + segments = pipeline_config["pipeline"]["segments"] + + for segment_idx, segment in enumerate(segments): + steps_config = segment.get("steps_config", {}) + + for step_config in steps_config.values(): + parallelism = step_config.get("parallelism") + if not parallelism or not isinstance(parallelism, dict): + continue + + multi_process = parallelism.get("multi_process") + if not multi_process: + continue + + processes = multi_process.get("processes") + if processes is not None: + segments_with_parallelism.append(segment_idx) + if process_count is None: + process_count = processes + break # Found parallelism in this segment, move to next segment + + return process_count, segments_with_parallelism + + def build_container( container_template: dict[str, Any], pipeline_name: str, @@ -49,6 +88,7 @@ def build_container( cpu_per_process: int, memory_per_process: int, segment_id: int, + process_count: int | None = None, ) -> dict[str, Any]: """ Build a complete container specification for the pipeline step. @@ -59,10 +99,34 @@ def build_container( some standard parameters like securityContext 3. building the streaming pipeline specific parameters and merging them onto the result of step 2. + """ base_container = load_base_template("container") container = deepmerge(base_container, container_template) + # CPU and memory are provided per process, so we need to multiply them + # by the number of processes to get the total resources. + cpu_total = cpu_per_process * (process_count or 1) + memory_total = memory_per_process * (process_count or 1) + + volume_mounts: list[dict[str, Any]] = [ + { + "name": "pipeline-config", + "mountPath": "/etc/pipeline-config", + "readOnly": True, + } + ] + + # Shared memory volume is needed to allow the communication between processes. + # via shared memory. Only needed when in multiprocess mode. + if process_count is not None and process_count > 1: + volume_mounts.append( + { + "name": "dshm", + "mountPath": "/dev/shm", + } + ) + pipeline_additions = { "name": "pipeline-consumer", "image": image_name, @@ -80,20 +144,14 @@ def build_container( ], "resources": { "requests": { - "cpu": f"{cpu_per_process}m", - "memory": f"{memory_per_process}Mi", + "cpu": f"{cpu_total}m", + "memory": f"{memory_total}Mi", }, "limits": { - "memory": f"{memory_per_process}Mi", + "memory": f"{memory_total}Mi", }, }, - "volumeMounts": [ - { - "name": "pipeline-config", - "mountPath": "/etc/pipeline-config", - "readOnly": True, - } - ], + "volumeMounts": volume_mounts, } return deepmerge(container, pipeline_additions) @@ -259,7 +317,13 @@ def run(self, context: dict[str, Any]) -> dict[str, Any]: replicas = ctx["replicas"] emergency_patch = ctx.get("emergency_patch", {}) - # Create deployment + process_count, segments_with_parallelism = get_multiprocess_config(pipeline_config) + if len(segments_with_parallelism) > 1: + raise ValueError( + f"Multi-processing configuration can only be specified in one segment. " + f"Found parallelism configuration in {len(segments_with_parallelism)} segments " + f"(segment indices: {segments_with_parallelism})." + ) container = build_container( container_template, @@ -269,6 +333,7 @@ def run(self, context: dict[str, Any]) -> dict[str, Any]: cpu_per_process, memory_per_process, segment_id, + process_count, ) base_deployment = load_base_template("deployment") @@ -279,6 +344,25 @@ def run(self, context: dict[str, Any]) -> dict[str, Any]: } configmap_name = make_k8s_name(f"{service_name}-pipeline-{pipeline_name}") + volumes: list[dict[str, Any]] = [ + { + "name": "pipeline-config", + "configMap": { + "name": configmap_name, + }, + } + ] + + # Shared memory volume is needed to allow the communication between processes. + # via shared memory. Only needed when in multiprocess mode. + if process_count is not None and process_count > 1: + volumes.append( + { + "name": "dshm", + "emptyDir": {"medium": "Memory"}, + } + ) + pipeline_additions = { "metadata": { "name": make_k8s_name(f"{service_name}-pipeline-{pipeline_name}-{segment_id}"), @@ -295,14 +379,7 @@ def run(self, context: dict[str, Any]) -> dict[str, Any]: }, "spec": { "containers": [container], - "volumes": [ - { - "name": "pipeline-config", - "configMap": { - "name": configmap_name, - }, - } - ], + "volumes": volumes, }, }, }, @@ -329,7 +406,6 @@ def run(self, context: dict[str, Any]) -> dict[str, Any]: if emergency_patch: deployment = deepmerge(deployment, emergency_patch) - # Create configmap configmap = { "apiVersion": "v1", "kind": "ConfigMap", @@ -342,7 +418,6 @@ def run(self, context: dict[str, Any]) -> dict[str, Any]: }, } - # Add namespace if present in deployment template if "namespace" in deployment.get("metadata", {}): metadata = cast(dict[str, Any], configmap["metadata"]) metadata["namespace"] = deployment["metadata"]["namespace"] diff --git a/sentry_streams_k8s/tests/test_pipeline_step.py b/sentry_streams_k8s/tests/test_pipeline_step.py index efa30795..a6802d93 100644 --- a/sentry_streams_k8s/tests/test_pipeline_step.py +++ b/sentry_streams_k8s/tests/test_pipeline_step.py @@ -582,6 +582,224 @@ def test_user_volumes_and_containers_preserved() -> None: assert "pipeline-config" in volume_mount_names +def test_get_multiprocess_config_detects_processes() -> None: + """Test that get_multiprocess_config() correctly extracts process count.""" + from sentry_streams_k8s.pipeline_step import get_multiprocess_config + + pipeline_config = { + "env": {}, + "pipeline": { + "segments": [ + { + "steps_config": { + "myinput": { + "starts_segment": True, + "bootstrap_servers": ["127.0.0.1:9092"], + "parallelism": 1, + }, + "parser": { + "starts_segment": True, + "parallelism": { + "multi_process": { + "processes": 4, + "batch_size": 1000, + "batch_time": 0.2, + } + }, + }, + "mysink": { + "starts_segment": True, + "bootstrap_servers": ["127.0.0.1:9092"], + }, + } + } + ] + }, + } + + process_count, segments = get_multiprocess_config(pipeline_config) + assert process_count == 4 + assert segments == [0] + + +def test_build_container_with_multiprocessing() -> None: + """Test that build_container() correctly handles multiprocessing configuration.""" + container_template: dict[str, Any] = {} + + container = build_container( + container_template=container_template, + pipeline_name="profiles", + pipeline_module="sbc.profiles", + image_name="my-image:latest", + cpu_per_process=1000, + memory_per_process=512, + segment_id=0, + process_count=4, + ) + + # Check resources are multiplied by process count + assert container["resources"]["requests"]["cpu"] == "4000m" + assert container["resources"]["requests"]["memory"] == "2048Mi" + assert container["resources"]["limits"]["memory"] == "2048Mi" + + # Check shared memory volume is added + volume_mount_names = [vm["name"] for vm in container["volumeMounts"]] + assert "pipeline-config" in volume_mount_names + assert "dshm" in volume_mount_names + + dshm_mount = next(vm for vm in container["volumeMounts"] if vm["name"] == "dshm") + assert dshm_mount["mountPath"] == "/dev/shm" + + +def test_build_container_without_multiprocessing() -> None: + """Test that build_container() works normally when multiprocessing is not configured.""" + container_template: dict[str, Any] = {} + + container = build_container( + container_template=container_template, + pipeline_name="profiles", + pipeline_module="sbc.profiles", + image_name="my-image:latest", + cpu_per_process=1000, + memory_per_process=512, + segment_id=0, + process_count=None, + ) + + # Check resources are NOT multiplied + assert container["resources"]["requests"]["cpu"] == "1000m" + assert container["resources"]["requests"]["memory"] == "512Mi" + assert container["resources"]["limits"]["memory"] == "512Mi" + + volume_mount_names = [vm["name"] for vm in container["volumeMounts"]] + assert "pipeline-config" in volume_mount_names + assert "dshm" not in volume_mount_names + + +def test_run_with_multiprocessing() -> None: + """Test complete deployment generation with multiprocessing configuration.""" + context: dict[str, Any] = { + "service_name": "my-service", + "pipeline_name": "profiles", + "deployment_template": {}, + "container_template": {}, + "pipeline_config": { + "env": {}, + "pipeline": { + "segments": [ + { + "steps_config": { + "myinput": { + "starts_segment": True, + "bootstrap_servers": ["127.0.0.1:9092"], + }, + "parser": { + "starts_segment": True, + "parallelism": { + "multi_process": { + "processes": 4, + "batch_size": 1000, + "batch_time": 0.2, + } + }, + }, + "mysink": { + "starts_segment": True, + "bootstrap_servers": ["127.0.0.1:9092"], + }, + } + } + ] + }, + }, + "pipeline_module": "sbc.profiles", + "image_name": "my-image:latest", + "cpu_per_process": 1000, + "memory_per_process": 512, + "segment_id": 0, + } + + pipeline_step = PipelineStep() + result = pipeline_step.run(context) + + deployment = result["deployment"] + + # Check resources are multiplied + container = deployment["spec"]["template"]["spec"]["containers"][0] + assert container["resources"]["requests"]["cpu"] == "4000m" + assert container["resources"]["requests"]["memory"] == "2048Mi" + + # Check shared memory volume is in deployment + volumes = deployment["spec"]["template"]["spec"]["volumes"] + volume_names = [v["name"] for v in volumes] + assert "pipeline-config" in volume_names + assert "dshm" in volume_names + + dshm_volume = next(v for v in volumes if v["name"] == "dshm") + assert dshm_volume["emptyDir"]["medium"] == "Memory" + + volume_mount_names = [vm["name"] for vm in container["volumeMounts"]] + assert "dshm" in volume_mount_names + + +def test_run_rejects_multiple_segments_with_parallelism() -> None: + """Test that run() rejects configuration with parallelism in multiple segments.""" + context: dict[str, Any] = { + "service_name": "my-service", + "pipeline_name": "profiles", + "deployment_template": {}, + "container_template": {}, + "pipeline_config": { + "env": {}, + "pipeline": { + "segments": [ + { + "steps_config": { + "step1": { + "starts_segment": True, + "parallelism": { + "multi_process": { + "processes": 4, + "batch_size": 1000, + "batch_time": 0.2, + } + }, + } + } + }, + { + "steps_config": { + "step2": { + "starts_segment": True, + "parallelism": { + "multi_process": { + "processes": 2, + "batch_size": 500, + "batch_time": 0.1, + } + }, + } + } + }, + ] + }, + }, + "pipeline_module": "sbc.profiles", + "image_name": "my-image:latest", + "cpu_per_process": 1000, + "memory_per_process": 512, + "segment_id": 0, + } + + pipeline_step = PipelineStep() + + with pytest.raises( + ValueError, + match=r"Multi-processing configuration can only be specified in one segment", + ): + pipeline_step.run(context) + + def test_template_conflict_scalar_overwrite() -> None: """Test that PipelineStep detects and prevents scalar field conflicts in templates.""" # Test conflict with replicas field