feat(client): add custom span exporter support#1618
Conversation
|
Found 5 test failures on Blacksmith runners: Failures
|
|
@claude review |
| span_exporter = OTLPSpanExporter( | ||
| endpoint=endpoint, | ||
| headers=headers, | ||
| timeout=timeout, | ||
| ) | ||
|
|
||
| super().__init__( | ||
| span_exporter=langfuse_span_exporter, | ||
| span_exporter=span_exporter, | ||
| export_timeout_millis=timeout * 1_000 if timeout else None, | ||
| max_export_batch_size=flush_at, | ||
| schedule_delay_millis=flush_interval * 1_000 |
There was a problem hiding this comment.
🔴 The export_timeout_millis parameter is always passed to BatchSpanProcessor even when a custom span_exporter is provided, silently capping the custom exporter's export() calls at the Langfuse SDK timeout (default 5 seconds). This contradicts the constructor docstring which states 'Langfuse does not wire base_url, exporter headers, exporter auth, or exporter timeout into it'. Users with custom exporters that need more than 5 seconds per batch will have spans silently dropped. Fix by skipping export_timeout_millis when a custom exporter is provided, or update the docs to explicitly warn about this constraint.
Extended reasoning...
What the bug is and how it manifests
In LangfuseSpanProcessor.init, the super().init() call unconditionally passes export_timeout_millis=timeout * 1_000 if timeout else None to BatchSpanProcessor regardless of whether the span_exporter argument was user-supplied or internally constructed. The BatchSpanProcessor uses this value as the maximum wait time for a single exporter.export() call — if the exporter does not finish within the limit, the batch processor times it out and silently drops the spans.
The specific code path that triggers it
In client.py:272, timeout is resolved with: timeout = timeout or int(os.environ.get(LANGFUSE_TIMEOUT, 5)). This means timeout is always a truthy integer (minimum 5) by the time it reaches LangfuseSpanProcessor. The conditional timeout * 1_000 if timeout else None therefore always evaluates to a non-None integer — at minimum 5000 ms. This value flows unconditionally into BatchSpanProcessor even when span_exporter is user-provided.
Why existing code does not prevent it
The if span_exporter is None: branch added by this PR correctly skips constructing the default OTLPSpanExporter when a custom exporter is provided, but the super().init() call after the branch still includes export_timeout_millis unconditionally. There is no equivalent guard for the timeout parameter in the super().init() call.
What the impact would be
Any user providing a custom span_exporter whose export() calls take longer than 5 seconds (or whatever LANGFUSE_TIMEOUT is set to) will have spans silently dropped. Because BatchSpanProcessor does not raise an exception or log a warning when it times out an exporter, this failure is invisible. The constructor docstring explicitly promises that Langfuse does not wire 'exporter timeout' into a custom exporter, so users have no reason to expect this constraint.
Step-by-step proof
- User calls Langfuse(span_exporter=MySlowExporter()) with no explicit timeout argument.
- client.py:272 sets timeout = int(os.environ.get(LANGFUSE_TIMEOUT, 5)) giving timeout=5.
- LangfuseSpanProcessor.init receives timeout=5 and span_exporter=MySlowExporter().
- The if span_exporter is None: block is skipped; MySlowExporter is used as-is.
- super().init(span_exporter=MySlowExporter(), export_timeout_millis=5000, ...) is called.
- BatchSpanProcessor is now configured to cancel any MySlowExporter.export() call after 5 seconds.
- If MySlowExporter.export() takes 6 seconds, the batch processor cancels it and the spans are lost with no warning.
How to fix it
Either (a) pass export_timeout_millis=None when a user-supplied exporter is detected, since the user is responsible for configuring timeout on their own exporter, or (b) update the constructor docstring to explicitly state that the Langfuse SDK timeout is still applied to the custom exporter via BatchSpanProcessor.export_timeout_millis.
| def should_export_b(span): | ||
| return span.name.startswith("b") | ||
|
|
||
| exporter_a = NoOpSpanExporter() | ||
| exporter_b = NoOpSpanExporter() | ||
|
|
||
| # Settings for client A | ||
| settings_a = { | ||
| "public_key": "pk-comprehensive-a", |
There was a problem hiding this comment.
🟡 The test test_get_client_multiple_clients_preserve_different_settings has two related test-quality issues: it omits the LangfuseResourceManager._instances.clear() guard that its sibling test uses, and it wraps all substantive assertions—including the new span_exporter identity checks—in an if retrieved_a._resources and retrieved_b._resources: guard that silently passes if the condition is ever False. Together these make the test fragile and its assertions potentially unexercised.
Extended reasoning...
Issue 1 – Missing singleton cleanup
The existing sibling test test_get_client_preserves_all_settings correctly starts with:
with LangfuseResourceManager._lock:
LangfuseResourceManager._instances.clear()The new test test_get_client_multiple_clients_preserve_different_settings has no such cleanup. LangfuseResourceManager.__new__ uses a singleton keyed by public_key, and shutdown() does NOT remove entries from _instances (it only calls atexit.unregister, flush, and _stop_and_join_consumer_threads). Only reset() clears _instances. If this test is retried in the same process (e.g., via pytest-rerunfailures, watch mode, or randomized ordering where test_get_client_preserves_all_settings has not already cleared state), the stale singleton for pk-comprehensive-a or pk-comprehensive-b is returned by Langfuse(**settings_a) instead of a fresh instance. The fresh exporter_a = NoOpSpanExporter() created in the retry is a different object than the one in the stale singleton, so assert retrieved_a._resources.span_exporter is exporter_a fails spuriously.
Concrete proof (retry scenario):
- First run: singleton for
pk-comprehensive-ais created, storesexporter_a_run1. Test passes. client_a.shutdown()is called — does NOT removepk-comprehensive-afrom_instances.- Retry (same process):
exporter_a_run2 = NoOpSpanExporter()is a new object.Langfuse(**settings_a)returns the cached singleton withexporter_a_run1.assert retrieved_a._resources.span_exporter is exporter_a_run2→ AssertionError.
Issue 2 – if-guard silently skips all substantive assertions
All substantive assertions, including the new PR-added span_exporter checks (lines 120-121), are wrapped in:
if retrieved_a._resources and retrieved_b._resources:
assert retrieved_a._resources.timeout == settings_a["timeout"]
...
assert retrieved_a._resources.span_exporter is exporter_a
assert retrieved_b._resources.span_exporter is exporter_bIf this condition evaluates to False for any reason, the entire block is silently skipped and the test passes green without verifying anything.
Two refutations argued that _resources is always set unconditionally in Langfuse.__init__ and LangfuseResourceManager has no __bool__/__len__, so the guard always evaluates to True. This is currently correct under normal conditions. However: (a) there is no explicit assertion before the if-block guaranteeing _resources is not None, making the guard a latent silent-failure risk for future refactors; (b) the combination with the missing singleton cleanup above creates a scenario where stale state could cause get_client() to return an unexpected client object, and the if-guard would then mask that. The sibling test avoids this pattern entirely by using unconditional assertions. This inconsistency is itself a test-quality defect.
Recommended fix:
- Add
with LangfuseResourceManager._lock: LangfuseResourceManager._instances.clear()at the top oftest_get_client_multiple_clients_preserve_different_settings. - Replace the
ifguard withassert retrieved_a._resources is not Noneandassert retrieved_b._resources is not None, followed by unconditional assertions — matching the pattern in the sibling test.
| self.blocked_instrumentation_scopes = blocked_instrumentation_scopes | ||
| self.should_export_span = should_export_span | ||
| self.additional_headers = additional_headers | ||
| self.span_exporter = span_exporter | ||
| self.tracer_provider: Optional[TracerProvider] = None | ||
|
|
||
| # OTEL Tracer |
There was a problem hiding this comment.
🟣 This is a pre-existing issue: LangfuseResourceManager.shutdown() never calls tracer_provider.shutdown(), so the full OTel lifecycle chain (TracerProvider.shutdown() → BatchSpanProcessor.shutdown() → SpanExporter.shutdown()) is never triggered. This PR amplifies the impact significantly by introducing custom span_exporter support as a first-class feature — users who implement cleanup logic in their exporter's shutdown() (closing file handles, flushing local buffers, releasing gRPC channels) will silently experience resource leaks when langfuse_client.shutdown() is called. Fix by calling tracer_provider.shutdown() from LangfuseResourceManager.shutdown(), or retain a reference to the LangfuseSpanProcessor and call processor.shutdown() directly.
Extended reasoning...
What the bug is and how it manifests
LangfuseResourceManager.shutdown() (resource_manager.py) calls self.flush(), which in turn calls self.tracer_provider.force_flush(), then calls _stop_and_join_consumer_threads(). Neither path ever calls tracer_provider.shutdown(). Since the OpenTelemetry SDK chains shutdown propagation — TracerProvider.shutdown() → BatchSpanProcessor.shutdown() → SpanExporter.shutdown() — any cleanup logic registered in a SpanExporter.shutdown() implementation is silently skipped.
The specific code path that triggers it
In resource_manager.py lines 181–187, shutdown() is:
def shutdown(self) -> None:
atexit.unregister(self.shutdown)
self.flush()
self._stop_and_join_consumer_threads()And flush() calls only tracer_provider.force_flush() (line ~440), never tracer_provider.shutdown(). The LangfuseSpanProcessor instance that wraps the exporter is created in _initialize_instance but no reference is retained on self — so there is no direct path to processor.shutdown() either.
Why existing code doesn't prevent it
force_flush() is explicitly not a substitute for shutdown(): it only blocks until queued spans are exported, but does not tear down the exporter or release its resources. The OTel spec defines shutdown() as the mechanism exporters use for cleanup (e.g., closing HTTP sessions, flushing file buffers, terminating gRPC channels). Because this method is never called, any exporter-level resources remain open until process exit — or not at all if the process exits abnormally.
Why this PR increases the impact
Before this PR the only exporter in use was the built-in OTLPSpanExporter, whose shutdown() just closes an HTTP session — a minor leak that the GC or process exit would eventually recover. This PR deliberately promotes custom SpanExporter instances as a first-class feature, with constructor docs telling users to configure the exporter themselves. The implicit contract is that Langfuse manages the exporter lifecycle; the docs say nothing about users needing to call shutdown() on their own exporter. Users following the documented pattern who implement meaningful cleanup in shutdown() (flush-to-disk, close a DB connection, drain a local queue) will silently get resource leaks.
Step-by-step proof
- User creates
class FileWritingExporter(SpanExporter)that opens a file in__init__and callsfile.flush(); file.close()inshutdown(). - User passes
Langfuse(span_exporter=FileWritingExporter()). _initialize_instancestores it asself.span_exporter(line 184) and passes it intoLangfuseSpanProcessor.__init__, which hands it toBatchSpanProcessor.__init__.- User calls
langfuse_client.shutdown(). LangfuseResourceManager.shutdown()→flush()→tracer_provider.force_flush()— spans are exported, buttracer_provider.shutdown()is never called.BatchSpanProcessor.shutdown()is never called →FileWritingExporter.shutdown()is never called → the file is never flushed/closed → data loss or file descriptor leak.
How to fix it
Option A (simplest): call tracer_provider.shutdown() from LangfuseResourceManager.shutdown() after flush(). This propagates correctly through the OTel chain. Care should be taken when the tracer_provider is user-supplied (the tracer_provider= parameter), since the user may not expect Langfuse to shut it down entirely — a guard like if self._owns_tracer_provider would make this safe.
Option B: retain a reference to the LangfuseSpanProcessor on self during _initialize_instance and call self._langfuse_processor.shutdown() explicitly in LangfuseResourceManager.shutdown(). This is more surgical and avoids touching user-provided TracerProviders.
Summary
Allow callers to pass a custom OpenTelemetry span exporter into
Langfuse(...)so the internal Langfuse span processor can use that exporter instead of always constructing its ownOTLPSpanExporter.What Changed
span_exporterparameter toLangfuseLangfuseResourceManagerandget_client()LangfuseSpanProcessorto use a caller-provided exporter when present and keep the existing OTLP exporter as the default pathget_client()Why
The span processor currently hard-codes creation of an
OTLPSpanExporter, which makes it hard for users to customize export behavior while still using the Langfuse initialization path.User Impact
Users who need a custom exporter can now pass one directly to
Langfuse(span_exporter=...).When a custom exporter is supplied, Langfuse does not auto-wire exporter transport settings into it, so users must configure those on the exporter instance itself. The constructor docs also call out that for Langfuse v4 or Langfuse Cloud Fast Preview, users should set
x-langfuse-ingestion-version=4on their exporter to get real time processing.Validation
uv run pytest tests/test_additional_headers_simple.py tests/test_resource_manager.pyuv run ruff check langfuse/_client/client.pyDisclaimer: Experimental PR review
Greptile Summary
This PR adds an optional
span_exporterparameter toLangfuse(...), threads it throughLangfuseResourceManagerandLangfuseSpanProcessor, and uses it in place of the defaultOTLPSpanExporterwhen provided. The change is well-scoped: the default exporter path is completely unchanged, andget_client()correctly preserves the custom exporter when reconstructing a client from a cached resource manager instance.Confidence Score: 5/5
Safe to merge; all remaining findings are P2 style suggestions that do not affect runtime behavior.
The core implementation is correct — the custom exporter is properly threaded through the stack, the default path is untouched, and get_client() round-trips the setting faithfully. The two open comments are a module-level import rule violation in a test file and a NoOpSpanExporter duplication across test files, neither of which affects production code.
tests/test_additional_headers_simple.py — import inside method body and duplicated helper class.
Important Files Changed
span_exporterparameter; skips OTLP exporter construction when a custom exporter is provided and delegates it toBatchSpanProcessor. Logic is correct and the default path is unchanged.span_exporterthrough__new__,_initialize_instance, and stores it asself.span_exporterfor downstream access. No issues.span_exporterconstructor parameter with documentation; forwards it toLangfuseResourceManager. Clean and well-documented.span_exporter=instance.span_exporterto_create_client_from_instanceso the custom exporter is preserved whenget_client()reconstructs a client from a cached resource manager.NoOpSpanExporterhelper and a test verifying custom exporter injection; import inside method body violates the project's module-level import rule, and the helper is duplicated intest_resource_manager.py.span_exporter;NoOpSpanExporteris duplicated fromtest_additional_headers_simple.pybut tests are otherwise correct and complete.Flowchart
%%{init: {'theme': 'neutral'}}%% flowchart TD A["Langfuse(span_exporter=...)"] -->|"span_exporter"| B["LangfuseResourceManager.__new__"] B -->|"stores self.span_exporter"| C["_initialize_instance"] C -->|"span_exporter"| D["LangfuseSpanProcessor.__init__"] D -->|"span_exporter is None?"| E{Custom exporter?} E -->|"No (default path)"| F["Create OTLPSpanExporter\n(endpoint, auth, headers, timeout)"] E -->|"Yes"| G["Use caller-provided exporter\n(no auto-wiring of headers/auth)"] F --> H["BatchSpanProcessor.super().__init__()"] G --> H H --> I["Attached to TracerProvider"] B -->|"instance cached by public_key"| J["get_client()"] J -->|"instance.span_exporter"| K["_create_client_from_instance"] K -->|"span_exporter preserved"| AReviews (1): Last reviewed commit: "add custom span exporter support" | Re-trigger Greptile