Skip to content

Commit d982b53

Browse files
author
Andrei Bratu
committed
Decorators support async functions
1 parent daccb9f commit d982b53

File tree

5 files changed

+212
-39
lines changed

5 files changed

+212
-39
lines changed

pyproject.toml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,17 +39,17 @@ deepdiff = "^8.2.0"
3939
httpx = ">=0.21.2"
4040
httpx-sse = "0.4.0"
4141
mmh3 = "^5.1.0"
42-
opentelemetry-api = "<=1.27.0"
42+
opentelemetry-api = ">=1.28.0"
4343
opentelemetry-instrumentation-anthropic = ">=0.20"
4444
opentelemetry-instrumentation-bedrock = ">=0.15"
4545
opentelemetry-instrumentation-cohere = ">=0.20"
4646
opentelemetry-instrumentation-groq = ">=0.29"
4747
opentelemetry-instrumentation-openai = ">=0.20"
4848
opentelemetry-instrumentation-replicate = ">=0.20"
49-
opentelemetry-proto = "^1.30.0"
50-
opentelemetry-sdk = "<=1.27.0"
49+
opentelemetry-proto = ">=1.30.0"
50+
opentelemetry-sdk = ">=1.28.0"
5151
parse = ">=1"
52-
protobuf = "^5.29.3"
52+
protobuf = ">=5.29.3"
5353
pydantic = ">= 1.9.2"
5454
pydantic-core = "^2.18.2"
5555
typing_extensions = ">= 4.0.0"

src/humanloop/client.py

Lines changed: 44 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from importlib import metadata
12
import os
23
import typing
34
from typing import Any, List, Optional, Sequence
@@ -9,6 +10,7 @@
910

1011
from humanloop.core.client_wrapper import SyncClientWrapper
1112

13+
from humanloop.error import HumanloopRuntimeError
1214
from humanloop.evals import run_eval
1315
from humanloop.evals.types import Dataset, Evaluator, EvaluatorCheck, File
1416

@@ -68,6 +70,43 @@ def run(
6870
)
6971

7072

