-
Notifications
You must be signed in to change notification settings - Fork 260
feat(client): add custom span exporter support #1618
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,7 +19,7 @@ | |
| from opentelemetry.context import Context | ||
| from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter | ||
| from opentelemetry.sdk.trace import ReadableSpan, Span | ||
| from opentelemetry.sdk.trace.export import BatchSpanProcessor | ||
| from opentelemetry.sdk.trace.export import BatchSpanProcessor, SpanExporter | ||
| from opentelemetry.trace import format_span_id | ||
|
|
||
| from langfuse._client.environment_variables import ( | ||
|
|
@@ -63,6 +63,7 @@ def __init__( | |
| blocked_instrumentation_scopes: Optional[List[str]] = None, | ||
| should_export_span: Optional[Callable[[ReadableSpan], bool]] = None, | ||
| additional_headers: Optional[Dict[str, str]] = None, | ||
| span_exporter: Optional[SpanExporter] = None, | ||
| ): | ||
| self.public_key = public_key | ||
| self.blocked_instrumentation_scopes = ( | ||
|
|
@@ -82,37 +83,38 @@ def __init__( | |
| else None | ||
| ) | ||
|
|
||
| basic_auth_header = "Basic " + base64.b64encode( | ||
| f"{public_key}:{secret_key}".encode("utf-8") | ||
| ).decode("ascii") | ||
| if span_exporter is None: | ||
| basic_auth_header = "Basic " + base64.b64encode( | ||
| f"{public_key}:{secret_key}".encode("utf-8") | ||
| ).decode("ascii") | ||
|
|
||
| # Prepare default headers | ||
| default_headers = { | ||
| "Authorization": basic_auth_header, | ||
| "x-langfuse-sdk-name": "python", | ||
| "x-langfuse-sdk-version": langfuse_version, | ||
| "x-langfuse-public-key": public_key, | ||
| } | ||
| # Prepare default headers | ||
| default_headers = { | ||
| "Authorization": basic_auth_header, | ||
| "x-langfuse-sdk-name": "python", | ||
| "x-langfuse-sdk-version": langfuse_version, | ||
| "x-langfuse-public-key": public_key, | ||
| } | ||
|
|
||
| # Merge additional headers if provided | ||
| headers = {**default_headers, **(additional_headers or {})} | ||
| # Merge additional headers if provided | ||
| headers = {**default_headers, **(additional_headers or {})} | ||
|
|
||
| traces_export_path = os.environ.get(LANGFUSE_OTEL_TRACES_EXPORT_PATH, None) | ||
| traces_export_path = os.environ.get(LANGFUSE_OTEL_TRACES_EXPORT_PATH, None) | ||
|
|
||
| endpoint = ( | ||
| f"{base_url}/{traces_export_path}" | ||
| if traces_export_path | ||
| else f"{base_url}/api/public/otel/v1/traces" | ||
| ) | ||
| endpoint = ( | ||
| f"{base_url}/{traces_export_path}" | ||
| if traces_export_path | ||
| else f"{base_url}/api/public/otel/v1/traces" | ||
| ) | ||
|
|
||
| langfuse_span_exporter = OTLPSpanExporter( | ||
| endpoint=endpoint, | ||
| headers=headers, | ||
| timeout=timeout, | ||
| ) | ||
| span_exporter = OTLPSpanExporter( | ||
| endpoint=endpoint, | ||
| headers=headers, | ||
| timeout=timeout, | ||
| ) | ||
|
|
||
| super().__init__( | ||
| span_exporter=langfuse_span_exporter, | ||
| span_exporter=span_exporter, | ||
| export_timeout_millis=timeout * 1_000 if timeout else None, | ||
| max_export_batch_size=flush_at, | ||
| schedule_delay_millis=flush_interval * 1_000 | ||
|
Comment on lines
+110
to
120
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔴 The export_timeout_millis parameter is always passed to BatchSpanProcessor even when a custom span_exporter is provided, silently capping the custom exporter's export() calls at the Langfuse SDK timeout (default 5 seconds). This contradicts the constructor docstring which states 'Langfuse does not wire base_url, exporter headers, exporter auth, or exporter timeout into it'. Users with custom exporters that need more than 5 seconds per batch will have spans silently dropped. Fix by skipping export_timeout_millis when a custom exporter is provided, or update the docs to explicitly warn about this constraint. Extended reasoning...What the bug is and how it manifests In LangfuseSpanProcessor.init, the super().init() call unconditionally passes export_timeout_millis=timeout * 1_000 if timeout else None to BatchSpanProcessor regardless of whether the span_exporter argument was user-supplied or internally constructed. The BatchSpanProcessor uses this value as the maximum wait time for a single exporter.export() call — if the exporter does not finish within the limit, the batch processor times it out and silently drops the spans. The specific code path that triggers it In client.py:272, timeout is resolved with: timeout = timeout or int(os.environ.get(LANGFUSE_TIMEOUT, 5)). This means timeout is always a truthy integer (minimum 5) by the time it reaches LangfuseSpanProcessor. The conditional timeout * 1_000 if timeout else None therefore always evaluates to a non-None integer — at minimum 5000 ms. This value flows unconditionally into BatchSpanProcessor even when span_exporter is user-provided. Why existing code does not prevent it The if span_exporter is None: branch added by this PR correctly skips constructing the default OTLPSpanExporter when a custom exporter is provided, but the super().init() call after the branch still includes export_timeout_millis unconditionally. There is no equivalent guard for the timeout parameter in the super().init() call. What the impact would be Any user providing a custom span_exporter whose export() calls take longer than 5 seconds (or whatever LANGFUSE_TIMEOUT is set to) will have spans silently dropped. Because BatchSpanProcessor does not raise an exception or log a warning when it times out an exporter, this failure is invisible. The constructor docstring explicitly promises that Langfuse does not wire 'exporter timeout' into a custom exporter, so users have no reason to expect this constraint. Step-by-step proof
How to fix it Either (a) pass export_timeout_millis=None when a user-supplied exporter is detected, since the user is responsible for configuring timeout on their own exporter, or (b) update the constructor docstring to explicitly state that the Langfuse SDK timeout is still applied to the custom exporter via BatchSpanProcessor.export_timeout_millis. |
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,10 +1,25 @@ | ||
| """Test the LangfuseResourceManager and get_client() function.""" | ||
|
|
||
| from typing import Sequence | ||
|
|
||
| from opentelemetry.sdk.trace import ReadableSpan | ||
| from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult | ||
|
|
||
| from langfuse import Langfuse | ||
| from langfuse._client.get_client import get_client | ||
| from langfuse._client.resource_manager import LangfuseResourceManager | ||
|
|
||
|
|
||
| class NoOpSpanExporter(SpanExporter): | ||
| """Minimal exporter used to verify configuration propagation.""" | ||
|
|
||
| def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: | ||
| return SpanExportResult.SUCCESS | ||
|
|
||
| def shutdown(self) -> None: | ||
| pass | ||
|
|
||
|
|
||
| def test_get_client_preserves_all_settings(): | ||
| """Test that get_client() preserves environment and all client settings.""" | ||
| with LangfuseResourceManager._lock: | ||
|
|
@@ -13,14 +28,19 @@ def test_get_client_preserves_all_settings(): | |
| def should_export(span): | ||
| return span.name != "drop" | ||
|
|
||
| span_exporter = NoOpSpanExporter() | ||
|
|
||
| settings = { | ||
| "public_key": "pk-comprehensive", | ||
| "secret_key": "sk-comprehensive", | ||
| "environment": "test-env", | ||
| "release": "v1.2.3", | ||
| "timeout": 30, | ||
| "flush_at": 100, | ||
| "sample_rate": 0.8, | ||
| "should_export_span": should_export, | ||
| "additional_headers": {"X-Custom": "value"}, | ||
| "span_exporter": span_exporter, | ||
| } | ||
|
|
||
| original_client = Langfuse(**settings) | ||
|
|
@@ -36,6 +56,7 @@ def should_export(span): | |
| assert rm.sample_rate == settings["sample_rate"] | ||
| assert rm.should_export_span is should_export | ||
| assert rm.additional_headers == settings["additional_headers"] | ||
| assert rm.span_exporter is span_exporter | ||
|
|
||
| original_client.shutdown() | ||
|
|
||
|
|
@@ -49,6 +70,9 @@ def should_export_a(span): | |
| def should_export_b(span): | ||
| return span.name.startswith("b") | ||
|
|
||
| exporter_a = NoOpSpanExporter() | ||
| exporter_b = NoOpSpanExporter() | ||
|
|
||
| # Settings for client A | ||
| settings_a = { | ||
| "public_key": "pk-comprehensive-a", | ||
|
Comment on lines
70
to
78
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 The test Extended reasoning...Issue 1 – Missing singleton cleanup The existing sibling test with LangfuseResourceManager._lock:
LangfuseResourceManager._instances.clear()The new test Concrete proof (retry scenario):
Issue 2 – if-guard silently skips all substantive assertions All substantive assertions, including the new PR-added if retrieved_a._resources and retrieved_b._resources:
assert retrieved_a._resources.timeout == settings_a["timeout"]
...
assert retrieved_a._resources.span_exporter is exporter_a
assert retrieved_b._resources.span_exporter is exporter_bIf this condition evaluates to False for any reason, the entire block is silently skipped and the test passes green without verifying anything. Two refutations argued that Recommended fix:
|
||
|
|
@@ -58,6 +82,7 @@ def should_export_b(span): | |
| "timeout": 10, | ||
| "sample_rate": 0.5, | ||
| "should_export_span": should_export_a, | ||
| "span_exporter": exporter_a, | ||
| } | ||
|
|
||
| # Settings for client B | ||
|
|
@@ -69,6 +94,7 @@ def should_export_b(span): | |
| "timeout": 20, | ||
| "sample_rate": 0.9, | ||
| "should_export_span": should_export_b, | ||
| "span_exporter": exporter_b, | ||
| } | ||
|
|
||
| client_a = Langfuse(**settings_a) | ||
|
|
@@ -91,6 +117,8 @@ def should_export_b(span): | |
| assert retrieved_b._resources.release == settings_b["release"] | ||
| assert retrieved_a._resources.should_export_span is should_export_a | ||
| assert retrieved_b._resources.should_export_span is should_export_b | ||
| assert retrieved_a._resources.span_exporter is exporter_a | ||
| assert retrieved_b._resources.span_exporter is exporter_b | ||
|
|
||
| client_a.shutdown() | ||
| client_b.shutdown() | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🟣 This is a pre-existing issue:
LangfuseResourceManager.shutdown()never callstracer_provider.shutdown(), so the full OTel lifecycle chain (TracerProvider.shutdown()→BatchSpanProcessor.shutdown()→SpanExporter.shutdown()) is never triggered. This PR amplifies the impact significantly by introducing customspan_exportersupport as a first-class feature — users who implement cleanup logic in their exporter'sshutdown()(closing file handles, flushing local buffers, releasing gRPC channels) will silently experience resource leaks whenlangfuse_client.shutdown()is called. Fix by callingtracer_provider.shutdown()fromLangfuseResourceManager.shutdown(), or retain a reference to theLangfuseSpanProcessorand callprocessor.shutdown()directly.Extended reasoning...
What the bug is and how it manifests
LangfuseResourceManager.shutdown()(resource_manager.py) callsself.flush(), which in turn callsself.tracer_provider.force_flush(), then calls_stop_and_join_consumer_threads(). Neither path ever callstracer_provider.shutdown(). Since the OpenTelemetry SDK chains shutdown propagation —TracerProvider.shutdown()→BatchSpanProcessor.shutdown()→SpanExporter.shutdown()— any cleanup logic registered in aSpanExporter.shutdown()implementation is silently skipped.The specific code path that triggers it
In
resource_manager.pylines 181–187,shutdown()is:And
flush()calls onlytracer_provider.force_flush()(line ~440), nevertracer_provider.shutdown(). TheLangfuseSpanProcessorinstance that wraps the exporter is created in_initialize_instancebut no reference is retained onself— so there is no direct path toprocessor.shutdown()either.Why existing code doesn't prevent it
force_flush()is explicitly not a substitute forshutdown(): it only blocks until queued spans are exported, but does not tear down the exporter or release its resources. The OTel spec definesshutdown()as the mechanism exporters use for cleanup (e.g., closing HTTP sessions, flushing file buffers, terminating gRPC channels). Because this method is never called, any exporter-level resources remain open until process exit — or not at all if the process exits abnormally.Why this PR increases the impact
Before this PR the only exporter in use was the built-in
OTLPSpanExporter, whoseshutdown()just closes an HTTP session — a minor leak that the GC or process exit would eventually recover. This PR deliberately promotes customSpanExporterinstances as a first-class feature, with constructor docs telling users to configure the exporter themselves. The implicit contract is that Langfuse manages the exporter lifecycle; the docs say nothing about users needing to callshutdown()on their own exporter. Users following the documented pattern who implement meaningful cleanup inshutdown()(flush-to-disk, close a DB connection, drain a local queue) will silently get resource leaks.Step-by-step proof
class FileWritingExporter(SpanExporter)that opens a file in__init__and callsfile.flush(); file.close()inshutdown().Langfuse(span_exporter=FileWritingExporter())._initialize_instancestores it asself.span_exporter(line 184) and passes it intoLangfuseSpanProcessor.__init__, which hands it toBatchSpanProcessor.__init__.langfuse_client.shutdown().LangfuseResourceManager.shutdown()→flush()→tracer_provider.force_flush()— spans are exported, buttracer_provider.shutdown()is never called.BatchSpanProcessor.shutdown()is never called →FileWritingExporter.shutdown()is never called → the file is never flushed/closed → data loss or file descriptor leak.How to fix it
Option A (simplest): call
tracer_provider.shutdown()fromLangfuseResourceManager.shutdown()afterflush(). This propagates correctly through the OTel chain. Care should be taken when thetracer_provideris user-supplied (thetracer_provider=parameter), since the user may not expect Langfuse to shut it down entirely — a guard likeif self._owns_tracer_providerwould make this safe.Option B: retain a reference to the
LangfuseSpanProcessoronselfduring_initialize_instanceand callself._langfuse_processor.shutdown()explicitly inLangfuseResourceManager.shutdown(). This is more surgical and avoids touching user-provided TracerProviders.