From a23db4b8dc0f31bfdc4963f1fee32f34219aa729 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Tue, 26 May 2026 08:13:42 +0200 Subject: [PATCH 01/10] feat(langchain): Support span streaming --- sentry_sdk/integrations/langchain.py | 470 ++++++++++++------ .../integrations/langchain/test_langchain.py | 205 +++++++- 2 files changed, 501 insertions(+), 174 deletions(-) diff --git a/sentry_sdk/integrations/langchain.py b/sentry_sdk/integrations/langchain.py index dfd46649e9..3858a0f00d 100644 --- a/sentry_sdk/integrations/langchain.py +++ b/sentry_sdk/integrations/langchain.py @@ -19,7 +19,12 @@ from sentry_sdk.consts import OP, SPANDATA from sentry_sdk.integrations import DidNotEnable, Integration from sentry_sdk.scope import should_send_default_pii -from sentry_sdk.tracing_utils import _get_value +from sentry_sdk.traces import StreamedSpan +from sentry_sdk.tracing_utils import ( + _get_value, + has_span_streaming_enabled, + should_truncate_gen_ai_input, +) from sentry_sdk.utils import capture_internal_exceptions, logger if TYPE_CHECKING: @@ -274,7 +279,9 @@ def _handle_error(self, run_id: "UUID", error: "Any") -> None: span_data = self.span_map[run_id] span = span_data.span - sentry_sdk.capture_exception(error, span.scope) + sentry_sdk.capture_exception( + error, span._scope if isinstance(span, StreamedSpan) else span.scope + ) span.__exit__(type(error), error, error.__traceback__) del self.span_map[run_id] @@ -290,17 +297,47 @@ def _create_span( self: "SentryLangchainCallback", run_id: "UUID", parent_id: "Optional[Any]", - **kwargs: "Any", + op: str, + name: str, + origin: str, ) -> "WatchedSpan": watched_span: "Optional[WatchedSpan]" = None + span_streaming = has_span_streaming_enabled(sentry_sdk.get_client().options) if parent_id: parent_span: "Optional[WatchedSpan]" = self.span_map.get(parent_id) if parent_span: - watched_span = WatchedSpan(parent_span.span.start_child(**kwargs)) + watched_span = ( + WatchedSpan( + sentry_sdk.traces.start_span( + parent_span=parent_span, + name=name, + attributes={ + "sentry.op": op, + "sentry.origin": origin, + }, + ) + ) + if span_streaming + else WatchedSpan( + parent_span.span.start_child(op=op, name=name, origin=origin) + ) + ) parent_span.children.append(watched_span) if watched_span is None: - watched_span = WatchedSpan(sentry_sdk.start_span(**kwargs)) + watched_span = ( + WatchedSpan( + sentry_sdk.traces.start_span( + name=name, + attributes={ + "sentry.op": op, + "sentry.origin": origin, + }, + ) + ) + if span_streaming + else WatchedSpan(sentry_sdk.start_span(op=op, name=name, origin=origin)) + ) watched_span.span.__enter__() self.span_map[run_id] = watched_span @@ -350,21 +387,24 @@ def on_llm_start( ) span = watched_span.span - span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "text_completion") + set_on_span = ( + span.set_attribute if isinstance(span, StreamedSpan) else span.set_data + ) + set_on_span(SPANDATA.GEN_AI_OPERATION_NAME, "text_completion") run_name = kwargs.get("name") if run_name: - span.set_data(SPANDATA.GEN_AI_FUNCTION_ID, run_name) + set_on_span(SPANDATA.GEN_AI_FUNCTION_ID, run_name) if model: - span.set_data( + set_on_span( SPANDATA.GEN_AI_REQUEST_MODEL, model, ) ai_system = _get_ai_system(all_params) if ai_system: - span.set_data(SPANDATA.GEN_AI_SYSTEM, ai_system) + set_on_span(SPANDATA.GEN_AI_SYSTEM, ai_system) for key, attribute in DATA_FIELDS.items(): if key in all_params and all_params[key] is not None: @@ -384,11 +424,9 @@ def on_llm_start( client = sentry_sdk.get_client() scope = sentry_sdk.get_current_scope() messages_data = ( - normalized_messages - if client.options.get("stream_gen_ai_spans", False) - else truncate_and_annotate_messages( - normalized_messages, span, scope - ) + truncate_and_annotate_messages(normalized_messages, span, scope) + if should_truncate_gen_ai_input(client.options) + else normalized_messages ) if messages_data is not None: set_data_normalized( @@ -430,23 +468,24 @@ def on_chat_model_start( ) span = watched_span.span - span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "chat") + set_on_span = ( + span.set_attribute if isinstance(span, StreamedSpan) else span.set_data + ) + set_on_span(SPANDATA.GEN_AI_OPERATION_NAME, "chat") if model: - span.set_data(SPANDATA.GEN_AI_REQUEST_MODEL, model) + set_on_span(SPANDATA.GEN_AI_REQUEST_MODEL, model) ai_system = _get_ai_system(all_params) if ai_system: - span.set_data(SPANDATA.GEN_AI_SYSTEM, ai_system) + set_on_span(SPANDATA.GEN_AI_SYSTEM, ai_system) agent_metadata = kwargs.get("metadata") if isinstance(agent_metadata, dict) and "lc_agent_name" in agent_metadata: - span.set_data( - SPANDATA.GEN_AI_AGENT_NAME, agent_metadata["lc_agent_name"] - ) + set_on_span(SPANDATA.GEN_AI_AGENT_NAME, agent_metadata["lc_agent_name"]) run_name = kwargs.get("name") if run_name: - span.set_data( + set_on_span( SPANDATA.GEN_AI_FUNCTION_ID, run_name, ) @@ -460,7 +499,7 @@ def on_chat_model_start( if should_send_default_pii() and self.include_prompts: system_instructions = _get_system_instructions(messages) if len(system_instructions) > 0: - span.set_data( + set_on_span( SPANDATA.GEN_AI_SYSTEM_INSTRUCTIONS, json.dumps(_transform_system_instructions(system_instructions)), ) @@ -479,11 +518,9 @@ def on_chat_model_start( client = sentry_sdk.get_client() scope = sentry_sdk.get_current_scope() messages_data = ( - normalized_messages - if client.options.get("stream_gen_ai_spans", False) - else truncate_and_annotate_messages( - normalized_messages, span, scope - ) + truncate_and_annotate_messages(normalized_messages, span, scope) + if should_truncate_gen_ai_input(client.options) + else normalized_messages ) if messages_data is not None: set_data_normalized( @@ -539,19 +576,25 @@ def on_llm_end( generation = None if generation is not None: + set_on_span = ( + span.set_attribute + if isinstance(span, StreamedSpan) + else span.set_data + ) + try: response_model = generation.message.response_metadata.get( "model_name" ) if response_model is not None: - span.set_data(SPANDATA.GEN_AI_RESPONSE_MODEL, response_model) + set_on_span(SPANDATA.GEN_AI_RESPONSE_MODEL, response_model) except AttributeError: pass try: finish_reason = generation.generation_info.get("finish_reason") if finish_reason is not None: - span.set_data( + set_on_span( SPANDATA.GEN_AI_RESPONSE_FINISH_REASONS, [finish_reason], ) @@ -646,22 +689,24 @@ def on_tool_start( ) span = watched_span.span - span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "execute_tool") - span.set_data(SPANDATA.GEN_AI_TOOL_NAME, tool_name) + set_on_span = ( + span.set_attribute if isinstance(span, StreamedSpan) else span.set_data + ) + + set_on_span(SPANDATA.GEN_AI_OPERATION_NAME, "execute_tool") + set_on_span(SPANDATA.GEN_AI_TOOL_NAME, tool_name) tool_description = serialized.get("description") if tool_description is not None: - span.set_data(SPANDATA.GEN_AI_TOOL_DESCRIPTION, tool_description) + set_on_span(SPANDATA.GEN_AI_TOOL_DESCRIPTION, tool_description) agent_metadata = kwargs.get("metadata") if isinstance(agent_metadata, dict) and "lc_agent_name" in agent_metadata: - span.set_data( - SPANDATA.GEN_AI_AGENT_NAME, agent_metadata["lc_agent_name"] - ) + set_on_span(SPANDATA.GEN_AI_AGENT_NAME, agent_metadata["lc_agent_name"]) run_name = kwargs.get("name") if run_name: - span.set_data( + set_on_span( SPANDATA.GEN_AI_FUNCTION_ID, run_name, ) @@ -771,7 +816,7 @@ def _get_token_usage(obj: "Any") -> "Optional[Dict[str, Any]]": return None -def _record_token_usage(span: "Span", response: "Any") -> None: +def _record_token_usage(span: "Union[Span, StreamedSpan]", response: "Any") -> None: token_usage = _get_token_usage(response) if token_usage: input_tokens, output_tokens, total_tokens = _extract_tokens(token_usage) @@ -780,14 +825,18 @@ def _record_token_usage(span: "Span", response: "Any") -> None: response.generations ) + set_on_span = ( + span.set_attribute if isinstance(span, StreamedSpan) else span.set_data + ) + if input_tokens is not None: - span.set_data(SPANDATA.GEN_AI_USAGE_INPUT_TOKENS, input_tokens) + set_on_span(SPANDATA.GEN_AI_USAGE_INPUT_TOKENS, input_tokens) if output_tokens is not None: - span.set_data(SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS, output_tokens) + set_on_span(SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS, output_tokens) if total_tokens is not None: - span.set_data(SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS, total_tokens) + set_on_span(SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS, total_tokens) def _get_request_data( @@ -980,63 +1029,114 @@ def new_configure( def _wrap_agent_executor_invoke(f: "Callable[..., Any]") -> "Callable[..., Any]": @wraps(f) def new_invoke(self: "Any", *args: "Any", **kwargs: "Any") -> "Any": - integration = sentry_sdk.get_client().get_integration(LangchainIntegration) + client = sentry_sdk.get_client() + integration = client.get_integration(LangchainIntegration) if integration is None: return f(self, *args, **kwargs) run_name, tools = _get_request_data(self, args, kwargs) - start_span_function = get_start_span_function() - with start_span_function( - op=OP.GEN_AI_INVOKE_AGENT, - name=f"invoke_agent {run_name}" if run_name else "invoke_agent", - origin=LangchainIntegration.origin, - ) as span: - if run_name: - span.set_data(SPANDATA.GEN_AI_FUNCTION_ID, run_name) + if has_span_streaming_enabled(client.options): + with sentry_sdk.traces.start_span( + name=f"invoke_agent {run_name}" if run_name else "invoke_agent", + attributes={ + "sentry.op": OP.GEN_AI_INVOKE_AGENT, + "sentry.origin": LangchainIntegration.origin, + SPANDATA.GEN_AI_OPERATION_NAME: "invoke_agent", + SPANDATA.GEN_AI_RESPONSE_STREAMING: False, + }, + ) as span: + if run_name: + span.set_attribute(SPANDATA.GEN_AI_FUNCTION_ID, run_name) + + _set_tools_on_span(span, tools) + + # Run the agent + result = f(self, *args, **kwargs) + + input = result.get("input") + if ( + input is not None + and should_send_default_pii() + and integration.include_prompts + ): + normalized_messages = normalize_message_roles([input]) + + client = sentry_sdk.get_client() + scope = sentry_sdk.get_current_scope() + messages_data = ( + truncate_and_annotate_messages(normalized_messages, span, scope) + if should_truncate_gen_ai_input(client.options) + else normalized_messages + ) + if messages_data is not None: + set_data_normalized( + span, + SPANDATA.GEN_AI_REQUEST_MESSAGES, + messages_data, + unpack=False, + ) - span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "invoke_agent") - span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, False) + output = result.get("output") + if ( + output is not None + and should_send_default_pii() + and integration.include_prompts + ): + set_data_normalized(span, SPANDATA.GEN_AI_RESPONSE_TEXT, output) - _set_tools_on_span(span, tools) + return result + else: + start_span_function = get_start_span_function() - # Run the agent - result = f(self, *args, **kwargs) + with start_span_function( + op=OP.GEN_AI_INVOKE_AGENT, + name=f"invoke_agent {run_name}" if run_name else "invoke_agent", + origin=LangchainIntegration.origin, + ) as span: + if run_name: + span.set_data(SPANDATA.GEN_AI_FUNCTION_ID, run_name) - input = result.get("input") - if ( - input is not None - and should_send_default_pii() - and integration.include_prompts - ): - normalized_messages = normalize_message_roles([input]) + span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "invoke_agent") + span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, False) - client = sentry_sdk.get_client() - scope = sentry_sdk.get_current_scope() - messages_data = ( - normalized_messages - if client.options.get("stream_gen_ai_spans", False) - else truncate_and_annotate_messages( - normalized_messages, span, scope - ) - ) - if messages_data is not None: - set_data_normalized( - span, - SPANDATA.GEN_AI_REQUEST_MESSAGES, - messages_data, - unpack=False, + _set_tools_on_span(span, tools) + + # Run the agent + result = f(self, *args, **kwargs) + + input = result.get("input") + if ( + input is not None + and should_send_default_pii() + and integration.include_prompts + ): + normalized_messages = normalize_message_roles([input]) + + client = sentry_sdk.get_client() + scope = sentry_sdk.get_current_scope() + messages_data = ( + truncate_and_annotate_messages(normalized_messages, span, scope) + if should_truncate_gen_ai_input(client.options) + else normalized_messages ) + if messages_data is not None: + set_data_normalized( + span, + SPANDATA.GEN_AI_REQUEST_MESSAGES, + messages_data, + unpack=False, + ) - output = result.get("output") - if ( - output is not None - and should_send_default_pii() - and integration.include_prompts - ): - set_data_normalized(span, SPANDATA.GEN_AI_RESPONSE_TEXT, output) + output = result.get("output") + if ( + output is not None + and should_send_default_pii() + and integration.include_prompts + ): + set_data_normalized(span, SPANDATA.GEN_AI_RESPONSE_TEXT, output) - return result + return result return new_invoke @@ -1044,25 +1144,41 @@ def new_invoke(self: "Any", *args: "Any", **kwargs: "Any") -> "Any": def _wrap_agent_executor_stream(f: "Callable[..., Any]") -> "Callable[..., Any]": @wraps(f) def new_stream(self: "Any", *args: "Any", **kwargs: "Any") -> "Any": - integration = sentry_sdk.get_client().get_integration(LangchainIntegration) + client = sentry_sdk.get_client() + integration = client.get_integration(LangchainIntegration) if integration is None: return f(self, *args, **kwargs) run_name, tools = _get_request_data(self, args, kwargs) - start_span_function = get_start_span_function() - span = start_span_function( - op=OP.GEN_AI_INVOKE_AGENT, - name=f"invoke_agent {run_name}" if run_name else "invoke_agent", - origin=LangchainIntegration.origin, - ) - span.__enter__() + if has_span_streaming_enabled(client.options): + span = sentry_sdk.traces.start_span( + name=f"invoke_agent {run_name}" if run_name else "invoke_agent", + attributes={ + "sentry.op": OP.GEN_AI_INVOKE_AGENT, + "sentry.origin": LangchainIntegration.origin, + SPANDATA.GEN_AI_OPERATION_NAME: "invoke_agent", + SPANDATA.GEN_AI_RESPONSE_STREAMING: True, + }, + ) + + if run_name: + span.set_attribute(SPANDATA.GEN_AI_FUNCTION_ID, run_name) + else: + start_span_function = get_start_span_function() - if run_name: - span.set_data(SPANDATA.GEN_AI_FUNCTION_ID, run_name) + span = start_span_function( + op=OP.GEN_AI_INVOKE_AGENT, + name=f"invoke_agent {run_name}" if run_name else "invoke_agent", + origin=LangchainIntegration.origin, + ) + span.__enter__() - span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "invoke_agent") - span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True) + span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "invoke_agent") + span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True) + + if run_name: + span.set_data(SPANDATA.GEN_AI_FUNCTION_ID, run_name) _set_tools_on_span(span, tools) @@ -1077,9 +1193,9 @@ def new_stream(self: "Any", *args: "Any", **kwargs: "Any") -> "Any": client = sentry_sdk.get_client() scope = sentry_sdk.get_current_scope() messages_data = ( - normalized_messages - if client.options.get("stream_gen_ai_spans", False) - else truncate_and_annotate_messages(normalized_messages, span, scope) + truncate_and_annotate_messages(normalized_messages, span, scope) + if should_truncate_gen_ai_input(client.options) + else normalized_messages ) if messages_data is not None: set_data_normalized( @@ -1180,35 +1296,65 @@ def _wrap_embedding_method(f: "Callable[..., Any]") -> "Callable[..., Any]": @wraps(f) def new_embedding_method(self: "Any", *args: "Any", **kwargs: "Any") -> "Any": - integration = sentry_sdk.get_client().get_integration(LangchainIntegration) + client = sentry_sdk.get_client() + integration = client.get_integration(LangchainIntegration) if integration is None: return f(self, *args, **kwargs) model_name = getattr(self, "model", None) or getattr(self, "model_name", None) - with sentry_sdk.start_span( - op=OP.GEN_AI_EMBEDDINGS, - name=f"embeddings {model_name}" if model_name else "embeddings", - origin=LangchainIntegration.origin, - ) as span: - span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "embeddings") - if model_name: - span.set_data(SPANDATA.GEN_AI_REQUEST_MODEL, model_name) - - # Capture input if PII is allowed - if ( - should_send_default_pii() - and integration.include_prompts - and len(args) > 0 - ): - input_data = args[0] - # Normalize to list format - texts = input_data if isinstance(input_data, list) else [input_data] - set_data_normalized( - span, SPANDATA.GEN_AI_EMBEDDINGS_INPUT, texts, unpack=False - ) - result = f(self, *args, **kwargs) - return result + if has_span_streaming_enabled(client.options): + with sentry_sdk.traces.start_span( + name=f"embeddings {model_name}" if model_name else "embeddings", + attributes={ + "sentry.op": OP.GEN_AI_EMBEDDINGS, + "sentry.origin": LangchainIntegration.origin, + SPANDATA.GEN_AI_OPERATION_NAME: "embeddings", + }, + ) as span: + if model_name: + span.set_attribute(SPANDATA.GEN_AI_REQUEST_MODEL, model_name) + + # Capture input if PII is allowed + if ( + should_send_default_pii() + and integration.include_prompts + and len(args) > 0 + ): + input_data = args[0] + # Normalize to list format + texts = input_data if isinstance(input_data, list) else [input_data] + set_data_normalized( + span, SPANDATA.GEN_AI_EMBEDDINGS_INPUT, texts, unpack=False + ) + + result = f(self, *args, **kwargs) + return result + else: + with sentry_sdk.start_span( + op=OP.GEN_AI_EMBEDDINGS, + name=f"embeddings {model_name}" if model_name else "embeddings", + origin=LangchainIntegration.origin, + ) as span: + span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "embeddings") + if model_name: + span.set_data(SPANDATA.GEN_AI_REQUEST_MODEL, model_name) + + # Capture input if PII is allowed + if ( + should_send_default_pii() + and integration.include_prompts + and len(args) > 0 + ): + input_data = args[0] + # Normalize to list format + texts = input_data if isinstance(input_data, list) else [input_data] + set_data_normalized( + span, SPANDATA.GEN_AI_EMBEDDINGS_INPUT, texts, unpack=False + ) + + result = f(self, *args, **kwargs) + return result return new_embedding_method @@ -1220,34 +1366,64 @@ def _wrap_async_embedding_method(f: "Callable[..., Any]") -> "Callable[..., Any] async def new_async_embedding_method( self: "Any", *args: "Any", **kwargs: "Any" ) -> "Any": - integration = sentry_sdk.get_client().get_integration(LangchainIntegration) + client = sentry_sdk.get_client() + integration = client.get_integration(LangchainIntegration) if integration is None: return await f(self, *args, **kwargs) model_name = getattr(self, "model", None) or getattr(self, "model_name", None) - with sentry_sdk.start_span( - op=OP.GEN_AI_EMBEDDINGS, - name=f"embeddings {model_name}" if model_name else "embeddings", - origin=LangchainIntegration.origin, - ) as span: - span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "embeddings") - if model_name: - span.set_data(SPANDATA.GEN_AI_REQUEST_MODEL, model_name) - - # Capture input if PII is allowed - if ( - should_send_default_pii() - and integration.include_prompts - and len(args) > 0 - ): - input_data = args[0] - # Normalize to list format - texts = input_data if isinstance(input_data, list) else [input_data] - set_data_normalized( - span, SPANDATA.GEN_AI_EMBEDDINGS_INPUT, texts, unpack=False - ) - result = await f(self, *args, **kwargs) - return result + if has_span_streaming_enabled(client.options): + with sentry_sdk.traces.start_span( + name=f"embeddings {model_name}" if model_name else "embeddings", + attributes={ + "sentry.op": OP.GEN_AI_EMBEDDINGS, + "sentry.origin": LangchainIntegration.origin, + SPANDATA.GEN_AI_OPERATION_NAME: "embeddings", + }, + ) as span: + if model_name: + span.set_attribute(SPANDATA.GEN_AI_REQUEST_MODEL, model_name) + + # Capture input if PII is allowed + if ( + should_send_default_pii() + and integration.include_prompts + and len(args) > 0 + ): + input_data = args[0] + # Normalize to list format + texts = input_data if isinstance(input_data, list) else [input_data] + set_data_normalized( + span, SPANDATA.GEN_AI_EMBEDDINGS_INPUT, texts, unpack=False + ) + + result = await f(self, *args, **kwargs) + return result + else: + with sentry_sdk.start_span( + op=OP.GEN_AI_EMBEDDINGS, + name=f"embeddings {model_name}" if model_name else "embeddings", + origin=LangchainIntegration.origin, + ) as span: + span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "embeddings") + if model_name: + span.set_data(SPANDATA.GEN_AI_REQUEST_MODEL, model_name) + + # Capture input if PII is allowed + if ( + should_send_default_pii() + and integration.include_prompts + and len(args) > 0 + ): + input_data = args[0] + # Normalize to list format + texts = input_data if isinstance(input_data, list) else [input_data] + set_data_normalized( + span, SPANDATA.GEN_AI_EMBEDDINGS_INPUT, texts, unpack=False + ) + + result = await f(self, *args, **kwargs) + return result return new_async_embedding_method diff --git a/tests/integrations/langchain/test_langchain.py b/tests/integrations/langchain/test_langchain.py index 69c58f5409..aa13ce63ad 100644 --- a/tests/integrations/langchain/test_langchain.py +++ b/tests/integrations/langchain/test_langchain.py @@ -236,6 +236,7 @@ def get_word_length(word: str) -> int: return len(word) +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_langchain_text_completion( sentry_init, @@ -243,6 +244,7 @@ def test_langchain_text_completion( capture_items, get_model_response, stream_gen_ai_spans, + span_streaming, ): sentry_init( integrations=[ @@ -253,6 +255,7 @@ def test_langchain_text_completion( traces_sample_rate=1.0, send_default_pii=True, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) model_response = get_model_response( @@ -284,7 +287,7 @@ def test_langchain_text_completion( openai_api_key="badkey", ) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with patch.object( @@ -298,6 +301,7 @@ def test_langchain_text_completion( tx = next(item.payload for item in items if item.type == "transaction") assert tx["type"] == "transaction" + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] llm_spans = [ span @@ -353,6 +357,7 @@ def test_langchain_text_completion( assert llm_span["data"]["gen_ai.usage.output_tokens"] == 15 +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_langchain_chat_with_run_name( sentry_init, @@ -361,6 +366,7 @@ def test_langchain_chat_with_run_name( get_model_response, nonstreaming_chat_completions_model_response, stream_gen_ai_spans, + span_streaming, ): sentry_init( integrations=[ @@ -371,6 +377,7 @@ def test_langchain_chat_with_run_name( traces_sample_rate=1.0, send_default_pii=True, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) request_headers = {} @@ -400,7 +407,7 @@ def test_langchain_chat_with_run_name( openai_api_key="badkey", ) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("span") with patch.object( @@ -413,6 +420,7 @@ def test_langchain_chat_with_run_name( config={"run_name": "my-snazzy-pipeline"}, ) + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] chat_spans = list( x for x in spans if x["attributes"]["sentry.op"] == "gen_ai.chat" @@ -444,12 +452,14 @@ def test_langchain_chat_with_run_name( ) +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_langchain_tool_call_with_run_name( sentry_init, capture_events, capture_items, stream_gen_ai_spans, + span_streaming, ): sentry_init( integrations=[ @@ -460,8 +470,9 @@ def test_langchain_tool_call_with_run_name( traces_sample_rate=1.0, send_default_pii=True, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("span") with start_transaction(): @@ -470,6 +481,7 @@ def test_langchain_tool_call_with_run_name( config={"run_name": "my-snazzy-pipeline"}, ) + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] tool_spans = list( x for x in spans if x["attributes"]["sentry.op"] == "gen_ai.execute_tool" @@ -496,6 +508,7 @@ def test_langchain_tool_call_with_run_name( ) +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) @pytest.mark.skipif( LANGCHAIN_VERSION < (1,), @@ -532,6 +545,7 @@ def test_langchain_create_agent( get_model_response, nonstreaming_responses_model_response, stream_gen_ai_spans, + span_streaming, ): sentry_init( integrations=[ @@ -542,6 +556,7 @@ def test_langchain_create_agent( traces_sample_rate=1.0, send_default_pii=send_default_pii, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) model_response = get_model_response( @@ -565,7 +580,7 @@ def test_langchain_create_agent( name="word_length_agent", ) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with patch.object( @@ -588,6 +603,7 @@ def test_langchain_create_agent( assert tx["type"] == "transaction" assert tx["contexts"]["trace"]["origin"] == "manual" + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] chat_spans = list( x for x in spans if x["attributes"]["sentry.op"] == "gen_ai.chat" @@ -723,6 +739,7 @@ def test_langchain_create_agent( assert SPANDATA.GEN_AI_RESPONSE_TEXT not in chat_spans[0].get("data", {}) +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) @pytest.mark.skipif( LANGCHAIN_VERSION < (1,), @@ -746,6 +763,7 @@ def test_tool_execution_span( get_model_response, responses_tool_call_model_responses, stream_gen_ai_spans, + span_streaming, ): sentry_init( integrations=[ @@ -756,6 +774,7 @@ def test_tool_execution_span( traces_sample_rate=1.0, send_default_pii=send_default_pii, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) responses = responses_tool_call_model_responses( @@ -818,7 +837,7 @@ def test_tool_execution_span( name="word_length_agent", ) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with patch.object( @@ -838,6 +857,7 @@ def test_tool_execution_span( assert tx["type"] == "transaction" assert tx["contexts"]["trace"]["origin"] == "manual" + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] chat_spans = list( x for x in spans if x["attributes"]["sentry.op"] == "gen_ai.chat" @@ -1013,6 +1033,7 @@ def test_tool_execution_span( assert "get_word_length" in tools_data +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) @pytest.mark.parametrize( "send_default_pii, include_prompts", @@ -1032,6 +1053,7 @@ def test_langchain_openai_tools_agent_no_prompts( server_side_event_chunks, streaming_chat_completions_model_responses, stream_gen_ai_spans, + span_streaming, ): sentry_init( integrations=[ @@ -1042,6 +1064,7 @@ def test_langchain_openai_tools_agent_no_prompts( traces_sample_rate=1.0, send_default_pii=send_default_pii, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) prompt = ChatPromptTemplate.from_messages( @@ -1080,7 +1103,7 @@ def test_langchain_openai_tools_agent_no_prompts( agent_executor = AgentExecutor(agent=agent, tools=[get_word_length], verbose=True) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with patch.object( @@ -1099,6 +1122,7 @@ def test_langchain_openai_tools_agent_no_prompts( assert tx["type"] == "transaction" assert tx["contexts"]["trace"]["origin"] == "manual" + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] invoke_agent_span = next( x for x in spans if x["attributes"]["sentry.op"] == "gen_ai.invoke_agent" @@ -1279,6 +1303,7 @@ def test_langchain_openai_tools_agent_no_prompts( assert "get_word_length" in tools_data +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) @pytest.mark.parametrize( "system_instructions_content", @@ -1302,6 +1327,7 @@ def test_langchain_openai_tools_agent( server_side_event_chunks, streaming_chat_completions_model_responses, stream_gen_ai_spans, + span_streaming, ): sentry_init( integrations=[ @@ -1312,6 +1338,7 @@ def test_langchain_openai_tools_agent( traces_sample_rate=1.0, send_default_pii=True, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) prompt = ChatPromptTemplate.from_messages( @@ -1350,7 +1377,7 @@ def test_langchain_openai_tools_agent( agent_executor = AgentExecutor(agent=agent, tools=[get_word_length], verbose=True) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with patch.object( @@ -1373,6 +1400,7 @@ def test_langchain_openai_tools_agent( assert tx["type"] == "transaction" assert tx["contexts"]["trace"]["origin"] == "manual" + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] invoke_agent_span = next( x for x in spans if x["attributes"]["sentry.op"] == "gen_ai.invoke_agent" @@ -1586,6 +1614,7 @@ def test_langchain_openai_tools_agent( assert "get_word_length" in tools_data +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_langchain_openai_tools_agent_with_config( sentry_init, @@ -1595,6 +1624,7 @@ def test_langchain_openai_tools_agent_with_config( server_side_event_chunks, streaming_chat_completions_model_responses, stream_gen_ai_spans, + span_streaming, ): sentry_init( integrations=[ @@ -1605,6 +1635,7 @@ def test_langchain_openai_tools_agent_with_config( traces_sample_rate=1.0, send_default_pii=True, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) prompt = ChatPromptTemplate.from_messages( @@ -1645,7 +1676,7 @@ def test_langchain_openai_tools_agent_with_config( agent_executor = AgentExecutor(agent=agent, tools=[get_word_length], verbose=True) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with patch.object( @@ -1663,6 +1694,7 @@ def test_langchain_openai_tools_agent_with_config( assert tx["type"] == "transaction" assert tx["contexts"]["trace"]["origin"] == "manual" + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] invoke_agent_span = next( x for x in spans if x["attributes"]["sentry.op"] == "gen_ai.invoke_agent" @@ -1695,6 +1727,7 @@ def test_langchain_openai_tools_agent_with_config( assert invoke_agent_span["data"]["gen_ai.function_id"] == "my-snazzy-pipeline" +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) @pytest.mark.parametrize( "send_default_pii, include_prompts", @@ -1714,6 +1747,7 @@ def test_langchain_openai_tools_agent_stream_no_prompts( server_side_event_chunks, streaming_chat_completions_model_responses, stream_gen_ai_spans, + span_streaming, ): sentry_init( integrations=[ @@ -1724,6 +1758,7 @@ def test_langchain_openai_tools_agent_stream_no_prompts( traces_sample_rate=1.0, send_default_pii=send_default_pii, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) prompt = ChatPromptTemplate.from_messages( @@ -1762,7 +1797,7 @@ def test_langchain_openai_tools_agent_stream_no_prompts( agent_executor = AgentExecutor(agent=agent, tools=[get_word_length], verbose=True) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with patch.object( @@ -1781,6 +1816,7 @@ def test_langchain_openai_tools_agent_stream_no_prompts( assert tx["type"] == "transaction" assert tx["contexts"]["trace"]["origin"] == "manual" + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] invoke_agent_span = next( x for x in spans if x["attributes"]["sentry.op"] == "gen_ai.invoke_agent" @@ -1804,6 +1840,7 @@ def test_langchain_openai_tools_agent_stream_no_prompts( == "my-snazzy-pipeline" ) + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] # We can't guarantee anything about the "shape" of the langchain execution graph assert ( @@ -1963,6 +2000,7 @@ def test_langchain_openai_tools_agent_stream_no_prompts( assert "get_word_length" in tools_data +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) @pytest.mark.parametrize( "system_instructions_content", @@ -1986,6 +2024,7 @@ def test_langchain_openai_tools_agent_stream( server_side_event_chunks, streaming_chat_completions_model_responses, stream_gen_ai_spans, + span_streaming, ): sentry_init( integrations=[ @@ -1996,6 +2035,7 @@ def test_langchain_openai_tools_agent_stream( traces_sample_rate=1.0, send_default_pii=True, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) prompt = ChatPromptTemplate.from_messages( @@ -2034,7 +2074,7 @@ def test_langchain_openai_tools_agent_stream( agent_executor = AgentExecutor(agent=agent, tools=[get_word_length], verbose=True) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with patch.object( @@ -2058,6 +2098,7 @@ def test_langchain_openai_tools_agent_stream( assert tx["type"] == "transaction" assert tx["contexts"]["trace"]["origin"] == "manual" + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] invoke_agent_span = next( x for x in spans if x["attributes"]["sentry.op"] == "gen_ai.invoke_agent" @@ -2282,6 +2323,7 @@ def test_langchain_openai_tools_agent_stream( assert "get_word_length" in tools_data +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_langchain_openai_tools_agent_stream_with_config( sentry_init, @@ -2291,6 +2333,7 @@ def test_langchain_openai_tools_agent_stream_with_config( server_side_event_chunks, streaming_chat_completions_model_responses, stream_gen_ai_spans, + span_streaming, ): sentry_init( integrations=[ @@ -2301,6 +2344,7 @@ def test_langchain_openai_tools_agent_stream_with_config( traces_sample_rate=1.0, send_default_pii=True, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) prompt = ChatPromptTemplate.from_messages( @@ -2341,7 +2385,7 @@ def test_langchain_openai_tools_agent_stream_with_config( agent_executor = AgentExecutor(agent=agent, tools=[get_word_length], verbose=True) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with patch.object( @@ -2359,6 +2403,7 @@ def test_langchain_openai_tools_agent_stream_with_config( assert tx["type"] == "transaction" assert tx["contexts"]["trace"]["origin"] == "manual" + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] invoke_agent_span = next( x for x in spans if x["attributes"]["sentry.op"] == "gen_ai.invoke_agent" @@ -2391,12 +2436,14 @@ def test_langchain_openai_tools_agent_stream_with_config( assert invoke_agent_span["data"]["gen_ai.function_id"] == "my-snazzy-pipeline" +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_langchain_error( sentry_init, capture_events, capture_items, stream_gen_ai_spans, + span_streaming, ): class MockOpenAI(ChatOpenAI): def _stream( @@ -2420,6 +2467,7 @@ def _llm_type(self) -> str: traces_sample_rate=1.0, send_default_pii=True, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) prompt = ChatPromptTemplate.from_messages( @@ -2441,7 +2489,7 @@ def _llm_type(self) -> str: agent_executor = AgentExecutor(agent=agent, tools=[get_word_length], verbose=True) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("event") with start_transaction(), pytest.raises(ValueError): @@ -2458,12 +2506,14 @@ def _llm_type(self) -> str: assert error["level"] == "error" +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_span_status_error( sentry_init, capture_events, capture_items, stream_gen_ai_spans, + span_streaming, ): class MockOpenAI(ChatOpenAI): def _stream( @@ -2486,8 +2536,9 @@ def _llm_type(self) -> str: integrations=[LangchainIntegration(include_prompts=True)], traces_sample_rate=1.0, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("event", "transaction", "span") with start_transaction(name="test"): @@ -2521,6 +2572,7 @@ def _llm_type(self) -> str: (error,) = (item.payload for item in items if item.type == "event") assert error["level"] == "error" + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] assert spans[0]["status"] == "error" (transaction,) = (item.payload for item in items if item.type == "transaction") @@ -2773,12 +2825,14 @@ def test_langchain_callback_list_existing_callback(sentry_init): assert handler is sentry_callback +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_langchain_message_role_mapping( sentry_init, capture_events, capture_items, stream_gen_ai_spans, + span_streaming, ): """Test that message roles are properly normalized in langchain integration.""" @@ -2813,6 +2867,7 @@ def _llm_type(self) -> str: traces_sample_rate=1.0, send_default_pii=True, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) prompt = ChatPromptTemplate.from_messages( @@ -2835,12 +2890,13 @@ def _llm_type(self) -> str: test_input = "Hello, how are you?" message_data_found = False - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("span") with start_transaction(): list(agent_executor.stream({"input": test_input})) + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] # Find spans with gen_ai operation that should have message data gen_ai_spans = [ @@ -3066,6 +3122,7 @@ def test_langchain_message_truncation(sentry_init, capture_events): assert tx["_meta"]["spans"]["0"]["data"]["gen_ai.request.messages"][""]["len"] == 5 +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) @pytest.mark.parametrize( "send_default_pii, include_prompts", @@ -3083,6 +3140,7 @@ def test_langchain_embeddings_sync( send_default_pii, include_prompts, stream_gen_ai_spans, + span_streaming, ): """Test that sync embedding methods (embed_documents, embed_query) are properly traced.""" try: @@ -3095,8 +3153,9 @@ def test_langchain_embeddings_sync( traces_sample_rate=1.0, send_default_pii=send_default_pii, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("span") # Mock the actual API call @@ -3119,6 +3178,7 @@ def test_langchain_embeddings_sync( assert len(result) == 2 mock_embed_documents.assert_called_once() + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] # Find embeddings span embeddings_spans = [ @@ -3214,6 +3274,7 @@ def test_langchain_embeddings_sync( ) +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) @pytest.mark.parametrize( "send_default_pii, include_prompts", @@ -3229,6 +3290,7 @@ def test_langchain_embeddings_embed_query( send_default_pii, include_prompts, stream_gen_ai_spans, + span_streaming, ): """Test that embed_query method is properly traced.""" try: @@ -3241,8 +3303,9 @@ def test_langchain_embeddings_embed_query( traces_sample_rate=1.0, send_default_pii=send_default_pii, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("span") # Mock the actual API call @@ -3264,6 +3327,7 @@ def test_langchain_embeddings_embed_query( assert len(result) == 3 mock_embed_query.assert_called_once() + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] # Find embeddings span embeddings_spans = [ @@ -3350,6 +3414,7 @@ def test_langchain_embeddings_embed_query( ) +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) @pytest.mark.parametrize( "send_default_pii, include_prompts", @@ -3366,6 +3431,7 @@ async def test_langchain_embeddings_async( send_default_pii, include_prompts, stream_gen_ai_spans, + span_streaming, ): """Test that async embedding methods (aembed_documents, aembed_query) are properly traced.""" try: @@ -3378,12 +3444,13 @@ async def test_langchain_embeddings_async( traces_sample_rate=1.0, send_default_pii=send_default_pii, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) async def mock_aembed_documents(self, texts): return [[0.1, 0.2, 0.3] for _ in texts] - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("span") # Mock the actual API call @@ -3407,6 +3474,7 @@ async def mock_aembed_documents(self, texts): assert len(result) == 2 mock_aembed.assert_called_once() + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] # Find embeddings span embeddings_spans = [ @@ -3508,6 +3576,7 @@ async def mock_aembed_documents(self, texts): ) +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) @pytest.mark.asyncio async def test_langchain_embeddings_aembed_query( @@ -3515,6 +3584,7 @@ async def test_langchain_embeddings_aembed_query( capture_events, capture_items, stream_gen_ai_spans, + span_streaming, ): """Test that aembed_query method is properly traced.""" try: @@ -3527,12 +3597,13 @@ async def test_langchain_embeddings_aembed_query( traces_sample_rate=1.0, send_default_pii=True, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) async def mock_aembed_query(self, text): return [0.1, 0.2, 0.3] - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("span") # Mock the actual API call @@ -3554,6 +3625,7 @@ async def mock_aembed_query(self, text): assert len(result) == 3 mock_aembed.assert_called_once() + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] # Find embeddings span embeddings_spans = [ @@ -3625,12 +3697,14 @@ async def mock_aembed_query(self, text): assert "Async query test" in input_data +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_langchain_embeddings_no_model_name( sentry_init, capture_events, capture_items, stream_gen_ai_spans, + span_streaming, ): """Test embeddings when model name is not available.""" try: @@ -3642,8 +3716,9 @@ def test_langchain_embeddings_no_model_name( integrations=[LangchainIntegration(include_prompts=False)], traces_sample_rate=1.0, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("span") # Mock the actual API call and remove model attribute @@ -3664,6 +3739,7 @@ def test_langchain_embeddings_no_model_name( with start_transaction(name="test_embeddings_no_model"): embeddings.embed_documents(["Test"]) + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] # Find embeddings span embeddings_spans = [ @@ -3725,12 +3801,14 @@ def test_langchain_embeddings_no_model_name( ) +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_langchain_embeddings_integration_disabled( sentry_init, capture_events, capture_items, stream_gen_ai_spans, + span_streaming, ): """Test that embeddings are not traced when integration is disabled.""" try: @@ -3741,10 +3819,11 @@ def test_langchain_embeddings_integration_disabled( sentry_init( traces_sample_rate=1.0, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) # Initialize without LangchainIntegration - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("span") with mock.patch.object( @@ -3760,6 +3839,7 @@ def test_langchain_embeddings_integration_disabled( embeddings.embed_documents(["Test"]) # Check that no embeddings spans were created + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] embeddings_spans = [ span @@ -3795,12 +3875,14 @@ def test_langchain_embeddings_integration_disabled( assert len(embeddings_spans) == 0 +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_langchain_embeddings_multiple_providers( sentry_init, capture_events, capture_items, stream_gen_ai_spans, + span_streaming, ): """Test that embeddings work with different providers.""" try: @@ -3813,8 +3895,9 @@ def test_langchain_embeddings_multiple_providers( traces_sample_rate=1.0, send_default_pii=True, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("span") # Mock both providers @@ -3843,6 +3926,7 @@ def test_langchain_embeddings_multiple_providers( openai_embeddings.embed_documents(["OpenAI test"]) azure_embeddings.embed_documents(["Azure test"]) + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] # Find embeddings spans embeddings_spans = [ @@ -3948,12 +4032,14 @@ def test_langchain_embeddings_error_handling(sentry_init, capture_events): # but the span should still be created +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_langchain_embeddings_multiple_calls( sentry_init, capture_events, capture_items, stream_gen_ai_spans, + span_streaming, ): """Test that multiple embeddings calls within a transaction are all traced.""" try: @@ -3966,8 +4052,9 @@ def test_langchain_embeddings_multiple_calls( traces_sample_rate=1.0, send_default_pii=True, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("span") # Mock the actual API calls @@ -3995,6 +4082,7 @@ def test_langchain_embeddings_multiple_calls( # Call embed_documents again embeddings.embed_documents(["Third batch"]) + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] # Find embeddings spans - should have 3 (2 embed_documents + 1 embed_query) embeddings_spans = [ @@ -4072,12 +4160,14 @@ def test_langchain_embeddings_multiple_calls( assert len(set(str(data) for data in input_data_list)) == 3 +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_langchain_embeddings_span_hierarchy( sentry_init, capture_events, capture_items, stream_gen_ai_spans, + span_streaming, ): """Test that embeddings spans are properly nested within parent spans.""" try: @@ -4090,8 +4180,56 @@ def test_langchain_embeddings_span_hierarchy( traces_sample_rate=1.0, send_default_pii=True, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) - if stream_gen_ai_spans: + if span_streaming: + items = capture_items("span") + + # Mock the actual API call + with mock.patch.object( + OpenAIEmbeddings, + "embed_documents", + wraps=lambda self, texts: [[0.1, 0.2, 0.3] for _ in texts], + ): + embeddings = OpenAIEmbeddings( + model="text-embedding-ada-002", openai_api_key="test-key" + ) + + # Force setup to re-run + LangchainIntegration.setup_once() + + with sentry_sdk.traces.start_span( + name="test_span_hierarchy" + ), sentry_sdk.traces.start_span( + name="custom operation", + attributes={ + "sentry.op": "custom", + }, + ): + embeddings.embed_documents(["Test within custom span"]) + + sentry_sdk.flush() + spans = [item.payload for item in items] + # Find all spans + embeddings_spans = [ + span + for span in spans + if span["attributes"].get("sentry.op") == "gen_ai.embeddings" + ] + custom_spans = [ + span for span in spans if span["attributes"].get("sentry.op") == "custom" + ] + + assert len(embeddings_spans) == 1 + assert len(custom_spans) == 1 + + # Both spans should exist + embeddings_span = embeddings_spans[0] + custom_span = custom_spans[0] + + assert embeddings_span["attributes"]["gen_ai.operation.name"] == "embeddings" + assert custom_span["name"] == "custom operation" + elif stream_gen_ai_spans: items = capture_items("transaction", "span") # Mock the actual API call @@ -4132,6 +4270,7 @@ def test_langchain_embeddings_span_hierarchy( custom_span = custom_spans[0] assert embeddings_span["attributes"]["gen_ai.operation.name"] == "embeddings" + assert custom_span["description"] == "custom operation" else: events = capture_events() @@ -4176,15 +4315,17 @@ def test_langchain_embeddings_span_hierarchy( custom_span = custom_spans[0] assert embeddings_span["data"]["gen_ai.operation.name"] == "embeddings" - assert custom_span["description"] == "custom operation" + assert custom_span["description"] == "custom operation" +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_langchain_embeddings_with_list_and_string_inputs( sentry_init, capture_events, capture_items, stream_gen_ai_spans, + span_streaming, ): """Test that embeddings correctly handle both list and string inputs.""" try: @@ -4197,8 +4338,9 @@ def test_langchain_embeddings_with_list_and_string_inputs( traces_sample_rate=1.0, send_default_pii=True, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("span") # Mock the actual API calls @@ -4226,6 +4368,7 @@ def test_langchain_embeddings_with_list_and_string_inputs( # embed_query takes a string embeddings.embed_query("Single string query") + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] # Find embeddings spans embeddings_spans = [ @@ -4298,6 +4441,7 @@ def test_langchain_embeddings_with_list_and_string_inputs( ), f"Expected input text in serialized data: {input_data}" +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) @pytest.mark.parametrize( "response_metadata_model,expected_model", @@ -4313,12 +4457,14 @@ def test_langchain_response_model_extraction( response_metadata_model, expected_model, stream_gen_ai_spans, + span_streaming, ): sentry_init( integrations=[LangchainIntegration(include_prompts=True)], traces_sample_rate=1.0, send_default_pii=True, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) callback = SentryLangchainCallback(max_span_map_size=100, include_prompts=True) @@ -4327,7 +4473,7 @@ def test_langchain_response_model_extraction( serialized = {"_type": "openai-chat", "model_name": "gpt-3.5-turbo"} prompts = ["Test prompt"] - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("span") with start_transaction(): @@ -4347,6 +4493,7 @@ def test_langchain_response_model_extraction( response = Mock(generations=[[generation]]) callback.on_llm_end(response=response, run_id=run_id) + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] llm_spans = [ span @@ -4617,6 +4764,7 @@ def test_transform_google_file_data(self): } +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) @pytest.mark.parametrize( "ai_type,expected_system", @@ -4669,11 +4817,13 @@ def test_langchain_ai_system_detection( ai_type, expected_system, stream_gen_ai_spans, + span_streaming, ): sentry_init( integrations=[LangchainIntegration()], traces_sample_rate=1.0, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) callback = SentryLangchainCallback(max_span_map_size=100, include_prompts=True) @@ -4682,7 +4832,7 @@ def test_langchain_ai_system_detection( serialized = {"_type": ai_type} if ai_type is not None else {} prompts = ["Test prompt"] - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("span") with start_transaction(): @@ -4697,6 +4847,7 @@ def test_langchain_ai_system_detection( response = Mock(generations=[[generation]]) callback.on_llm_end(response=response, run_id=run_id) + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] llm_spans = [ span From 6ce1144bfa56c2d854c106a68d024815e771339e Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Tue, 26 May 2026 08:15:11 +0200 Subject: [PATCH 02/10] feat(langgraph): Support span streaming --- sentry_sdk/integrations/langgraph.py | 208 ++++++++++++++---- .../integrations/langgraph/test_langgraph.py | 97 ++++++-- 2 files changed, 241 insertions(+), 64 deletions(-) diff --git a/sentry_sdk/integrations/langgraph.py b/sentry_sdk/integrations/langgraph.py index acf630add2..8039aec6a9 100644 --- a/sentry_sdk/integrations/langgraph.py +++ b/sentry_sdk/integrations/langgraph.py @@ -11,6 +11,11 @@ from sentry_sdk.consts import OP, SPANDATA from sentry_sdk.integrations import DidNotEnable, Integration from sentry_sdk.scope import should_send_default_pii +from sentry_sdk.traces import StreamedSpan +from sentry_sdk.tracing_utils import ( + has_span_streaming_enabled, + should_truncate_gen_ai_input, +) from sentry_sdk.utils import safe_serialize try: @@ -107,9 +112,11 @@ def _parse_langgraph_messages(state: "Any") -> "Optional[List[Any]]": def _wrap_state_graph_compile(f: "Callable[..., Any]") -> "Callable[..., Any]": @wraps(f) def new_compile(self: "Any", *args: "Any", **kwargs: "Any") -> "Any": - integration = sentry_sdk.get_client().get_integration(LanggraphIntegration) - if integration is None: + client = sentry_sdk.get_client() + integration = client.get_integration(LanggraphIntegration) + if integration is None or has_span_streaming_enabled(client.options): return f(self, *args, **kwargs) + with sentry_sdk.start_span( op=OP.GEN_AI_CREATE_AGENT, origin=LanggraphIntegration.origin, @@ -151,7 +158,8 @@ def new_compile(self: "Any", *args: "Any", **kwargs: "Any") -> "Any": def _wrap_pregel_invoke(f: "Callable[..., Any]") -> "Callable[..., Any]": @wraps(f) def new_invoke(self: "Any", *args: "Any", **kwargs: "Any") -> "Any": - integration = sentry_sdk.get_client().get_integration(LanggraphIntegration) + client = sentry_sdk.get_client() + integration = client.get_integration(LanggraphIntegration) if integration is None: return f(self, *args, **kwargs) @@ -160,50 +168,101 @@ def new_invoke(self: "Any", *args: "Any", **kwargs: "Any") -> "Any": f"invoke_agent {graph_name}".strip() if graph_name else "invoke_agent" ) - with get_start_span_function()( - op=OP.GEN_AI_INVOKE_AGENT, - name=span_name, - origin=LanggraphIntegration.origin, - ) as span: - if graph_name: - span.set_data(SPANDATA.GEN_AI_PIPELINE_NAME, graph_name) - span.set_data(SPANDATA.GEN_AI_AGENT_NAME, graph_name) - - span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "invoke_agent") - - # Store input messages to later compare with output - input_messages = None - if ( - len(args) > 0 - and should_send_default_pii() - and integration.include_prompts - ): - input_messages = _parse_langgraph_messages(args[0]) - if input_messages: - normalized_input_messages = normalize_message_roles(input_messages) + if has_span_streaming_enabled(client.options): + with sentry_sdk.traces.start_span( + name=span_name, + attributes={ + "sentry.op": OP.GEN_AI_INVOKE_AGENT, + "sentry.origin": LanggraphIntegration.origin, + SPANDATA.GEN_AI_OPERATION_NAME: "invoke_agent", + }, + ) as span: + if graph_name: + span.set_attribute(SPANDATA.GEN_AI_PIPELINE_NAME, graph_name) + span.set_attribute(SPANDATA.GEN_AI_AGENT_NAME, graph_name) + + # Store input messages to later compare with output + input_messages = None + if ( + len(args) > 0 + and should_send_default_pii() + and integration.include_prompts + ): + input_messages = _parse_langgraph_messages(args[0]) + if input_messages: + normalized_input_messages = normalize_message_roles( + input_messages + ) - client = sentry_sdk.get_client() - scope = sentry_sdk.get_current_scope() - messages_data = ( - normalized_input_messages - if client.options.get("stream_gen_ai_spans", False) - else truncate_and_annotate_messages( - normalized_input_messages, span, scope + client = sentry_sdk.get_client() + scope = sentry_sdk.get_current_scope() + messages_data = ( + truncate_and_annotate_messages( + normalized_input_messages, span, scope + ) + if should_truncate_gen_ai_input(client.options) + else normalized_input_messages ) - ) - if messages_data is not None: - set_data_normalized( - span, - SPANDATA.GEN_AI_REQUEST_MESSAGES, - messages_data, - unpack=False, + if messages_data is not None: + set_data_normalized( + span, + SPANDATA.GEN_AI_REQUEST_MESSAGES, + messages_data, + unpack=False, + ) + + result = f(self, *args, **kwargs) + + _set_response_attributes(span, input_messages, result, integration) + + return result + else: + with get_start_span_function()( + op=OP.GEN_AI_INVOKE_AGENT, + name=span_name, + origin=LanggraphIntegration.origin, + ) as span: + if graph_name: + span.set_data(SPANDATA.GEN_AI_PIPELINE_NAME, graph_name) + span.set_data(SPANDATA.GEN_AI_AGENT_NAME, graph_name) + + span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "invoke_agent") + + # Store input messages to later compare with output + input_messages = None + if ( + len(args) > 0 + and should_send_default_pii() + and integration.include_prompts + ): + input_messages = _parse_langgraph_messages(args[0]) + if input_messages: + normalized_input_messages = normalize_message_roles( + input_messages + ) + + client = sentry_sdk.get_client() + scope = sentry_sdk.get_current_scope() + messages_data = ( + truncate_and_annotate_messages( + normalized_input_messages, span, scope + ) + if should_truncate_gen_ai_input(client.options) + else normalized_input_messages ) + if messages_data is not None: + set_data_normalized( + span, + SPANDATA.GEN_AI_REQUEST_MESSAGES, + messages_data, + unpack=False, + ) - result = f(self, *args, **kwargs) + result = f(self, *args, **kwargs) - _set_response_attributes(span, input_messages, result, integration) + _set_response_attributes(span, input_messages, result, integration) - return result + return result return new_invoke @@ -211,7 +270,8 @@ def new_invoke(self: "Any", *args: "Any", **kwargs: "Any") -> "Any": def _wrap_pregel_ainvoke(f: "Callable[..., Any]") -> "Callable[..., Any]": @wraps(f) async def new_ainvoke(self: "Any", *args: "Any", **kwargs: "Any") -> "Any": - integration = sentry_sdk.get_client().get_integration(LanggraphIntegration) + client = sentry_sdk.get_client() + integration = client.get_integration(LanggraphIntegration) if integration is None: return await f(self, *args, **kwargs) @@ -220,6 +280,54 @@ async def new_ainvoke(self: "Any", *args: "Any", **kwargs: "Any") -> "Any": f"invoke_agent {graph_name}".strip() if graph_name else "invoke_agent" ) + if has_span_streaming_enabled(client.options): + with sentry_sdk.traces.start_span( + name=span_name, + attributes={ + "sentry.op": OP.GEN_AI_INVOKE_AGENT, + "sentry.origin": LanggraphIntegration.origin, + SPANDATA.GEN_AI_OPERATION_NAME: "invoke_agent", + }, + ) as span: + if graph_name: + span.set_attribute(SPANDATA.GEN_AI_PIPELINE_NAME, graph_name) + span.set_attribute(SPANDATA.GEN_AI_AGENT_NAME, graph_name) + + input_messages = None + if ( + len(args) > 0 + and should_send_default_pii() + and integration.include_prompts + ): + input_messages = _parse_langgraph_messages(args[0]) + if input_messages: + normalized_input_messages = normalize_message_roles( + input_messages + ) + + client = sentry_sdk.get_client() + scope = sentry_sdk.get_current_scope() + messages_data = ( + truncate_and_annotate_messages( + normalized_input_messages, span, scope + ) + if should_truncate_gen_ai_input(client.options) + else normalized_input_messages + ) + if messages_data is not None: + set_data_normalized( + span, + SPANDATA.GEN_AI_REQUEST_MESSAGES, + messages_data, + unpack=False, + ) + + result = await f(self, *args, **kwargs) + + _set_response_attributes(span, input_messages, result, integration) + + return result + with get_start_span_function()( op=OP.GEN_AI_INVOKE_AGENT, name=span_name, @@ -244,11 +352,11 @@ async def new_ainvoke(self: "Any", *args: "Any", **kwargs: "Any") -> "Any": client = sentry_sdk.get_client() scope = sentry_sdk.get_current_scope() messages_data = ( - normalized_input_messages - if client.options.get("stream_gen_ai_spans", False) - else truncate_and_annotate_messages( + truncate_and_annotate_messages( normalized_input_messages, span, scope ) + if should_truncate_gen_ai_input(client.options) + else normalized_input_messages ) if messages_data is not None: set_data_normalized( @@ -333,14 +441,18 @@ def _set_usage_data(span: "sentry_sdk.tracing.Span", messages: "Any") -> None: output_tokens += int(token_usage.get("completion_tokens", 0)) total_tokens += int(token_usage.get("total_tokens", 0)) + set_on_span = ( + span.set_attribute if isinstance(span, StreamedSpan) else span.set_data + ) + if input_tokens > 0: - span.set_data(SPANDATA.GEN_AI_USAGE_INPUT_TOKENS, input_tokens) + set_on_span(SPANDATA.GEN_AI_USAGE_INPUT_TOKENS, input_tokens) if output_tokens > 0: - span.set_data(SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS, output_tokens) + set_on_span(SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS, output_tokens) if total_tokens > 0: - span.set_data( + set_on_span( SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS, total_tokens, ) diff --git a/tests/integrations/langgraph/test_langgraph.py b/tests/integrations/langgraph/test_langgraph.py index 42da770870..26c7bfbc27 100644 --- a/tests/integrations/langgraph/test_langgraph.py +++ b/tests/integrations/langgraph/test_langgraph.py @@ -4,6 +4,7 @@ import pytest +import sentry_sdk from sentry_sdk import start_transaction from sentry_sdk.consts import OP, SPANDATA @@ -239,6 +240,7 @@ def original_compile(self, *args, **kwargs): assert "calculator" in tools_data +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) @pytest.mark.parametrize( "send_default_pii, include_prompts", @@ -256,6 +258,7 @@ def test_pregel_invoke( send_default_pii, include_prompts, stream_gen_ai_spans, + span_streaming, ): """Test Pregel.invoke() wrapper creates proper invoke_agent span.""" sentry_init( @@ -263,6 +266,7 @@ def test_pregel_invoke( traces_sample_rate=1.0, send_default_pii=send_default_pii, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) test_state = { @@ -294,7 +298,7 @@ def original_invoke(self, *args, **kwargs): ] return {"messages": new_messages} - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with start_transaction(): @@ -303,6 +307,7 @@ def original_invoke(self, *args, **kwargs): assert result is not None + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] invoke_spans = [ span @@ -421,6 +426,7 @@ def original_invoke(self, *args, **kwargs): ) +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) @pytest.mark.parametrize( "send_default_pii, include_prompts", @@ -438,6 +444,7 @@ def test_pregel_ainvoke( send_default_pii, include_prompts, stream_gen_ai_spans, + span_streaming, ): """Test Pregel.ainvoke() async wrapper creates proper invoke_agent span.""" sentry_init( @@ -445,6 +452,7 @@ def test_pregel_ainvoke( traces_sample_rate=1.0, send_default_pii=send_default_pii, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) test_state = {"messages": [MockMessage("What's the weather like?", name="user")]} @@ -476,12 +484,13 @@ async def run_test(): result = await wrapped_ainvoke(pregel, test_state) return result - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") result = asyncio.run(run_test()) assert result is not None + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] invoke_spans = [ span @@ -576,12 +585,14 @@ async def run_test(): ) +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_pregel_invoke_error( sentry_init, capture_events, capture_items, stream_gen_ai_spans, + span_streaming, ): """Test error handling during graph execution.""" sentry_init( @@ -589,6 +600,7 @@ def test_pregel_invoke_error( traces_sample_rate=1.0, send_default_pii=True, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) test_state = {"messages": [MockMessage("This will fail")]} @@ -597,7 +609,7 @@ def test_pregel_invoke_error( def original_invoke(self, *args, **kwargs): raise Exception("Graph execution failed") - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with start_transaction(), pytest.raises( @@ -606,6 +618,7 @@ def original_invoke(self, *args, **kwargs): wrapped_invoke = _wrap_pregel_invoke(original_invoke) wrapped_invoke(pregel, test_state) + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] invoke_spans = [ span @@ -636,12 +649,14 @@ def original_invoke(self, *args, **kwargs): assert invoke_span.get("tags", {}).get("status") == "internal_error" +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_pregel_ainvoke_error( sentry_init, capture_events, capture_items, stream_gen_ai_spans, + span_streaming, ): """Test error handling during async graph execution.""" sentry_init( @@ -649,6 +664,7 @@ def test_pregel_ainvoke_error( traces_sample_rate=1.0, send_default_pii=True, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) test_state = {"messages": [MockMessage("This will fail async")]} @@ -664,11 +680,12 @@ async def run_error_test(): wrapped_ainvoke = _wrap_pregel_ainvoke(original_ainvoke) await wrapped_ainvoke(pregel, test_state) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") asyncio.run(run_error_test()) + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] invoke_spans = [ span @@ -695,18 +712,21 @@ async def run_error_test(): assert invoke_span.get("tags", {}).get("status") == "internal_error" +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_span_origin( sentry_init, capture_events, capture_items, stream_gen_ai_spans, + span_streaming, ): """Test that span origins are correctly set.""" sentry_init( integrations=[LanggraphIntegration()], traces_sample_rate=1.0, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) graph = MockStateGraph() @@ -714,7 +734,7 @@ def test_span_origin( def original_compile(self, *args, **kwargs): return MockCompiledGraph(self.name) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with start_transaction(): @@ -726,6 +746,7 @@ def original_compile(self, *args, **kwargs): tx = next(item.payload for item in items if item.type == "transaction") assert tx["contexts"]["trace"]["origin"] == "manual" + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] for span in spans: assert span["attributes"]["sentry.origin"] == "auto.ai.langgraph" @@ -745,6 +766,7 @@ def original_compile(self, *args, **kwargs): assert span["origin"] == "auto.ai.langgraph" +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) @pytest.mark.parametrize("graph_name", ["my_graph", None, ""]) def test_pregel_invoke_with_different_graph_names( @@ -753,6 +775,7 @@ def test_pregel_invoke_with_different_graph_names( capture_items, graph_name, stream_gen_ai_spans, + span_streaming, ): """Test Pregel.invoke() with different graph name scenarios.""" sentry_init( @@ -760,6 +783,7 @@ def test_pregel_invoke_with_different_graph_names( traces_sample_rate=1.0, send_default_pii=True, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) pregel = MockPregelInstance(graph_name) if graph_name else MockPregelInstance() @@ -770,13 +794,14 @@ def test_pregel_invoke_with_different_graph_names( def original_invoke(self, *args, **kwargs): return {"result": "test"} - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with start_transaction(): wrapped_invoke = _wrap_pregel_invoke(original_invoke) wrapped_invoke(pregel, {"messages": []}) + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] invoke_spans = [ span @@ -824,12 +849,14 @@ def original_invoke(self, *args, **kwargs): assert SPANDATA.GEN_AI_AGENT_NAME not in invoke_span.get("data", {}) +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_pregel_invoke_span_includes_usage_data( sentry_init, capture_events, capture_items, stream_gen_ai_spans, + span_streaming, ): """ Test that invoke_agent spans include aggregated usage data from context_wrapper. @@ -839,6 +866,7 @@ def test_pregel_invoke_span_includes_usage_data( integrations=[LanggraphIntegration()], traces_sample_rate=1.0, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) test_state = { @@ -878,7 +906,7 @@ def original_invoke(self, *args, **kwargs): ] return {"messages": new_messages} - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with start_transaction(): @@ -887,6 +915,7 @@ def original_invoke(self, *args, **kwargs): assert result is not None + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] invoke_spans = [ span @@ -938,12 +967,14 @@ def original_invoke(self, *args, **kwargs): assert invoke_agent_span["data"]["gen_ai.usage.total_tokens"] == 30 +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_pregel_ainvoke_span_includes_usage_data( sentry_init, capture_events, capture_items, stream_gen_ai_spans, + span_streaming, ): """ Test that invoke_agent spans include aggregated usage data from context_wrapper. @@ -953,6 +984,7 @@ def test_pregel_ainvoke_span_includes_usage_data( integrations=[LanggraphIntegration()], traces_sample_rate=1.0, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) test_state = { @@ -998,12 +1030,13 @@ async def run_test(): result = await wrapped_ainvoke(pregel, test_state) return result - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") result = asyncio.run(run_test()) assert result is not None + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] invoke_spans = [ span @@ -1052,12 +1085,14 @@ async def run_test(): assert invoke_agent_span["data"]["gen_ai.usage.total_tokens"] == 30 +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_pregel_invoke_multiple_llm_calls_aggregate_usage( sentry_init, capture_events, capture_items, stream_gen_ai_spans, + span_streaming, ): """ Test that invoke_agent spans show aggregated usage across multiple LLM calls @@ -1067,6 +1102,7 @@ def test_pregel_invoke_multiple_llm_calls_aggregate_usage( integrations=[LanggraphIntegration()], traces_sample_rate=1.0, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) test_state = { @@ -1117,7 +1153,7 @@ def original_invoke(self, *args, **kwargs): ] return {"messages": new_messages} - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with start_transaction(): @@ -1126,6 +1162,7 @@ def original_invoke(self, *args, **kwargs): assert result is not None + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] invoke_spans = [ span @@ -1165,12 +1202,14 @@ def original_invoke(self, *args, **kwargs): assert invoke_agent_span["data"]["gen_ai.usage.total_tokens"] == 50 +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_pregel_ainvoke_multiple_llm_calls_aggregate_usage( sentry_init, capture_events, capture_items, stream_gen_ai_spans, + span_streaming, ): """ Test that invoke_agent spans show aggregated usage across multiple LLM calls @@ -1180,6 +1219,7 @@ def test_pregel_ainvoke_multiple_llm_calls_aggregate_usage( integrations=[LanggraphIntegration()], traces_sample_rate=1.0, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) test_state = { @@ -1236,12 +1276,13 @@ async def run_test(): result = await wrapped_ainvoke(pregel, test_state) return result - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") result = asyncio.run(run_test()) assert result is not None + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] invoke_spans = [ span @@ -1278,12 +1319,14 @@ async def run_test(): assert invoke_agent_span["data"]["gen_ai.usage.total_tokens"] == 50 +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_pregel_invoke_span_includes_response_model( sentry_init, capture_events, capture_items, stream_gen_ai_spans, + span_streaming, ): """ Test that invoke_agent spans include the response model. @@ -1293,6 +1336,7 @@ def test_pregel_invoke_span_includes_response_model( integrations=[LanggraphIntegration()], traces_sample_rate=1.0, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) test_state = { @@ -1332,7 +1376,7 @@ def original_invoke(self, *args, **kwargs): ] return {"messages": new_messages} - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with start_transaction(): @@ -1341,6 +1385,7 @@ def original_invoke(self, *args, **kwargs): assert result is not None + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] invoke_spans = [ span @@ -1385,12 +1430,14 @@ def original_invoke(self, *args, **kwargs): ) +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_pregel_ainvoke_span_includes_response_model( sentry_init, capture_events, capture_items, stream_gen_ai_spans, + span_streaming, ): """ Test that invoke_agent spans include the response model. @@ -1400,6 +1447,7 @@ def test_pregel_ainvoke_span_includes_response_model( integrations=[LanggraphIntegration()], traces_sample_rate=1.0, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) test_state = { @@ -1445,12 +1493,13 @@ async def run_test(): result = await wrapped_ainvoke(pregel, test_state) return result - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") result = asyncio.run(run_test()) assert result is not None + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] invoke_spans = [ span @@ -1492,12 +1541,14 @@ async def run_test(): ) +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_pregel_invoke_span_uses_last_response_model( sentry_init, capture_events, capture_items, stream_gen_ai_spans, + span_streaming, ): """ Test that when an agent makes multiple LLM calls (e.g., with tools), @@ -1507,6 +1558,7 @@ def test_pregel_invoke_span_uses_last_response_model( integrations=[LanggraphIntegration()], traces_sample_rate=1.0, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) test_state = { @@ -1559,7 +1611,7 @@ def original_invoke(self, *args, **kwargs): ] return {"messages": new_messages} - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with start_transaction(): @@ -1568,6 +1620,7 @@ def original_invoke(self, *args, **kwargs): assert result is not None + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] invoke_spans = [ span @@ -1610,12 +1663,14 @@ def original_invoke(self, *args, **kwargs): ) +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_pregel_ainvoke_span_uses_last_response_model( sentry_init, capture_events, capture_items, stream_gen_ai_spans, + span_streaming, ): """ Test that when an agent makes multiple LLM calls (e.g., with tools), @@ -1625,6 +1680,7 @@ def test_pregel_ainvoke_span_uses_last_response_model( integrations=[LanggraphIntegration()], traces_sample_rate=1.0, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) test_state = { @@ -1683,12 +1739,13 @@ async def run_test(): result = await wrapped_ainvoke(pregel, test_state) return result - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") result = asyncio.run(run_test()) assert result is not None + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] invoke_spans = [ span @@ -1775,12 +1832,14 @@ def test_complex_message_parsing(): assert result[2]["function_call"]["name"] == "search" +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_extraction_functions_complex_scenario( sentry_init, capture_events, capture_items, stream_gen_ai_spans, + span_streaming, ): """Test extraction functions with complex scenarios including multiple messages and edge cases.""" sentry_init( @@ -1788,6 +1847,7 @@ def test_extraction_functions_complex_scenario( traces_sample_rate=1.0, send_default_pii=True, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) pregel = MockPregelInstance("complex_graph") @@ -1823,7 +1883,7 @@ def original_invoke(self, *args, **kwargs): ] return {"messages": new_messages} - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with start_transaction(): @@ -1832,6 +1892,7 @@ def original_invoke(self, *args, **kwargs): assert result is not None + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] invoke_spans = [ span @@ -1884,12 +1945,14 @@ def original_invoke(self, *args, **kwargs): assert tool_calls_data[1]["function"]["name"] == "calculate" +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_langgraph_message_role_mapping( sentry_init, capture_events, capture_items, stream_gen_ai_spans, + span_streaming, ): """Test that Langgraph integration properly maps message roles like 'ai' to 'assistant'""" sentry_init( @@ -1897,6 +1960,7 @@ def test_langgraph_message_role_mapping( traces_sample_rate=1.0, send_default_pii=True, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) # Mock a langgraph message with mixed roles @@ -1918,7 +1982,7 @@ def __init__(self, content, message_type="human"): compiled_graph = MockCompiledGraph("test_graph") pregel = MockPregelInstance(compiled_graph) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with start_transaction(name="langgraph tx"): @@ -1930,6 +1994,7 @@ def __init__(self, content, message_type="human"): ) wrapped_invoke(pregel, state_data) + sentry_sdk.flush() span = next(item.payload for item in items if item.type == "span") # Verify that the span was created correctly From 1d0577122733ce37fb9973b6e9e9dd360d4f7b45 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Tue, 26 May 2026 08:30:43 +0200 Subject: [PATCH 03/10] . --- sentry_sdk/integrations/langchain.py | 4 ++-- sentry_sdk/tracing_utils.py | 9 +++++++++ 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/sentry_sdk/integrations/langchain.py b/sentry_sdk/integrations/langchain.py index 3858a0f00d..d91b9baebf 100644 --- a/sentry_sdk/integrations/langchain.py +++ b/sentry_sdk/integrations/langchain.py @@ -247,11 +247,11 @@ def setup_once() -> None: class WatchedSpan: - span: "Span" = None # type: ignore[assignment] + span: "Union[Span, StreamedSpan]" = None # type: ignore[assignment] children: "List[WatchedSpan]" = [] is_pipeline: bool = False - def __init__(self, span: "Span") -> None: + def __init__(self, span: "Union[Span, StreamedSpan]") -> None: self.span = span diff --git a/sentry_sdk/tracing_utils.py b/sentry_sdk/tracing_utils.py index e6fc8770d6..822114628a 100644 --- a/sentry_sdk/tracing_utils.py +++ b/sentry_sdk/tracing_utils.py @@ -116,6 +116,15 @@ def has_span_streaming_enabled(options: "Optional[dict[str, Any]]") -> bool: return (options.get("_experiments") or {}).get("trace_lifecycle") == "stream" +def should_truncate_gen_ai_input(options: "Optional[dict[str, Any]]") -> bool: + if options is None: + return True + + return not options.get( + "stream_gen_ai_spans", False + ) and not has_span_streaming_enabled(options) + + @contextlib.contextmanager def record_sql_queries( cursor: "Any", From 0d87458bdbbe49edd653238e3e2156f3fbdb479d Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Tue, 26 May 2026 08:43:44 +0200 Subject: [PATCH 04/10] . --- sentry_sdk/integrations/langchain.py | 6 +- sentry_sdk/integrations/langgraph.py | 131 +++++++++++++++++++-------- 2 files changed, 95 insertions(+), 42 deletions(-) diff --git a/sentry_sdk/integrations/langchain.py b/sentry_sdk/integrations/langchain.py index d91b9baebf..4bf774c898 100644 --- a/sentry_sdk/integrations/langchain.py +++ b/sentry_sdk/integrations/langchain.py @@ -302,7 +302,6 @@ def _create_span( origin: str, ) -> "WatchedSpan": watched_span: "Optional[WatchedSpan]" = None - span_streaming = has_span_streaming_enabled(sentry_sdk.get_client().options) if parent_id: parent_span: "Optional[WatchedSpan]" = self.span_map.get(parent_id) if parent_span: @@ -317,7 +316,7 @@ def _create_span( }, ) ) - if span_streaming + if isinstance(parent_span, StreamedSpan) else WatchedSpan( parent_span.span.start_child(op=op, name=name, origin=origin) ) @@ -325,6 +324,7 @@ def _create_span( parent_span.children.append(watched_span) if watched_span is None: + span_streaming = has_span_streaming_enabled(sentry_sdk.get_client().options) watched_span = ( WatchedSpan( sentry_sdk.traces.start_span( @@ -933,7 +933,7 @@ def _simplify_langchain_tools(tools: "Any") -> "Optional[List[Any]]": return simplified_tools if simplified_tools else None -def _set_tools_on_span(span: "Span", tools: "Any") -> None: +def _set_tools_on_span(span: "Union[Span, StreamedSpan]", tools: "Any") -> None: """Set available tools data on a span if tools are provided.""" if tools is not None: simplified_tools = _simplify_langchain_tools(tools) diff --git a/sentry_sdk/integrations/langgraph.py b/sentry_sdk/integrations/langgraph.py index acf630add2..0c466b1730 100644 --- a/sentry_sdk/integrations/langgraph.py +++ b/sentry_sdk/integrations/langgraph.py @@ -11,6 +11,7 @@ from sentry_sdk.consts import OP, SPANDATA from sentry_sdk.integrations import DidNotEnable, Integration from sentry_sdk.scope import should_send_default_pii +from sentry_sdk.tracing_utils import has_span_streaming_enabled from sentry_sdk.utils import safe_serialize try: @@ -151,7 +152,8 @@ def new_compile(self: "Any", *args: "Any", **kwargs: "Any") -> "Any": def _wrap_pregel_invoke(f: "Callable[..., Any]") -> "Callable[..., Any]": @wraps(f) def new_invoke(self: "Any", *args: "Any", **kwargs: "Any") -> "Any": - integration = sentry_sdk.get_client().get_integration(LanggraphIntegration) + client = sentry_sdk.get_client() + integration = client.get_integration(LanggraphIntegration) if integration is None: return f(self, *args, **kwargs) @@ -160,50 +162,101 @@ def new_invoke(self: "Any", *args: "Any", **kwargs: "Any") -> "Any": f"invoke_agent {graph_name}".strip() if graph_name else "invoke_agent" ) - with get_start_span_function()( - op=OP.GEN_AI_INVOKE_AGENT, - name=span_name, - origin=LanggraphIntegration.origin, - ) as span: - if graph_name: - span.set_data(SPANDATA.GEN_AI_PIPELINE_NAME, graph_name) - span.set_data(SPANDATA.GEN_AI_AGENT_NAME, graph_name) - - span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "invoke_agent") - - # Store input messages to later compare with output - input_messages = None - if ( - len(args) > 0 - and should_send_default_pii() - and integration.include_prompts - ): - input_messages = _parse_langgraph_messages(args[0]) - if input_messages: - normalized_input_messages = normalize_message_roles(input_messages) + if has_span_streaming_enabled(client.options): + with sentry_sdk.traces.start_span( + name=span_name, + attributes={ + "sentry.op": OP.GEN_AI_INVOKE_AGENT, + "sentry.origin": LanggraphIntegration.origin, + SPANDATA.GEN_AI_OPERATION_NAME: "invoke_agent", + }, + ) as span: + if graph_name: + span.set_attribute(SPANDATA.GEN_AI_PIPELINE_NAME, graph_name) + span.set_attribute(SPANDATA.GEN_AI_AGENT_NAME, graph_name) + + # Store input messages to later compare with output + input_messages = None + if ( + len(args) > 0 + and should_send_default_pii() + and integration.include_prompts + ): + input_messages = _parse_langgraph_messages(args[0]) + if input_messages: + normalized_input_messages = normalize_message_roles( + input_messages + ) - client = sentry_sdk.get_client() - scope = sentry_sdk.get_current_scope() - messages_data = ( - normalized_input_messages - if client.options.get("stream_gen_ai_spans", False) - else truncate_and_annotate_messages( - normalized_input_messages, span, scope + client = sentry_sdk.get_client() + scope = sentry_sdk.get_current_scope() + messages_data = ( + normalized_input_messages + if client.options.get("stream_gen_ai_spans", False) + else truncate_and_annotate_messages( + normalized_input_messages, span, scope + ) ) - ) - if messages_data is not None: - set_data_normalized( - span, - SPANDATA.GEN_AI_REQUEST_MESSAGES, - messages_data, - unpack=False, + if messages_data is not None: + set_data_normalized( + span, + SPANDATA.GEN_AI_REQUEST_MESSAGES, + messages_data, + unpack=False, + ) + + result = f(self, *args, **kwargs) + + _set_response_attributes(span, input_messages, result, integration) + + return result + else: + with get_start_span_function()( + op=OP.GEN_AI_INVOKE_AGENT, + name=span_name, + origin=LanggraphIntegration.origin, + ) as span: + if graph_name: + span.set_data(SPANDATA.GEN_AI_PIPELINE_NAME, graph_name) + span.set_data(SPANDATA.GEN_AI_AGENT_NAME, graph_name) + + span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "invoke_agent") + + # Store input messages to later compare with output + input_messages = None + if ( + len(args) > 0 + and should_send_default_pii() + and integration.include_prompts + ): + input_messages = _parse_langgraph_messages(args[0]) + if input_messages: + normalized_input_messages = normalize_message_roles( + input_messages ) - result = f(self, *args, **kwargs) + client = sentry_sdk.get_client() + scope = sentry_sdk.get_current_scope() + messages_data = ( + normalized_input_messages + if client.options.get("stream_gen_ai_spans", False) + else truncate_and_annotate_messages( + normalized_input_messages, span, scope + ) + ) + if messages_data is not None: + set_data_normalized( + span, + SPANDATA.GEN_AI_REQUEST_MESSAGES, + messages_data, + unpack=False, + ) - _set_response_attributes(span, input_messages, result, integration) + result = f(self, *args, **kwargs) - return result + _set_response_attributes(span, input_messages, result, integration) + + return result return new_invoke From d69069db3bbaedf02feeb888ce24f823aa7b502b Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Tue, 26 May 2026 08:58:40 +0200 Subject: [PATCH 05/10] ref(langchain): Remove WatchedSpan class --- sentry_sdk/integrations/langchain.py | 72 ++++++++++------------------ 1 file changed, 26 insertions(+), 46 deletions(-) diff --git a/sentry_sdk/integrations/langchain.py b/sentry_sdk/integrations/langchain.py index dfd46649e9..8d20bdb616 100644 --- a/sentry_sdk/integrations/langchain.py +++ b/sentry_sdk/integrations/langchain.py @@ -7,7 +7,6 @@ from typing import TYPE_CHECKING import sentry_sdk -from sentry_sdk.ai.monitoring import set_ai_pipeline_name from sentry_sdk.ai.utils import ( GEN_AI_ALLOWED_MESSAGE_ROLES, get_start_span_function, @@ -241,22 +240,13 @@ def setup_once() -> None: _patch_embeddings_provider(OllamaEmbeddings) -class WatchedSpan: - span: "Span" = None # type: ignore[assignment] - children: "List[WatchedSpan]" = [] - is_pipeline: bool = False - - def __init__(self, span: "Span") -> None: - self.span = span - - class SentryLangchainCallback(BaseCallbackHandler): # type: ignore[misc] """Callback handler that creates Sentry spans.""" def __init__( self, max_span_map_size: "Optional[int]", include_prompts: bool ) -> None: - self.span_map: "OrderedDict[UUID, WatchedSpan]" = OrderedDict() + self.span_map: "OrderedDict[UUID, sentry_sdk.tracing.Span]" = OrderedDict() self.max_span_map_size = max_span_map_size self.include_prompts = include_prompts @@ -271,8 +261,7 @@ def _handle_error(self, run_id: "UUID", error: "Any") -> None: if not run_id or run_id not in self.span_map: return - span_data = self.span_map[run_id] - span = span_data.span + span = self.span_map[run_id] sentry_sdk.capture_exception(error, span.scope) @@ -291,29 +280,27 @@ def _create_span( run_id: "UUID", parent_id: "Optional[Any]", **kwargs: "Any", - ) -> "WatchedSpan": - watched_span: "Optional[WatchedSpan]" = None + ) -> "sentry_sdk.tracing.Span": + span = None if parent_id: - parent_span: "Optional[WatchedSpan]" = self.span_map.get(parent_id) + parent_span: "Optional[sentry_sdk.tracing.Span]" = self.span_map.get( + parent_id + ) if parent_span: - watched_span = WatchedSpan(parent_span.span.start_child(**kwargs)) - parent_span.children.append(watched_span) + span = parent_span.span.start_child(**kwargs) - if watched_span is None: - watched_span = WatchedSpan(sentry_sdk.start_span(**kwargs)) + if span is None: + span = sentry_sdk.start_span(**kwargs) - watched_span.span.__enter__() - self.span_map[run_id] = watched_span + span.__enter__() + self.span_map[run_id] = span self.gc_span_map() - return watched_span + return span def _exit_span( - self: "SentryLangchainCallback", span_data: "WatchedSpan", run_id: "UUID" + self: "SentryLangchainCallback", span: "sentry_sdk.tracing.Span", run_id: "UUID" ) -> None: - if span_data.is_pipeline: - set_ai_pipeline_name(None) - - span_data.span.__exit__(None, None, None) + span.__exit__(None, None, None) del self.span_map[run_id] def on_llm_start( @@ -341,14 +328,13 @@ def on_llm_start( or "" ) - watched_span = self._create_span( + span = self._create_span( run_id, parent_run_id, op=OP.GEN_AI_TEXT_COMPLETION, name=f"text_completion {model}".strip(), origin=LangchainIntegration.origin, ) - span = watched_span.span span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "text_completion") @@ -421,14 +407,13 @@ def on_chat_model_start( or "" ) - watched_span = self._create_span( + span = self._create_span( run_id, kwargs.get("parent_run_id"), op=OP.GEN_AI_CHAT, name=f"chat {model}".strip(), origin=LangchainIntegration.origin, ) - span = watched_span.span span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "chat") if model: @@ -505,8 +490,7 @@ def on_chat_model_end( if not run_id or run_id not in self.span_map: return - span_data = self.span_map[run_id] - span = span_data.span + span = self.span_map[run_id] if should_send_default_pii() and self.include_prompts: set_data_normalized( @@ -516,7 +500,7 @@ def on_chat_model_end( ) _record_token_usage(span, response) - self._exit_span(span_data, run_id) + self._exit_span(span, run_id) def on_llm_end( self: "SentryLangchainCallback", @@ -530,8 +514,7 @@ def on_llm_end( if not run_id or run_id not in self.span_map: return - span_data = self.span_map[run_id] - span = span_data.span + span = self.span_map[run_id] try: generation = response.generations[0][0] @@ -579,7 +562,7 @@ def on_llm_end( ) _record_token_usage(span, response) - self._exit_span(span_data, run_id) + self._exit_span(span, run_id) def on_llm_error( self: "SentryLangchainCallback", @@ -612,15 +595,14 @@ def on_agent_finish( if not run_id or run_id not in self.span_map: return - span_data = self.span_map[run_id] - span = span_data.span + span = self.span_map[run_id] if should_send_default_pii() and self.include_prompts: set_data_normalized( span, SPANDATA.GEN_AI_RESPONSE_TEXT, finish.return_values.items() ) - self._exit_span(span_data, run_id) + self._exit_span(span, run_id) def on_tool_start( self: "SentryLangchainCallback", @@ -637,14 +619,13 @@ def on_tool_start( tool_name = serialized.get("name") or kwargs.get("name") or "" - watched_span = self._create_span( + span = self._create_span( run_id, kwargs.get("parent_run_id"), op=OP.GEN_AI_EXECUTE_TOOL, name=f"execute_tool {tool_name}".strip(), origin=LangchainIntegration.origin, ) - span = watched_span.span span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "execute_tool") span.set_data(SPANDATA.GEN_AI_TOOL_NAME, tool_name) @@ -681,13 +662,12 @@ def on_tool_end( if not run_id or run_id not in self.span_map: return - span_data = self.span_map[run_id] - span = span_data.span + span = self.span_map[run_id] if should_send_default_pii() and self.include_prompts: set_data_normalized(span, SPANDATA.GEN_AI_TOOL_OUTPUT, output) - self._exit_span(span_data, run_id) + self._exit_span(span, run_id) def on_tool_error( self, From ea62340507a9c02ed21ea1ffdd6adfb66f7915ea Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Tue, 26 May 2026 09:31:56 +0200 Subject: [PATCH 06/10] . --- sentry_sdk/integrations/langchain.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sentry_sdk/integrations/langchain.py b/sentry_sdk/integrations/langchain.py index 8d20bdb616..af427fe6b1 100644 --- a/sentry_sdk/integrations/langchain.py +++ b/sentry_sdk/integrations/langchain.py @@ -287,7 +287,7 @@ def _create_span( parent_id ) if parent_span: - span = parent_span.span.start_child(**kwargs) + span = parent_span.start_child(**kwargs) if span is None: span = sentry_sdk.start_span(**kwargs) From 41cceeb7a1581f229379228a30aabd6ceb5bd42c Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Tue, 26 May 2026 14:09:53 +0200 Subject: [PATCH 07/10] rename variable --- sentry_sdk/integrations/langchain.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sentry_sdk/integrations/langchain.py b/sentry_sdk/integrations/langchain.py index af427fe6b1..bcf9221165 100644 --- a/sentry_sdk/integrations/langchain.py +++ b/sentry_sdk/integrations/langchain.py @@ -253,8 +253,8 @@ def __init__( def gc_span_map(self) -> None: if self.max_span_map_size is not None: while len(self.span_map) > self.max_span_map_size: - run_id, watched_span = self.span_map.popitem(last=False) - self._exit_span(watched_span, run_id) + run_id, span = self.span_map.popitem(last=False) + self._exit_span(span, run_id) def _handle_error(self, run_id: "UUID", error: "Any") -> None: with capture_internal_exceptions(): From 888806b462f6166c62f5d1090f1bac74fb886159 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Tue, 26 May 2026 14:32:18 +0200 Subject: [PATCH 08/10] use should_truncate_gen_ai_inputs in langgraph --- sentry_sdk/integrations/langgraph.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/sentry_sdk/integrations/langgraph.py b/sentry_sdk/integrations/langgraph.py index 0c466b1730..d66cc8971f 100644 --- a/sentry_sdk/integrations/langgraph.py +++ b/sentry_sdk/integrations/langgraph.py @@ -11,7 +11,10 @@ from sentry_sdk.consts import OP, SPANDATA from sentry_sdk.integrations import DidNotEnable, Integration from sentry_sdk.scope import should_send_default_pii -from sentry_sdk.tracing_utils import has_span_streaming_enabled +from sentry_sdk.tracing_utils import ( + has_span_streaming_enabled, + should_truncate_gen_ai_input, +) from sentry_sdk.utils import safe_serialize try: @@ -191,11 +194,11 @@ def new_invoke(self: "Any", *args: "Any", **kwargs: "Any") -> "Any": client = sentry_sdk.get_client() scope = sentry_sdk.get_current_scope() messages_data = ( - normalized_input_messages - if client.options.get("stream_gen_ai_spans", False) - else truncate_and_annotate_messages( + truncate_and_annotate_messages( normalized_input_messages, span, scope ) + if should_truncate_gen_ai_input(client.options) + else normalized_input_messages ) if messages_data is not None: set_data_normalized( From c2353ff2c035eda69f14d3efb01ea80a3c30ebf4 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Wed, 27 May 2026 13:46:50 +0200 Subject: [PATCH 09/10] unparametrize one test --- tests/integrations/langgraph/test_langgraph.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tests/integrations/langgraph/test_langgraph.py b/tests/integrations/langgraph/test_langgraph.py index 26c7bfbc27..db41e0e8d7 100644 --- a/tests/integrations/langgraph/test_langgraph.py +++ b/tests/integrations/langgraph/test_langgraph.py @@ -712,21 +712,18 @@ async def run_error_test(): assert invoke_span.get("tags", {}).get("status") == "internal_error" -@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_span_origin( sentry_init, capture_events, capture_items, stream_gen_ai_spans, - span_streaming, ): """Test that span origins are correctly set.""" sentry_init( integrations=[LanggraphIntegration()], traces_sample_rate=1.0, stream_gen_ai_spans=stream_gen_ai_spans, - _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) graph = MockStateGraph() @@ -734,7 +731,7 @@ def test_span_origin( def original_compile(self, *args, **kwargs): return MockCompiledGraph(self.name) - if span_streaming or stream_gen_ai_spans: + if stream_gen_ai_spans: items = capture_items("transaction", "span") with start_transaction(): From ea62a4640e507959fd0f7749deff296cb7131897 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Wed, 27 May 2026 14:06:08 +0200 Subject: [PATCH 10/10] empty commit