Skip to content

feat(client): add custom span exporter support#1618

Merged
hassiebp merged 1 commit intomainfrom
codex/custom-span-exporter
Apr 10, 2026
Merged

feat(client): add custom span exporter support#1618
hassiebp merged 1 commit intomainfrom
codex/custom-span-exporter

Conversation

@hassiebp
Copy link
Copy Markdown
Contributor

@hassiebp hassiebp commented Apr 10, 2026

Summary

Allow callers to pass a custom OpenTelemetry span exporter into Langfuse(...) so the internal Langfuse span processor can use that exporter instead of always constructing its own OTLPSpanExporter.

What Changed

  • added an optional span_exporter parameter to Langfuse
  • threaded that setting through LangfuseResourceManager and get_client()
  • updated LangfuseSpanProcessor to use a caller-provided exporter when present and keep the existing OTLP exporter as the default path
  • documented in the constructor docs that custom exporters must be configured directly for endpoint, headers, auth, and timeout
  • added tests covering custom exporter injection and preservation across get_client()

Why

The span processor currently hard-codes creation of an OTLPSpanExporter, which makes it hard for users to customize export behavior while still using the Langfuse initialization path.

User Impact

Users who need a custom exporter can now pass one directly to Langfuse(span_exporter=...).

When a custom exporter is supplied, Langfuse does not auto-wire exporter transport settings into it, so users must configure those on the exporter instance itself. The constructor docs also call out that for Langfuse v4 or Langfuse Cloud Fast Preview, users should set x-langfuse-ingestion-version=4 on their exporter to get real time processing.

Validation

  • uv run pytest tests/test_additional_headers_simple.py tests/test_resource_manager.py
  • uv run ruff check langfuse/_client/client.py

Disclaimer: Experimental PR review

Greptile Summary

This PR adds an optional span_exporter parameter to Langfuse(...), threads it through LangfuseResourceManager and LangfuseSpanProcessor, and uses it in place of the default OTLPSpanExporter when provided. The change is well-scoped: the default exporter path is completely unchanged, and get_client() correctly preserves the custom exporter when reconstructing a client from a cached resource manager instance.

Confidence Score: 5/5

Safe to merge; all remaining findings are P2 style suggestions that do not affect runtime behavior.

The core implementation is correct — the custom exporter is properly threaded through the stack, the default path is untouched, and get_client() round-trips the setting faithfully. The two open comments are a module-level import rule violation in a test file and a NoOpSpanExporter duplication across test files, neither of which affects production code.

tests/test_additional_headers_simple.py — import inside method body and duplicated helper class.

Important Files Changed

Filename Overview
langfuse/_client/span_processor.py Adds optional span_exporter parameter; skips OTLP exporter construction when a custom exporter is provided and delegates it to BatchSpanProcessor. Logic is correct and the default path is unchanged.
langfuse/_client/resource_manager.py Threads span_exporter through __new__, _initialize_instance, and stores it as self.span_exporter for downstream access. No issues.
langfuse/_client/client.py Adds span_exporter constructor parameter with documentation; forwards it to LangfuseResourceManager. Clean and well-documented.
langfuse/_client/get_client.py Adds span_exporter=instance.span_exporter to _create_client_from_instance so the custom exporter is preserved when get_client() reconstructs a client from a cached resource manager.
tests/test_additional_headers_simple.py Adds NoOpSpanExporter helper and a test verifying custom exporter injection; import inside method body violates the project's module-level import rule, and the helper is duplicated in test_resource_manager.py.
tests/test_resource_manager.py Extends existing settings-preservation tests with span_exporter; NoOpSpanExporter is duplicated from test_additional_headers_simple.py but tests are otherwise correct and complete.

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    A["Langfuse(span_exporter=...)"] -->|"span_exporter"| B["LangfuseResourceManager.__new__"]
    B -->|"stores self.span_exporter"| C["_initialize_instance"]
    C -->|"span_exporter"| D["LangfuseSpanProcessor.__init__"]
    D -->|"span_exporter is None?"| E{Custom exporter?}
    E -->|"No (default path)"| F["Create OTLPSpanExporter\n(endpoint, auth, headers, timeout)"]
    E -->|"Yes"| G["Use caller-provided exporter\n(no auto-wiring of headers/auth)"]
    F --> H["BatchSpanProcessor.super().__init__()"]
    G --> H
    H --> I["Attached to TracerProvider"]
    B -->|"instance cached by public_key"| J["get_client()"]
    J -->|"instance.span_exporter"| K["_create_client_from_instance"]
    K -->|"span_exporter preserved"| A
