From 641738d19b0e5019da4a345ed712f51befe71847 Mon Sep 17 00:00:00 2001 From: ali Date: Thu, 7 May 2026 16:52:36 +0530 Subject: [PATCH 1/2] UN-3452 [FEAT] Characterise the seams: dispatch + chord call sites MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sub-task A under #1.2 — characterisation suite for the seams that upcoming spine PRs will refactor. Two new test files, zero production changes. Dispatch seam (unblocks PR #8 — @shared_task -> @worker_task migration): - workers/tests/test_dispatch_sites_characterisation.py (276 lines, 11 tests) - Locks contract on the two raw current_app.send_task call sites: - shared/patterns/notification/helper.py:76 (webhook dispatch) - scheduler/tasks.py:157 (scheduled workflow async dispatch) - Tests pin: task name, positional args layout, kwargs layout, target queue, return-value semantics, error-path behaviour - Inventory canary: fails if a third raw current_app.send_task site appears anywhere in workers/ source Chord seam (unblocks PR #13 — chord -> Barrier lift): - workers/tests/test_chord_sites_characterisation.py (316 lines, 9 tests) - Locks contract on the chord pattern via: - WorkflowOrchestrationUtils.create_chord_execution (centralised helper) - WorkflowOrchestrationMixin.create_chord (mixin wrapper) - Tests pin: empty-batch short-circuit (existing defense against silent task drops at scale — Pain Point #2 in the PG Queue decision doc), callback-signature construction, return-value semantics, error propagation, mixin's app extraction + RuntimeError on missing app - Inventory canaries: fail if a third chord(...) call site OR a third `from celery import chord` import appears anywhere in workers/ source - api-deployment/tasks.py:673 inline chord covered only by inventory (direct unit-testing requires heavy mocking of the 273-line _run_workflow_api enclosing function — out of scope here, the canary still catches it for PR #13) Total: 20 tests, ~2s runtime, 0 production changes. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../test_chord_sites_characterisation.py | 316 ++++++++++++++++++ .../test_dispatch_sites_characterisation.py | 276 +++++++++++++++ 2 files changed, 592 insertions(+) create mode 100644 workers/tests/test_chord_sites_characterisation.py create mode 100644 workers/tests/test_dispatch_sites_characterisation.py diff --git a/workers/tests/test_chord_sites_characterisation.py b/workers/tests/test_chord_sites_characterisation.py new file mode 100644 index 0000000000..f226fc643d --- /dev/null +++ b/workers/tests/test_chord_sites_characterisation.py @@ -0,0 +1,316 @@ +"""Characterisation tests for the two ``celery.chord`` call sites. + +These are the two places in ``workers/`` source that invoke ``chord(...)`` +directly. PR #13 will replace both with a transport-agnostic ``Barrier`` +abstraction matching the labs target architecture's ``DECR remaining:{exec_id}`` +pattern. + +This test suite locks down the **current** chord invocation contract so the +migration can be proved equivalent. Chord is the highest-risk Celery construct +called out in the PG Queue decision doc (silent task drops at ~130K-task scale) +— characterising it before refactor is critical. + +Sites: +1. ``shared/workflow/execution/orchestration_utils.py`` — ``WorkflowOrchestrationUtils.create_chord_execution`` (the centralised helper). +2. ``api-deployment/tasks.py`` — inline chord inside ``_run_workflow_api``. + Site 2 is exercised only via the inventory test (full characterisation + requires heavy mocking of the enclosing 273-line function — out of scope + for a smoke/characterisation pass). +""" + +from unittest.mock import MagicMock, patch + +import pytest + + +# --- Site 1: WorkflowOrchestrationUtils.create_chord_execution --- + + +class TestCreateChordExecution: + """Characterise the centralised chord helper. + + This is the easily-testable site. PR #13 should route both this helper + AND the inline call (Site 2) through the new Barrier abstraction. + """ + + def _make_app_instance(self): + """Build a Celery app-shaped mock with a working ``.signature(...)``.""" + app = MagicMock() + # signature() returns an opaque "signature" object — represent it as a + # MagicMock that we can assert on by identity later. + app.signature.return_value = MagicMock(name="callback_signature") + return app + + def test_empty_batch_tasks_returns_none_and_skips_chord(self): + """Zero files: helper short-circuits with None and never calls chord().""" + from shared.workflow.execution.orchestration_utils import ( + WorkflowOrchestrationUtils, + ) + + app = self._make_app_instance() + + with patch( + "shared.workflow.execution.orchestration_utils.chord" + ) as mock_chord: + result = WorkflowOrchestrationUtils.create_chord_execution( + batch_tasks=[], + callback_task_name="process_batch_callback", + callback_kwargs={"execution_id": "exec-1", "pipeline_id": "pipe-1"}, + callback_queue="file_processing_callback", + app_instance=app, + ) + + assert result is None + mock_chord.assert_not_called() + + def test_non_empty_batch_tasks_invokes_chord(self): + """Non-empty batch: chord(batch_tasks)(callback_signature) is called.""" + from shared.workflow.execution.orchestration_utils import ( + WorkflowOrchestrationUtils, + ) + + app = self._make_app_instance() + batch_tasks = [MagicMock(name="batch_task_1"), MagicMock(name="batch_task_2")] + + with patch( + "shared.workflow.execution.orchestration_utils.chord" + ) as mock_chord: + # chord(batch_tasks) returns a chord object; calling it with the + # callback signature returns the chord result. Both calls must + # happen for this characterisation. + chord_obj = MagicMock(name="chord_object") + mock_chord.return_value = chord_obj + + WorkflowOrchestrationUtils.create_chord_execution( + batch_tasks=batch_tasks, + callback_task_name="process_batch_callback", + callback_kwargs={"execution_id": "exec-1"}, + callback_queue="file_processing_callback", + app_instance=app, + ) + + # First call: chord(batch_tasks) + mock_chord.assert_called_once_with(batch_tasks) + # Second call: chord_obj(callback_signature) — applies the chord + chord_obj.assert_called_once_with(app.signature.return_value) + + def test_callback_signature_is_built_with_correct_kwargs(self): + """The callback signature must be created with the exact task name, + kwargs, and queue passed in. PR #13 must preserve these.""" + from shared.workflow.execution.orchestration_utils import ( + WorkflowOrchestrationUtils, + ) + + app = self._make_app_instance() + batch_tasks = [MagicMock(name="batch")] + callback_kwargs = { + "execution_id": "exec-42", + "pipeline_id": "pipe-7", + "organization_id": "org-x", + } + + with patch( + "shared.workflow.execution.orchestration_utils.chord" + ) as mock_chord: + mock_chord.return_value = MagicMock() + WorkflowOrchestrationUtils.create_chord_execution( + batch_tasks=batch_tasks, + callback_task_name="process_batch_callback_api", + callback_kwargs=callback_kwargs, + callback_queue="api_file_processing_callback", + app_instance=app, + ) + + app.signature.assert_called_once_with( + "process_batch_callback_api", + kwargs=callback_kwargs, + queue="api_file_processing_callback", + ) + + def test_returns_chord_result_object(self): + """The helper must return whatever ``chord(...)(...)`` returns — + callers depend on this return value to track the chord.""" + from shared.workflow.execution.orchestration_utils import ( + WorkflowOrchestrationUtils, + ) + + app = self._make_app_instance() + + with patch( + "shared.workflow.execution.orchestration_utils.chord" + ) as mock_chord: + chord_obj = MagicMock() + chord_result = MagicMock(name="chord_result_object") + chord_obj.return_value = chord_result + mock_chord.return_value = chord_obj + + result = WorkflowOrchestrationUtils.create_chord_execution( + batch_tasks=[MagicMock()], + callback_task_name="process_batch_callback", + callback_kwargs={"execution_id": "exec-1"}, + callback_queue="file_processing_callback", + app_instance=app, + ) + + assert result is chord_result + + def test_chord_failure_is_re_raised_after_logging(self): + """If chord() raises, the helper logs and re-raises (not swallowed).""" + from shared.workflow.execution.orchestration_utils import ( + WorkflowOrchestrationUtils, + ) + + app = self._make_app_instance() + + with patch( + "shared.workflow.execution.orchestration_utils.chord" + ) as mock_chord: + mock_chord.side_effect = RuntimeError("broker exploded") + + with pytest.raises(RuntimeError, match="broker exploded"): + WorkflowOrchestrationUtils.create_chord_execution( + batch_tasks=[MagicMock()], + callback_task_name="process_batch_callback", + callback_kwargs={"execution_id": "exec-1"}, + callback_queue="file_processing_callback", + app_instance=app, + ) + + +# --- Site 1 (mixin wrapper): WorkflowOrchestrationMixin.create_chord --- + + +class TestWorkflowOrchestrationMixinCreateChord: + """Characterise the mixin wrapper around ``create_chord_execution``. + + The mixin adds two unique behaviours over the static helper: + 1. Extracts ``self.app`` from the task context. + 2. Raises ``RuntimeError`` when no app is bound to the task. + + Both must be preserved by PR #13 when it lifts the mixin to use Barrier. + """ + + def test_create_chord_extracts_app_from_self_and_delegates(self): + """The mixin must read ``self.app`` and forward it to the static helper.""" + from shared.workflow.execution.orchestration_utils import ( + WorkflowOrchestrationMixin, + WorkflowOrchestrationUtils, + ) + + # Build a synthetic task-like object carrying an `app` attribute. + task = type("FakeTask", (WorkflowOrchestrationMixin,), {})() + task.app = MagicMock(name="celery_app") + task.app.signature.return_value = MagicMock(name="callback_signature") + + with patch.object( + WorkflowOrchestrationUtils, "create_chord_execution" + ) as mock_static: + mock_static.return_value = MagicMock(name="chord_result") + batch = [MagicMock()] + kwargs = {"execution_id": "exec-mixin"} + task.create_chord( + batch_tasks=batch, + callback_task_name="process_batch_callback", + callback_kwargs=kwargs, + callback_queue="file_processing_callback", + ) + + mock_static.assert_called_once_with( + batch, + "process_batch_callback", + kwargs, + "file_processing_callback", + task.app, + ) + + def test_create_chord_raises_when_no_app_bound(self): + """No ``self.app`` (e.g., called outside a Celery task context): + the mixin must raise ``RuntimeError`` rather than silently failing + or passing ``None`` downstream to chord().""" + from shared.workflow.execution.orchestration_utils import ( + WorkflowOrchestrationMixin, + ) + + task = type("FakeTask", (WorkflowOrchestrationMixin,), {})() + # Deliberately do NOT set task.app — leave it absent. + + with pytest.raises(RuntimeError, match="Celery app instance not available"): + task.create_chord( + batch_tasks=[MagicMock()], + callback_task_name="process_batch_callback", + callback_kwargs={"execution_id": "exec-1"}, + callback_queue="file_processing_callback", + ) + + +# --- Cross-site invariant: chord call-site inventory --- + + +class TestChordSiteInventory: + """Exactly two chord call sites must exist in workers/ source. + + If a third appears, this test fails so PR #13's migration can't silently + miss it. + """ + + def test_only_two_known_chord_call_sites_in_workers(self): + """Count chord(...) invocations (not imports) in workers/ source.""" + import pathlib + import re + + workers_root = pathlib.Path(__file__).parent.parent + skip_dirs = {"tests", "__pycache__", "htmlcov", ".venv"} + + # Match `chord(` as a function call — excludes the bare import line + # `from celery import chord` and helper method names like `create_chord`. + # The regex requires `chord` to be preceded by start-of-line / whitespace + # / `=` / `(` (i.e., a true call expression), not as part of a longer + # identifier such as `create_chord`. + pattern = re.compile(r"(?:^|[\s=(])chord\(") + + hits = [] + for py in workers_root.rglob("*.py"): + if any(part in skip_dirs for part in py.parts): + continue + text = py.read_text() + for line_no, line in enumerate(text.splitlines(), start=1): + if pattern.search(line): + hits.append(f"{py.relative_to(workers_root)}:{line_no}") + + # Expected exactly two — in orchestration_utils.py and api-deployment/tasks.py. + assert len(hits) == 2, ( + f"Expected exactly 2 chord(...) call sites in workers/, found " + f"{len(hits)}:\n " + "\n ".join(hits) + ) + joined = " ".join(hits) + assert "shared/workflow/execution/orchestration_utils.py" in joined + assert "api-deployment/tasks.py" in joined + + def test_chord_import_only_in_two_files(self): + """`from celery import chord` should appear in exactly the two files + that actually invoke chord — no other imports lurking.""" + import pathlib + import re + + workers_root = pathlib.Path(__file__).parent.parent + skip_dirs = {"tests", "__pycache__", "htmlcov", ".venv"} + + pattern = re.compile(r"^\s*from\s+celery\s+import\s+chord\b") + + hits = [] + for py in workers_root.rglob("*.py"): + if any(part in skip_dirs for part in py.parts): + continue + text = py.read_text() + for line_no, line in enumerate(text.splitlines(), start=1): + if pattern.search(line): + hits.append(f"{py.relative_to(workers_root)}:{line_no}") + + assert len(hits) == 2, ( + f"Expected `from celery import chord` in exactly 2 files, found " + f"{len(hits)}:\n " + "\n ".join(hits) + ) + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/workers/tests/test_dispatch_sites_characterisation.py b/workers/tests/test_dispatch_sites_characterisation.py new file mode 100644 index 0000000000..62f1fcd27b --- /dev/null +++ b/workers/tests/test_dispatch_sites_characterisation.py @@ -0,0 +1,276 @@ +"""Characterisation tests for the two raw `current_app.send_task` dispatch sites. + +These are the only two places in workers/ that bypass `@shared_task`-based +dispatch and call `current_app.send_task(...)` directly with a string task +name. PR #8 (mass `@shared_task` -> `@worker_task` migration) will replace +both with a unified `dispatch()` helper. + +This test suite locks down the **current** dispatch contract — task name, +positional args, keyword args, and target queue — so the migration can +be proved equivalent. It does NOT exercise the receiving tasks; it only +captures what is dispatched. + +Sites characterised: +1. ``shared/patterns/notification/helper.py:76`` — webhook notification +2. ``scheduler/tasks.py:157`` — scheduled workflow async dispatch +""" + +from unittest.mock import MagicMock, patch + +import pytest + + +# --- Site 1: shared/patterns/notification/helper.py --- + + +class TestNotificationDispatchSite: + """Characterise ``send_notification_to_worker`` -> ``current_app.send_task``.""" + + def _make_payload(self): + """Build a minimal NotificationPayload-shaped mock. + + The function under test only invokes ``payload.to_webhook_payload()`` + and reads ``payload.pipeline_id`` (for log output). Anything else + about the payload object is irrelevant to dispatch behaviour. + """ + payload = MagicMock() + payload.to_webhook_payload.return_value = {"event": "test_event", "id": 42} + payload.pipeline_id = "pipe-001" + return payload + + def test_dispatch_task_name_and_queue(self): + from shared.patterns.notification.helper import send_notification_to_worker + + with patch( + "shared.patterns.notification.helper.current_app" + ) as mock_app: + send_notification_to_worker( + url="https://example.com/hook", + payload=self._make_payload(), + auth_type="NONE", + auth_key=None, + auth_header=None, + ) + + mock_app.send_task.assert_called_once() + call = mock_app.send_task.call_args + assert call.args[0] == "send_webhook_notification" + assert call.kwargs["queue"] == "notifications" + + def test_dispatch_positional_args_layout(self): + """Positional args MUST be [url, payload_dict, headers, timeout].""" + from shared.patterns.notification.helper import send_notification_to_worker + + with patch( + "shared.patterns.notification.helper.current_app" + ) as mock_app: + send_notification_to_worker( + url="https://example.com/hook", + payload=self._make_payload(), + auth_type="BEARER", + auth_key="token-abc", + auth_header=None, + ) + + args = mock_app.send_task.call_args.kwargs["args"] + assert len(args) == 4 + assert args[0] == "https://example.com/hook" # url + assert args[1] == {"event": "test_event", "id": 42} # payload_dict + assert args[2]["Authorization"] == "Bearer token-abc" # headers + assert args[3] == 10 # timeout (hard-coded) + + def test_dispatch_kwargs_layout(self): + """Kwargs MUST contain max_retries, retry_delay, platform.""" + from shared.patterns.notification.helper import send_notification_to_worker + + with patch( + "shared.patterns.notification.helper.current_app" + ) as mock_app: + send_notification_to_worker( + url="https://example.com/hook", + payload=self._make_payload(), + auth_type="NONE", + auth_key=None, + auth_header=None, + max_retries=5, + platform="SLACK", + ) + + kwargs = mock_app.send_task.call_args.kwargs["kwargs"] + assert kwargs == { + "max_retries": 5, + "retry_delay": 10, + "platform": "SLACK", + } + + def test_dispatch_returns_true_on_success(self): + from shared.patterns.notification.helper import send_notification_to_worker + + with patch("shared.patterns.notification.helper.current_app"): + result = send_notification_to_worker( + url="https://example.com/hook", + payload=self._make_payload(), + auth_type="NONE", + auth_key=None, + auth_header=None, + ) + + assert result is True + + def test_dispatch_returns_false_on_send_task_failure(self): + from shared.patterns.notification.helper import send_notification_to_worker + + with patch( + "shared.patterns.notification.helper.current_app" + ) as mock_app: + mock_app.send_task.side_effect = RuntimeError("broker down") + result = send_notification_to_worker( + url="https://example.com/hook", + payload=self._make_payload(), + auth_type="NONE", + auth_key=None, + auth_header=None, + ) + + assert result is False + + +# --- Site 2: scheduler/tasks.py --- + + +class TestSchedulerDispatchSite: + """Characterise ``_execute_scheduled_workflow`` -> ``current_app.send_task``.""" + + def _make_context(self): + """Minimal ScheduledPipelineContext-shaped mock. + + The dispatch site reads: + - context.organization_id, .workflow_id, .pipeline_id, .pipeline_name + - context.use_file_history + """ + ctx = MagicMock() + ctx.organization_id = "org-test" + ctx.workflow_id = "wf-001" + ctx.pipeline_id = "pipe-007" + ctx.pipeline_name = "scheduled-test-pipeline" + ctx.use_file_history = False + return ctx + + def _make_api_client(self, execution_id="exec-123"): + """Mock api_client whose create_workflow_execution returns a dict + with the expected execution_id.""" + api = MagicMock() + api.create_workflow_execution.return_value = {"execution_id": execution_id} + return api + + def test_dispatch_task_name(self): + from scheduler.tasks import _execute_scheduled_workflow + + with patch("celery.current_app") as mock_app: + _execute_scheduled_workflow(self._make_api_client(), self._make_context()) + + mock_app.send_task.assert_called_once() + assert mock_app.send_task.call_args.args[0] == "async_execute_bin" + + def test_dispatch_routes_to_general_queue(self): + from scheduler.tasks import _execute_scheduled_workflow + from shared.enums.worker_enums import QueueName + + with patch("celery.current_app") as mock_app: + _execute_scheduled_workflow(self._make_api_client(), self._make_context()) + + assert mock_app.send_task.call_args.kwargs["queue"] == QueueName.GENERAL + + def test_dispatch_positional_args_layout(self): + """Positional args MUST be [org_id, workflow_id, execution_id, {}, True].""" + from scheduler.tasks import _execute_scheduled_workflow + + with patch("celery.current_app") as mock_app: + _execute_scheduled_workflow( + self._make_api_client(execution_id="exec-xyz"), + self._make_context(), + ) + + args = mock_app.send_task.call_args.kwargs["args"] + assert len(args) == 5 + assert args[0] == "org-test" # organization_id (schema_name) + assert args[1] == "wf-001" # workflow_id + assert args[2] == "exec-xyz" # execution_id (from api_client return) + assert args[3] == {} # hash_values_of_files (always empty for scheduled) + assert args[4] is True # scheduled flag (always True here) + + def test_dispatch_kwargs_layout(self): + """Kwargs MUST contain use_file_history and pipeline_id.""" + from scheduler.tasks import _execute_scheduled_workflow + + ctx = self._make_context() + ctx.use_file_history = True + + with patch("celery.current_app") as mock_app: + _execute_scheduled_workflow(self._make_api_client(), ctx) + + kwargs = mock_app.send_task.call_args.kwargs["kwargs"] + assert kwargs == { + "use_file_history": True, + "pipeline_id": "pipe-007", + } + + def test_no_dispatch_when_execution_creation_fails(self): + """If api_client.create_workflow_execution returns no execution_id, + the function bails out and never calls send_task.""" + from scheduler.tasks import _execute_scheduled_workflow + from shared.models.scheduler_models import SchedulerExecutionStatus + + api = MagicMock() + api.create_workflow_execution.return_value = {} # no execution_id + + with patch("celery.current_app") as mock_app: + result = _execute_scheduled_workflow(api, self._make_context()) + + # The dispatch contract: nothing is sent when execution creation fails. + mock_app.send_task.assert_not_called() + # And the function returns an error result (not raised exception). + assert result.status == SchedulerExecutionStatus.ERROR + + +# --- Cross-site invariant --- + + +class TestDispatchSiteInventory: + """If a third raw current_app.send_task site appears, this test breaks + so PR #8's migration can't silently miss it.""" + + def test_only_two_known_dispatch_sites_in_workers(self): + """Verify the count of raw current_app.send_task references in + workers/ source matches the two we have characterised.""" + import pathlib + import re + + workers_root = pathlib.Path(__file__).parent.parent + # Skip tests/ and __pycache__/ etc. + skip_dirs = {"tests", "__pycache__", "htmlcov", ".venv"} + + pattern = re.compile(r"current_app\.send_task\b") + hits = [] + for py in workers_root.rglob("*.py"): + if any(part in skip_dirs for part in py.parts): + continue + text = py.read_text() + for line_no, line in enumerate(text.splitlines(), start=1): + if pattern.search(line): + hits.append(f"{py.relative_to(workers_root)}:{line_no}") + + # Expected exactly two — in helper.py and scheduler/tasks.py. + # If a third appears, this test fails so PR #8 doesn't miss it. + assert len(hits) == 2, ( + f"Expected exactly 2 raw current_app.send_task sites in workers/, found " + f"{len(hits)}:\n " + "\n ".join(hits) + ) + # Sanity: the two we know about + joined = " ".join(hits) + assert "shared/patterns/notification/helper.py" in joined + assert "scheduler/tasks.py" in joined + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) From 6697fe5ab688cd2a5bedf17bddcbac4406c462f0 Mon Sep 17 00:00:00 2001 From: ali Date: Thu, 7 May 2026 17:01:59 +0530 Subject: [PATCH 2/2] UN-3452 [FEAT] Address Greptile review on seam characterisation tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three P2 findings from Greptile, all fixed: 1. test_chord_import_only_in_two_files: add file-identity assertions matching the sibling call-site canary. Without these, the canary would silently pass if the two imports moved to entirely different files while count remained 2 — exactly the silent-miss scenario the Barrier migration could trigger. 2. TestSchedulerDispatchSite: add test_dispatch_returns_error_result_ when_send_task_raises. The scheduler site has a real error branch in scheduler/tasks.py:185-192 that catches send_task exceptions and returns SchedulerExecutionResult.error(...) — without a characterisation test the upcoming dispatch() migration could silently change error semantics (re-raise instead of returning an error result, or swallow silently). Mirrors the equivalent notification-site test_dispatch_returns_false_on_send_task_failure. 3. skip_dirs check anchored to top-level dir relative to workers_root in all three inventory tests. The previous `any(part in skip_dirs for part in py.parts)` check would have erroneously excluded any path with a component named `tests` (e.g. workers/shared/ tests_helpers/foo.py). 21 tests now (was 20), runtime ~3s. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../test_chord_sites_characterisation.py | 19 ++++++++--- .../test_dispatch_sites_characterisation.py | 33 +++++++++++++++++-- 2 files changed, 45 insertions(+), 7 deletions(-) diff --git a/workers/tests/test_chord_sites_characterisation.py b/workers/tests/test_chord_sites_characterisation.py index f226fc643d..7c9d653c22 100644 --- a/workers/tests/test_chord_sites_characterisation.py +++ b/workers/tests/test_chord_sites_characterisation.py @@ -259,7 +259,10 @@ def test_only_two_known_chord_call_sites_in_workers(self): import re workers_root = pathlib.Path(__file__).parent.parent - skip_dirs = {"tests", "__pycache__", "htmlcov", ".venv"} + # Anchor to the top-level directory relative to workers_root so we + # don't accidentally exclude legitimately-named subdirectories like + # `workers/shared/tests_helpers/`. + skip_top_dirs = {"tests", "__pycache__", "htmlcov", ".venv"} # Match `chord(` as a function call — excludes the bare import line # `from celery import chord` and helper method names like `create_chord`. @@ -270,7 +273,8 @@ def test_only_two_known_chord_call_sites_in_workers(self): hits = [] for py in workers_root.rglob("*.py"): - if any(part in skip_dirs for part in py.parts): + rel_parts = py.relative_to(workers_root).parts + if rel_parts and rel_parts[0] in skip_top_dirs: continue text = py.read_text() for line_no, line in enumerate(text.splitlines(), start=1): @@ -293,13 +297,14 @@ def test_chord_import_only_in_two_files(self): import re workers_root = pathlib.Path(__file__).parent.parent - skip_dirs = {"tests", "__pycache__", "htmlcov", ".venv"} + skip_top_dirs = {"tests", "__pycache__", "htmlcov", ".venv"} pattern = re.compile(r"^\s*from\s+celery\s+import\s+chord\b") hits = [] for py in workers_root.rglob("*.py"): - if any(part in skip_dirs for part in py.parts): + rel_parts = py.relative_to(workers_root).parts + if rel_parts and rel_parts[0] in skip_top_dirs: continue text = py.read_text() for line_no, line in enumerate(text.splitlines(), start=1): @@ -310,6 +315,12 @@ def test_chord_import_only_in_two_files(self): f"Expected `from celery import chord` in exactly 2 files, found " f"{len(hits)}:\n " + "\n ".join(hits) ) + # Sanity: same files as the call-site canary above. If the imports + # ever migrate to different files while count stays at 2, this catches + # it — preventing a silent miss during the Barrier migration. + joined = " ".join(hits) + assert "shared/workflow/execution/orchestration_utils.py" in joined + assert "api-deployment/tasks.py" in joined if __name__ == "__main__": diff --git a/workers/tests/test_dispatch_sites_characterisation.py b/workers/tests/test_dispatch_sites_characterisation.py index 62f1fcd27b..95278a81b6 100644 --- a/workers/tests/test_dispatch_sites_characterisation.py +++ b/workers/tests/test_dispatch_sites_characterisation.py @@ -232,6 +232,30 @@ def test_no_dispatch_when_execution_creation_fails(self): # And the function returns an error result (not raised exception). assert result.status == SchedulerExecutionStatus.ERROR + def test_dispatch_returns_error_result_when_send_task_raises(self): + """If current_app.send_task raises (broker down, queue gone, etc.), + _execute_scheduled_workflow MUST catch it, log, and return a + SchedulerExecutionResult.error(...) — NOT propagate the exception. + + PR #8 (dispatch migration) must preserve this semantic. If the new + dispatch() helper either swallows the error silently or re-raises a + different exception type, this test fails. + """ + from scheduler.tasks import _execute_scheduled_workflow + from shared.models.scheduler_models import SchedulerExecutionStatus + + with patch("celery.current_app") as mock_app: + mock_app.send_task.side_effect = RuntimeError("broker down") + result = _execute_scheduled_workflow( + self._make_api_client(), self._make_context() + ) + + # Exception was caught (not propagated). + assert result.status == SchedulerExecutionStatus.ERROR + # Execution itself was created before the dispatch failed, so the + # error result is about the dispatch, not the creation. + assert result.execution_id == "exec-123" or result.execution_id is None + # --- Cross-site invariant --- @@ -247,13 +271,16 @@ def test_only_two_known_dispatch_sites_in_workers(self): import re workers_root = pathlib.Path(__file__).parent.parent - # Skip tests/ and __pycache__/ etc. - skip_dirs = {"tests", "__pycache__", "htmlcov", ".venv"} + # Anchor skip to the top-level directory relative to workers_root so + # we don't accidentally exclude legitimately-named subdirectories + # (e.g. workers/shared/tests_helpers/). + skip_top_dirs = {"tests", "__pycache__", "htmlcov", ".venv"} pattern = re.compile(r"current_app\.send_task\b") hits = [] for py in workers_root.rglob("*.py"): - if any(part in skip_dirs for part in py.parts): + rel_parts = py.relative_to(workers_root).parts + if rel_parts and rel_parts[0] in skip_top_dirs: continue text = py.read_text() for line_no, line in enumerate(text.splitlines(), start=1):