Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion langfuse/_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import httpx
from opentelemetry import trace as otel_trace_api
from opentelemetry.sdk.trace import ReadableSpan, TracerProvider
from opentelemetry.sdk.trace.export import SpanExporter
from opentelemetry.sdk.trace.id_generator import RandomIdGenerator
from opentelemetry.util._decorator import (
_AgnosticContextManager,
Expand Down Expand Up @@ -179,8 +180,9 @@ class Langfuse:
)
```
should_export_span (Optional[Callable[[ReadableSpan], bool]]): Callback to decide whether to export a span. If omitted, Langfuse uses the default filter (Langfuse SDK spans, spans with `gen_ai.*` attributes, and known LLM instrumentation scopes).
additional_headers (Optional[Dict[str, str]]): Additional headers to include in all API requests and OTLPSpanExporter requests. These headers will be merged with default headers. Note: If httpx_client is provided, additional_headers must be set directly on your custom httpx_client as well.
additional_headers (Optional[Dict[str, str]]): Additional headers to include in all API requests and in the default OTLPSpanExporter requests. These headers will be merged with default headers. Note: If httpx_client is provided, additional_headers must be set directly on your custom httpx_client as well. If `span_exporter` is provided, these headers are not wired into that exporter and must be configured on the exporter instance directly.
tracer_provider(Optional[TracerProvider]): OpenTelemetry TracerProvider to use for Langfuse. This can be useful to set to have disconnected tracing between Langfuse and other OpenTelemetry-span emitting libraries. Note: To track active spans, the context is still shared between TracerProviders. This may lead to broken trace trees.
span_exporter (Optional[SpanExporter]): Custom OpenTelemetry span exporter for the Langfuse span processor. If omitted, Langfuse creates an OTLPSpanExporter pointed at the Langfuse OTLP endpoint. If provided, Langfuse does not wire `base_url`, exporter headers, exporter auth, or exporter timeout into it. Configure endpoint, headers, and timeout on the exporter instance directly. If you are sending spans to Langfuse v4 or using Langfuse Cloud Fast Preview, include `x-langfuse-ingestion-version=4` on the exporter to enable real time processing of exported spans.

Example:
```python
Expand Down Expand Up @@ -244,6 +246,7 @@ def __init__(
should_export_span: Optional[Callable[[ReadableSpan], bool]] = None,
additional_headers: Optional[Dict[str, str]] = None,
tracer_provider: Optional[TracerProvider] = None,
span_exporter: Optional[SpanExporter] = None,
):
self._base_url = (
base_url
Expand Down Expand Up @@ -340,6 +343,7 @@ def __init__(
should_export_span=should_export_span,
additional_headers=additional_headers,
tracer_provider=tracer_provider,
span_exporter=span_exporter,
)
self._mask = self._resources.mask

Expand Down
1 change: 1 addition & 0 deletions langfuse/_client/get_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def _create_client_from_instance(
should_export_span=instance.should_export_span,
additional_headers=instance.additional_headers,
tracer_provider=instance.tracer_provider,
span_exporter=instance.span_exporter,
httpx_client=instance.httpx_client,
)

Expand Down
6 changes: 6 additions & 0 deletions langfuse/_client/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from opentelemetry import trace as otel_trace_api
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import ReadableSpan, TracerProvider
from opentelemetry.sdk.trace.export import SpanExporter
from opentelemetry.sdk.trace.sampling import Decision, TraceIdRatioBased
from opentelemetry.trace import Tracer

Expand Down Expand Up @@ -98,6 +99,7 @@ def __new__(
should_export_span: Optional[Callable[[ReadableSpan], bool]] = None,
additional_headers: Optional[Dict[str, str]] = None,
tracer_provider: Optional[TracerProvider] = None,
span_exporter: Optional[SpanExporter] = None,
) -> "LangfuseResourceManager":
if public_key in cls._instances:
return cls._instances[public_key]
Expand Down Expand Up @@ -133,6 +135,7 @@ def __new__(
should_export_span=should_export_span,
additional_headers=additional_headers,
tracer_provider=tracer_provider,
span_exporter=span_exporter,
)

cls._instances[public_key] = instance
Expand All @@ -159,6 +162,7 @@ def _initialize_instance(
should_export_span: Optional[Callable[[ReadableSpan], bool]] = None,
additional_headers: Optional[Dict[str, str]] = None,
tracer_provider: Optional[TracerProvider] = None,
span_exporter: Optional[SpanExporter] = None,
) -> None:
self.public_key = public_key
self.secret_key = secret_key
Expand All @@ -177,6 +181,7 @@ def _initialize_instance(
self.blocked_instrumentation_scopes = blocked_instrumentation_scopes
self.should_export_span = should_export_span
self.additional_headers = additional_headers
self.span_exporter = span_exporter
self.tracer_provider: Optional[TracerProvider] = None

# OTEL Tracer
Comment on lines 181 to 187
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟣 This is a pre-existing issue: LangfuseResourceManager.shutdown() never calls tracer_provider.shutdown(), so the full OTel lifecycle chain (TracerProvider.shutdown()BatchSpanProcessor.shutdown()SpanExporter.shutdown()) is never triggered. This PR amplifies the impact significantly by introducing custom span_exporter support as a first-class feature — users who implement cleanup logic in their exporter's shutdown() (closing file handles, flushing local buffers, releasing gRPC channels) will silently experience resource leaks when langfuse_client.shutdown() is called. Fix by calling tracer_provider.shutdown() from LangfuseResourceManager.shutdown(), or retain a reference to the LangfuseSpanProcessor and call processor.shutdown() directly.

Extended reasoning...

What the bug is and how it manifests

LangfuseResourceManager.shutdown() (resource_manager.py) calls self.flush(), which in turn calls self.tracer_provider.force_flush(), then calls _stop_and_join_consumer_threads(). Neither path ever calls tracer_provider.shutdown(). Since the OpenTelemetry SDK chains shutdown propagation — TracerProvider.shutdown()BatchSpanProcessor.shutdown()SpanExporter.shutdown() — any cleanup logic registered in a SpanExporter.shutdown() implementation is silently skipped.

The specific code path that triggers it

In resource_manager.py lines 181–187, shutdown() is:

def shutdown(self) -> None:
    atexit.unregister(self.shutdown)
    self.flush()
    self._stop_and_join_consumer_threads()

And flush() calls only tracer_provider.force_flush() (line ~440), never tracer_provider.shutdown(). The LangfuseSpanProcessor instance that wraps the exporter is created in _initialize_instance but no reference is retained on self — so there is no direct path to processor.shutdown() either.

Why existing code doesn't prevent it

force_flush() is explicitly not a substitute for shutdown(): it only blocks until queued spans are exported, but does not tear down the exporter or release its resources. The OTel spec defines shutdown() as the mechanism exporters use for cleanup (e.g., closing HTTP sessions, flushing file buffers, terminating gRPC channels). Because this method is never called, any exporter-level resources remain open until process exit — or not at all if the process exits abnormally.

Why this PR increases the impact

Before this PR the only exporter in use was the built-in OTLPSpanExporter, whose shutdown() just closes an HTTP session — a minor leak that the GC or process exit would eventually recover. This PR deliberately promotes custom SpanExporter instances as a first-class feature, with constructor docs telling users to configure the exporter themselves. The implicit contract is that Langfuse manages the exporter lifecycle; the docs say nothing about users needing to call shutdown() on their own exporter. Users following the documented pattern who implement meaningful cleanup in shutdown() (flush-to-disk, close a DB connection, drain a local queue) will silently get resource leaks.

Step-by-step proof

  1. User creates class FileWritingExporter(SpanExporter) that opens a file in __init__ and calls file.flush(); file.close() in shutdown().
  2. User passes Langfuse(span_exporter=FileWritingExporter()).
  3. _initialize_instance stores it as self.span_exporter (line 184) and passes it into LangfuseSpanProcessor.__init__, which hands it to BatchSpanProcessor.__init__.
  4. User calls langfuse_client.shutdown().
  5. LangfuseResourceManager.shutdown()flush()tracer_provider.force_flush() — spans are exported, but tracer_provider.shutdown() is never called.
  6. BatchSpanProcessor.shutdown() is never called → FileWritingExporter.shutdown() is never called → the file is never flushed/closed → data loss or file descriptor leak.

How to fix it

Option A (simplest): call tracer_provider.shutdown() from LangfuseResourceManager.shutdown() after flush(). This propagates correctly through the OTel chain. Care should be taken when the tracer_provider is user-supplied (the tracer_provider= parameter), since the user may not expect Langfuse to shut it down entirely — a guard like if self._owns_tracer_provider would make this safe.

Option B: retain a reference to the LangfuseSpanProcessor on self during _initialize_instance and call self._langfuse_processor.shutdown() explicitly in LangfuseResourceManager.shutdown(). This is more surgical and avoids touching user-provided TracerProviders.

Expand All @@ -196,6 +201,7 @@ def _initialize_instance(
blocked_instrumentation_scopes=blocked_instrumentation_scopes,
should_export_span=should_export_span,
additional_headers=additional_headers,
span_exporter=span_exporter,
)
tracer_provider.add_span_processor(langfuse_processor)

Expand Down
52 changes: 27 additions & 25 deletions langfuse/_client/span_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from opentelemetry.context import Context
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import ReadableSpan, Span
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.trace.export import BatchSpanProcessor, SpanExporter
from opentelemetry.trace import format_span_id

from langfuse._client.environment_variables import (
Expand Down Expand Up @@ -63,6 +63,7 @@ def __init__(
blocked_instrumentation_scopes: Optional[List[str]] = None,
should_export_span: Optional[Callable[[ReadableSpan], bool]] = None,
additional_headers: Optional[Dict[str, str]] = None,
span_exporter: Optional[SpanExporter] = None,
):
self.public_key = public_key
self.blocked_instrumentation_scopes = (
Expand All @@ -82,37 +83,38 @@ def __init__(
else None
)

basic_auth_header = "Basic " + base64.b64encode(
f"{public_key}:{secret_key}".encode("utf-8")
).decode("ascii")
if span_exporter is None:
basic_auth_header = "Basic " + base64.b64encode(
f"{public_key}:{secret_key}".encode("utf-8")
).decode("ascii")

# Prepare default headers
default_headers = {
"Authorization": basic_auth_header,
"x-langfuse-sdk-name": "python",
"x-langfuse-sdk-version": langfuse_version,
"x-langfuse-public-key": public_key,
}
# Prepare default headers
default_headers = {
"Authorization": basic_auth_header,
"x-langfuse-sdk-name": "python",
"x-langfuse-sdk-version": langfuse_version,
"x-langfuse-public-key": public_key,
}

# Merge additional headers if provided
headers = {**default_headers, **(additional_headers or {})}
# Merge additional headers if provided
headers = {**default_headers, **(additional_headers or {})}

traces_export_path = os.environ.get(LANGFUSE_OTEL_TRACES_EXPORT_PATH, None)
traces_export_path = os.environ.get(LANGFUSE_OTEL_TRACES_EXPORT_PATH, None)

endpoint = (
f"{base_url}/{traces_export_path}"
if traces_export_path
else f"{base_url}/api/public/otel/v1/traces"
)
endpoint = (
f"{base_url}/{traces_export_path}"
if traces_export_path
else f"{base_url}/api/public/otel/v1/traces"
)

langfuse_span_exporter = OTLPSpanExporter(
endpoint=endpoint,
headers=headers,
timeout=timeout,
)
span_exporter = OTLPSpanExporter(
endpoint=endpoint,
headers=headers,
timeout=timeout,
)

super().__init__(
span_exporter=langfuse_span_exporter,
span_exporter=span_exporter,
export_timeout_millis=timeout * 1_000 if timeout else None,
max_export_batch_size=flush_at,
schedule_delay_millis=flush_interval * 1_000
Comment on lines +110 to 120
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 The export_timeout_millis parameter is always passed to BatchSpanProcessor even when a custom span_exporter is provided, silently capping the custom exporter's export() calls at the Langfuse SDK timeout (default 5 seconds). This contradicts the constructor docstring which states 'Langfuse does not wire base_url, exporter headers, exporter auth, or exporter timeout into it'. Users with custom exporters that need more than 5 seconds per batch will have spans silently dropped. Fix by skipping export_timeout_millis when a custom exporter is provided, or update the docs to explicitly warn about this constraint.

Extended reasoning...

What the bug is and how it manifests

In LangfuseSpanProcessor.init, the super().init() call unconditionally passes export_timeout_millis=timeout * 1_000 if timeout else None to BatchSpanProcessor regardless of whether the span_exporter argument was user-supplied or internally constructed. The BatchSpanProcessor uses this value as the maximum wait time for a single exporter.export() call — if the exporter does not finish within the limit, the batch processor times it out and silently drops the spans.

The specific code path that triggers it

In client.py:272, timeout is resolved with: timeout = timeout or int(os.environ.get(LANGFUSE_TIMEOUT, 5)). This means timeout is always a truthy integer (minimum 5) by the time it reaches LangfuseSpanProcessor. The conditional timeout * 1_000 if timeout else None therefore always evaluates to a non-None integer — at minimum 5000 ms. This value flows unconditionally into BatchSpanProcessor even when span_exporter is user-provided.

Why existing code does not prevent it

The if span_exporter is None: branch added by this PR correctly skips constructing the default OTLPSpanExporter when a custom exporter is provided, but the super().init() call after the branch still includes export_timeout_millis unconditionally. There is no equivalent guard for the timeout parameter in the super().init() call.

What the impact would be

Any user providing a custom span_exporter whose export() calls take longer than 5 seconds (or whatever LANGFUSE_TIMEOUT is set to) will have spans silently dropped. Because BatchSpanProcessor does not raise an exception or log a warning when it times out an exporter, this failure is invisible. The constructor docstring explicitly promises that Langfuse does not wire 'exporter timeout' into a custom exporter, so users have no reason to expect this constraint.

Step-by-step proof

  1. User calls Langfuse(span_exporter=MySlowExporter()) with no explicit timeout argument.
  2. client.py:272 sets timeout = int(os.environ.get(LANGFUSE_TIMEOUT, 5)) giving timeout=5.
  3. LangfuseSpanProcessor.init receives timeout=5 and span_exporter=MySlowExporter().
  4. The if span_exporter is None: block is skipped; MySlowExporter is used as-is.
  5. super().init(span_exporter=MySlowExporter(), export_timeout_millis=5000, ...) is called.
  6. BatchSpanProcessor is now configured to cancel any MySlowExporter.export() call after 5 seconds.
  7. If MySlowExporter.export() takes 6 seconds, the batch processor cancels it and the spans are lost with no warning.

How to fix it

Either (a) pass export_timeout_millis=None when a user-supplied exporter is detected, since the user is responsible for configuring timeout on their own exporter, or (b) update the constructor docstring to explicitly state that the Langfuse SDK timeout is still applied to the custom exporter via BatchSpanProcessor.export_timeout_millis.

Expand Down
30 changes: 30 additions & 0 deletions tests/test_additional_headers_simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,25 @@
This module tests that additional headers are properly configured in the HTTP clients.
"""

from typing import Sequence

import httpx
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult

from langfuse._client.client import Langfuse


class NoOpSpanExporter(SpanExporter):
"""Minimal exporter used to verify custom exporter injection."""

def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
return SpanExportResult.SUCCESS

def shutdown(self) -> None:
pass


class TestAdditionalHeadersSimple:
"""Simple test suite for additional_headers functionality."""

Expand Down Expand Up @@ -196,3 +210,19 @@ def test_span_processor_none_additional_headers_works(self):
assert "Authorization" in exporter._headers
assert "x-langfuse-sdk-name" in exporter._headers
assert "x-langfuse-public-key" in exporter._headers

def test_span_processor_uses_custom_span_exporter_when_provided(self):
"""Test that a custom exporter bypasses the default OTLP exporter construction."""
from langfuse._client.span_processor import LangfuseSpanProcessor

custom_exporter = NoOpSpanExporter()

processor = LangfuseSpanProcessor(
public_key="test-public-key",
secret_key="test-secret-key",
base_url="https://mock-host.com",
additional_headers={"X-Custom-Trace-Header": "trace-value"},
span_exporter=custom_exporter,
)

assert processor.span_exporter is custom_exporter
28 changes: 28 additions & 0 deletions tests/test_resource_manager.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,25 @@
"""Test the LangfuseResourceManager and get_client() function."""

from typing import Sequence

from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult

from langfuse import Langfuse
from langfuse._client.get_client import get_client
from langfuse._client.resource_manager import LangfuseResourceManager


class NoOpSpanExporter(SpanExporter):
"""Minimal exporter used to verify configuration propagation."""

def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
return SpanExportResult.SUCCESS

def shutdown(self) -> None:
pass


def test_get_client_preserves_all_settings():
"""Test that get_client() preserves environment and all client settings."""
with LangfuseResourceManager._lock:
Expand All @@ -13,14 +28,19 @@ def test_get_client_preserves_all_settings():
def should_export(span):
return span.name != "drop"

span_exporter = NoOpSpanExporter()

settings = {
"public_key": "pk-comprehensive",
"secret_key": "sk-comprehensive",
"environment": "test-env",
"release": "v1.2.3",
"timeout": 30,
"flush_at": 100,
"sample_rate": 0.8,
"should_export_span": should_export,
"additional_headers": {"X-Custom": "value"},
"span_exporter": span_exporter,
}

original_client = Langfuse(**settings)
Expand All @@ -36,6 +56,7 @@ def should_export(span):
assert rm.sample_rate == settings["sample_rate"]
assert rm.should_export_span is should_export
assert rm.additional_headers == settings["additional_headers"]
assert rm.span_exporter is span_exporter

original_client.shutdown()

Expand All @@ -49,6 +70,9 @@ def should_export_a(span):
def should_export_b(span):
return span.name.startswith("b")

exporter_a = NoOpSpanExporter()
exporter_b = NoOpSpanExporter()

# Settings for client A
settings_a = {
"public_key": "pk-comprehensive-a",
Comment on lines 70 to 78
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 The test test_get_client_multiple_clients_preserve_different_settings has two related test-quality issues: it omits the LangfuseResourceManager._instances.clear() guard that its sibling test uses, and it wraps all substantive assertions—including the new span_exporter identity checks—in an if retrieved_a._resources and retrieved_b._resources: guard that silently passes if the condition is ever False. Together these make the test fragile and its assertions potentially unexercised.

Extended reasoning...

Issue 1 – Missing singleton cleanup

The existing sibling test test_get_client_preserves_all_settings correctly starts with:

with LangfuseResourceManager._lock:
    LangfuseResourceManager._instances.clear()

The new test test_get_client_multiple_clients_preserve_different_settings has no such cleanup. LangfuseResourceManager.__new__ uses a singleton keyed by public_key, and shutdown() does NOT remove entries from _instances (it only calls atexit.unregister, flush, and _stop_and_join_consumer_threads). Only reset() clears _instances. If this test is retried in the same process (e.g., via pytest-rerunfailures, watch mode, or randomized ordering where test_get_client_preserves_all_settings has not already cleared state), the stale singleton for pk-comprehensive-a or pk-comprehensive-b is returned by Langfuse(**settings_a) instead of a fresh instance. The fresh exporter_a = NoOpSpanExporter() created in the retry is a different object than the one in the stale singleton, so assert retrieved_a._resources.span_exporter is exporter_a fails spuriously.

Concrete proof (retry scenario):

  1. First run: singleton for pk-comprehensive-a is created, stores exporter_a_run1. Test passes.
  2. client_a.shutdown() is called — does NOT remove pk-comprehensive-a from _instances.
  3. Retry (same process): exporter_a_run2 = NoOpSpanExporter() is a new object. Langfuse(**settings_a) returns the cached singleton with exporter_a_run1. assert retrieved_a._resources.span_exporter is exporter_a_run2 → AssertionError.

Issue 2 – if-guard silently skips all substantive assertions

All substantive assertions, including the new PR-added span_exporter checks (lines 120-121), are wrapped in:

if retrieved_a._resources and retrieved_b._resources:
    assert retrieved_a._resources.timeout == settings_a["timeout"]
    ...
    assert retrieved_a._resources.span_exporter is exporter_a
    assert retrieved_b._resources.span_exporter is exporter_b

If this condition evaluates to False for any reason, the entire block is silently skipped and the test passes green without verifying anything.

Two refutations argued that _resources is always set unconditionally in Langfuse.__init__ and LangfuseResourceManager has no __bool__/__len__, so the guard always evaluates to True. This is currently correct under normal conditions. However: (a) there is no explicit assertion before the if-block guaranteeing _resources is not None, making the guard a latent silent-failure risk for future refactors; (b) the combination with the missing singleton cleanup above creates a scenario where stale state could cause get_client() to return an unexpected client object, and the if-guard would then mask that. The sibling test avoids this pattern entirely by using unconditional assertions. This inconsistency is itself a test-quality defect.

Recommended fix:

  1. Add with LangfuseResourceManager._lock: LangfuseResourceManager._instances.clear() at the top of test_get_client_multiple_clients_preserve_different_settings.
  2. Replace the if guard with assert retrieved_a._resources is not None and assert retrieved_b._resources is not None, followed by unconditional assertions — matching the pattern in the sibling test.

Expand All @@ -58,6 +82,7 @@ def should_export_b(span):
"timeout": 10,
"sample_rate": 0.5,
"should_export_span": should_export_a,
"span_exporter": exporter_a,
}

# Settings for client B
Expand All @@ -69,6 +94,7 @@ def should_export_b(span):
"timeout": 20,
"sample_rate": 0.9,
"should_export_span": should_export_b,
"span_exporter": exporter_b,
}

client_a = Langfuse(**settings_a)
Expand All @@ -91,6 +117,8 @@ def should_export_b(span):
assert retrieved_b._resources.release == settings_b["release"]
assert retrieved_a._resources.should_export_span is should_export_a
assert retrieved_b._resources.should_export_span is should_export_b
assert retrieved_a._resources.span_exporter is exporter_a
assert retrieved_b._resources.span_exporter is exporter_b

client_a.shutdown()
client_b.shutdown()
Loading