73+
class HumanloopTracerSingleton:
74+
_instance = None
75+
76+
def __init__(self, hl_client_headers: dict[str, str], hl_client_base_url: str):
77+
if HumanloopTracerSingleton._instance is not None:
78+
raise HumanloopRuntimeError(
79+
"Internal error: HumanloopTracerSingleton already initialized"
80+
)
81+
82+
self.tracer_provider = TracerProvider(
83+
resource=Resource(
84+
attributes={
85+
"service.name": "humanloop-python-sdk",
86+
"service.version": metadata.version("humanloop"),
87+
}
88+
)
89+
)
90+
self.tracer_provider.add_span_processor(
91+
HumanloopSpanProcessor(
92+
exporter=HumanloopSpanExporter(
93+
hl_client_headers=hl_client_headers,
94+
hl_client_base_url=hl_client_base_url,
95+
)
96+
)
97+
)
98+
99+
instrument_provider(provider=self.tracer_provider)
100+
101+
self.tracer = self.tracer_provider.get_tracer("humanloop.sdk")
102+
103+
@classmethod
104+
def get_instance(cls, hl_client_headers: dict[str, str], hl_client_base_url: str):
105+
if cls._instance is None:
106+
cls._instance = cls(hl_client_headers, hl_client_base_url)
107+
return cls._instance
108+
109+
71110
class Humanloop(BaseHumanloop):
72111
"""
73112
See docstring of :class:`BaseHumanloop`.
@@ -117,27 +156,12 @@ def __init__(
117156
self.flows = overload_log(client=self.flows)
118157
self.tools = overload_log(client=self.tools)
119158

120-
if opentelemetry_tracer_provider is not None:
121-
self._tracer_provider = opentelemetry_tracer_provider
122-
else:
123-
self._tracer_provider = TracerProvider(
124-
resource=Resource(
125-
attributes={
126-
"instrumentor": "humanloop.sdk",
127-
}
128-
),
129-
)
130-
instrument_provider(provider=self._tracer_provider)
131-
self._tracer_provider.add_span_processor(
132-
HumanloopSpanProcessor(exporter=HumanloopSpanExporter(client=self)),
159+
# Initialize the tracer singleton
160+
self._tracer_singleton = HumanloopTracerSingleton.get_instance(
161+
hl_client_headers=self._client_wrapper.get_headers(),
162+
hl_client_base_url=self._client_wrapper._base_url,
133163
)
134-
135-
if opentelemetry_tracer is None:
136-
self._opentelemetry_tracer = self._tracer_provider.get_tracer(
137-
"humanloop.sdk"
138-
)
139-
else:
140-
self._opentelemetry_tracer = opentelemetry_tracer
164+
self._opentelemetry_tracer = self._tracer_singleton.tracer
141165

142166
def prompt(
143167
self,

src/humanloop/decorators/flow.py

Lines changed: 88 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
import logging
23
from functools import wraps
34
from typing import Any, Callable, Optional, TypeVar
@@ -121,13 +122,99 @@ def wrapper(*args: P.args, **kwargs: P.kwargs) -> Optional[R]:
121122
# Return the output of the decorated function
122123
return func_output # type: ignore [return-value]
123124

125+
@wraps(func)
126+
async def async_wrapper(*args: P.args, **kwargs: P.kwargs) -> Optional[R]:
127+
span: Span
128+
with set_decorator_context(
129+
DecoratorContext(
130+
path=decorator_path,
131+
type="flow",
132+
version=flow_kernel,
133+
)
134+
) as decorator_context:
135+
with opentelemetry_tracer.start_as_current_span(HUMANLOOP_FLOW_SPAN_NAME) as span: # type: ignore
136+
span.set_attribute(HUMANLOOP_FILE_PATH_KEY, decorator_path)
137+
span.set_attribute(HUMANLOOP_FILE_TYPE_KEY, file_type)
138+
trace_id = get_trace_id()
139+
func_args = bind_args(func, args, kwargs)
140+
141+
# Create the trace ahead so we have a parent ID to reference
142+
init_log_inputs = {
143+
"inputs": {k: v for k, v in func_args.items() if k != "messages"},
144+
"messages": func_args.get("messages"),
145+
"trace_parent_id": trace_id,
146+
}
147+
this_flow_log: FlowLogResponse = client.flows._log( # type: ignore [attr-defined]
148+
path=decorator_context.path,
149+
flow=decorator_context.version,
150+
log_status="incomplete",
151+
**init_log_inputs,
152+
)
153+
154+
with set_trace_id(this_flow_log.id):
155+
func_output: Optional[R]
156+
log_output: Optional[str]
157+
log_error: Optional[str]
158+
log_output_message: Optional[ChatMessage]
159+
try:
160+
func_output = await func(*args, **kwargs)
161+
if (
162+
isinstance(func_output, dict)
163+
and len(func_output.keys()) == 2
164+
and "role" in func_output
165+
and "content" in func_output
166+
):
167+
log_output_message = func_output # type: ignore [assignment]
168+
log_output = None
169+
else:
170+
log_output = process_output(func=func, output=func_output)
171+
log_output_message = None
172+
log_error = None
173+
except HumanloopRuntimeError as e:
174+
# Critical error, re-raise
175+
client.logs.delete(id=this_flow_log.id)
176+
span.record_exception(e)
177+
raise e
178+
except Exception as e:
179+
logger.error(f"Error calling {func.__name__}: {e}")
180+
log_output = None
181+
log_output_message = None
182+
log_error = str(e)
183+
func_output = None
184+
185+
updated_flow_log = {
186+
"log_status": "complete",
187+
"output": log_output,
188+
"error": log_error,
189+
"output_message": log_output_message,
190+
"id": this_flow_log.id,
191+
}
192+
# Write the Flow Log to the Span on HL_LOG_OT_KEY
193+
write_to_opentelemetry_span(
194+
span=span, # type: ignore [arg-type]
195+
key=HUMANLOOP_LOG_KEY,
196+
value=updated_flow_log, # type: ignore
197+
)
198+
# Return the output of the decorated function
199+
return func_output # type: ignore [return-value]
200+
201+
# If the decorated function is an async function, return the async wrapper
202+
if asyncio.iscoroutinefunction(func):
203+
async_wrapper.file = File( # type: ignore
204+
path=decorator_path,
205+
type=file_type, # type: ignore [arg-type, typeddict-item]
206+
version=FlowDict(**flow_kernel), # type: ignore
207+
callable=async_wrapper,
208+
)
209+
return async_wrapper
210+
211+
# If the decorated function is a sync function, return the sync wrapper
124212
wrapper.file = File( # type: ignore
125213
path=decorator_path,
126214
type=file_type, # type: ignore [arg-type, typeddict-item]
127215
version=FlowDict(**flow_kernel), # type: ignore
128216
callable=wrapper,
129217
)
130-
131218
return wrapper
132219

133220
return decorator

src/humanloop/decorators/tool.py

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
import builtins
23
import inspect
34
import logging
@@ -112,13 +113,80 @@ def wrapper(*args: P.args, **kwargs: P.kwargs) -> Optional[R]:
112113
# Return the output of the decorated function
113114
return func_output
114115

116+
@wraps(func)
117+
async def async_wrapper(*args: P.args, **kwargs: P.kwargs) -> Optional[R]:
118+
evaluation_context = get_evaluation_context()
119+
if evaluation_context is not None:
120+
if evaluation_context.path == path:
121+
raise HumanloopRuntimeError("Tools cannot be evaluated with the `evaluations.run()` utility.")
122+
with opentelemetry_tracer.start_as_current_span("humanloop.tool") as span:
123+
# Write the Tool Kernel to the Span on HL_FILE_OT_KEY
124+
write_to_opentelemetry_span(
125+
span=span, # type: ignore [arg-type]
126+
key=HUMANLOOP_FILE_KEY,
127+
value=tool_kernel, # type: ignore [arg-type]
128+
)
129+
span.set_attribute(HUMANLOOP_FILE_PATH_KEY, path)
130+
span.set_attribute(HUMANLOOP_FILE_TYPE_KEY, file_type)
131+
132+
log_inputs: dict[str, Any] = bind_args(func, args, kwargs)
133+
log_error: Optional[str]
134+
log_output: str
135+
136+
func_output: Optional[R]
137+
try:
138+
func_output = await func(*args, **kwargs)
139+
log_output = process_output(
140+
func=func,
141+
output=func_output,
142+
)
143+
log_error = None
144+
except HumanloopRuntimeError as e:
145+
# Critical error, re-raise
146+
raise e
147+
except Exception as e:
148+
logger.error(f"Error calling {func.__name__}: {e}")
149+
output = None
150+
log_output = process_output(
151+
func=func,
152+
output=output,
153+
)
154+
log_error = str(e)
155+
156+
# Populate Tool Log attributes
157+
tool_log = {
158+
"inputs": log_inputs,
159+
"output": log_output,
160+
"error": log_error,
161+
"trace_parent_id": get_trace_id(),
162+
}
163+
# Write the Tool Log to the Span on HL_LOG_OT_KEY
164+
write_to_opentelemetry_span(
165+
span=span, # type: ignore [arg-type]
166+
key=HUMANLOOP_LOG_KEY,
167+
value=tool_log, # type: ignore [arg-type]
168+
)
169+
170+
# Return the output of the decorated function
171+
return func_output
172+
173+
# If the decorated function is an async function, return the async wrapper
174+
if asyncio.iscoroutinefunction(func):
175+
async_wrapper.file = File( # type: ignore
176+
path=path,
177+
type=file_type, # type: ignore [arg-type, typeddict-item]
178+
version=tool_kernel,
179+
callable=async_wrapper,
180+
)
181+
return async_wrapper
182+
183+
# If the decorated function is a sync function, return the sync wrapper
115184
wrapper.file = File( # type: ignore
116185
path=path,
117186
type=file_type, # type: ignore [arg-type, typeddict-item]
118187
version=tool_kernel,
119188
callable=wrapper,
120189
)
121-
122190
return wrapper
123191

124192
return decorator

src/humanloop/otel/exporter/__init__.py

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import logging
22

33
import time
4-
import typing
54
from queue import Empty as EmptyQueue
65
from queue import Queue
76
from threading import Thread
@@ -26,27 +25,25 @@
2625
)
2726

2827

29-
if typing.TYPE_CHECKING:
30-
from humanloop.client import Humanloop
31-
32-
3328
logger = logging.getLogger("humanloop.sdk")
3429

3530

3631
class HumanloopSpanExporter(SpanExporter):
37-
DEFAULT_NUMBER_THREADS = 1
32+
DEFAULT_NUMBER_THREADS = 4
3833

3934
def __init__(
4035
self,
41-
client: "Humanloop",
36+
hl_client_headers: dict[str, str],
37+
hl_client_base_url: str,
4238
worker_threads: Optional[int] = None,
4339
) -> None:
4440
"""Upload Spans created by SDK decorators to Humanloop.
4541
4642
Spans not created by Humanloop SDK decorators will be ignored.
4743
"""
4844
super().__init__()
49-
self._client = client
45+
self._hl_client_headers = hl_client_headers
46+
self._base_url = hl_client_base_url
5047
# Work queue for the threads uploading the spans
5148
self._upload_queue: Queue = Queue()
5249
# Worker threads to export the spans
@@ -135,12 +132,9 @@ def _do_work(self):
135132
continue
136133

137134
span_to_export, eval_context_callback = thread_args
138-
139135
response = requests.post(
140-
f"{self._client._client_wrapper.get_base_url()}/import/otel/v1/traces",
141-
headers={
142-
**self._client._client_wrapper.get_headers(),
143-
},
136+
f"{self._base_url}/import/otel/v1/traces",
137+
headers=self._hl_client_headers,
144138
data=serialize_span(span_to_export),
145139
)
146140
if response.status_code != 200:

0 commit comments

Comments
 (0)