Skip to content

Commit fb9def2

Browse files
ericapisaniclaude
andauthored
feat(huey): Migrate Huey integration to spans-first tracing (#6399)
Builds on a bug fix (PY-2426/ #6392 ) Migrates the Huey integration to support the new spans-first tracing lifecycle (`trace_lifecycle: "stream"`), while keeping full backwards compatibility with the existing transaction-based path. Fixes PY-2331 and #6029 --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent a17274f commit fb9def2

3 files changed

Lines changed: 488 additions & 154 deletions

File tree

sentry_sdk/integrations/huey.py

Lines changed: 74 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,16 @@
77
from sentry_sdk.consts import OP, SPANSTATUS
88
from sentry_sdk.integrations import DidNotEnable, Integration
99
from sentry_sdk.scope import should_send_default_pii
10+
from sentry_sdk.traces import SegmentSource, SpanStatus, StreamedSpan
1011
from sentry_sdk.tracing import (
1112
BAGGAGE_HEADER_NAME,
1213
SENTRY_TRACE_HEADER_NAME,
1314
TransactionSource,
1415
)
16+
from sentry_sdk.tracing_utils import has_span_streaming_enabled
1517
from sentry_sdk.utils import (
1618
SENSITIVE_DATA_SUBSTITUTE,
19+
_register_control_flow_exception,
1720
capture_internal_exceptions,
1821
ensure_integration_enabled,
1922
event_from_exception,
@@ -53,14 +56,17 @@ class HueyIntegration(Integration):
5356
def setup_once() -> None:
5457
patch_enqueue()
5558
patch_execute()
59+
_register_control_flow_exception(
60+
[CancelExecution, RetryTask, TaskLockedException]
61+
)
5662

5763

5864
def patch_enqueue() -> None:
5965
old_enqueue = Huey.enqueue
6066

6167
@ensure_integration_enabled(HueyIntegration, old_enqueue)
6268
def _sentry_enqueue(
63-
self: "Huey", item: "Union[Task, HueyGroup, HueyChord]"
69+
self: "Huey", item: "Any"
6470
) -> "Optional[Union[Result, ResultGroup]]":
6571
if HueyChord is not None and isinstance(item, HueyChord):
6672
span_name = "Huey Chord"
@@ -69,16 +75,31 @@ def _sentry_enqueue(
6975
else:
7076
span_name = item.name
7177

72-
with sentry_sdk.start_span(
73-
op=OP.QUEUE_SUBMIT_HUEY,
74-
name=span_name,
75-
origin=HueyIntegration.origin,
76-
):
77-
if (
78-
not isinstance(item, PeriodicTask)
79-
and not (HueyGroup is not None and isinstance(item, HueyGroup))
80-
and not (HueyChord is not None and isinstance(item, HueyChord))
81-
):
78+
is_span_streaming_enabled = has_span_streaming_enabled(
79+
sentry_sdk.get_client().options
80+
)
81+
82+
span_ctx = None
83+
if is_span_streaming_enabled:
84+
span_ctx = sentry_sdk.traces.start_span(
85+
name=span_name,
86+
attributes={
87+
"sentry.op": OP.QUEUE_SUBMIT_HUEY,
88+
"sentry.origin": HueyIntegration.origin,
89+
},
90+
)
91+
else:
92+
span_ctx = sentry_sdk.start_span(
93+
op=OP.QUEUE_SUBMIT_HUEY,
94+
name=span_name,
95+
origin=HueyIntegration.origin,
96+
)
97+
98+
no_headers_types = (PeriodicTask,) + tuple(
99+
t for t in [HueyGroup, HueyChord] if t is not None
100+
)
101+
with span_ctx:
102+
if not isinstance(item, no_headers_types):
82103
# Attach trace propagation data to task kwargs. We do
83104
# not do this for periodic tasks, as these don't
84105
# really have an originating transaction.
@@ -124,12 +145,22 @@ def event_processor(event: "Event", hint: "Hint") -> "Optional[Event]":
124145

125146
def _capture_exception(exc_info: "ExcInfo") -> None:
126147
scope = sentry_sdk.get_current_scope()
148+
is_span_streaming_enabled = has_span_streaming_enabled(
149+
sentry_sdk.get_client().options
150+
)
127151

128152
if exc_info[0] in HUEY_CONTROL_FLOW_EXCEPTIONS:
129-
scope.transaction.set_status(SPANSTATUS.ABORTED)
153+
if not is_span_streaming_enabled:
154+
scope.transaction.set_status(SPANSTATUS.ABORTED)
155+
elif type(scope._span) is StreamedSpan:
156+
scope._span._segment.status = SpanStatus.OK
130157
return
131158

132-
scope.transaction.set_status(SPANSTATUS.INTERNAL_ERROR)
159+
if not is_span_streaming_enabled:
160+
scope.transaction.set_status(SPANSTATUS.INTERNAL_ERROR)
161+
elif type(scope._span) is StreamedSpan:
162+
scope._span._segment.status = SpanStatus.ERROR
163+
133164
event, hint = event_from_exception(
134165
exc_info,
135166
client_options=sentry_sdk.get_client().options,
@@ -167,21 +198,42 @@ def _sentry_execute(
167198
scope.add_event_processor(_make_event_processor(task))
168199

169200
sentry_headers = task.kwargs.pop("sentry_headers", None)
170-
171-
transaction = continue_trace(
172-
sentry_headers or {},
173-
name=task.name,
174-
op=OP.QUEUE_TASK_HUEY,
175-
source=TransactionSource.TASK,
176-
origin=HueyIntegration.origin,
201+
is_span_streaming_enabled = has_span_streaming_enabled(
202+
sentry_sdk.get_client().options
177203
)
178-
transaction.set_status(SPANSTATUS.OK)
204+
205+
if is_span_streaming_enabled:
206+
headers = sentry_headers or {}
207+
sentry_sdk.traces.continue_trace(headers)
208+
span_ctx = sentry_sdk.traces.start_span(
209+
name=task.name,
210+
attributes={
211+
"sentry.op": OP.QUEUE_TASK_HUEY,
212+
"sentry.origin": HueyIntegration.origin,
213+
"sentry.span.source": SegmentSource.TASK,
214+
"messaging.message.id": task.id,
215+
"messaging.message.system": "huey",
216+
"messaging.message.retry.count": (task.default_retries or 0)
217+
- task.retries,
218+
},
219+
parent_span=None,
220+
)
221+
else:
222+
transaction = continue_trace(
223+
sentry_headers or {},
224+
name=task.name,
225+
op=OP.QUEUE_TASK_HUEY,
226+
source=TransactionSource.TASK,
227+
origin=HueyIntegration.origin,
228+
)
229+
transaction.set_status(SPANSTATUS.OK)
230+
span_ctx = sentry_sdk.start_transaction(transaction)
179231

180232
if not getattr(task, "_sentry_is_patched", False):
181233
task.execute = _wrap_task_execute(task.execute)
182234
task._sentry_is_patched = True
183235

184-
with sentry_sdk.start_transaction(transaction):
236+
with span_ctx:
185237
return old_execute(self, task, timestamp)
186238

187239
Huey._execute = _sentry_execute

sentry_sdk/utils.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@
9090

9191
# These exceptions won't set the span status to error if they occur. Use
9292
# register_control_flow_exception to add to this list
93-
_control_flow_exception_classes: "list[type]" = []
93+
_control_flow_exception_classes: "set[type]" = set()
9494

9595

9696
def is_internal_task() -> bool:
@@ -1982,8 +1982,13 @@ def get_current_thread_meta(
19821982
return None, None
19831983

19841984

1985-
def _register_control_flow_exception(exc_type: type) -> None:
1986-
_control_flow_exception_classes.append(exc_type)
1985+
def _register_control_flow_exception(
1986+
exc_type: "Union[type, list[type], tuple[type], set[type]]",
1987+
) -> None:
1988+
if isinstance(exc_type, (list, tuple, set)):
1989+
_control_flow_exception_classes.update(exc_type)
1990+
else:
1991+
_control_flow_exception_classes.add(exc_type)
19871992

19881993

19891994
def should_be_treated_as_error(ty: "Any", value: "Any") -> bool:

0 commit comments

Comments
 (0)