11import contextvars
2- import json
32import logging
43import threading
4+ import time
55import typing
66from queue import Empty as EmptyQueue
77from queue import Queue
1414
1515from humanloop .core import ApiError as HumanloopApiError
1616from humanloop .eval_utils .context import EVALUATION_CONTEXT_VARIABLE_NAME , EvaluationContext
17- from humanloop .otel import TRACE_FLOW_CONTEXT , FlowContext
17+ from humanloop .otel import TRACE_FLOW_CONTEXT
1818from humanloop .otel .constants import (
1919 HUMANLOOP_FILE_KEY ,
2020 HUMANLOOP_FILE_TYPE_KEY ,
21+ HUMANLOOP_FLOW_PREREQUISITES_KEY ,
2122 HUMANLOOP_LOG_KEY ,
2223 HUMANLOOP_PATH_KEY ,
2324)
2425from humanloop .otel .helpers import is_humanloop_span , read_from_opentelemetry_span
2526from humanloop .requests .flow_kernel_request import FlowKernelRequestParams
2627from humanloop .requests .prompt_kernel_request import PromptKernelRequestParams
28+ from humanloop .requests .tool_kernel_request import ToolKernelRequestParams
2729
2830if typing .TYPE_CHECKING :
2931 from humanloop .client import Humanloop
@@ -69,7 +71,8 @@ def __init__(
6971 for thread in self ._threads :
7072 thread .start ()
7173 logger .debug ("Exporter Thread %s started" , thread .ident )
72- self ._flow_logs_to_complete : list [str ] = []
74+ # Flow Log Span ID mapping to children Spans that must be uploaded first
75+ self ._flow_log_prerequisites : dict [int , set [int ]] = {}
7376
7477 def export (self , spans : trace .Sequence [ReadableSpan ]) -> SpanExportResult :
7578 def is_evaluated_file (
@@ -133,11 +136,6 @@ def shutdown(self) -> None:
133136 for thread in self ._threads :
134137 thread .join ()
135138 logger .debug ("Exporter Thread %s joined" , thread .ident )
136- for log_id in self ._flow_logs_to_complete :
137- self ._client .flows .update_log (
138- log_id = log_id ,
139- trace_status = "complete" ,
140- )
141139
142140 def force_flush (self , timeout_millis : int = 3000 ) -> bool :
143141 self ._shutdown = True
@@ -211,9 +209,22 @@ def _do_work(self):
211209 self ._upload_queue .put ((span_to_export , evaluation_context ))
212210 self ._upload_queue .task_done ()
213211
212+ def _complete_flow_log (self , span_id : int ) -> None :
213+ for flow_log_span_id , flow_children_span_ids in self ._flow_log_prerequisites .items ():
214+ if span_id in flow_children_span_ids :
215+ flow_children_span_ids .remove (span_id )
216+ if len (flow_children_span_ids ) == 0 :
217+ flow_log_id = self ._span_id_to_uploaded_log_id [flow_log_span_id ]
218+ self ._client .flows .update_log (log_id = flow_log_id , trace_status = "complete" )
219+ break
220+
214221 def _export_span_dispatch (self , span : ReadableSpan ) -> None :
215222 hl_file = read_from_opentelemetry_span (span , key = HUMANLOOP_FILE_KEY )
216223 file_type = span ._attributes .get (HUMANLOOP_FILE_TYPE_KEY ) # type: ignore
224+ parent_span_id = span .parent .span_id if span .parent else None
225+
226+ while parent_span_id and self ._span_id_to_uploaded_log_id .get (parent_span_id ) is None :
227+ time .sleep (0.1 )
217228
218229 if file_type == "prompt" :
219230 export_func = self ._export_prompt
@@ -242,25 +253,16 @@ def _export_prompt(self, span: ReadableSpan) -> None:
242253 log_object ["messages" ] = []
243254 if "tools" not in file_object ["prompt" ]:
244255 file_object ["prompt" ]["tools" ] = []
245- trace_metadata = TRACE_FLOW_CONTEXT .get (span .get_span_context ().span_id )
246- if trace_metadata and "trace_parent_id" in trace_metadata and trace_metadata ["trace_parent_id" ]:
247- trace_parent_id = self ._span_id_to_uploaded_log_id [trace_metadata ["trace_parent_id" ]]
248- if trace_parent_id is None :
249- # Parent Log in Trace upload failed
250- file_path = read_from_opentelemetry_span (span , key = HUMANLOOP_PATH_KEY )
251- logger .error (f"Skipping log for { file_path } : parent Log upload failed" )
252- return
253- else :
254- trace_parent_id = None
255- prompt : PromptKernelRequestParams = file_object ["prompt" ]
256+
256257 path : str = file_object ["path" ]
257- if "output" in log_object :
258- if not isinstance ( log_object [ "output" ], str ):
259- # Output expected to be a string, if decorated function
260- # does not return one, jsonify it
261- log_object [ "output" ] = json . dumps ( log_object [ "output" ])
258+ prompt : PromptKernelRequestParams = file_object [ "prompt" ]
259+
260+ span_parent_id = span . parent . span_id if span . parent else None
261+ trace_parent_id = self . _span_id_to_uploaded_log_id [ span_parent_id ] if span_parent_id else None
262+
262263 if "attributes" not in prompt or not prompt ["attributes" ]:
263264 prompt ["attributes" ] = {}
265+
264266 try :
265267 log_response = self ._client .prompts .log (
266268 path = path ,
@@ -271,34 +273,32 @@ def _export_prompt(self, span: ReadableSpan) -> None:
271273 self ._span_id_to_uploaded_log_id [span .context .span_id ] = log_response .id
272274 except HumanloopApiError :
273275 self ._span_id_to_uploaded_log_id [span .context .span_id ] = None
276+ self ._complete_flow_log (span_id = span .context .span_id )
274277
275278 def _export_tool (self , span : ReadableSpan ) -> None :
276- file_object : dict [str , Any ] = read_from_opentelemetry_span (span , key = HUMANLOOP_FILE_KEY )
277- log_object : dict [str , Any ] = read_from_opentelemetry_span (span , key = HUMANLOOP_LOG_KEY )
278- trace_metadata : FlowContext = TRACE_FLOW_CONTEXT .get (span .get_span_context ().span_id , {})
279- if "trace_parent_id" in trace_metadata and trace_metadata ["trace_parent_id" ]:
280- trace_parent_id = self ._span_id_to_uploaded_log_id .get (
281- trace_metadata ["trace_parent_id" ],
282- )
283- if trace_parent_id is None :
284- # Parent Log in Trace upload failed
285- file_path = read_from_opentelemetry_span (span , key = HUMANLOOP_PATH_KEY )
286- logger .error (f"Skipping log for { file_path } : parent Log upload failed" )
287- return
288- else :
289- trace_parent_id = None
290- tool = file_object ["tool" ]
279+ file_object : dict [str , Any ] = read_from_opentelemetry_span (
280+ span ,
281+ key = HUMANLOOP_FILE_KEY ,
282+ )
283+ log_object : dict [str , Any ] = read_from_opentelemetry_span (
284+ span ,
285+ key = HUMANLOOP_LOG_KEY ,
286+ )
287+
288+ path : str = file_object ["path" ]
289+ tool : ToolKernelRequestParams = file_object ["tool" ]
290+
291+ span_parent_id = span .parent .span_id if span .parent else None
292+ trace_parent_id = self ._span_id_to_uploaded_log_id [span_parent_id ] if span_parent_id else None
293+
294+ # API expects an empty dictionary if user does not supply attributes
291295 if not tool .get ("attributes" ):
292296 tool ["attributes" ] = {}
293297 if not tool .get ("setup_values" ):
294298 tool ["setup_values" ] = {}
295- path : str = file_object ["path" ]
296299 if "parameters" in tool ["function" ] and "properties" not in tool ["function" ]["parameters" ]:
297300 tool ["function" ]["parameters" ]["properties" ] = {}
298- if not isinstance (log_object ["output" ], str ):
299- # Output expected to be a string, if decorated function
300- # does not return one, jsonify it
301- log_object ["output" ] = json .dumps (log_object ["output" ])
301+
302302 try :
303303 log_response = self ._client .tools .log (
304304 path = path ,
@@ -309,33 +309,34 @@ def _export_tool(self, span: ReadableSpan) -> None:
309309 self ._span_id_to_uploaded_log_id [span .context .span_id ] = log_response .id
310310 except HumanloopApiError :
311311 self ._span_id_to_uploaded_log_id [span .context .span_id ] = None
312+ self ._complete_flow_log (span_id = span .context .span_id )
312313
313314 def _export_flow (self , span : ReadableSpan ) -> None :
314- file_object : dict [str , Any ] = read_from_opentelemetry_span (span , key = HUMANLOOP_FILE_KEY )
315- log_object : dict [str , Any ] = read_from_opentelemetry_span (span , key = HUMANLOOP_LOG_KEY )
316- trace_metadata : FlowContext = TRACE_FLOW_CONTEXT .get (
317- span .get_span_context ().span_id ,
318- {},
315+ file_object : dict [str , Any ] = read_from_opentelemetry_span (
316+ span ,
317+ key = HUMANLOOP_FILE_KEY ,
319318 )
320- if "trace_parent_id" in trace_metadata :
321- trace_parent_id = self ._span_id_to_uploaded_log_id .get (
322- trace_metadata ["trace_parent_id" ], # type: ignore
323- )
324- if trace_parent_id is None and trace_metadata ["trace_id" ] != span .get_span_context ().span_id :
325- # Parent Log in Trace upload failed
326- # NOTE: Check if the trace_id metadata field points to the
327- # span itself. This signifies the span is the head of the Trace
328- file_path = read_from_opentelemetry_span (span , key = HUMANLOOP_PATH_KEY )
329- logger .error (f"Skipping log for { file_path } : parent Log upload failed" )
330- return
331- else :
332- trace_parent_id = None
319+ log_object : dict [str , Any ] = read_from_opentelemetry_span (
320+ span ,
321+ key = HUMANLOOP_LOG_KEY ,
322+ )
323+ # Spans that must be uploaded before the Flow Span is completed
324+ prerequisites = read_from_opentelemetry_span (
325+ span = span ,
326+ key = HUMANLOOP_FLOW_PREREQUISITES_KEY ,
327+ )
328+ self ._flow_log_prerequisites [span .context .span_id ] = set (prerequisites )
329+
330+ path : str = file_object ["path" ]
333331 flow : FlowKernelRequestParams
334332 if not file_object .get ("flow" ):
335333 flow = {"attributes" : {}}
336334 else :
337335 flow = file_object ["flow" ]
338- path : str = file_object ["path" ]
336+
337+ span_parent_id = span .parent .span_id if span .parent else None
338+ trace_parent_id = self ._span_id_to_uploaded_log_id [span_parent_id ] if span_parent_id else None
339+
339340 if "output" not in log_object :
340341 log_object ["output" ] = None
341342 try :
@@ -350,3 +351,4 @@ def _export_flow(self, span: ReadableSpan) -> None:
350351 except HumanloopApiError as e :
351352 logger .error (str (e ))
352353 self ._span_id_to_uploaded_log_id [span .context .span_id ] = None
354+ self ._complete_flow_log (span_id = span .context .span_id )
0 commit comments