Loading

Reviews (1): Last reviewed commit: "add custom span exporter support" | Re-trigger Greptile

@hassiebp hassiebp changed the title [codex] add custom span exporter support feat(client): add custom span exporter support Apr 10, 2026
@blacksmith-sh
Copy link
Copy Markdown

blacksmith-sh bot commented Apr 10, 2026

Found 5 test failures on Blacksmith runners:

Failures

Test View Logs
test_decorators/test_scoring_observations View Logs
test_decorators/test_scoring_observations View Logs
test_decorators/test_scoring_observations View Logs
test_decorators/test_scoring_observations View Logs
test_decorators/test_scoring_observations View Logs

Fix in Cursor

@hassiebp hassiebp marked this pull request as ready for review April 10, 2026 11:50
@github-actions
Copy link
Copy Markdown

@claude review

@hassiebp hassiebp merged commit fe09fd5 into main Apr 10, 2026
9 of 15 checks passed
@hassiebp hassiebp deleted the codex/custom-span-exporter branch April 10, 2026 11:54
Comment on lines +110 to 120
span_exporter = OTLPSpanExporter(
endpoint=endpoint,
headers=headers,
timeout=timeout,
)

super().__init__(
span_exporter=langfuse_span_exporter,
span_exporter=span_exporter,
export_timeout_millis=timeout * 1_000 if timeout else None,
max_export_batch_size=flush_at,
schedule_delay_millis=flush_interval * 1_000
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 The export_timeout_millis parameter is always passed to BatchSpanProcessor even when a custom span_exporter is provided, silently capping the custom exporter's export() calls at the Langfuse SDK timeout (default 5 seconds). This contradicts the constructor docstring which states 'Langfuse does not wire base_url, exporter headers, exporter auth, or exporter timeout into it'. Users with custom exporters that need more than 5 seconds per batch will have spans silently dropped. Fix by skipping export_timeout_millis when a custom exporter is provided, or update the docs to explicitly warn about this constraint.

Extended reasoning...

What the bug is and how it manifests

In LangfuseSpanProcessor.init, the super().init() call unconditionally passes export_timeout_millis=timeout * 1_000 if timeout else None to BatchSpanProcessor regardless of whether the span_exporter argument was user-supplied or internally constructed. The BatchSpanProcessor uses this value as the maximum wait time for a single exporter.export() call — if the exporter does not finish within the limit, the batch processor times it out and silently drops the spans.

The specific code path that triggers it

In client.py:272, timeout is resolved with: timeout = timeout or int(os.environ.get(LANGFUSE_TIMEOUT, 5)). This means timeout is always a truthy integer (minimum 5) by the time it reaches LangfuseSpanProcessor. The conditional timeout * 1_000 if timeout else None therefore always evaluates to a non-None integer — at minimum 5000 ms. This value flows unconditionally into BatchSpanProcessor even when span_exporter is user-provided.

Why existing code does not prevent it

The if span_exporter is None: branch added by this PR correctly skips constructing the default OTLPSpanExporter when a custom exporter is provided, but the super().init() call after the branch still includes export_timeout_millis unconditionally. There is no equivalent guard for the timeout parameter in the super().init() call.

What the impact would be

Any user providing a custom span_exporter whose export() calls take longer than 5 seconds (or whatever LANGFUSE_TIMEOUT is set to) will have spans silently dropped. Because BatchSpanProcessor does not raise an exception or log a warning when it times out an exporter, this failure is invisible. The constructor docstring explicitly promises that Langfuse does not wire 'exporter timeout' into a custom exporter, so users have no reason to expect this constraint.

Step-by-step proof

  1. User calls Langfuse(span_exporter=MySlowExporter()) with no explicit timeout argument.
  2. client.py:272 sets timeout = int(os.environ.get(LANGFUSE_TIMEOUT, 5)) giving timeout=5.
  3. LangfuseSpanProcessor.init receives timeout=5 and span_exporter=MySlowExporter().
  4. The if span_exporter is None: block is skipped; MySlowExporter is used as-is.
  5. super().init(span_exporter=MySlowExporter(), export_timeout_millis=5000, ...) is called.
  6. BatchSpanProcessor is now configured to cancel any MySlowExporter.export() call after 5 seconds.
  7. If MySlowExporter.export() takes 6 seconds, the batch processor cancels it and the spans are lost with no warning.

How to fix it

Either (a) pass export_timeout_millis=None when a user-supplied exporter is detected, since the user is responsible for configuring timeout on their own exporter, or (b) update the constructor docstring to explicitly state that the Langfuse SDK timeout is still applied to the custom exporter via BatchSpanProcessor.export_timeout_millis.

Comment on lines 70 to 78
def should_export_b(span):
return span.name.startswith("b")

exporter_a = NoOpSpanExporter()
exporter_b = NoOpSpanExporter()

# Settings for client A
settings_a = {
"public_key": "pk-comprehensive-a",
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 The test test_get_client_multiple_clients_preserve_different_settings has two related test-quality issues: it omits the LangfuseResourceManager._instances.clear() guard that its sibling test uses, and it wraps all substantive assertions—including the new span_exporter identity checks—in an if retrieved_a._resources and retrieved_b._resources: guard that silently passes if the condition is ever False. Together these make the test fragile and its assertions potentially unexercised.

Extended reasoning...

Issue 1 – Missing singleton cleanup

The existing sibling test test_get_client_preserves_all_settings correctly starts with:

with LangfuseResourceManager._lock:
    LangfuseResourceManager._instances.clear()

The new test test_get_client_multiple_clients_preserve_different_settings has no such cleanup. LangfuseResourceManager.__new__ uses a singleton keyed by public_key, and shutdown() does NOT remove entries from _instances (it only calls atexit.unregister, flush, and _stop_and_join_consumer_threads). Only reset() clears _instances. If this test is retried in the same process (e.g., via pytest-rerunfailures, watch mode, or randomized ordering where test_get_client_preserves_all_settings has not already cleared state), the stale singleton for pk-comprehensive-a or pk-comprehensive-b is returned by Langfuse(**settings_a) instead of a fresh instance. The fresh exporter_a = NoOpSpanExporter() created in the retry is a different object than the one in the stale singleton, so assert retrieved_a._resources.span_exporter is exporter_a fails spuriously.

Concrete proof (retry scenario):

  1. First run: singleton for pk-comprehensive-a is created, stores exporter_a_run1. Test passes.
  2. client_a.shutdown() is called — does NOT remove pk-comprehensive-a from _instances.
  3. Retry (same process): exporter_a_run2 = NoOpSpanExporter() is a new object. Langfuse(**settings_a) returns the cached singleton with exporter_a_run1. assert retrieved_a._resources.span_exporter is exporter_a_run2 → AssertionError.

Issue 2 – if-guard silently skips all substantive assertions

All substantive assertions, including the new PR-added span_exporter checks (lines 120-121), are wrapped in:

if retrieved_a._resources and retrieved_b._resources:
    assert retrieved_a._resources.timeout == settings_a["timeout"]
    ...
    assert retrieved_a._resources.span_exporter is exporter_a
    assert retrieved_b._resources.span_exporter is exporter_b

If this condition evaluates to False for any reason, the entire block is silently skipped and the test passes green without verifying anything.

Two refutations argued that _resources is always set unconditionally in Langfuse.__init__ and LangfuseResourceManager has no __bool__/__len__, so the guard always evaluates to True. This is currently correct under normal conditions. However: (a) there is no explicit assertion before the if-block guaranteeing _resources is not None, making the guard a latent silent-failure risk for future refactors; (b) the combination with the missing singleton cleanup above creates a scenario where stale state could cause get_client() to return an unexpected client object, and the if-guard would then mask that. The sibling test avoids this pattern entirely by using unconditional assertions. This inconsistency is itself a test-quality defect.

Recommended fix:

  1. Add with LangfuseResourceManager._lock: LangfuseResourceManager._instances.clear() at the top of test_get_client_multiple_clients_preserve_different_settings.
  2. Replace the if guard with assert retrieved_a._resources is not None and assert retrieved_b._resources is not None, followed by unconditional assertions — matching the pattern in the sibling test.

Comment on lines 181 to 187
self.blocked_instrumentation_scopes = blocked_instrumentation_scopes
self.should_export_span = should_export_span
self.additional_headers = additional_headers
self.span_exporter = span_exporter
self.tracer_provider: Optional[TracerProvider] = None

# OTEL Tracer
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟣 This is a pre-existing issue: LangfuseResourceManager.shutdown() never calls tracer_provider.shutdown(), so the full OTel lifecycle chain (TracerProvider.shutdown()BatchSpanProcessor.shutdown()SpanExporter.shutdown()) is never triggered. This PR amplifies the impact significantly by introducing custom span_exporter support as a first-class feature — users who implement cleanup logic in their exporter's shutdown() (closing file handles, flushing local buffers, releasing gRPC channels) will silently experience resource leaks when langfuse_client.shutdown() is called. Fix by calling tracer_provider.shutdown() from LangfuseResourceManager.shutdown(), or retain a reference to the LangfuseSpanProcessor and call processor.shutdown() directly.

Extended reasoning...

What the bug is and how it manifests

LangfuseResourceManager.shutdown() (resource_manager.py) calls self.flush(), which in turn calls self.tracer_provider.force_flush(), then calls _stop_and_join_consumer_threads(). Neither path ever calls tracer_provider.shutdown(). Since the OpenTelemetry SDK chains shutdown propagation — TracerProvider.shutdown()BatchSpanProcessor.shutdown()SpanExporter.shutdown() — any cleanup logic registered in a SpanExporter.shutdown() implementation is silently skipped.

The specific code path that triggers it

In resource_manager.py lines 181–187, shutdown() is:

def shutdown(self) -> None:
    atexit.unregister(self.shutdown)
    self.flush()
    self._stop_and_join_consumer_threads()

And flush() calls only tracer_provider.force_flush() (line ~440), never tracer_provider.shutdown(). The LangfuseSpanProcessor instance that wraps the exporter is created in _initialize_instance but no reference is retained on self — so there is no direct path to processor.shutdown() either.

Why existing code doesn't prevent it

force_flush() is explicitly not a substitute for shutdown(): it only blocks until queued spans are exported, but does not tear down the exporter or release its resources. The OTel spec defines shutdown() as the mechanism exporters use for cleanup (e.g., closing HTTP sessions, flushing file buffers, terminating gRPC channels). Because this method is never called, any exporter-level resources remain open until process exit — or not at all if the process exits abnormally.

Why this PR increases the impact

Before this PR the only exporter in use was the built-in OTLPSpanExporter, whose shutdown() just closes an HTTP session — a minor leak that the GC or process exit would eventually recover. This PR deliberately promotes custom SpanExporter instances as a first-class feature, with constructor docs telling users to configure the exporter themselves. The implicit contract is that Langfuse manages the exporter lifecycle; the docs say nothing about users needing to call shutdown() on their own exporter. Users following the documented pattern who implement meaningful cleanup in shutdown() (flush-to-disk, close a DB connection, drain a local queue) will silently get resource leaks.

Step-by-step proof

  1. User creates class FileWritingExporter(SpanExporter) that opens a file in __init__ and calls file.flush(); file.close() in shutdown().
  2. User passes Langfuse(span_exporter=FileWritingExporter()).
  3. _initialize_instance stores it as self.span_exporter (line 184) and passes it into LangfuseSpanProcessor.__init__, which hands it to BatchSpanProcessor.__init__.
  4. User calls langfuse_client.shutdown().
  5. LangfuseResourceManager.shutdown()flush()tracer_provider.force_flush() — spans are exported, but tracer_provider.shutdown() is never called.
  6. BatchSpanProcessor.shutdown() is never called → FileWritingExporter.shutdown() is never called → the file is never flushed/closed → data loss or file descriptor leak.

How to fix it

Option A (simplest): call tracer_provider.shutdown() from LangfuseResourceManager.shutdown() after flush(). This propagates correctly through the OTel chain. Care should be taken when the tracer_provider is user-supplied (the tracer_provider= parameter), since the user may not expect Langfuse to shut it down entirely — a guard like if self._owns_tracer_provider would make this safe.

Option B: retain a reference to the LangfuseSpanProcessor on self during _initialize_instance and call self._langfuse_processor.shutdown() explicitly in LangfuseResourceManager.shutdown(). This is more surgical and avoids touching user-provided TracerProviders.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant