From b16696b58901a6e1da11c061d7e30cb63d9b4f6d Mon Sep 17 00:00:00 2001 From: Erica Pisani Date: Fri, 22 May 2026 11:13:11 -0400 Subject: [PATCH 01/11] fix(huey): Fix group and chord handling in enqueue The Huey integration was not properly handling task groups and chords when enqueuing. When a group/chord is enqueued, we would attempt to access the `name` attribute of the group/chord object for the span. They don't have one, causing an AttributeError. Fixes PY-2426 Fixes #6310 --- sentry_sdk/integrations/huey.py | 16 ++++++--- tests/integrations/huey/test_huey.py | 54 ++++++++++++++++++++++++++++ 2 files changed, 65 insertions(+), 5 deletions(-) diff --git a/sentry_sdk/integrations/huey.py b/sentry_sdk/integrations/huey.py index a8d932200c..9704ffab49 100644 --- a/sentry_sdk/integrations/huey.py +++ b/sentry_sdk/integrations/huey.py @@ -30,6 +30,7 @@ try: from huey.api import Huey, PeriodicTask, Result, ResultGroup, Task + from huey.api import group as HueyGroup from huey.exceptions import CancelExecution, RetryTask, TaskLockedException except ImportError: raise DidNotEnable("Huey is not installed") @@ -53,22 +54,27 @@ def patch_enqueue() -> None: @ensure_integration_enabled(HueyIntegration, old_enqueue) def _sentry_enqueue( - self: "Huey", task: "Task" + self: "Huey", item: "Union[Task, HueyGroup]" ) -> "Optional[Union[Result, ResultGroup]]": + span_name = "Huey Task Group" if type(item) is HueyGroup else item.name with sentry_sdk.start_span( op=OP.QUEUE_SUBMIT_HUEY, - name=task.name, + name=span_name, origin=HueyIntegration.origin, ): - if not isinstance(task, PeriodicTask): + if not isinstance(item, PeriodicTask) and not isinstance(item, HueyGroup): # Attach trace propagation data to task kwargs. We do # not do this for periodic tasks, as these don't # really have an originating transaction. - task.kwargs["sentry_headers"] = { + # Additionally, we do not do this for Huey groups, as enqueue will + # recursively call this method for each task within the group, resulting + # in the trace propagation data being attached to each task individually ( + # which we want) + item.kwargs["sentry_headers"] = { BAGGAGE_HEADER_NAME: get_baggage(), SENTRY_TRACE_HEADER_NAME: get_traceparent(), } - return old_enqueue(self, task) + return old_enqueue(self, item) Huey.enqueue = _sentry_enqueue diff --git a/tests/integrations/huey/test_huey.py b/tests/integrations/huey/test_huey.py index 7440280623..51d58c45b8 100644 --- a/tests/integrations/huey/test_huey.py +++ b/tests/integrations/huey/test_huey.py @@ -2,6 +2,7 @@ import pytest from huey import __version__ as HUEY_VERSION +from huey import group from huey.api import MemoryHuey, Result from huey.exceptions import RetryTask @@ -222,3 +223,56 @@ def propagated_trace_task(): (event,) = events assert event["contexts"]["trace"]["origin"] == "auto.queue.huey" + + +def test_huey_enqueue_group(init_huey, capture_events): + huey = init_huey() + + events = capture_events() + + @huey.task() + def task1(): + pass + + @huey.task() + def task2(): + pass + + with start_transaction() as transaction: + huey.enqueue(group([task1.s(), task2.s()])) + + for _ in range(2): + task = huey.dequeue() + huey.execute(task) + + assert len(events) == 3 + + # Assert enqueue spans were successfully recorded + producer_event = events[0] + assert producer_event["type"] == "transaction" + assert producer_event["contexts"]["trace"]["trace_id"] == transaction.trace_id + assert producer_event["contexts"]["trace"]["origin"] == "manual" + + spans = producer_event["spans"] + assert len(spans) == 3 + assert spans[0]["op"] == "queue.submit.huey" + assert spans[0]["description"] == "Huey Task Group" + assert spans[1]["op"] == "queue.submit.huey" + assert spans[1]["description"] == "task1" + assert spans[2]["op"] == "queue.submit.huey" + assert spans[2]["description"] == "task2" + + # Consumer transaction assertions (one per task) + consumer_events = sorted(events[1:], key=lambda e: e["transaction"]) + for i, (consumer_event, expected_name) in enumerate( + zip(consumer_events, ["task1", "task2"]) + ): + assert consumer_event["type"] == "transaction" + assert consumer_event["transaction"] == expected_name + assert consumer_event["transaction_info"] == {"source": "task"} + assert consumer_event["contexts"]["trace"]["op"] == "queue.task.huey" + assert consumer_event["contexts"]["trace"]["origin"] == "auto.queue.huey" + assert consumer_event["contexts"]["trace"]["status"] == "ok" + assert consumer_event["contexts"]["trace"]["trace_id"] == transaction.trace_id + assert "huey_task_id" in consumer_event["tags"] + assert consumer_event["tags"]["huey_task_retry"] is False From 14a8cc8d9bc014f485d524269e758339c5a282ff Mon Sep 17 00:00:00 2001 From: Erica Pisani Date: Fri, 22 May 2026 12:04:33 -0400 Subject: [PATCH 02/11] Handle chords correctly and include test. Groups and chords weren't introduced until Huey 3.0, so handle that gracefully. --- sentry_sdk/integrations/huey.py | 32 +++++++++---- tests/integrations/huey/test_huey.py | 68 ++++++++++++++++++++++++++-- 2 files changed, 89 insertions(+), 11 deletions(-) diff --git a/sentry_sdk/integrations/huey.py b/sentry_sdk/integrations/huey.py index 9704ffab49..7a1ac19232 100644 --- a/sentry_sdk/integrations/huey.py +++ b/sentry_sdk/integrations/huey.py @@ -30,11 +30,17 @@ try: from huey.api import Huey, PeriodicTask, Result, ResultGroup, Task - from huey.api import group as HueyGroup from huey.exceptions import CancelExecution, RetryTask, TaskLockedException except ImportError: raise DidNotEnable("Huey is not installed") +try: + from huey.api import chord as HueyChord + from huey.api import group as HueyGroup +except ImportError: + HueyChord = None + HueyGroup = None + HUEY_CONTROL_FLOW_EXCEPTIONS = (CancelExecution, RetryTask, TaskLockedException) @@ -54,22 +60,32 @@ def patch_enqueue() -> None: @ensure_integration_enabled(HueyIntegration, old_enqueue) def _sentry_enqueue( - self: "Huey", item: "Union[Task, HueyGroup]" + self: "Huey", item: "Union[Task, HueyGroup, HueyChord]" ) -> "Optional[Union[Result, ResultGroup]]": - span_name = "Huey Task Group" if type(item) is HueyGroup else item.name + if HueyChord is not None and isinstance(item, HueyChord): + span_name = "Huey Chord" + elif HueyGroup is not None and isinstance(item, HueyGroup): + span_name = "Huey Task Group" + else: + span_name = item.name + with sentry_sdk.start_span( op=OP.QUEUE_SUBMIT_HUEY, name=span_name, origin=HueyIntegration.origin, ): - if not isinstance(item, PeriodicTask) and not isinstance(item, HueyGroup): + if ( + not isinstance(item, PeriodicTask) + and not (HueyGroup is not None and isinstance(item, HueyGroup)) + and not (HueyChord is not None and isinstance(item, HueyChord)) + ): # Attach trace propagation data to task kwargs. We do # not do this for periodic tasks, as these don't # really have an originating transaction. - # Additionally, we do not do this for Huey groups, as enqueue will - # recursively call this method for each task within the group, resulting - # in the trace propagation data being attached to each task individually ( - # which we want) + # Additionally, we do not do this for Huey groups or chords, as enqueue will + # recursively call this method for each task within the list, resulting + # in the trace propagation data being attached to each task individually + # (which we want) item.kwargs["sentry_headers"] = { BAGGAGE_HEADER_NAME: get_baggage(), SENTRY_TRACE_HEADER_NAME: get_traceparent(), diff --git a/tests/integrations/huey/test_huey.py b/tests/integrations/huey/test_huey.py index 51d58c45b8..070608779d 100644 --- a/tests/integrations/huey/test_huey.py +++ b/tests/integrations/huey/test_huey.py @@ -2,7 +2,6 @@ import pytest from huey import __version__ as HUEY_VERSION -from huey import group from huey.api import MemoryHuey, Result from huey.exceptions import RetryTask @@ -12,6 +11,11 @@ HUEY_VERSION = parse_version(HUEY_VERSION) +try: + from huey.api import chord, group +except ImportError: + chord = None + group = None @pytest.fixture def init_huey(sentry_init): @@ -225,6 +229,7 @@ def propagated_trace_task(): assert event["contexts"]["trace"]["origin"] == "auto.queue.huey" +@pytest.mark.skipif(HUEY_VERSION < (3, 0), reason="group was added in 3.0") def test_huey_enqueue_group(init_huey, capture_events): huey = init_huey() @@ -263,8 +268,65 @@ def task2(): assert spans[2]["description"] == "task2" # Consumer transaction assertions (one per task) - consumer_events = sorted(events[1:], key=lambda e: e["transaction"]) - for i, (consumer_event, expected_name) in enumerate( + consumer_events = events[1:] + for _, (consumer_event, expected_name) in enumerate( + zip(consumer_events, ["task1", "task2"]) + ): + assert consumer_event["type"] == "transaction" + assert consumer_event["transaction"] == expected_name + assert consumer_event["transaction_info"] == {"source": "task"} + assert consumer_event["contexts"]["trace"]["op"] == "queue.task.huey" + assert consumer_event["contexts"]["trace"]["origin"] == "auto.queue.huey" + assert consumer_event["contexts"]["trace"]["status"] == "ok" + assert consumer_event["contexts"]["trace"]["trace_id"] == transaction.trace_id + assert "huey_task_id" in consumer_event["tags"] + assert consumer_event["tags"]["huey_task_retry"] is False + + +@pytest.mark.skipif(HUEY_VERSION < (3, 0), reason="chord was added in 3.0") +def test_huey_enqueue_chord(init_huey, capture_events): + huey = init_huey() + + events = capture_events() + + @huey.task() + def task1(): + pass + + @huey.task() + def task2(results): + pass + + with start_transaction() as transaction: + huey.enqueue(chord([task1.s()], task2.s())) + + for _ in range(2): + task = huey.dequeue() + huey.execute(task) + + assert len(events) == 3 + + # Enqueue spans + producer_event = events[0] + assert producer_event["contexts"]["trace"]["trace_id"] == transaction.trace_id + assert producer_event["contexts"]["trace"]["origin"] == "manual" + + spans = producer_event["spans"] + assert len(spans) == 2 + assert spans[0]["op"] == "queue.submit.huey" + assert spans[0]["description"] == "Huey Chord" + assert spans[1]["op"] == "queue.submit.huey" + assert spans[1]["description"] == "task1" + + task1_event = events[1] + # Confirm the first task enqueued the chord callback + task1_spans = task1_event["spans"] + assert len(task1_spans) == 1 + assert task1_spans[0]["op"] == "queue.submit.huey" + assert task1_spans[0]["description"] == "task2" + + consumer_events = events[1:] + for _, (consumer_event, expected_name) in enumerate( zip(consumer_events, ["task1", "task2"]) ): assert consumer_event["type"] == "transaction" From 092af9dd4a6b4f8c7c4c5b96e61ccda4b482301a Mon Sep 17 00:00:00 2001 From: Erica Pisani Date: Fri, 22 May 2026 13:18:16 -0400 Subject: [PATCH 03/11] lint --- tests/integrations/huey/test_huey.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/integrations/huey/test_huey.py b/tests/integrations/huey/test_huey.py index 070608779d..e2cc81e755 100644 --- a/tests/integrations/huey/test_huey.py +++ b/tests/integrations/huey/test_huey.py @@ -14,8 +14,9 @@ try: from huey.api import chord, group except ImportError: - chord = None - group = None + chord = None + group = None + @pytest.fixture def init_huey(sentry_init): From 857ba47626fd89c6c02d88e4c680d9cc79d4ad32 Mon Sep 17 00:00:00 2001 From: Erica Pisani Date: Mon, 25 May 2026 11:05:27 -0400 Subject: [PATCH 04/11] ensure data in the event processor is added to spans. ensure status is set correctly on segments in _capture_exception method --- sentry_sdk/integrations/huey.py | 97 +++-- tests/integrations/huey/test_huey.py | 513 +++++++++++++++++++++------ 2 files changed, 471 insertions(+), 139 deletions(-) diff --git a/sentry_sdk/integrations/huey.py b/sentry_sdk/integrations/huey.py index 7a1ac19232..9eada7bbf3 100644 --- a/sentry_sdk/integrations/huey.py +++ b/sentry_sdk/integrations/huey.py @@ -7,11 +7,13 @@ from sentry_sdk.consts import OP, SPANSTATUS from sentry_sdk.integrations import DidNotEnable, Integration from sentry_sdk.scope import should_send_default_pii +from sentry_sdk.traces import SegmentSource, SpanStatus, StreamedSpan from sentry_sdk.tracing import ( BAGGAGE_HEADER_NAME, SENTRY_TRACE_HEADER_NAME, TransactionSource, ) +from sentry_sdk.tracing_utils import has_span_streaming_enabled from sentry_sdk.utils import ( SENSITIVE_DATA_SUBSTITUTE, capture_internal_exceptions, @@ -60,7 +62,7 @@ def patch_enqueue() -> None: @ensure_integration_enabled(HueyIntegration, old_enqueue) def _sentry_enqueue( - self: "Huey", item: "Union[Task, HueyGroup, HueyChord]" + self: "Huey", item: "Any" ) -> "Optional[Union[Result, ResultGroup]]": if HueyChord is not None and isinstance(item, HueyChord): span_name = "Huey Chord" @@ -69,16 +71,31 @@ def _sentry_enqueue( else: span_name = item.name - with sentry_sdk.start_span( - op=OP.QUEUE_SUBMIT_HUEY, - name=span_name, - origin=HueyIntegration.origin, - ): - if ( - not isinstance(item, PeriodicTask) - and not (HueyGroup is not None and isinstance(item, HueyGroup)) - and not (HueyChord is not None and isinstance(item, HueyChord)) - ): + is_span_streaming_enabled = has_span_streaming_enabled( + sentry_sdk.get_client().options + ) + + span_ctx = None + if is_span_streaming_enabled: + span_ctx = sentry_sdk.traces.start_span( + name=span_name, + attributes={ + "sentry.op": OP.QUEUE_SUBMIT_HUEY, + "sentry.origin": HueyIntegration.origin, + }, + ) + else: + span_ctx = sentry_sdk.start_span( + op=OP.QUEUE_SUBMIT_HUEY, + name=span_name, + origin=HueyIntegration.origin, + ) + + no_headers_types = (PeriodicTask,) + tuple( + t for t in [HueyGroup, HueyChord] if t is not None + ) + with span_ctx: + if not isinstance(item, no_headers_types): # Attach trace propagation data to task kwargs. We do # not do this for periodic tasks, as these don't # really have an originating transaction. @@ -124,12 +141,22 @@ def event_processor(event: "Event", hint: "Hint") -> "Optional[Event]": def _capture_exception(exc_info: "ExcInfo") -> None: scope = sentry_sdk.get_current_scope() + is_span_streaming_enabled = has_span_streaming_enabled( + sentry_sdk.get_client().options + ) if exc_info[0] in HUEY_CONTROL_FLOW_EXCEPTIONS: - scope.transaction.set_status(SPANSTATUS.ABORTED) + if not is_span_streaming_enabled: + scope.transaction.set_status(SPANSTATUS.ABORTED) + elif type(scope._span) is StreamedSpan: + scope._span._segment.status = SpanStatus.OK return - scope.transaction.set_status(SPANSTATUS.INTERNAL_ERROR) + if not is_span_streaming_enabled: + scope.transaction.set_status(SPANSTATUS.INTERNAL_ERROR) + elif type(scope._span) is StreamedSpan: + scope._span._segment.status = SpanStatus.ERROR + event, hint = event_from_exception( exc_info, client_options=sentry_sdk.get_client().options, @@ -167,21 +194,47 @@ def _sentry_execute( scope.add_event_processor(_make_event_processor(task)) sentry_headers = task.kwargs.pop("sentry_headers", None) - - transaction = continue_trace( - sentry_headers or {}, - name=task.name, - op=OP.QUEUE_TASK_HUEY, - source=TransactionSource.TASK, - origin=HueyIntegration.origin, + is_span_streaming_enabled = has_span_streaming_enabled( + sentry_sdk.get_client().options ) - transaction.set_status(SPANSTATUS.OK) + + if is_span_streaming_enabled: + headers = sentry_headers or {} + sentry_sdk.traces.continue_trace(headers) + span_ctx = sentry_sdk.traces.start_span( + name=task.name, + attributes={ + "sentry.op": OP.QUEUE_TASK_HUEY, + "sentry.origin": HueyIntegration.origin, + "sentry.span.source": SegmentSource.TASK, + "messaging.message.id": task.id, + "messaging.message.system": "huey", + "messaging.message.retry.count": (task.default_retries or 0) + - task.retries, + "messaging.message.args": task.args + if should_send_default_pii() + else SENSITIVE_DATA_SUBSTITUTE, + "messaging.message.kwargs": task.kwargs + if should_send_default_pii() + else SENSITIVE_DATA_SUBSTITUTE, + }, + ) + else: + transaction = continue_trace( + sentry_headers or {}, + name=task.name, + op=OP.QUEUE_TASK_HUEY, + source=TransactionSource.TASK, + origin=HueyIntegration.origin, + ) + transaction.set_status(SPANSTATUS.OK) + span_ctx = sentry_sdk.start_transaction(transaction) if not getattr(task, "_sentry_is_patched", False): task.execute = _wrap_task_execute(task.execute) task._sentry_is_patched = True - with sentry_sdk.start_transaction(transaction): + with span_ctx: return old_execute(self, task, timestamp) Huey._execute = _sentry_execute diff --git a/tests/integrations/huey/test_huey.py b/tests/integrations/huey/test_huey.py index e2cc81e755..83a794b00c 100644 --- a/tests/integrations/huey/test_huey.py +++ b/tests/integrations/huey/test_huey.py @@ -5,8 +5,13 @@ from huey.api import MemoryHuey, Result from huey.exceptions import RetryTask +import sentry_sdk from sentry_sdk import start_transaction +from sentry_sdk._types import SENSITIVE_DATA_SUBSTITUTE +from sentry_sdk.consts import OP from sentry_sdk.integrations.huey import HueyIntegration +from sentry_sdk.scope import should_send_default_pii +from sentry_sdk.traces import SegmentSource, SpanStatus from sentry_sdk.utils import parse_version HUEY_VERSION = parse_version(HUEY_VERSION) @@ -20,11 +25,12 @@ @pytest.fixture def init_huey(sentry_init): - def inner(): + def inner(has_span_streaming=None, send_default_pii=True): sentry_init( integrations=[HueyIntegration()], traces_sample_rate=1.0, - send_default_pii=True, + send_default_pii=send_default_pii, + _experiments={"trace_lifecycle": "stream"} if has_span_streaming else {}, ) return MemoryHuey(name="sentry_sdk") @@ -69,39 +75,67 @@ def increase(num): @pytest.mark.parametrize("task_fails", [True, False], ids=["error", "success"]) -def test_task_transaction(capture_events, init_huey, task_fails): - huey = init_huey() +@pytest.mark.parametrize( + "has_span_streaming", [True, False], ids=["streaming", "no_streaming"] +) +def test_task_transaction_or_segment( + capture_events, capture_items, init_huey, task_fails, has_span_streaming +): + huey = init_huey(has_span_streaming=has_span_streaming) @huey.task() def division(a, b): return a / b - events = capture_events() - execute_huey_task( - huey, division, 1, int(not task_fails), exceptions=(DivisionByZero,) - ) + if has_span_streaming: + items = capture_items("span") + execute_huey_task( + huey, division, 1, int(not task_fails), exceptions=(DivisionByZero,) + ) + sentry_sdk.get_client().flush() + + payloads = [i.payload for i in items] + (execute_span,) = [ + # Searching for this span specifically because this is what has the raised exception + p + for p in payloads + if p["attributes"]["sentry.op"] == OP.QUEUE_TASK_HUEY + ] + assert execute_span["is_segment"] + assert execute_span["name"] == "division" + assert execute_span["status"] == ( + SpanStatus.ERROR if task_fails else SpanStatus.OK + ) + else: + events = capture_events() + execute_huey_task( + huey, division, 1, int(not task_fails), exceptions=(DivisionByZero,) + ) - if task_fails: - error_event = events.pop(0) - assert error_event["exception"]["values"][0]["type"] == "ZeroDivisionError" - assert error_event["exception"]["values"][0]["mechanism"]["type"] == "huey" + if task_fails: + error_event = events.pop(0) + assert error_event["exception"]["values"][0]["type"] == "ZeroDivisionError" + assert error_event["exception"]["values"][0]["mechanism"]["type"] == "huey" - (event,) = events - assert event["type"] == "transaction" - assert event["transaction"] == "division" - assert event["transaction_info"] == {"source": "task"} + (event,) = events + assert event["type"] == "transaction" + assert event["transaction"] == "division" + assert event["transaction_info"] == {"source": "task"} - if task_fails: - assert event["contexts"]["trace"]["status"] == "internal_error" - else: - assert event["contexts"]["trace"]["status"] == "ok" + if task_fails: + assert event["contexts"]["trace"]["status"] == "internal_error" + else: + assert event["contexts"]["trace"]["status"] == "ok" - assert "huey_task_id" in event["tags"] - assert "huey_task_retry" in event["tags"] + assert "huey_task_id" in event["tags"] + assert "huey_task_retry" in event["tags"] -def test_task_retry(capture_events, init_huey): - huey = init_huey() +@pytest.mark.parametrize( + "has_span_streaming", [True, False], ids=["streaming", "no_streaming"] +) +def test_task_retry(capture_events, capture_items, init_huey, has_span_streaming): + huey = init_huey(has_span_streaming=has_span_streaming) context = {"retry": True} @huey.task() @@ -110,21 +144,38 @@ def retry_task(context): context["retry"] = False raise RetryTask() - events = capture_events() - result = execute_huey_task(huey, retry_task, context) - (event,) = events + if has_span_streaming: + items = capture_items("span") + execute_huey_task(huey, retry_task, context) + sentry_sdk.get_client().flush() + + payloads = [i.payload for i in items] + (execute_span,) = [ + # Searching for this span specifically because this is what has the raised exception + p + for p in payloads + if p["attributes"]["sentry.op"] == OP.QUEUE_TASK_HUEY + ] + assert execute_span["is_segment"] + assert execute_span["name"] == "retry_task" + assert execute_span["status"] == SpanStatus.OK + assert len(huey) == 1 + else: + events = capture_events() + result = execute_huey_task(huey, retry_task, context) + (event,) = events - assert event["transaction"] == "retry_task" - assert event["tags"]["huey_task_id"] == result.task.id - assert len(huey) == 1 + assert event["transaction"] == "retry_task" + assert event["tags"]["huey_task_id"] == result.task.id + assert len(huey) == 1 - task = huey.dequeue() - huey.execute(task) - (event, _) = events + task = huey.dequeue() + huey.execute(task) + (event, _) = events - assert event["transaction"] == "retry_task" - assert event["tags"]["huey_task_id"] == result.task.id - assert len(huey) == 0 + assert event["transaction"] == "retry_task" + assert event["tags"]["huey_task_id"] == result.task.id + assert len(huey) == 0 @pytest.mark.parametrize("lock_name", ["lock.a", "lock.b"], ids=["locked", "unlocked"]) @@ -230,11 +281,15 @@ def propagated_trace_task(): assert event["contexts"]["trace"]["origin"] == "auto.queue.huey" +@pytest.mark.parametrize("pii_enabled", [True, False]) +@pytest.mark.parametrize("has_span_streaming", [True, False]) @pytest.mark.skipif(HUEY_VERSION < (3, 0), reason="group was added in 3.0") -def test_huey_enqueue_group(init_huey, capture_events): - huey = init_huey() - - events = capture_events() +def test_huey_enqueue_group( + init_huey, capture_events, capture_items, pii_enabled, has_span_streaming +): + huey = init_huey( + has_span_streaming=has_span_streaming, send_default_pii=pii_enabled + ) @huey.task() def task1(): @@ -244,51 +299,166 @@ def task1(): def task2(): pass - with start_transaction() as transaction: + if has_span_streaming: + items = capture_items("span") + huey.enqueue(group([task1.s(), task2.s()])) - for _ in range(2): - task = huey.dequeue() - huey.execute(task) + for _ in range(2): + task = huey.dequeue() + huey.execute(task) - assert len(events) == 3 - - # Assert enqueue spans were successfully recorded - producer_event = events[0] - assert producer_event["type"] == "transaction" - assert producer_event["contexts"]["trace"]["trace_id"] == transaction.trace_id - assert producer_event["contexts"]["trace"]["origin"] == "manual" - - spans = producer_event["spans"] - assert len(spans) == 3 - assert spans[0]["op"] == "queue.submit.huey" - assert spans[0]["description"] == "Huey Task Group" - assert spans[1]["op"] == "queue.submit.huey" - assert spans[1]["description"] == "task1" - assert spans[2]["op"] == "queue.submit.huey" - assert spans[2]["description"] == "task2" - - # Consumer transaction assertions (one per task) - consumer_events = events[1:] - for _, (consumer_event, expected_name) in enumerate( - zip(consumer_events, ["task1", "task2"]) - ): - assert consumer_event["type"] == "transaction" - assert consumer_event["transaction"] == expected_name - assert consumer_event["transaction_info"] == {"source": "task"} - assert consumer_event["contexts"]["trace"]["op"] == "queue.task.huey" - assert consumer_event["contexts"]["trace"]["origin"] == "auto.queue.huey" - assert consumer_event["contexts"]["trace"]["status"] == "ok" - assert consumer_event["contexts"]["trace"]["trace_id"] == transaction.trace_id - assert "huey_task_id" in consumer_event["tags"] - assert consumer_event["tags"]["huey_task_retry"] is False + sentry_sdk.get_client().flush() + assert len(items) == 5 + + ( + task1_enqueue_span, + task2_enqueue_span, + group_span, + task1_execute_span, + task2_execute_span, + ) = [i.payload for i in items] + + assert group_span["is_segment"] + assert not task1_enqueue_span["is_segment"] + assert not task2_enqueue_span["is_segment"] + assert task1_execute_span["is_segment"] + assert task2_execute_span["is_segment"] + + assert group_span["name"] == "Huey Task Group" + assert group_span["status"] == "ok" + assert group_span["attributes"]["sentry.op"] == "queue.submit.huey" + assert group_span["attributes"]["sentry.origin"] == "auto.queue.huey" + + assert task1_enqueue_span["name"] == "task1" + assert task1_enqueue_span["status"] == "ok" + assert task1_enqueue_span["parent_span_id"] == group_span["span_id"] + assert ( + task1_enqueue_span["attributes"]["sentry.segment.name"] == "Huey Task Group" + ) + assert task1_enqueue_span["attributes"]["sentry.op"] == "queue.submit.huey" + assert task1_enqueue_span["attributes"]["sentry.origin"] == "auto.queue.huey" + + assert task2_enqueue_span["name"] == "task2" + assert task2_enqueue_span["status"] == "ok" + assert task2_enqueue_span["parent_span_id"] == group_span["span_id"] + assert ( + task2_enqueue_span["attributes"]["sentry.segment.name"] == "Huey Task Group" + ) + assert task2_enqueue_span["attributes"]["sentry.op"] == "queue.submit.huey" + assert task2_enqueue_span["attributes"]["sentry.origin"] == "auto.queue.huey" + + assert task1_execute_span["name"] == "task1" + assert task1_execute_span["status"] == "ok" + assert task1_execute_span["attributes"]["messaging.message.system"] == "huey" + assert task1_execute_span["parent_span_id"] == task1_enqueue_span["span_id"] + assert task1_execute_span["attributes"]["sentry.op"] == "queue.task.huey" + assert task1_execute_span["attributes"]["sentry.origin"] == "auto.queue.huey" + assert ( + task1_execute_span["attributes"]["sentry.span.source"] == SegmentSource.TASK + ) + assert task1_execute_span["attributes"]["messaging.message.id"] is not None + assert task1_execute_span["attributes"]["messaging.message.retry.count"] == 0 + + if pii_enabled: + assert ( + task1_execute_span["attributes"]["messaging.message.args"] + != SENSITIVE_DATA_SUBSTITUTE + ) + assert ( + task1_execute_span["attributes"]["messaging.message.kwargs"] + != SENSITIVE_DATA_SUBSTITUTE + ) + else: + assert ( + task1_execute_span["attributes"]["messaging.message.args"] + == SENSITIVE_DATA_SUBSTITUTE + ) + assert ( + task1_execute_span["attributes"]["messaging.message.kwargs"] + == SENSITIVE_DATA_SUBSTITUTE + ) + + assert task2_execute_span["name"] == "task2" + assert task2_execute_span["status"] == "ok" + assert task2_execute_span["parent_span_id"] == task2_enqueue_span["span_id"] + assert task2_execute_span["attributes"]["messaging.message.system"] == "huey" + assert task2_execute_span["attributes"]["sentry.op"] == "queue.task.huey" + assert task2_execute_span["attributes"]["sentry.origin"] == "auto.queue.huey" + assert ( + task2_execute_span["attributes"]["sentry.span.source"] == SegmentSource.TASK + ) + if pii_enabled: + assert ( + task2_execute_span["attributes"]["messaging.message.args"] + != SENSITIVE_DATA_SUBSTITUTE + ) + assert ( + task2_execute_span["attributes"]["messaging.message.kwargs"] + != SENSITIVE_DATA_SUBSTITUTE + ) + else: + assert ( + task2_execute_span["attributes"]["messaging.message.args"] + == SENSITIVE_DATA_SUBSTITUTE + ) + assert ( + task2_execute_span["attributes"]["messaging.message.kwargs"] + == SENSITIVE_DATA_SUBSTITUTE + ) -@pytest.mark.skipif(HUEY_VERSION < (3, 0), reason="chord was added in 3.0") -def test_huey_enqueue_chord(init_huey, capture_events): - huey = init_huey() + else: + events = capture_events() + with start_transaction() as transaction: + huey.enqueue(group([task1.s(), task2.s()])) - events = capture_events() + for _ in range(2): + task = huey.dequeue() + huey.execute(task) + + assert len(events) == 3 + + # Assert enqueue spans were successfully recorded + producer_event = events[0] + assert producer_event["type"] == "transaction" + assert producer_event["contexts"]["trace"]["trace_id"] == transaction.trace_id + assert producer_event["contexts"]["trace"]["origin"] == "manual" + + spans = producer_event["spans"] + assert len(spans) == 3 + assert spans[0]["op"] == "queue.submit.huey" + assert spans[0]["description"] == "Huey Task Group" + assert spans[1]["op"] == "queue.submit.huey" + assert spans[1]["description"] == "task1" + assert spans[2]["op"] == "queue.submit.huey" + assert spans[2]["description"] == "task2" + + # Consumer transaction assertions (one per task) + consumer_events = events[1:] + for consumer_event, expected_name in zip(consumer_events, ["task1", "task2"]): + assert consumer_event["type"] == "transaction" + assert consumer_event["transaction"] == expected_name + assert consumer_event["transaction_info"] == {"source": "task"} + assert consumer_event["contexts"]["trace"]["op"] == "queue.task.huey" + assert consumer_event["contexts"]["trace"]["origin"] == "auto.queue.huey" + assert consumer_event["contexts"]["trace"]["status"] == "ok" + assert ( + consumer_event["contexts"]["trace"]["trace_id"] == transaction.trace_id + ) + assert "huey_task_id" in consumer_event["tags"] + assert consumer_event["tags"]["huey_task_retry"] is False + + +@pytest.mark.parametrize("pii_enabled", [True, False]) +@pytest.mark.parametrize("has_span_streaming", [True, False]) +@pytest.mark.skipif(HUEY_VERSION < (3, 0), reason="chord was added in 3.0") +def test_huey_enqueue_chord( + init_huey, capture_events, capture_items, pii_enabled, has_span_streaming +): + huey = init_huey( + has_span_streaming=has_span_streaming, send_default_pii=pii_enabled + ) @huey.task() def task1(): @@ -298,44 +468,153 @@ def task1(): def task2(results): pass - with start_transaction() as transaction: + if has_span_streaming: + items = capture_items("span") huey.enqueue(chord([task1.s()], task2.s())) - for _ in range(2): - task = huey.dequeue() - huey.execute(task) + for _ in range(2): + task = huey.dequeue() + huey.execute(task) + + sentry_sdk.get_client().flush() + assert len(items) == 5 + + ( + task1_enqueue_span, + chord_span, + task2_enqueue_span, + task1_execute_span, + task2_execute_span, + ) = [i.payload for i in items] + + assert chord_span["is_segment"] + assert not task1_enqueue_span["is_segment"] + assert not task2_enqueue_span["is_segment"] + assert task1_execute_span["is_segment"] + assert task2_execute_span["is_segment"] + + assert chord_span["name"] == "Huey Chord" + assert chord_span["status"] == "ok" + assert chord_span["attributes"]["sentry.op"] == "queue.submit.huey" + assert chord_span["attributes"]["sentry.origin"] == "auto.queue.huey" + + assert task1_enqueue_span["name"] == "task1" + assert task1_enqueue_span["status"] == "ok" + assert task1_enqueue_span["parent_span_id"] == chord_span["span_id"] + assert task1_enqueue_span["attributes"]["sentry.segment.name"] == "Huey Chord" + assert task1_enqueue_span["attributes"]["sentry.op"] == "queue.submit.huey" + assert task1_enqueue_span["attributes"]["sentry.origin"] == "auto.queue.huey" + + assert task1_execute_span["name"] == "task1" + assert task1_execute_span["status"] == "ok" + assert task1_execute_span["attributes"]["messaging.message.system"] == "huey" + assert task1_execute_span["parent_span_id"] == task1_enqueue_span["span_id"] + assert task1_execute_span["attributes"]["sentry.op"] == "queue.task.huey" + assert task1_execute_span["attributes"]["sentry.origin"] == "auto.queue.huey" + assert ( + task1_execute_span["attributes"]["sentry.span.source"] == SegmentSource.TASK + ) + if pii_enabled: + assert ( + task1_execute_span["attributes"]["messaging.message.args"] + != SENSITIVE_DATA_SUBSTITUTE + ) + assert ( + task1_execute_span["attributes"]["messaging.message.kwargs"] + != SENSITIVE_DATA_SUBSTITUTE + ) + else: + assert ( + task1_execute_span["attributes"]["messaging.message.args"] + == SENSITIVE_DATA_SUBSTITUTE + ) + assert ( + task1_execute_span["attributes"]["messaging.message.kwargs"] + == SENSITIVE_DATA_SUBSTITUTE + ) + + # chord callback (task2) is enqueued during task1's execution + assert task2_enqueue_span["name"] == "task2" + assert task2_enqueue_span["status"] == "ok" + assert task2_enqueue_span["parent_span_id"] == task1_execute_span["span_id"] + assert task2_enqueue_span["attributes"]["sentry.segment.name"] == "task1" + assert task2_enqueue_span["attributes"]["sentry.op"] == "queue.submit.huey" + assert task2_enqueue_span["attributes"]["sentry.origin"] == "auto.queue.huey" + + assert task2_execute_span["name"] == "task2" + assert task2_execute_span["status"] == "ok" + assert task2_execute_span["parent_span_id"] == task2_enqueue_span["span_id"] + assert task2_execute_span["attributes"]["messaging.message.system"] == "huey" + assert task2_execute_span["attributes"]["sentry.op"] == "queue.task.huey" + assert task2_execute_span["attributes"]["sentry.origin"] == "auto.queue.huey" + assert ( + task2_execute_span["attributes"]["sentry.span.source"] == SegmentSource.TASK + ) + + if pii_enabled: + assert ( + task2_execute_span["attributes"]["messaging.message.args"] + != SENSITIVE_DATA_SUBSTITUTE + ) + assert ( + task2_execute_span["attributes"]["messaging.message.kwargs"] + != SENSITIVE_DATA_SUBSTITUTE + ) + else: + assert ( + task2_execute_span["attributes"]["messaging.message.args"] + == SENSITIVE_DATA_SUBSTITUTE + ) + assert ( + task2_execute_span["attributes"]["messaging.message.kwargs"] + == SENSITIVE_DATA_SUBSTITUTE + ) + + else: + events = capture_events() + with start_transaction() as transaction: + huey.enqueue(chord([task1.s()], task2.s())) + + for _ in range(2): + task = huey.dequeue() + huey.execute(task) - assert len(events) == 3 - - # Enqueue spans - producer_event = events[0] - assert producer_event["contexts"]["trace"]["trace_id"] == transaction.trace_id - assert producer_event["contexts"]["trace"]["origin"] == "manual" - - spans = producer_event["spans"] - assert len(spans) == 2 - assert spans[0]["op"] == "queue.submit.huey" - assert spans[0]["description"] == "Huey Chord" - assert spans[1]["op"] == "queue.submit.huey" - assert spans[1]["description"] == "task1" - - task1_event = events[1] - # Confirm the first task enqueued the chord callback - task1_spans = task1_event["spans"] - assert len(task1_spans) == 1 - assert task1_spans[0]["op"] == "queue.submit.huey" - assert task1_spans[0]["description"] == "task2" - - consumer_events = events[1:] - for _, (consumer_event, expected_name) in enumerate( - zip(consumer_events, ["task1", "task2"]) - ): - assert consumer_event["type"] == "transaction" - assert consumer_event["transaction"] == expected_name - assert consumer_event["transaction_info"] == {"source": "task"} - assert consumer_event["contexts"]["trace"]["op"] == "queue.task.huey" - assert consumer_event["contexts"]["trace"]["origin"] == "auto.queue.huey" - assert consumer_event["contexts"]["trace"]["status"] == "ok" - assert consumer_event["contexts"]["trace"]["trace_id"] == transaction.trace_id - assert "huey_task_id" in consumer_event["tags"] - assert consumer_event["tags"]["huey_task_retry"] is False + assert len(events) == 3 + + # Enqueue spans + producer_event = events[0] + assert producer_event["contexts"]["trace"]["trace_id"] == transaction.trace_id + assert producer_event["contexts"]["trace"]["origin"] == "manual" + + spans = producer_event["spans"] + assert len(spans) == 2 + assert spans[0]["op"] == "queue.submit.huey" + assert spans[0]["description"] == "Huey Chord" + assert spans[1]["op"] == "queue.submit.huey" + assert spans[1]["description"] == "task1" + + task1_event = events[1] + # Confirm the first task enqueued the chord callback + assert len(task1_event["spans"]) == 1 + assert task1_event["spans"][0]["op"] == "queue.submit.huey" + assert task1_event["spans"][0]["description"] == "task2" + assert task1_event["type"] == "transaction" + assert task1_event["transaction"] == "task1" + assert task1_event["transaction_info"] == {"source": "task"} + assert task1_event["contexts"]["trace"]["op"] == "queue.task.huey" + assert task1_event["contexts"]["trace"]["origin"] == "auto.queue.huey" + assert task1_event["contexts"]["trace"]["status"] == "ok" + assert task1_event["contexts"]["trace"]["trace_id"] == transaction.trace_id + assert "huey_task_id" in task1_event["tags"] + assert task1_event["tags"]["huey_task_retry"] is False + + task2_event = events[2] + assert task2_event["type"] == "transaction" + assert task2_event["transaction"] == "task2" + assert task2_event["transaction_info"] == {"source": "task"} + assert task2_event["contexts"]["trace"]["op"] == "queue.task.huey" + assert task2_event["contexts"]["trace"]["origin"] == "auto.queue.huey" + assert task2_event["contexts"]["trace"]["status"] == "ok" + assert task2_event["contexts"]["trace"]["trace_id"] == transaction.trace_id + assert "huey_task_id" in task2_event["tags"] + assert task2_event["tags"]["huey_task_retry"] is False From f815ea47310721a41cd483e59f253ca8c51d5f3d Mon Sep 17 00:00:00 2001 From: Erica Pisani Date: Mon, 25 May 2026 11:17:00 -0400 Subject: [PATCH 05/11] Remove attributes we're not sure of for now. If needed, we can add them in later --- sentry_sdk/integrations/huey.py | 6 -- tests/integrations/huey/test_huey.py | 95 ++-------------------------- 2 files changed, 6 insertions(+), 95 deletions(-) diff --git a/sentry_sdk/integrations/huey.py b/sentry_sdk/integrations/huey.py index 9eada7bbf3..a3e119a6ad 100644 --- a/sentry_sdk/integrations/huey.py +++ b/sentry_sdk/integrations/huey.py @@ -211,12 +211,6 @@ def _sentry_execute( "messaging.message.system": "huey", "messaging.message.retry.count": (task.default_retries or 0) - task.retries, - "messaging.message.args": task.args - if should_send_default_pii() - else SENSITIVE_DATA_SUBSTITUTE, - "messaging.message.kwargs": task.kwargs - if should_send_default_pii() - else SENSITIVE_DATA_SUBSTITUTE, }, ) else: diff --git a/tests/integrations/huey/test_huey.py b/tests/integrations/huey/test_huey.py index 83a794b00c..413e84ba05 100644 --- a/tests/integrations/huey/test_huey.py +++ b/tests/integrations/huey/test_huey.py @@ -25,11 +25,11 @@ @pytest.fixture def init_huey(sentry_init): - def inner(has_span_streaming=None, send_default_pii=True): + def inner(has_span_streaming=None): sentry_init( integrations=[HueyIntegration()], traces_sample_rate=1.0, - send_default_pii=send_default_pii, + send_default_pii=True, _experiments={"trace_lifecycle": "stream"} if has_span_streaming else {}, ) @@ -281,15 +281,12 @@ def propagated_trace_task(): assert event["contexts"]["trace"]["origin"] == "auto.queue.huey" -@pytest.mark.parametrize("pii_enabled", [True, False]) @pytest.mark.parametrize("has_span_streaming", [True, False]) @pytest.mark.skipif(HUEY_VERSION < (3, 0), reason="group was added in 3.0") def test_huey_enqueue_group( - init_huey, capture_events, capture_items, pii_enabled, has_span_streaming + init_huey, capture_events, capture_items, has_span_streaming ): - huey = init_huey( - has_span_streaming=has_span_streaming, send_default_pii=pii_enabled - ) + huey = init_huey(has_span_streaming=has_span_streaming) @huey.task() def task1(): @@ -360,25 +357,6 @@ def task2(): assert task1_execute_span["attributes"]["messaging.message.id"] is not None assert task1_execute_span["attributes"]["messaging.message.retry.count"] == 0 - if pii_enabled: - assert ( - task1_execute_span["attributes"]["messaging.message.args"] - != SENSITIVE_DATA_SUBSTITUTE - ) - assert ( - task1_execute_span["attributes"]["messaging.message.kwargs"] - != SENSITIVE_DATA_SUBSTITUTE - ) - else: - assert ( - task1_execute_span["attributes"]["messaging.message.args"] - == SENSITIVE_DATA_SUBSTITUTE - ) - assert ( - task1_execute_span["attributes"]["messaging.message.kwargs"] - == SENSITIVE_DATA_SUBSTITUTE - ) - assert task2_execute_span["name"] == "task2" assert task2_execute_span["status"] == "ok" assert task2_execute_span["parent_span_id"] == task2_enqueue_span["span_id"] @@ -389,25 +367,6 @@ def task2(): task2_execute_span["attributes"]["sentry.span.source"] == SegmentSource.TASK ) - if pii_enabled: - assert ( - task2_execute_span["attributes"]["messaging.message.args"] - != SENSITIVE_DATA_SUBSTITUTE - ) - assert ( - task2_execute_span["attributes"]["messaging.message.kwargs"] - != SENSITIVE_DATA_SUBSTITUTE - ) - else: - assert ( - task2_execute_span["attributes"]["messaging.message.args"] - == SENSITIVE_DATA_SUBSTITUTE - ) - assert ( - task2_execute_span["attributes"]["messaging.message.kwargs"] - == SENSITIVE_DATA_SUBSTITUTE - ) - else: events = capture_events() with start_transaction() as transaction: @@ -450,15 +409,12 @@ def task2(): assert consumer_event["tags"]["huey_task_retry"] is False -@pytest.mark.parametrize("pii_enabled", [True, False]) @pytest.mark.parametrize("has_span_streaming", [True, False]) @pytest.mark.skipif(HUEY_VERSION < (3, 0), reason="chord was added in 3.0") def test_huey_enqueue_chord( - init_huey, capture_events, capture_items, pii_enabled, has_span_streaming + init_huey, capture_events, capture_items, has_span_streaming ): - huey = init_huey( - has_span_streaming=has_span_streaming, send_default_pii=pii_enabled - ) + huey = init_huey(has_span_streaming=has_span_streaming) @huey.task() def task1(): @@ -514,25 +470,6 @@ def task2(results): assert ( task1_execute_span["attributes"]["sentry.span.source"] == SegmentSource.TASK ) - if pii_enabled: - assert ( - task1_execute_span["attributes"]["messaging.message.args"] - != SENSITIVE_DATA_SUBSTITUTE - ) - assert ( - task1_execute_span["attributes"]["messaging.message.kwargs"] - != SENSITIVE_DATA_SUBSTITUTE - ) - else: - assert ( - task1_execute_span["attributes"]["messaging.message.args"] - == SENSITIVE_DATA_SUBSTITUTE - ) - assert ( - task1_execute_span["attributes"]["messaging.message.kwargs"] - == SENSITIVE_DATA_SUBSTITUTE - ) - # chord callback (task2) is enqueued during task1's execution assert task2_enqueue_span["name"] == "task2" assert task2_enqueue_span["status"] == "ok" @@ -550,26 +487,6 @@ def task2(results): assert ( task2_execute_span["attributes"]["sentry.span.source"] == SegmentSource.TASK ) - - if pii_enabled: - assert ( - task2_execute_span["attributes"]["messaging.message.args"] - != SENSITIVE_DATA_SUBSTITUTE - ) - assert ( - task2_execute_span["attributes"]["messaging.message.kwargs"] - != SENSITIVE_DATA_SUBSTITUTE - ) - else: - assert ( - task2_execute_span["attributes"]["messaging.message.args"] - == SENSITIVE_DATA_SUBSTITUTE - ) - assert ( - task2_execute_span["attributes"]["messaging.message.kwargs"] - == SENSITIVE_DATA_SUBSTITUTE - ) - else: events = capture_events() with start_transaction() as transaction: From e1e73bef94c08093d7fff97de70d79c4a5522c4c Mon Sep 17 00:00:00 2001 From: Erica Pisani Date: Mon, 25 May 2026 11:22:41 -0400 Subject: [PATCH 06/11] style(huey): Remove unused imports in test file Co-Authored-By: Claude Sonnet 4.6 --- tests/integrations/huey/test_huey.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/integrations/huey/test_huey.py b/tests/integrations/huey/test_huey.py index 413e84ba05..304182c4fc 100644 --- a/tests/integrations/huey/test_huey.py +++ b/tests/integrations/huey/test_huey.py @@ -7,10 +7,8 @@ import sentry_sdk from sentry_sdk import start_transaction -from sentry_sdk._types import SENSITIVE_DATA_SUBSTITUTE from sentry_sdk.consts import OP from sentry_sdk.integrations.huey import HueyIntegration -from sentry_sdk.scope import should_send_default_pii from sentry_sdk.traces import SegmentSource, SpanStatus from sentry_sdk.utils import parse_version From 60abc3bef2a91c90df1e671dcce6dda623d8514b Mon Sep 17 00:00:00 2001 From: Erica Pisani Date: Mon, 25 May 2026 11:30:03 -0400 Subject: [PATCH 07/11] ensure that spans created in the execute method are always segments --- sentry_sdk/integrations/huey.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sentry_sdk/integrations/huey.py b/sentry_sdk/integrations/huey.py index a3e119a6ad..426430e35f 100644 --- a/sentry_sdk/integrations/huey.py +++ b/sentry_sdk/integrations/huey.py @@ -212,6 +212,7 @@ def _sentry_execute( "messaging.message.retry.count": (task.default_retries or 0) - task.retries, }, + parent_span=None, ) else: transaction = continue_trace( From 7d33b81bfef3bedcb1e21334b5ac9b978a9fd8d7 Mon Sep 17 00:00:00 2001 From: Erica Pisani Date: Mon, 25 May 2026 12:17:05 -0400 Subject: [PATCH 08/11] improve test --- tests/integrations/huey/test_huey.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/tests/integrations/huey/test_huey.py b/tests/integrations/huey/test_huey.py index 304182c4fc..b479dd4b4f 100644 --- a/tests/integrations/huey/test_huey.py +++ b/tests/integrations/huey/test_huey.py @@ -158,6 +158,24 @@ def retry_task(context): assert execute_span["name"] == "retry_task" assert execute_span["status"] == SpanStatus.OK assert len(huey) == 1 + + task = huey.dequeue() + huey.execute(task) + + sentry_sdk.get_client().flush() + + all_payloads = [i.payload for i in items] + task_spans = [ + p + for p in all_payloads + if p["attributes"]["sentry.op"] == OP.QUEUE_TASK_HUEY + ] + assert len(task_spans) == 2 + retry_span = task_spans[1] + assert retry_span["is_segment"] + assert retry_span["name"] == "retry_task" + assert retry_span["status"] == SpanStatus.OK + assert len(huey) == 0 else: events = capture_events() result = execute_huey_task(huey, retry_task, context) From 97fb86c38b2d0388af92d9e035bcc8df9fcf4517 Mon Sep 17 00:00:00 2001 From: Erica Pisani Date: Mon, 25 May 2026 14:11:37 -0400 Subject: [PATCH 09/11] fix status overwrite error for control flow exceptions. update tests to test order of spans generated since that's stable --- sentry_sdk/utils.py | 13 +++ tests/integrations/huey/test_huey.py | 126 ++++++++++++++++++++------- 2 files changed, 107 insertions(+), 32 deletions(-) diff --git a/sentry_sdk/utils.py b/sentry_sdk/utils.py index aa13a98e94..e93e27a2f1 100644 --- a/sentry_sdk/utils.py +++ b/sentry_sdk/utils.py @@ -32,6 +32,13 @@ except ImportError: AIOHttpHttpException = None +try: + from huey.exceptions import CancelExecution, RetryTask, TaskLockedException + + HueyControlFlowExceptions = (CancelExecution, RetryTask, TaskLockedException) +except ImportError: + HueyControlFlowExceptions = None + from typing import TYPE_CHECKING import sentry_sdk @@ -1994,6 +2001,12 @@ def should_be_treated_as_error(ty: "Any", value: "Any") -> bool: if AIOHttpHttpException and isinstance(value, AIOHttpHttpException): return False + # Huey also has exceptions that are raised for control flow reasons, not + # because there's an actual error. This check, similar to the aiohttp one above, + # is to prevent accidentally overwriting a status of "ok" with "error" + if HueyControlFlowExceptions and isinstance(value, HueyControlFlowExceptions): + return False + return True diff --git a/tests/integrations/huey/test_huey.py b/tests/integrations/huey/test_huey.py index b479dd4b4f..c7b9e0469c 100644 --- a/tests/integrations/huey/test_huey.py +++ b/tests/integrations/huey/test_huey.py @@ -3,7 +3,7 @@ import pytest from huey import __version__ as HUEY_VERSION from huey.api import MemoryHuey, Result -from huey.exceptions import RetryTask +from huey.exceptions import CancelExecution, RetryTask import sentry_sdk from sentry_sdk import start_transaction @@ -93,13 +93,12 @@ def division(a, b): sentry_sdk.get_client().flush() payloads = [i.payload for i in items] - (execute_span,) = [ - # Searching for this span specifically because this is what has the raised exception - p - for p in payloads - if p["attributes"]["sentry.op"] == OP.QUEUE_TASK_HUEY - ] + assert len(payloads) == 2 + enqueue_span, execute_span = payloads + + assert enqueue_span["attributes"]["sentry.op"] == OP.QUEUE_SUBMIT_HUEY assert execute_span["is_segment"] + assert execute_span["attributes"]["sentry.op"] == OP.QUEUE_TASK_HUEY assert execute_span["name"] == "division" assert execute_span["status"] == ( SpanStatus.ERROR if task_fails else SpanStatus.OK @@ -148,15 +147,21 @@ def retry_task(context): sentry_sdk.get_client().flush() payloads = [i.payload for i in items] - (execute_span,) = [ - # Searching for this span specifically because this is what has the raised exception - p - for p in payloads - if p["attributes"]["sentry.op"] == OP.QUEUE_TASK_HUEY - ] + assert len(payloads) == 3 + + enqueue_span, re_enqueue_span, execute_span = payloads + + assert enqueue_span["attributes"]["sentry.op"] == OP.QUEUE_SUBMIT_HUEY + assert enqueue_span["is_segment"] + + assert re_enqueue_span["attributes"]["sentry.op"] == OP.QUEUE_SUBMIT_HUEY + assert not re_enqueue_span["is_segment"] + + assert execute_span["attributes"]["sentry.op"] == OP.QUEUE_TASK_HUEY assert execute_span["is_segment"] assert execute_span["name"] == "retry_task" assert execute_span["status"] == SpanStatus.OK + assert len(huey) == 1 task = huey.dequeue() @@ -165,13 +170,10 @@ def retry_task(context): sentry_sdk.get_client().flush() all_payloads = [i.payload for i in items] - task_spans = [ - p - for p in all_payloads - if p["attributes"]["sentry.op"] == OP.QUEUE_TASK_HUEY - ] - assert len(task_spans) == 2 - retry_span = task_spans[1] + + assert len(all_payloads) == 4 + retry_span = all_payloads[3] + assert retry_span["is_segment"] assert retry_span["name"] == "retry_task" assert retry_span["status"] == SpanStatus.OK @@ -194,10 +196,50 @@ def retry_task(context): assert len(huey) == 0 +@pytest.mark.parametrize( + "has_span_streaming", [True, False], ids=["streaming", "no_streaming"] +) +def test_task_cancel_does_not_override_status( + capture_events, capture_items, init_huey, has_span_streaming +): + huey = init_huey(has_span_streaming=has_span_streaming) + + @huey.task() + def cancel_task(): + raise CancelExecution() + + if has_span_streaming: + items = capture_items("span") + execute_huey_task(huey, cancel_task) + sentry_sdk.get_client().flush() + + payloads = [i.payload for i in items] + assert len(payloads) == 2 + enqueue_span, execute_span = payloads + + assert enqueue_span["attributes"]["sentry.op"] == OP.QUEUE_SUBMIT_HUEY + assert execute_span["attributes"]["sentry.op"] == OP.QUEUE_TASK_HUEY + assert execute_span["is_segment"] + assert execute_span["name"] == "cancel_task" + assert execute_span["status"] == SpanStatus.OK + else: + events = capture_events() + execute_huey_task(huey, cancel_task) + + (event,) = events + assert event["transaction"] == "cancel_task" + assert event["contexts"]["trace"]["status"] == "aborted" + + @pytest.mark.parametrize("lock_name", ["lock.a", "lock.b"], ids=["locked", "unlocked"]) +@pytest.mark.parametrize( + "has_span_streaming", [True, False], ids=["streaming", "no_streaming"] +) @pytest.mark.skipif(HUEY_VERSION < (2, 5), reason="is_locked was added in 2.5") -def test_task_lock(capture_events, init_huey, lock_name): - huey = init_huey() +def test_task_lock( + capture_events, capture_items, init_huey, lock_name, has_span_streaming +): + huey = init_huey(has_span_streaming=has_span_streaming) task_lock_name = "lock.a" should_be_locked = task_lock_name == lock_name @@ -207,19 +249,39 @@ def test_task_lock(capture_events, init_huey, lock_name): def maybe_locked_task(): pass - events = capture_events() + if has_span_streaming: + items = capture_items("span") + with huey.lock_task(lock_name): + assert huey.is_locked(task_lock_name) == should_be_locked + execute_huey_task(huey, maybe_locked_task) + sentry_sdk.get_client().flush() - with huey.lock_task(lock_name): - assert huey.is_locked(task_lock_name) == should_be_locked - result = execute_huey_task(huey, maybe_locked_task) + payloads = [i.payload for i in items] + assert len(payloads) == 2 + enqueue_span, execute_span = payloads - (event,) = events + assert enqueue_span["attributes"]["sentry.op"] == OP.QUEUE_SUBMIT_HUEY + assert execute_span["attributes"]["sentry.op"] == OP.QUEUE_TASK_HUEY - assert event["transaction"] == "maybe_locked_task" - assert event["tags"]["huey_task_id"] == result.task.id - assert ( - event["contexts"]["trace"]["status"] == "aborted" if should_be_locked else "ok" - ) + assert execute_span["is_segment"] + assert execute_span["name"] == "maybe_locked_task" + assert execute_span["status"] == SpanStatus.OK + else: + events = capture_events() + + with huey.lock_task(lock_name): + assert huey.is_locked(task_lock_name) == should_be_locked + result = execute_huey_task(huey, maybe_locked_task) + + (event,) = events + + assert event["transaction"] == "maybe_locked_task" + assert event["tags"]["huey_task_id"] == result.task.id + assert ( + event["contexts"]["trace"]["status"] == "aborted" + if should_be_locked + else "ok" + ) assert len(huey) == 0 From 099ae1e1a669875716eaaa18d6554f3f485c2bc3 Mon Sep 17 00:00:00 2001 From: Erica Pisani Date: Tue, 26 May 2026 11:16:26 -0400 Subject: [PATCH 10/11] Push empty commit to trigger workflows From 2814292ed3005930890af3ec21e96dfc81e38cb5 Mon Sep 17 00:00:00 2001 From: Erica Pisani Date: Tue, 26 May 2026 11:58:29 -0400 Subject: [PATCH 11/11] Update test for CancelExecution to handle the different behaviour that huey has in earlier versions that results in more events being emitted than a later version) --- tests/integrations/huey/test_huey.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/integrations/huey/test_huey.py b/tests/integrations/huey/test_huey.py index c7b9e0469c..de1aefe040 100644 --- a/tests/integrations/huey/test_huey.py +++ b/tests/integrations/huey/test_huey.py @@ -226,7 +226,10 @@ def cancel_task(): events = capture_events() execute_huey_task(huey, cancel_task) - (event,) = events + if HUEY_VERSION < (3, 0, 1): + (event, _) = events + else: + (event,) = events assert event["transaction"] == "cancel_task" assert event["contexts"]["trace"]["status"] == "aborted"