Skip to content

Commit a44a987

Browse files
author
Andrei Bratu
committed
Bug fixing
1 parent 58a1b95 commit a44a987

File tree

2 files changed

+115
-30
lines changed

2 files changed

+115
-30
lines changed

src/humanloop/otel/exporter.py

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,9 @@ def is_evaluated_file(
108108
),
109109
)
110110
logger.debug(
111-
"Span %s with EvaluationContext %s added to upload queue",
112-
span.attributes,
111+
"[HumanloopSpanExporter] Span %s %s with EvaluationContext %s added to upload queue",
112+
span.context.span_id,
113+
span.name,
113114
evaluation_context_copy,
114115
)
115116
# Reset the EvaluationContext so run eval does not
@@ -119,24 +120,24 @@ def is_evaluated_file(
119120
evaluation_context,
120121
):
121122
logger.debug(
122-
"EvaluationContext %s marked as exhausted for Log in Span %s",
123+
"[HumanloopSpanExporter] EvaluationContext %s marked as exhausted for Log in Span %s",
123124
evaluation_context,
124125
spans[0].attributes,
125126
)
126127
# Mark the EvaluationContext as used
127128
self._client.evaluation_context_variable.set(None)
128129
return SpanExportResult.SUCCESS
129130
else:
130-
logger.warning("HumanloopSpanExporter is shutting down, not accepting new spans")
131+
logger.warning("[HumanloopSpanExporter] Shutting down, not accepting new spans")
131132
return SpanExportResult.FAILURE
132133

133134
def shutdown(self) -> None:
134135
self._shutdown = True
135136
for thread in self._threads:
136137
thread.join()
137-
logger.debug("Exporter Thread %s joined", thread.ident)
138+
logger.debug("[HumanloopSpanExporter] Exporter Thread %s joined", thread.ident)
138139

139-
def force_flush(self, timeout_millis: int = 3000) -> bool:
140+
def force_flush(self, timeout_millis: int = 10000) -> bool:
140141
self._shutdown = True
141142
for thread in self._threads:
142143
thread.join(timeout=timeout_millis)
@@ -179,14 +180,20 @@ def _do_work(self):
179180
# Span is not part of a Flow Log
180181
self._export_span_dispatch(span_to_export)
181182
logger.debug(
182-
"_do_work on Thread %s: Starting to upload span %s",
183+
"[HumanloopSpanExporter] _do_work on Thread %s: Dispatching span %s %s",
183184
threading.get_ident(),
184-
span_to_export,
185+
span_to_export.context.span_id,
186+
span_to_export.name,
185187
)
186188
elif span_to_export.parent.span_id in self._span_id_to_uploaded_log_id:
187189
# Span is part of a Flow and its parent has been uploaded
188190
self._export_span_dispatch(span_to_export)
189-
logger.debug("_do_work on Thread %s: Starting to upload span %s", threading.get_ident(), span_to_export)
191+
logger.debug(
192+
"[HumanloopSpanExporter] _do_work on Thread %s: Dispatching span %s %s",
193+
threading.get_ident(),
194+
span_to_export.context.span_id,
195+
span_to_export.name,
196+
)
190197
else:
191198
# Requeue the Span and upload after its parent
192199
self._upload_queue.put((span_to_export, evaluation_context))
@@ -200,16 +207,28 @@ def _complete_flow_log(self, span_id: int) -> None:
200207
# All logs in the Trace have been uploaded, mark the Flow Log as complete
201208
flow_log_id = self._span_id_to_uploaded_log_id[flow_log_span_id]
202209
self._client.flows.update_log(log_id=flow_log_id, trace_status="complete")
203-
break
210+
break
204211

205212
def _export_span_dispatch(self, span: ReadableSpan) -> None:
206213
hl_file = read_from_opentelemetry_span(span, key=HUMANLOOP_FILE_KEY)
207214
file_type = span._attributes.get(HUMANLOOP_FILE_TYPE_KEY) # type: ignore
208215
parent_span_id = span.parent.span_id if span.parent else None
209216

