From 87c5258fc5968fd4b8c00d17401c24dfa842ab1f Mon Sep 17 00:00:00 2001 From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com> Date: Fri, 10 Apr 2026 12:51:51 +0200 Subject: [PATCH] add custom span exporter support --- langfuse/_client/client.py | 6 ++- langfuse/_client/get_client.py | 1 + langfuse/_client/resource_manager.py | 6 +++ langfuse/_client/span_processor.py | 52 +++++++++++++------------ tests/test_additional_headers_simple.py | 30 ++++++++++++++ tests/test_resource_manager.py | 28 +++++++++++++ 6 files changed, 97 insertions(+), 26 deletions(-) diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index 04d8fae2c..f0b469320 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -29,6 +29,7 @@ import httpx from opentelemetry import trace as otel_trace_api from opentelemetry.sdk.trace import ReadableSpan, TracerProvider +from opentelemetry.sdk.trace.export import SpanExporter from opentelemetry.sdk.trace.id_generator import RandomIdGenerator from opentelemetry.util._decorator import ( _AgnosticContextManager, @@ -179,8 +180,9 @@ class Langfuse: ) ``` should_export_span (Optional[Callable[[ReadableSpan], bool]]): Callback to decide whether to export a span. If omitted, Langfuse uses the default filter (Langfuse SDK spans, spans with `gen_ai.*` attributes, and known LLM instrumentation scopes). - additional_headers (Optional[Dict[str, str]]): Additional headers to include in all API requests and OTLPSpanExporter requests. These headers will be merged with default headers. Note: If httpx_client is provided, additional_headers must be set directly on your custom httpx_client as well. + additional_headers (Optional[Dict[str, str]]): Additional headers to include in all API requests and in the default OTLPSpanExporter requests. These headers will be merged with default headers. Note: If httpx_client is provided, additional_headers must be set directly on your custom httpx_client as well. If `span_exporter` is provided, these headers are not wired into that exporter and must be configured on the exporter instance directly. tracer_provider(Optional[TracerProvider]): OpenTelemetry TracerProvider to use for Langfuse. This can be useful to set to have disconnected tracing between Langfuse and other OpenTelemetry-span emitting libraries. Note: To track active spans, the context is still shared between TracerProviders. This may lead to broken trace trees. + span_exporter (Optional[SpanExporter]): Custom OpenTelemetry span exporter for the Langfuse span processor. If omitted, Langfuse creates an OTLPSpanExporter pointed at the Langfuse OTLP endpoint. If provided, Langfuse does not wire `base_url`, exporter headers, exporter auth, or exporter timeout into it. Configure endpoint, headers, and timeout on the exporter instance directly. If you are sending spans to Langfuse v4 or using Langfuse Cloud Fast Preview, include `x-langfuse-ingestion-version=4` on the exporter to enable real time processing of exported spans. Example: ```python @@ -244,6 +246,7 @@ def __init__( should_export_span: Optional[Callable[[ReadableSpan], bool]] = None, additional_headers: Optional[Dict[str, str]] = None, tracer_provider: Optional[TracerProvider] = None, + span_exporter: Optional[SpanExporter] = None, ): self._base_url = ( base_url @@ -340,6 +343,7 @@ def __init__( should_export_span=should_export_span, additional_headers=additional_headers, tracer_provider=tracer_provider, + span_exporter=span_exporter, ) self._mask = self._resources.mask diff --git a/langfuse/_client/get_client.py b/langfuse/_client/get_client.py index dd2ee4a29..e2ebab0ed 100644 --- a/langfuse/_client/get_client.py +++ b/langfuse/_client/get_client.py @@ -54,6 +54,7 @@ def _create_client_from_instance( should_export_span=instance.should_export_span, additional_headers=instance.additional_headers, tracer_provider=instance.tracer_provider, + span_exporter=instance.span_exporter, httpx_client=instance.httpx_client, ) diff --git a/langfuse/_client/resource_manager.py b/langfuse/_client/resource_manager.py index 55f23a782..6e277e0db 100644 --- a/langfuse/_client/resource_manager.py +++ b/langfuse/_client/resource_manager.py @@ -24,6 +24,7 @@ from opentelemetry import trace as otel_trace_api from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import ReadableSpan, TracerProvider +from opentelemetry.sdk.trace.export import SpanExporter from opentelemetry.sdk.trace.sampling import Decision, TraceIdRatioBased from opentelemetry.trace import Tracer @@ -98,6 +99,7 @@ def __new__( should_export_span: Optional[Callable[[ReadableSpan], bool]] = None, additional_headers: Optional[Dict[str, str]] = None, tracer_provider: Optional[TracerProvider] = None, + span_exporter: Optional[SpanExporter] = None, ) -> "LangfuseResourceManager": if public_key in cls._instances: return cls._instances[public_key] @@ -133,6 +135,7 @@ def __new__( should_export_span=should_export_span, additional_headers=additional_headers, tracer_provider=tracer_provider, + span_exporter=span_exporter, ) cls._instances[public_key] = instance @@ -159,6 +162,7 @@ def _initialize_instance( should_export_span: Optional[Callable[[ReadableSpan], bool]] = None, additional_headers: Optional[Dict[str, str]] = None, tracer_provider: Optional[TracerProvider] = None, + span_exporter: Optional[SpanExporter] = None, ) -> None: self.public_key = public_key self.secret_key = secret_key @@ -177,6 +181,7 @@ def _initialize_instance( self.blocked_instrumentation_scopes = blocked_instrumentation_scopes self.should_export_span = should_export_span self.additional_headers = additional_headers + self.span_exporter = span_exporter self.tracer_provider: Optional[TracerProvider] = None # OTEL Tracer @@ -196,6 +201,7 @@ def _initialize_instance( blocked_instrumentation_scopes=blocked_instrumentation_scopes, should_export_span=should_export_span, additional_headers=additional_headers, + span_exporter=span_exporter, ) tracer_provider.add_span_processor(langfuse_processor) diff --git a/langfuse/_client/span_processor.py b/langfuse/_client/span_processor.py index 1057137af..a684a8813 100644 --- a/langfuse/_client/span_processor.py +++ b/langfuse/_client/span_processor.py @@ -19,7 +19,7 @@ from opentelemetry.context import Context from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.trace import ReadableSpan, Span -from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.sdk.trace.export import BatchSpanProcessor, SpanExporter from opentelemetry.trace import format_span_id from langfuse._client.environment_variables import ( @@ -63,6 +63,7 @@ def __init__( blocked_instrumentation_scopes: Optional[List[str]] = None, should_export_span: Optional[Callable[[ReadableSpan], bool]] = None, additional_headers: Optional[Dict[str, str]] = None, + span_exporter: Optional[SpanExporter] = None, ): self.public_key = public_key self.blocked_instrumentation_scopes = ( @@ -82,37 +83,38 @@ def __init__( else None ) - basic_auth_header = "Basic " + base64.b64encode( - f"{public_key}:{secret_key}".encode("utf-8") - ).decode("ascii") + if span_exporter is None: + basic_auth_header = "Basic " + base64.b64encode( + f"{public_key}:{secret_key}".encode("utf-8") + ).decode("ascii") - # Prepare default headers - default_headers = { - "Authorization": basic_auth_header, - "x-langfuse-sdk-name": "python", - "x-langfuse-sdk-version": langfuse_version, - "x-langfuse-public-key": public_key, - } + # Prepare default headers + default_headers = { + "Authorization": basic_auth_header, + "x-langfuse-sdk-name": "python", + "x-langfuse-sdk-version": langfuse_version, + "x-langfuse-public-key": public_key, + } - # Merge additional headers if provided - headers = {**default_headers, **(additional_headers or {})} + # Merge additional headers if provided + headers = {**default_headers, **(additional_headers or {})} - traces_export_path = os.environ.get(LANGFUSE_OTEL_TRACES_EXPORT_PATH, None) + traces_export_path = os.environ.get(LANGFUSE_OTEL_TRACES_EXPORT_PATH, None) - endpoint = ( - f"{base_url}/{traces_export_path}" - if traces_export_path - else f"{base_url}/api/public/otel/v1/traces" - ) + endpoint = ( + f"{base_url}/{traces_export_path}" + if traces_export_path + else f"{base_url}/api/public/otel/v1/traces" + ) - langfuse_span_exporter = OTLPSpanExporter( - endpoint=endpoint, - headers=headers, - timeout=timeout, - ) + span_exporter = OTLPSpanExporter( + endpoint=endpoint, + headers=headers, + timeout=timeout, + ) super().__init__( - span_exporter=langfuse_span_exporter, + span_exporter=span_exporter, export_timeout_millis=timeout * 1_000 if timeout else None, max_export_batch_size=flush_at, schedule_delay_millis=flush_interval * 1_000 diff --git a/tests/test_additional_headers_simple.py b/tests/test_additional_headers_simple.py index 47cc765bd..dd843b35a 100644 --- a/tests/test_additional_headers_simple.py +++ b/tests/test_additional_headers_simple.py @@ -3,11 +3,25 @@ This module tests that additional headers are properly configured in the HTTP clients. """ +from typing import Sequence + import httpx +from opentelemetry.sdk.trace import ReadableSpan +from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult from langfuse._client.client import Langfuse +class NoOpSpanExporter(SpanExporter): + """Minimal exporter used to verify custom exporter injection.""" + + def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: + return SpanExportResult.SUCCESS + + def shutdown(self) -> None: + pass + + class TestAdditionalHeadersSimple: """Simple test suite for additional_headers functionality.""" @@ -196,3 +210,19 @@ def test_span_processor_none_additional_headers_works(self): assert "Authorization" in exporter._headers assert "x-langfuse-sdk-name" in exporter._headers assert "x-langfuse-public-key" in exporter._headers + + def test_span_processor_uses_custom_span_exporter_when_provided(self): + """Test that a custom exporter bypasses the default OTLP exporter construction.""" + from langfuse._client.span_processor import LangfuseSpanProcessor + + custom_exporter = NoOpSpanExporter() + + processor = LangfuseSpanProcessor( + public_key="test-public-key", + secret_key="test-secret-key", + base_url="https://mock-host.com", + additional_headers={"X-Custom-Trace-Header": "trace-value"}, + span_exporter=custom_exporter, + ) + + assert processor.span_exporter is custom_exporter diff --git a/tests/test_resource_manager.py b/tests/test_resource_manager.py index 72f9f7d7e..cce6446e9 100644 --- a/tests/test_resource_manager.py +++ b/tests/test_resource_manager.py @@ -1,10 +1,25 @@ """Test the LangfuseResourceManager and get_client() function.""" +from typing import Sequence + +from opentelemetry.sdk.trace import ReadableSpan +from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult + from langfuse import Langfuse from langfuse._client.get_client import get_client from langfuse._client.resource_manager import LangfuseResourceManager +class NoOpSpanExporter(SpanExporter): + """Minimal exporter used to verify configuration propagation.""" + + def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: + return SpanExportResult.SUCCESS + + def shutdown(self) -> None: + pass + + def test_get_client_preserves_all_settings(): """Test that get_client() preserves environment and all client settings.""" with LangfuseResourceManager._lock: @@ -13,7 +28,11 @@ def test_get_client_preserves_all_settings(): def should_export(span): return span.name != "drop" + span_exporter = NoOpSpanExporter() + settings = { + "public_key": "pk-comprehensive", + "secret_key": "sk-comprehensive", "environment": "test-env", "release": "v1.2.3", "timeout": 30, @@ -21,6 +40,7 @@ def should_export(span): "sample_rate": 0.8, "should_export_span": should_export, "additional_headers": {"X-Custom": "value"}, + "span_exporter": span_exporter, } original_client = Langfuse(**settings) @@ -36,6 +56,7 @@ def should_export(span): assert rm.sample_rate == settings["sample_rate"] assert rm.should_export_span is should_export assert rm.additional_headers == settings["additional_headers"] + assert rm.span_exporter is span_exporter original_client.shutdown() @@ -49,6 +70,9 @@ def should_export_a(span): def should_export_b(span): return span.name.startswith("b") + exporter_a = NoOpSpanExporter() + exporter_b = NoOpSpanExporter() + # Settings for client A settings_a = { "public_key": "pk-comprehensive-a", @@ -58,6 +82,7 @@ def should_export_b(span): "timeout": 10, "sample_rate": 0.5, "should_export_span": should_export_a, + "span_exporter": exporter_a, } # Settings for client B @@ -69,6 +94,7 @@ def should_export_b(span): "timeout": 20, "sample_rate": 0.9, "should_export_span": should_export_b, + "span_exporter": exporter_b, } client_a = Langfuse(**settings_a) @@ -91,6 +117,8 @@ def should_export_b(span): assert retrieved_b._resources.release == settings_b["release"] assert retrieved_a._resources.should_export_span is should_export_a assert retrieved_b._resources.should_export_span is should_export_b + assert retrieved_a._resources.span_exporter is exporter_a + assert retrieved_b._resources.span_exporter is exporter_b client_a.shutdown() client_b.shutdown()