Skip to content

Commit 7becdeb

Browse files
author
Andrei Bratu
committed
Decorators support async functions
1 parent 1e2bdaa commit 7becdeb

File tree

4 files changed

+206
-35
lines changed

4 files changed

+206
-35
lines changed

src/humanloop/client.py

Lines changed: 42 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from importlib import metadata
12
import os
23
import typing
34
from typing import Any, List, Optional, Sequence
@@ -9,6 +10,7 @@
910

1011
from humanloop.core.client_wrapper import SyncClientWrapper
1112

13+
from humanloop.error import HumanloopRuntimeError
1214
from humanloop.evals import run_eval
1315
from humanloop.evals.types import Dataset, Evaluator, EvaluatorCheck, File
1416

@@ -68,6 +70,41 @@ def run(
6870
)
6971

7072

73+
class HumanloopTracerSingleton:
74+
_instance = None
75+
76+
def __init__(self, hl_client_headers: dict[str, str], hl_client_base_url: str):
77+
if HumanloopTracerSingleton._instance is not None:
78+
raise HumanloopRuntimeError("Internal error: HumanloopTracerSingleton already initialized")
79+
80+
self.tracer_provider = TracerProvider(
81+
resource=Resource(
82+
attributes={
83+
"service.name": "humanloop-python-sdk",
84+
"service.version": metadata.version("humanloop"),
85+
}
86+
)
87+
)
88+
self.tracer_provider.add_span_processor(
89+
HumanloopSpanProcessor(
90+
exporter=HumanloopSpanExporter(
91+
hl_client_headers=hl_client_headers,
92+
hl_client_base_url=hl_client_base_url,
93+
)
94+
)
95+
)
96+
97+
instrument_provider(provider=self.tracer_provider)
98+
99+
self.tracer = self.tracer_provider.get_tracer("humanloop.sdk")
100+
101+
@classmethod
102+
def get_instance(cls, hl_client_headers: dict[str, str], hl_client_base_url: str):
103+
if cls._instance is None:
104+
cls._instance = cls(hl_client_headers, hl_client_base_url)
105+
return cls._instance
106+
107+
71108
class Humanloop(BaseHumanloop):
72109
"""
73110
See docstring of :class:`BaseHumanloop`.
@@ -117,27 +154,12 @@ def __init__(
117154
self.flows = overload_log(client=self.flows)
118155
self.tools = overload_log(client=self.tools)
119156

120-
if opentelemetry_tracer_provider is not None:
121-
self._tracer_provider = opentelemetry_tracer_provider
122-
else:
123-
self._tracer_provider = TracerProvider(
124-
resource=Resource(
125-
attributes={
126-
"instrumentor": "humanloop.sdk",
127-
}
128-
),
129-
)
130-
instrument_provider(provider=self._tracer_provider)
131-
self._tracer_provider.add_span_processor(
132-
HumanloopSpanProcessor(exporter=HumanloopSpanExporter(client=self)),
157+
# Initialize the tracer singleton
158+
self._tracer_singleton = HumanloopTracerSingleton.get_instance(
159+
hl_client_headers=self._client_wrapper.get_headers(),
160+
hl_client_base_url=self._client_wrapper._base_url,
133161
)
134-
135-
if opentelemetry_tracer is None:
136-
self._opentelemetry_tracer = self._tracer_provider.get_tracer(
137-
"humanloop.sdk"
138-
)
139-
else:
140-
self._opentelemetry_tracer = opentelemetry_tracer
162+
self._opentelemetry_tracer = self._tracer_singleton.tracer
141163

142164
def prompt(
143165
self,

src/humanloop/decorators/flow.py

Lines changed: 88 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
import logging
23
from functools import wraps
34
from typing import Any, Callable, Optional, TypeVar
@@ -121,13 +122,99 @@ def wrapper(*args: P.args, **kwargs: P.kwargs) -> Optional[R]:
121122
# Return the output of the decorated function
122123
return func_output # type: ignore [return-value]
123124

125+
@wraps(func)
126+
async def async_wrapper(*args: P.args, **kwargs: P.kwargs) -> Optional[R]:
127+
span: Span
128+
with set_decorator_context(
129+
DecoratorContext(
130+
path=decorator_path,
131+
type="flow",
132+
version=flow_kernel,
133+
)
134+
) as decorator_context:
135+
with opentelemetry_tracer.start_as_current_span(HUMANLOOP_FLOW_SPAN_NAME) as span: # type: ignore
136+
span.set_attribute(HUMANLOOP_FILE_PATH_KEY, decorator_path)
137+
span.set_attribute(HUMANLOOP_FILE_TYPE_KEY, file_type)
138+
trace_id = get_trace_id()
139+
func_args = bind_args(func, args, kwargs)
140+
141+
# Create the trace ahead so we have a parent ID to reference
142+
init_log_inputs = {
143+
"inputs": {k: v for k, v in func_args.items() if k != "messages"},
144+
"messages": func_args.get("messages"),
145+
"trace_parent_id": trace_id,
146+
}
147+
this_flow_log: FlowLogResponse = client.flows._log( # type: ignore [attr-defined]
148+
path=decorator_context.path,
149+
flow=decorator_context.version,
150+
log_status="incomplete",
151+
**init_log_inputs,
152+
)
153+
154+
with set_trace_id(this_flow_log.id):
155+
func_output: Optional[R]
156+
log_output: Optional[str]
157+
log_error: Optional[str]
158+
log_output_message: Optional[ChatMessage]
159+
try:
160+
func_output = await func(*args, **kwargs)
161+
if (
162+
isinstance(func_output, dict)
163+
and len(func_output.keys()) == 2
164+
and "role" in func_output
165+
and "content" in func_output
166+
):
167+
log_output_message = func_output # type: ignore [assignment]
168+
log_output = None
169+
else:
170+
log_output = process_output(func=func, output=func_output)
171+
log_output_message = None
172+
log_error = None
173+
except HumanloopRuntimeError as e:
174+
# Critical error, re-raise
175+
client.logs.delete(id=this_flow_log.id)
176+
span.record_exception(e)
177+
raise e
178+
except Exception as e:
179+
logger.error(f"Error calling {func.__name__}: {e}")
180+
log_output = None
181+
log_output_message = None
182+
log_error = str(e)
183+
func_output = None
184+
185+
updated_flow_log = {
186+
"log_status": "complete",
187+
"output": log_output,
188+
"error": log_error,
189+
"output_message": log_output_message,
190+
"id": this_flow_log.id,
191+
}
192+
# Write the Flow Log to the Span on HL_LOG_OT_KEY
193+
write_to_opentelemetry_span(
194+
span=span, # type: ignore [arg-type]
195+
key=HUMANLOOP_LOG_KEY,
196+
value=updated_flow_log, # type: ignore
197+
)
198+
# Return the output of the decorated function
199+
return func_output # type: ignore [return-value]
200+
201+
# If the decorated function is an async function, return the async wrapper
202+
if asyncio.iscoroutinefunction(func):
203+
async_wrapper.file = File( # type: ignore
204+
path=decorator_path,
205+
type=file_type, # type: ignore [arg-type, typeddict-item]
206+
version=FlowDict(**flow_kernel), # type: ignore
207+
callable=async_wrapper,
208+
)
209+
return async_wrapper
210+
211+
# If the decorated function is a sync function, return the sync wrapper
124212
wrapper.file = File( # type: ignore
125213
path=decorator_path,
126214
type=file_type, # type: ignore [arg-type, typeddict-item]
127215
version=FlowDict(**flow_kernel), # type: ignore
128216
callable=wrapper,
129217
)
130-
131218
return wrapper
132219

133220
return decorator

src/humanloop/decorators/tool.py

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
import builtins
23
import inspect
34
import logging
@@ -112,13 +113,80 @@ def wrapper(*args: P.args, **kwargs: P.kwargs) -> Optional[R]:
112113
# Return the output of the decorated function
113114
return func_output
114115

116+
@wraps(func)
117+
async def async_wrapper(*args: P.args, **kwargs: P.kwargs) -> Optional[R]:
118+
evaluation_context = get_evaluation_context()
119+
if evaluation_context is not None:
120+
if evaluation_context.path == path:
121+
raise HumanloopRuntimeError("Tools cannot be evaluated with the `evaluations.run()` utility.")
122+
with opentelemetry_tracer.start_as_current_span("humanloop.tool") as span:
123+
# Write the Tool Kernel to the Span on HL_FILE_OT_KEY
124+
write_to_opentelemetry_span(
125+
span=span, # type: ignore [arg-type]
126+
key=HUMANLOOP_FILE_KEY,
127+
value=tool_kernel, # type: ignore [arg-type]
128+
)
129+
span.set_attribute(HUMANLOOP_FILE_PATH_KEY, path)
130+
span.set_attribute(HUMANLOOP_FILE_TYPE_KEY, file_type)
131+
132+
log_inputs: dict[str, Any] = bind_args(func, args, kwargs)
133+
log_error: Optional[str]
134+
log_output: str
135+
136+
func_output: Optional[R]
137+
try:
138+
func_output = await func(*args, **kwargs)
139+
log_output = process_output(
140+
func=func,
141+
output=func_output,
142+
)
143+
log_error = None
144+
except HumanloopRuntimeError as e:
145+
# Critical error, re-raise
146+
raise e
147+
except Exception as e:
148+
logger.error(f"Error calling {func.__name__}: {e}")
149+
output = None
150+
log_output = process_output(
151+
func=func,
152+
output=output,
153+
)
154+
log_error = str(e)
155+
156+
# Populate Tool Log attributes
157+
tool_log = {
158+
"inputs": log_inputs,
159+
"output": log_output,
160+
"error": log_error,
161+
"trace_parent_id": get_trace_id(),
162+
}
163+
# Write the Tool Log to the Span on HL_LOG_OT_KEY
164+
write_to_opentelemetry_span(
165+
span=span, # type: ignore [arg-type]
166+
key=HUMANLOOP_LOG_KEY,
167+
value=tool_log, # type: ignore [arg-type]
168+
)
169+
170+
# Return the output of the decorated function
171+
return func_output
172+
173+
# If the decorated function is an async function, return the async wrapper
174+
if asyncio.iscoroutinefunction(func):
175+
async_wrapper.file = File( # type: ignore
176+
path=path,
177+
type=file_type, # type: ignore [arg-type, typeddict-item]
178+
version=tool_kernel,
179+
callable=async_wrapper,
180+
)
181+
return async_wrapper
182+
183+
# If the decorated function is a sync function, return the sync wrapper
115184
wrapper.file = File( # type: ignore
116185
path=path,
117186
type=file_type, # type: ignore [arg-type, typeddict-item]
118187
version=tool_kernel,
119188
callable=wrapper,
120189
)
121-
122190
return wrapper
123191

124192
return decorator

src/humanloop/otel/exporter/__init__.py

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import logging
22

33
import time
4-
import typing
54
from queue import Empty as EmptyQueue
65
from queue import Queue
76
from threading import Thread
@@ -26,27 +25,25 @@
2625
)
2726

2827

29-
if typing.TYPE_CHECKING:
30-
from humanloop.client import Humanloop
31-
32-
3328
logger = logging.getLogger("humanloop.sdk")
3429

3530

3631
class HumanloopSpanExporter(SpanExporter):
37-
DEFAULT_NUMBER_THREADS = 1
32+
DEFAULT_NUMBER_THREADS = 4
3833

3934
def __init__(
4035
self,
41-
client: "Humanloop",
36+
hl_client_headers: dict[str, str],
37+
hl_client_base_url: str,
4238
worker_threads: Optional[int] = None,
4339
) -> None:
4440
"""Upload Spans created by SDK decorators to Humanloop.
4541
4642
Spans not created by Humanloop SDK decorators will be ignored.
4743
"""
4844
super().__init__()
49-
self._client = client
45+
self._hl_client_headers = hl_client_headers
46+
self._base_url = hl_client_base_url
5047
# Work queue for the threads uploading the spans
5148
self._upload_queue: Queue = Queue()
5249
# Worker threads to export the spans
@@ -135,12 +132,9 @@ def _do_work(self):
135132
continue
136133

137134
span_to_export, eval_context_callback = thread_args
138-
139135
response = requests.post(
140-
f"{self._client._client_wrapper.get_base_url()}/import/otel/v1/traces",
141-
headers={
142-
**self._client._client_wrapper.get_headers(),
143-
},
136+
f"{self._base_url}/import/otel/v1/traces",
137+
headers=self._hl_client_headers,
144138
data=serialize_span(span_to_export),
145139
)
146140
if response.status_code != 200:

0 commit comments

Comments
 (0)