210217
while parent_span_id and self._span_id_to_uploaded_log_id.get(parent_span_id) is None:
218+
logger.debug(
219+
"[HumanloopSpanExporter] Span %s %s waiting for parent %s to be uploaded",
220+
span.context.span_id,
221+
span.name,
222+
parent_span_id,
223+
)
211224
time.sleep(0.1)
212225

226+
logger.debug(
227+
"[HumanloopSpanExporter] Exporting span %s with file type %s",
228+
span,
229+
file_type,
230+
)
231+
213232
if file_type == "prompt":
214233
export_func = self._export_prompt
215234
elif file_type == "tool":

src/humanloop/otel/processor.py

Lines changed: 86 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1+
from concurrent.futures import ThreadPoolExecutor
12
import logging
23
from collections import defaultdict
3-
from typing import Any
4+
import time
5+
from typing import Any, TypedDict
46

57
from opentelemetry.sdk.trace import ReadableSpan
68
from opentelemetry.sdk.trace.export import SimpleSpanProcessor, SpanExporter
@@ -23,6 +25,11 @@
2325
logger = logging.getLogger("humanloop.sdk")
2426

2527

28+
class CompletableSpan(TypedDict):
29+
span: ReadableSpan
30+
complete: bool
31+
32+
2633
class HumanloopSpanProcessor(SimpleSpanProcessor):
2734
"""Enrich Humanloop spans with data from their children spans.
2835
@@ -42,43 +49,93 @@ class HumanloopSpanProcessor(SimpleSpanProcessor):
4249

4350
def __init__(self, exporter: SpanExporter) -> None:
4451
super().__init__(exporter)
45-
# Span parent to Span children map
46-
self._children: dict[int, list] = defaultdict(list)
52+
# span parent to span children map
53+
self._children: dict[int, list[CompletableSpan]] = defaultdict(list)
54+
# List of all span IDs that are contained in a Flow trace
55+
# They are passed to the Exporter as a span attribute
56+
# so the Exporter knows when to complete a trace
4757
self._prerequisites: dict[int, list[int]] = {}
58+
self._executor = ThreadPoolExecutor(max_workers=4)
59+
60+
def shutdown(self):
61+
self._executor.shutdown()
62+
return super().shutdown()
4863

4964
def on_start(self, span, parent_context=None):
5065
span_id = span.context.span_id
66+
parent_span_id = span.parent.span_id if span.parent else None
5167
if span.name == "humanloop.flow":
5268
self._prerequisites[span_id] = []
53-
if span.parent and is_humanloop_span(span):
54-
parent_span_id = span.parent.span_id
69+
if parent_span_id and is_humanloop_span(span):
5570
for trace_head, all_trace_nodes in self._prerequisites.items():
5671
if parent_span_id == trace_head or parent_span_id in all_trace_nodes:
5772
all_trace_nodes.append(span_id)
73+
break
74+
# Handle stream case: when Prompt instrumented function calls a provider with streaming: true
75+
# The instrumentor span will end only when the ChunksResponse is consumed, which can happen
76+
# after the span created by the Prompt utility finishes. To handle this, we register all instrumentor
77+
# spans belonging to a Humanloop span, and their parent will wait for them to complete in onEnd before
78+
# exporting the Humanloop span.
79+
if parent_span_id and _is_instrumentor_span(span):
80+
if parent_span_id not in self._children:
81+
self._children[parent_span_id] = []
82+
self._children[parent_span_id].append(
83+
{
84+
"span": span,
85+
"complete": False,
86+
}
87+
)
5888

5989
def on_end(self, span: ReadableSpan) -> None:
6090
if is_humanloop_span(span=span):
61-
_process_span_dispatch(span, self._children[span.context.span_id])
62-
# Release the reference to the Spans as they've already
63-
# been sent to the Exporter
64-
del self._children[span.context.span_id]
91+
# Wait for children to complete asynchronously
92+
self._executor.submit(self._wait_for_children, span=span)
93+
elif span.parent is not None and _is_instrumentor_span(span):
94+
# If this is one of the children spans waited upon, update its completion status
95+
96+
# Updating the child span status
97+
self._children[span.parent.span_id] = [
98+
child if child["span"].context.span_id != span.context.span_id else {"span": span, "complete": True}
99+
for child in self._children[span.parent.span_id]
100+
]
101+
102+
# Export the instrumentor span
103+
self.span_exporter.export([span])
65104
else:
66-
if span.parent is not None and _is_instrumentor_span(span):
67-
# Copy the Span and keep it until the Humanloop Span
68-
# arrives in order to enrich it
69-
self._children[span.parent.span_id].append(span)
70-
# Pass the Span to the Exporter
105+
# Unknown span, pass it to the Exporter
106+
self.span_exporter.export([span])
107+
108+
def _wait_for_children(self, span: ReadableSpan):
109+
"""Wait for all children spans to complete before processing the Humanloop span."""
110+
span_id = span.context.span_id
111+
while not all(child["complete"] for child in self._children[span_id]):
112+
logger.debug(
113+
"[HumanloopSpanProcessor] Span %s %s waiting for children to complete: %s",
114+
span_id,
115+
span.name,
116+
self._children[span_id],
117+
)
118+
time.sleep(0.1)
119+
# All instrumentor spans have arrived, we can process the
120+
# Humanloop parent span owning them
71121
if span.name == "humanloop.flow":
72122
write_to_opentelemetry_span(
73123
span=span,
74124
key=HUMANLOOP_FLOW_PREREQUISITES_KEY,
75-
value=self._prerequisites[span.context.span_id],
125+
value=self._prerequisites[span_id],
76126
)
127+
del self._prerequisites[span_id]
128+
logger.debug("[HumanloopSpanProcessor] Dispatching span %s %s", span_id, span.name)
129+
_process_span_dispatch(span, [child["span"] for child in self._children[span_id]])
130+
# Release references
131+
del self._children[span_id]
132+
# Pass Humanloop span to Exporter
133+
logger.debug("[HumanloopSpanProcessor] Sending span %s %s to exporter", span_id, span.name)
77134
self.span_exporter.export([span])
78135

79136

80137
def _is_instrumentor_span(span: ReadableSpan) -> bool:
81-
"""Determine if the Span contains information of interest for Spans created by Humanloop decorators."""
138+
"""Determine if the span contains information of interest for Spans created by Humanloop decorators."""
82139
# At the moment we only enrich Spans created by the Prompt decorators
83140
# As we add Instrumentors for other libraries, this function must
84141
# be expanded
@@ -104,7 +161,11 @@ def _process_span_dispatch(span: ReadableSpan, children_spans: list[ReadableSpan
104161
elif file_type == "flow":
105162
pass
106163
else:
107-
logger.error("Unknown Humanloop File Span %s", span)
164+
logger.error(
165+
"[HumanloopSpanProcessor] Unknown Humanloop File span %s %s",
166+
span.context.span_id,
167+
span.name,
168+
)
108169

109170

110171
def _process_prompt(prompt_span: ReadableSpan, children_spans: list[ReadableSpan]):
@@ -150,9 +211,14 @@ def _enrich_prompt_kernel(prompt_span: ReadableSpan, llm_provider_call_span: Rea
150211
# Validate the Prompt Kernel
151212
PromptKernelRequest.model_validate(obj=prompt)
152213
except PydanticValidationError as e:
153-
logger.error("Could not validate Prompt Kernel extracted from Span: %s", e)
154-
155-
# Write the enriched Prompt Kernel back to the Span
214+
logger.error(
215+
"[HumanloopSpanProcessor] Could not validate Prompt Kernel extracted from span: %s %s. Error: %s",
216+
prompt_span.context.span_id,
217+
prompt_span.name,
218+
e,
219+
)
220+
221+
# Write the enriched Prompt Kernel back to the span
156222
hl_file["prompt"] = prompt
157223
write_to_opentelemetry_span(
158224
span=prompt_span,

0 commit comments

Comments
 (0)