@@ -93,6 +93,7 @@ def __init__(
9393 logger .debug ("Exporter Thread %s started" , thread .ident )
9494 # Flow Log Span ID mapping to children Spans that must be uploaded first
9595 self ._spans_left_in_trace : dict [int , set [int ]] = {}
96+ self ._traces : list [set [str ]] = []
9697
9798 def export (self , spans : Sequence [ReadableSpan ]) -> SpanExportResult :
9899 if self ._shutdown :
@@ -144,7 +145,7 @@ def _do_work(self):
144145 # Do work while the Exporter was not instructed to
145146 # wind down or the queue is not empty
146147 while self ._upload_queue .qsize () > 0 or not self ._shutdown :
147- thread_args : tuple [ReadableSpan , EvaluationContext | None ] # type: ignore
148+ thread_args : tuple [ReadableSpan , Optional [ EvaluationContext ] ] # type: ignore
148149 try :
149150 # Don't block or the thread will never be notified of the shutdown
150151 thread_args = self ._upload_queue .get (
@@ -234,8 +235,7 @@ def _export_prompt_span(self, span: ReadableSpan) -> None:
234235 path : str = file_object ["path" ]
235236 prompt : PromptKernelRequestParams = file_object ["prompt" ]
236237
237- span_parent_id = span .parent .span_id if span .parent else None
238- trace_parent_id = self ._span_to_uploaded_log_id [span_parent_id ] if span_parent_id else None
238+ trace_parent_id = self ._get_parent_in_trace (span )
239239
240240 if "attributes" not in prompt or not prompt ["attributes" ]:
241241 prompt ["attributes" ] = {}
@@ -248,6 +248,8 @@ def _export_prompt_span(self, span: ReadableSpan) -> None:
248248 trace_parent_id = trace_parent_id ,
249249 )
250250 self ._span_to_uploaded_log_id [span .context .span_id ] = log_response .id
251+ if trace_parent_id is not None :
252+ self ._keep_track_of_trace (log_response .id , trace_parent_id )
251253 except HumanloopApiError :
252254 self ._span_to_uploaded_log_id [span .context .span_id ] = None
253255 self ._mark_span_as_uploaded (span_id = span .context .span_id )
@@ -265,9 +267,6 @@ def _export_tool_span(self, span: ReadableSpan) -> None:
265267 path : str = file_object ["path" ]
266268 tool : ToolKernelRequestParams = file_object ["tool" ]
267269
268- span_parent_id = span .parent .span_id if span .parent else None
269- trace_parent_id = self ._span_to_uploaded_log_id [span_parent_id ] if span_parent_id else None
270-
271270 # API expects an empty dictionary if user does not supply attributes
272271 # NOTE: see comment in _export_prompt_span about OTEL conventions
273272 if not tool .get ("attributes" ):
@@ -277,6 +276,7 @@ def _export_tool_span(self, span: ReadableSpan) -> None:
277276 if "parameters" in tool ["function" ] and "properties" not in tool ["function" ]["parameters" ]:
278277 tool ["function" ]["parameters" ]["properties" ] = {}
279278
279+ trace_parent_id = self ._get_parent_in_trace (span )
280280 try :
281281 log_response = self ._client .tools .log (
282282 path = path ,
@@ -285,6 +285,8 @@ def _export_tool_span(self, span: ReadableSpan) -> None:
285285 trace_parent_id = trace_parent_id ,
286286 )
287287 self ._span_to_uploaded_log_id [span .context .span_id ] = log_response .id
288+ if trace_parent_id is not None :
289+ self ._keep_track_of_trace (log_response .id , trace_parent_id )
288290 except HumanloopApiError :
289291 self ._span_to_uploaded_log_id [span .context .span_id ] = None
290292 self ._mark_span_as_uploaded (span_id = span .context .span_id )
@@ -320,8 +322,7 @@ def _export_flow_span(self, span: ReadableSpan) -> None:
320322 else :
321323 flow = file_object ["flow" ]
322324
323- span_parent_id = span .parent .span_id if span .parent else None
324- trace_parent_id = self ._span_to_uploaded_log_id [span_parent_id ] if span_parent_id else None
325+ trace_parent_id = self ._get_parent_in_trace (span )
325326
326327 if "output" not in log_object :
327328 log_object ["output" ] = None
@@ -332,6 +333,13 @@ def _export_flow_span(self, span: ReadableSpan) -> None:
332333 ** log_object ,
333334 trace_parent_id = trace_parent_id ,
334335 )
336+ if trace_parent_id is not None :
337+ self ._keep_track_of_trace (
338+ log_id = log_response .id ,
339+ parent_log_id = trace_parent_id ,
340+ )
341+ # Exporting a flow log creates a new trace
342+ self ._traces .append ({log_response .id })
335343 self ._span_to_uploaded_log_id [span .get_span_context ().span_id ] = log_response .id
336344 except HumanloopApiError as e :
337345 logger .error (str (e ))
@@ -364,3 +372,20 @@ def _mark_trace_complete_if_needed(self, trace_head_span_id: int):
364372 )
365373 else :
366374 self ._client .flows .update_log (log_id = flow_log_id , trace_status = "complete" )
375+
376+ def _keep_track_of_trace (self , log_id : str , parent_log_id : str ):
377+ for trace in self ._traces :
378+ if parent_log_id in trace :
379+ trace .add (log_id )
380+ found = True
381+ if found :
382+ break
383+
384+ def _get_parent_in_trace (self , span : ReadableSpan ) -> Optional [str ]:
385+ if span .parent is None :
386+ return None
387+ parent_log_id = self ._span_to_uploaded_log_id [span .parent .span_id ]
388+ for trace in self ._traces :
389+ if parent_log_id in trace :
390+ return parent_log_id
391+ return None
0 commit comments