diff --git a/sentry_sdk/integrations/langchain.py b/sentry_sdk/integrations/langchain.py index bcf9221165..054a953a01 100644 --- a/sentry_sdk/integrations/langchain.py +++ b/sentry_sdk/integrations/langchain.py @@ -18,7 +18,12 @@ from sentry_sdk.consts import OP, SPANDATA from sentry_sdk.integrations import DidNotEnable, Integration from sentry_sdk.scope import should_send_default_pii -from sentry_sdk.tracing_utils import _get_value +from sentry_sdk.traces import StreamedSpan +from sentry_sdk.tracing_utils import ( + _get_value, + has_span_streaming_enabled, + should_truncate_gen_ai_input, +) from sentry_sdk.utils import capture_internal_exceptions, logger if TYPE_CHECKING: @@ -246,7 +251,7 @@ class SentryLangchainCallback(BaseCallbackHandler): # type: ignore[misc] def __init__( self, max_span_map_size: "Optional[int]", include_prompts: bool ) -> None: - self.span_map: "OrderedDict[UUID, sentry_sdk.tracing.Span]" = OrderedDict() + self.span_map: "OrderedDict[UUID, Union[sentry_sdk.tracing.Span, StreamedSpan]]" = OrderedDict() self.max_span_map_size = max_span_map_size self.include_prompts = include_prompts @@ -263,7 +268,9 @@ def _handle_error(self, run_id: "UUID", error: "Any") -> None: span = self.span_map[run_id] - sentry_sdk.capture_exception(error, span.scope) + sentry_sdk.capture_exception( + error, span._scope if isinstance(span, StreamedSpan) else span.scope + ) span.__exit__(type(error), error, error.__traceback__) del self.span_map[run_id] @@ -279,18 +286,42 @@ def _create_span( self: "SentryLangchainCallback", run_id: "UUID", parent_id: "Optional[Any]", - **kwargs: "Any", - ) -> "sentry_sdk.tracing.Span": + op: str, + name: str, + origin: str, + ) -> "Union[sentry_sdk.tracing.Span, StreamedSpan]": span = None if parent_id: - parent_span: "Optional[sentry_sdk.tracing.Span]" = self.span_map.get( - parent_id + parent_span: "Optional[Union[sentry_sdk.tracing.Span, StreamedSpan]]" = ( + self.span_map.get(parent_id) ) if parent_span: - span = parent_span.start_child(**kwargs) + span = ( + sentry_sdk.traces.start_span( + parent_span=parent_span, + name=name, + attributes={ + "sentry.op": op, + "sentry.origin": origin, + }, + ) + if isinstance(parent_span, StreamedSpan) + else parent_span.start_child(op=op, name=name, origin=origin) + ) if span is None: - span = sentry_sdk.start_span(**kwargs) + span_streaming = has_span_streaming_enabled(sentry_sdk.get_client().options) + span = ( + sentry_sdk.traces.start_span( + name=name, + attributes={ + "sentry.op": op, + "sentry.origin": origin, + }, + ) + if span_streaming + else sentry_sdk.start_span(op=op, name=name, origin=origin) + ) span.__enter__() self.span_map[run_id] = span @@ -298,7 +329,9 @@ def _create_span( return span def _exit_span( - self: "SentryLangchainCallback", span: "sentry_sdk.tracing.Span", run_id: "UUID" + self: "SentryLangchainCallback", + span: "Union[sentry_sdk.tracing.Span, StreamedSpan]", + run_id: "UUID", ) -> None: span.__exit__(None, None, None) del self.span_map[run_id] @@ -336,21 +369,24 @@ def on_llm_start( origin=LangchainIntegration.origin, ) - span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "text_completion") + set_on_span = ( + span.set_attribute if isinstance(span, StreamedSpan) else span.set_data + ) + set_on_span(SPANDATA.GEN_AI_OPERATION_NAME, "text_completion") run_name = kwargs.get("name") if run_name: - span.set_data(SPANDATA.GEN_AI_FUNCTION_ID, run_name) + set_on_span(SPANDATA.GEN_AI_FUNCTION_ID, run_name) if model: - span.set_data( + set_on_span( SPANDATA.GEN_AI_REQUEST_MODEL, model, ) ai_system = _get_ai_system(all_params) if ai_system: - span.set_data(SPANDATA.GEN_AI_SYSTEM, ai_system) + set_on_span(SPANDATA.GEN_AI_SYSTEM, ai_system) for key, attribute in DATA_FIELDS.items(): if key in all_params and all_params[key] is not None: @@ -370,11 +406,9 @@ def on_llm_start( client = sentry_sdk.get_client() scope = sentry_sdk.get_current_scope() messages_data = ( - normalized_messages - if client.options.get("stream_gen_ai_spans", False) - else truncate_and_annotate_messages( - normalized_messages, span, scope - ) + truncate_and_annotate_messages(normalized_messages, span, scope) + if should_truncate_gen_ai_input(client.options) + else normalized_messages ) if messages_data is not None: set_data_normalized( @@ -415,23 +449,24 @@ def on_chat_model_start( origin=LangchainIntegration.origin, ) - span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "chat") + set_on_span = ( + span.set_attribute if isinstance(span, StreamedSpan) else span.set_data + ) + set_on_span(SPANDATA.GEN_AI_OPERATION_NAME, "chat") if model: - span.set_data(SPANDATA.GEN_AI_REQUEST_MODEL, model) + set_on_span(SPANDATA.GEN_AI_REQUEST_MODEL, model) ai_system = _get_ai_system(all_params) if ai_system: - span.set_data(SPANDATA.GEN_AI_SYSTEM, ai_system) + set_on_span(SPANDATA.GEN_AI_SYSTEM, ai_system) agent_metadata = kwargs.get("metadata") if isinstance(agent_metadata, dict) and "lc_agent_name" in agent_metadata: - span.set_data( - SPANDATA.GEN_AI_AGENT_NAME, agent_metadata["lc_agent_name"] - ) + set_on_span(SPANDATA.GEN_AI_AGENT_NAME, agent_metadata["lc_agent_name"]) run_name = kwargs.get("name") if run_name: - span.set_data( + set_on_span( SPANDATA.GEN_AI_FUNCTION_ID, run_name, ) @@ -445,7 +480,7 @@ def on_chat_model_start( if should_send_default_pii() and self.include_prompts: system_instructions = _get_system_instructions(messages) if len(system_instructions) > 0: - span.set_data( + set_on_span( SPANDATA.GEN_AI_SYSTEM_INSTRUCTIONS, json.dumps(_transform_system_instructions(system_instructions)), ) @@ -464,11 +499,9 @@ def on_chat_model_start( client = sentry_sdk.get_client() scope = sentry_sdk.get_current_scope() messages_data = ( - normalized_messages - if client.options.get("stream_gen_ai_spans", False) - else truncate_and_annotate_messages( - normalized_messages, span, scope - ) + truncate_and_annotate_messages(normalized_messages, span, scope) + if should_truncate_gen_ai_input(client.options) + else normalized_messages ) if messages_data is not None: set_data_normalized( @@ -522,19 +555,25 @@ def on_llm_end( generation = None if generation is not None: + set_on_span = ( + span.set_attribute + if isinstance(span, StreamedSpan) + else span.set_data + ) + try: response_model = generation.message.response_metadata.get( "model_name" ) if response_model is not None: - span.set_data(SPANDATA.GEN_AI_RESPONSE_MODEL, response_model) + set_on_span(SPANDATA.GEN_AI_RESPONSE_MODEL, response_model) except AttributeError: pass try: finish_reason = generation.generation_info.get("finish_reason") if finish_reason is not None: - span.set_data( + set_on_span( SPANDATA.GEN_AI_RESPONSE_FINISH_REASONS, [finish_reason], ) @@ -627,22 +666,24 @@ def on_tool_start( origin=LangchainIntegration.origin, ) - span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "execute_tool") - span.set_data(SPANDATA.GEN_AI_TOOL_NAME, tool_name) + set_on_span = ( + span.set_attribute if isinstance(span, StreamedSpan) else span.set_data + ) + + set_on_span(SPANDATA.GEN_AI_OPERATION_NAME, "execute_tool") + set_on_span(SPANDATA.GEN_AI_TOOL_NAME, tool_name) tool_description = serialized.get("description") if tool_description is not None: - span.set_data(SPANDATA.GEN_AI_TOOL_DESCRIPTION, tool_description) + set_on_span(SPANDATA.GEN_AI_TOOL_DESCRIPTION, tool_description) agent_metadata = kwargs.get("metadata") if isinstance(agent_metadata, dict) and "lc_agent_name" in agent_metadata: - span.set_data( - SPANDATA.GEN_AI_AGENT_NAME, agent_metadata["lc_agent_name"] - ) + set_on_span(SPANDATA.GEN_AI_AGENT_NAME, agent_metadata["lc_agent_name"]) run_name = kwargs.get("name") if run_name: - span.set_data( + set_on_span( SPANDATA.GEN_AI_FUNCTION_ID, run_name, ) @@ -751,7 +792,7 @@ def _get_token_usage(obj: "Any") -> "Optional[Dict[str, Any]]": return None -def _record_token_usage(span: "Span", response: "Any") -> None: +def _record_token_usage(span: "Union[Span, StreamedSpan]", response: "Any") -> None: token_usage = _get_token_usage(response) if token_usage: input_tokens, output_tokens, total_tokens = _extract_tokens(token_usage) @@ -760,14 +801,18 @@ def _record_token_usage(span: "Span", response: "Any") -> None: response.generations ) + set_on_span = ( + span.set_attribute if isinstance(span, StreamedSpan) else span.set_data + ) + if input_tokens is not None: - span.set_data(SPANDATA.GEN_AI_USAGE_INPUT_TOKENS, input_tokens) + set_on_span(SPANDATA.GEN_AI_USAGE_INPUT_TOKENS, input_tokens) if output_tokens is not None: - span.set_data(SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS, output_tokens) + set_on_span(SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS, output_tokens) if total_tokens is not None: - span.set_data(SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS, total_tokens) + set_on_span(SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS, total_tokens) def _get_request_data( @@ -864,7 +909,7 @@ def _simplify_langchain_tools(tools: "Any") -> "Optional[List[Any]]": return simplified_tools if simplified_tools else None -def _set_tools_on_span(span: "Span", tools: "Any") -> None: +def _set_tools_on_span(span: "Union[Span, StreamedSpan]", tools: "Any") -> None: """Set available tools data on a span if tools are provided.""" if tools is not None: simplified_tools = _simplify_langchain_tools(tools) @@ -960,63 +1005,114 @@ def new_configure( def _wrap_agent_executor_invoke(f: "Callable[..., Any]") -> "Callable[..., Any]": @wraps(f) def new_invoke(self: "Any", *args: "Any", **kwargs: "Any") -> "Any": - integration = sentry_sdk.get_client().get_integration(LangchainIntegration) + client = sentry_sdk.get_client() + integration = client.get_integration(LangchainIntegration) if integration is None: return f(self, *args, **kwargs) run_name, tools = _get_request_data(self, args, kwargs) - start_span_function = get_start_span_function() - with start_span_function( - op=OP.GEN_AI_INVOKE_AGENT, - name=f"invoke_agent {run_name}" if run_name else "invoke_agent", - origin=LangchainIntegration.origin, - ) as span: - if run_name: - span.set_data(SPANDATA.GEN_AI_FUNCTION_ID, run_name) + if has_span_streaming_enabled(client.options): + with sentry_sdk.traces.start_span( + name=f"invoke_agent {run_name}" if run_name else "invoke_agent", + attributes={ + "sentry.op": OP.GEN_AI_INVOKE_AGENT, + "sentry.origin": LangchainIntegration.origin, + SPANDATA.GEN_AI_OPERATION_NAME: "invoke_agent", + SPANDATA.GEN_AI_RESPONSE_STREAMING: False, + }, + ) as span: + if run_name: + span.set_attribute(SPANDATA.GEN_AI_FUNCTION_ID, run_name) + + _set_tools_on_span(span, tools) + + # Run the agent + result = f(self, *args, **kwargs) + + input = result.get("input") + if ( + input is not None + and should_send_default_pii() + and integration.include_prompts + ): + normalized_messages = normalize_message_roles([input]) + + client = sentry_sdk.get_client() + scope = sentry_sdk.get_current_scope() + messages_data = ( + truncate_and_annotate_messages(normalized_messages, span, scope) + if should_truncate_gen_ai_input(client.options) + else normalized_messages + ) + if messages_data is not None: + set_data_normalized( + span, + SPANDATA.GEN_AI_REQUEST_MESSAGES, + messages_data, + unpack=False, + ) - span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "invoke_agent") - span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, False) + output = result.get("output") + if ( + output is not None + and should_send_default_pii() + and integration.include_prompts + ): + set_data_normalized(span, SPANDATA.GEN_AI_RESPONSE_TEXT, output) - _set_tools_on_span(span, tools) + return result + else: + start_span_function = get_start_span_function() - # Run the agent - result = f(self, *args, **kwargs) + with start_span_function( + op=OP.GEN_AI_INVOKE_AGENT, + name=f"invoke_agent {run_name}" if run_name else "invoke_agent", + origin=LangchainIntegration.origin, + ) as span: + if run_name: + span.set_data(SPANDATA.GEN_AI_FUNCTION_ID, run_name) - input = result.get("input") - if ( - input is not None - and should_send_default_pii() - and integration.include_prompts - ): - normalized_messages = normalize_message_roles([input]) + span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "invoke_agent") + span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, False) - client = sentry_sdk.get_client() - scope = sentry_sdk.get_current_scope() - messages_data = ( - normalized_messages - if client.options.get("stream_gen_ai_spans", False) - else truncate_and_annotate_messages( - normalized_messages, span, scope - ) - ) - if messages_data is not None: - set_data_normalized( - span, - SPANDATA.GEN_AI_REQUEST_MESSAGES, - messages_data, - unpack=False, + _set_tools_on_span(span, tools) + + # Run the agent + result = f(self, *args, **kwargs) + + input = result.get("input") + if ( + input is not None + and should_send_default_pii() + and integration.include_prompts + ): + normalized_messages = normalize_message_roles([input]) + + client = sentry_sdk.get_client() + scope = sentry_sdk.get_current_scope() + messages_data = ( + truncate_and_annotate_messages(normalized_messages, span, scope) + if should_truncate_gen_ai_input(client.options) + else normalized_messages ) + if messages_data is not None: + set_data_normalized( + span, + SPANDATA.GEN_AI_REQUEST_MESSAGES, + messages_data, + unpack=False, + ) - output = result.get("output") - if ( - output is not None - and should_send_default_pii() - and integration.include_prompts - ): - set_data_normalized(span, SPANDATA.GEN_AI_RESPONSE_TEXT, output) + output = result.get("output") + if ( + output is not None + and should_send_default_pii() + and integration.include_prompts + ): + set_data_normalized(span, SPANDATA.GEN_AI_RESPONSE_TEXT, output) - return result + return result return new_invoke @@ -1024,25 +1120,41 @@ def new_invoke(self: "Any", *args: "Any", **kwargs: "Any") -> "Any": def _wrap_agent_executor_stream(f: "Callable[..., Any]") -> "Callable[..., Any]": @wraps(f) def new_stream(self: "Any", *args: "Any", **kwargs: "Any") -> "Any": - integration = sentry_sdk.get_client().get_integration(LangchainIntegration) + client = sentry_sdk.get_client() + integration = client.get_integration(LangchainIntegration) if integration is None: return f(self, *args, **kwargs) run_name, tools = _get_request_data(self, args, kwargs) - start_span_function = get_start_span_function() - span = start_span_function( - op=OP.GEN_AI_INVOKE_AGENT, - name=f"invoke_agent {run_name}" if run_name else "invoke_agent", - origin=LangchainIntegration.origin, - ) - span.__enter__() + if has_span_streaming_enabled(client.options): + span = sentry_sdk.traces.start_span( + name=f"invoke_agent {run_name}" if run_name else "invoke_agent", + attributes={ + "sentry.op": OP.GEN_AI_INVOKE_AGENT, + "sentry.origin": LangchainIntegration.origin, + SPANDATA.GEN_AI_OPERATION_NAME: "invoke_agent", + SPANDATA.GEN_AI_RESPONSE_STREAMING: True, + }, + ) + + if run_name: + span.set_attribute(SPANDATA.GEN_AI_FUNCTION_ID, run_name) + else: + start_span_function = get_start_span_function() - if run_name: - span.set_data(SPANDATA.GEN_AI_FUNCTION_ID, run_name) + span = start_span_function( + op=OP.GEN_AI_INVOKE_AGENT, + name=f"invoke_agent {run_name}" if run_name else "invoke_agent", + origin=LangchainIntegration.origin, + ) + span.__enter__() - span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "invoke_agent") - span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True) + span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "invoke_agent") + span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True) + + if run_name: + span.set_data(SPANDATA.GEN_AI_FUNCTION_ID, run_name) _set_tools_on_span(span, tools) @@ -1057,9 +1169,9 @@ def new_stream(self: "Any", *args: "Any", **kwargs: "Any") -> "Any": client = sentry_sdk.get_client() scope = sentry_sdk.get_current_scope() messages_data = ( - normalized_messages - if client.options.get("stream_gen_ai_spans", False) - else truncate_and_annotate_messages(normalized_messages, span, scope) + truncate_and_annotate_messages(normalized_messages, span, scope) + if should_truncate_gen_ai_input(client.options) + else normalized_messages ) if messages_data is not None: set_data_normalized( @@ -1160,35 +1272,65 @@ def _wrap_embedding_method(f: "Callable[..., Any]") -> "Callable[..., Any]": @wraps(f) def new_embedding_method(self: "Any", *args: "Any", **kwargs: "Any") -> "Any": - integration = sentry_sdk.get_client().get_integration(LangchainIntegration) + client = sentry_sdk.get_client() + integration = client.get_integration(LangchainIntegration) if integration is None: return f(self, *args, **kwargs) model_name = getattr(self, "model", None) or getattr(self, "model_name", None) - with sentry_sdk.start_span( - op=OP.GEN_AI_EMBEDDINGS, - name=f"embeddings {model_name}" if model_name else "embeddings", - origin=LangchainIntegration.origin, - ) as span: - span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "embeddings") - if model_name: - span.set_data(SPANDATA.GEN_AI_REQUEST_MODEL, model_name) - - # Capture input if PII is allowed - if ( - should_send_default_pii() - and integration.include_prompts - and len(args) > 0 - ): - input_data = args[0] - # Normalize to list format - texts = input_data if isinstance(input_data, list) else [input_data] - set_data_normalized( - span, SPANDATA.GEN_AI_EMBEDDINGS_INPUT, texts, unpack=False - ) - result = f(self, *args, **kwargs) - return result + if has_span_streaming_enabled(client.options): + with sentry_sdk.traces.start_span( + name=f"embeddings {model_name}" if model_name else "embeddings", + attributes={ + "sentry.op": OP.GEN_AI_EMBEDDINGS, + "sentry.origin": LangchainIntegration.origin, + SPANDATA.GEN_AI_OPERATION_NAME: "embeddings", + }, + ) as span: + if model_name: + span.set_attribute(SPANDATA.GEN_AI_REQUEST_MODEL, model_name) + + # Capture input if PII is allowed + if ( + should_send_default_pii() + and integration.include_prompts + and len(args) > 0 + ): + input_data = args[0] + # Normalize to list format + texts = input_data if isinstance(input_data, list) else [input_data] + set_data_normalized( + span, SPANDATA.GEN_AI_EMBEDDINGS_INPUT, texts, unpack=False + ) + + result = f(self, *args, **kwargs) + return result + else: + with sentry_sdk.start_span( + op=OP.GEN_AI_EMBEDDINGS, + name=f"embeddings {model_name}" if model_name else "embeddings", + origin=LangchainIntegration.origin, + ) as span: + span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "embeddings") + if model_name: + span.set_data(SPANDATA.GEN_AI_REQUEST_MODEL, model_name) + + # Capture input if PII is allowed + if ( + should_send_default_pii() + and integration.include_prompts + and len(args) > 0 + ): + input_data = args[0] + # Normalize to list format + texts = input_data if isinstance(input_data, list) else [input_data] + set_data_normalized( + span, SPANDATA.GEN_AI_EMBEDDINGS_INPUT, texts, unpack=False + ) + + result = f(self, *args, **kwargs) + return result return new_embedding_method @@ -1200,34 +1342,64 @@ def _wrap_async_embedding_method(f: "Callable[..., Any]") -> "Callable[..., Any] async def new_async_embedding_method( self: "Any", *args: "Any", **kwargs: "Any" ) -> "Any": - integration = sentry_sdk.get_client().get_integration(LangchainIntegration) + client = sentry_sdk.get_client() + integration = client.get_integration(LangchainIntegration) if integration is None: return await f(self, *args, **kwargs) model_name = getattr(self, "model", None) or getattr(self, "model_name", None) - with sentry_sdk.start_span( - op=OP.GEN_AI_EMBEDDINGS, - name=f"embeddings {model_name}" if model_name else "embeddings", - origin=LangchainIntegration.origin, - ) as span: - span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "embeddings") - if model_name: - span.set_data(SPANDATA.GEN_AI_REQUEST_MODEL, model_name) - - # Capture input if PII is allowed - if ( - should_send_default_pii() - and integration.include_prompts - and len(args) > 0 - ): - input_data = args[0] - # Normalize to list format - texts = input_data if isinstance(input_data, list) else [input_data] - set_data_normalized( - span, SPANDATA.GEN_AI_EMBEDDINGS_INPUT, texts, unpack=False - ) - result = await f(self, *args, **kwargs) - return result + if has_span_streaming_enabled(client.options): + with sentry_sdk.traces.start_span( + name=f"embeddings {model_name}" if model_name else "embeddings", + attributes={ + "sentry.op": OP.GEN_AI_EMBEDDINGS, + "sentry.origin": LangchainIntegration.origin, + SPANDATA.GEN_AI_OPERATION_NAME: "embeddings", + }, + ) as span: + if model_name: + span.set_attribute(SPANDATA.GEN_AI_REQUEST_MODEL, model_name) + + # Capture input if PII is allowed + if ( + should_send_default_pii() + and integration.include_prompts + and len(args) > 0 + ): + input_data = args[0] + # Normalize to list format + texts = input_data if isinstance(input_data, list) else [input_data] + set_data_normalized( + span, SPANDATA.GEN_AI_EMBEDDINGS_INPUT, texts, unpack=False + ) + + result = await f(self, *args, **kwargs) + return result + else: + with sentry_sdk.start_span( + op=OP.GEN_AI_EMBEDDINGS, + name=f"embeddings {model_name}" if model_name else "embeddings", + origin=LangchainIntegration.origin, + ) as span: + span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "embeddings") + if model_name: + span.set_data(SPANDATA.GEN_AI_REQUEST_MODEL, model_name) + + # Capture input if PII is allowed + if ( + should_send_default_pii() + and integration.include_prompts + and len(args) > 0 + ): + input_data = args[0] + # Normalize to list format + texts = input_data if isinstance(input_data, list) else [input_data] + set_data_normalized( + span, SPANDATA.GEN_AI_EMBEDDINGS_INPUT, texts, unpack=False + ) + + result = await f(self, *args, **kwargs) + return result return new_async_embedding_method diff --git a/sentry_sdk/integrations/langgraph.py b/sentry_sdk/integrations/langgraph.py index acf630add2..3d408d7b03 100644 --- a/sentry_sdk/integrations/langgraph.py +++ b/sentry_sdk/integrations/langgraph.py @@ -11,6 +11,11 @@ from sentry_sdk.consts import OP, SPANDATA from sentry_sdk.integrations import DidNotEnable, Integration from sentry_sdk.scope import should_send_default_pii +from sentry_sdk.traces import StreamedSpan +from sentry_sdk.tracing_utils import ( + has_span_streaming_enabled, + should_truncate_gen_ai_input, +) from sentry_sdk.utils import safe_serialize try: @@ -151,7 +156,8 @@ def new_compile(self: "Any", *args: "Any", **kwargs: "Any") -> "Any": def _wrap_pregel_invoke(f: "Callable[..., Any]") -> "Callable[..., Any]": @wraps(f) def new_invoke(self: "Any", *args: "Any", **kwargs: "Any") -> "Any": - integration = sentry_sdk.get_client().get_integration(LanggraphIntegration) + client = sentry_sdk.get_client() + integration = client.get_integration(LanggraphIntegration) if integration is None: return f(self, *args, **kwargs) @@ -160,50 +166,101 @@ def new_invoke(self: "Any", *args: "Any", **kwargs: "Any") -> "Any": f"invoke_agent {graph_name}".strip() if graph_name else "invoke_agent" ) - with get_start_span_function()( - op=OP.GEN_AI_INVOKE_AGENT, - name=span_name, - origin=LanggraphIntegration.origin, - ) as span: - if graph_name: - span.set_data(SPANDATA.GEN_AI_PIPELINE_NAME, graph_name) - span.set_data(SPANDATA.GEN_AI_AGENT_NAME, graph_name) - - span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "invoke_agent") - - # Store input messages to later compare with output - input_messages = None - if ( - len(args) > 0 - and should_send_default_pii() - and integration.include_prompts - ): - input_messages = _parse_langgraph_messages(args[0]) - if input_messages: - normalized_input_messages = normalize_message_roles(input_messages) + if has_span_streaming_enabled(client.options): + with sentry_sdk.traces.start_span( + name=span_name, + attributes={ + "sentry.op": OP.GEN_AI_INVOKE_AGENT, + "sentry.origin": LanggraphIntegration.origin, + SPANDATA.GEN_AI_OPERATION_NAME: "invoke_agent", + }, + ) as span: + if graph_name: + span.set_attribute(SPANDATA.GEN_AI_PIPELINE_NAME, graph_name) + span.set_attribute(SPANDATA.GEN_AI_AGENT_NAME, graph_name) + + # Store input messages to later compare with output + input_messages = None + if ( + len(args) > 0 + and should_send_default_pii() + and integration.include_prompts + ): + input_messages = _parse_langgraph_messages(args[0]) + if input_messages: + normalized_input_messages = normalize_message_roles( + input_messages + ) - client = sentry_sdk.get_client() - scope = sentry_sdk.get_current_scope() - messages_data = ( - normalized_input_messages - if client.options.get("stream_gen_ai_spans", False) - else truncate_and_annotate_messages( - normalized_input_messages, span, scope + client = sentry_sdk.get_client() + scope = sentry_sdk.get_current_scope() + messages_data = ( + truncate_and_annotate_messages( + normalized_input_messages, span, scope + ) + if should_truncate_gen_ai_input(client.options) + else normalized_input_messages ) - ) - if messages_data is not None: - set_data_normalized( - span, - SPANDATA.GEN_AI_REQUEST_MESSAGES, - messages_data, - unpack=False, + if messages_data is not None: + set_data_normalized( + span, + SPANDATA.GEN_AI_REQUEST_MESSAGES, + messages_data, + unpack=False, + ) + + result = f(self, *args, **kwargs) + + _set_response_attributes(span, input_messages, result, integration) + + return result + else: + with get_start_span_function()( + op=OP.GEN_AI_INVOKE_AGENT, + name=span_name, + origin=LanggraphIntegration.origin, + ) as span: + if graph_name: + span.set_data(SPANDATA.GEN_AI_PIPELINE_NAME, graph_name) + span.set_data(SPANDATA.GEN_AI_AGENT_NAME, graph_name) + + span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "invoke_agent") + + # Store input messages to later compare with output + input_messages = None + if ( + len(args) > 0 + and should_send_default_pii() + and integration.include_prompts + ): + input_messages = _parse_langgraph_messages(args[0]) + if input_messages: + normalized_input_messages = normalize_message_roles( + input_messages + ) + + client = sentry_sdk.get_client() + scope = sentry_sdk.get_current_scope() + messages_data = ( + normalized_input_messages + if client.options.get("stream_gen_ai_spans", False) + else truncate_and_annotate_messages( + normalized_input_messages, span, scope + ) ) + if messages_data is not None: + set_data_normalized( + span, + SPANDATA.GEN_AI_REQUEST_MESSAGES, + messages_data, + unpack=False, + ) - result = f(self, *args, **kwargs) + result = f(self, *args, **kwargs) - _set_response_attributes(span, input_messages, result, integration) + _set_response_attributes(span, input_messages, result, integration) - return result + return result return new_invoke @@ -333,14 +390,18 @@ def _set_usage_data(span: "sentry_sdk.tracing.Span", messages: "Any") -> None: output_tokens += int(token_usage.get("completion_tokens", 0)) total_tokens += int(token_usage.get("total_tokens", 0)) + set_on_span = ( + span.set_attribute if isinstance(span, StreamedSpan) else span.set_data + ) + if input_tokens > 0: - span.set_data(SPANDATA.GEN_AI_USAGE_INPUT_TOKENS, input_tokens) + set_on_span(SPANDATA.GEN_AI_USAGE_INPUT_TOKENS, input_tokens) if output_tokens > 0: - span.set_data(SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS, output_tokens) + set_on_span(SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS, output_tokens) if total_tokens > 0: - span.set_data( + set_on_span( SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS, total_tokens, ) diff --git a/tests/integrations/langchain/test_langchain.py b/tests/integrations/langchain/test_langchain.py index 69c58f5409..aa13ce63ad 100644 --- a/tests/integrations/langchain/test_langchain.py +++ b/tests/integrations/langchain/test_langchain.py @@ -236,6 +236,7 @@ def get_word_length(word: str) -> int: return len(word) +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_langchain_text_completion( sentry_init, @@ -243,6 +244,7 @@ def test_langchain_text_completion( capture_items, get_model_response, stream_gen_ai_spans, + span_streaming, ): sentry_init( integrations=[ @@ -253,6 +255,7 @@ def test_langchain_text_completion( traces_sample_rate=1.0, send_default_pii=True, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) model_response = get_model_response( @@ -284,7 +287,7 @@ def test_langchain_text_completion( openai_api_key="badkey", ) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with patch.object( @@ -298,6 +301,7 @@ def test_langchain_text_completion( tx = next(item.payload for item in items if item.type == "transaction") assert tx["type"] == "transaction" + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] llm_spans = [ span @@ -353,6 +357,7 @@ def test_langchain_text_completion( assert llm_span["data"]["gen_ai.usage.output_tokens"] == 15 +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_langchain_chat_with_run_name( sentry_init, @@ -361,6 +366,7 @@ def test_langchain_chat_with_run_name( get_model_response, nonstreaming_chat_completions_model_response, stream_gen_ai_spans, + span_streaming, ): sentry_init( integrations=[ @@ -371,6 +377,7 @@ def test_langchain_chat_with_run_name( traces_sample_rate=1.0, send_default_pii=True, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) request_headers = {} @@ -400,7 +407,7 @@ def test_langchain_chat_with_run_name( openai_api_key="badkey", ) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("span") with patch.object( @@ -413,6 +420,7 @@ def test_langchain_chat_with_run_name( config={"run_name": "my-snazzy-pipeline"}, ) + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] chat_spans = list( x for x in spans if x["attributes"]["sentry.op"] == "gen_ai.chat" @@ -444,12 +452,14 @@ def test_langchain_chat_with_run_name( ) +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_langchain_tool_call_with_run_name( sentry_init, capture_events, capture_items, stream_gen_ai_spans, + span_streaming, ): sentry_init( integrations=[ @@ -460,8 +470,9 @@ def test_langchain_tool_call_with_run_name( traces_sample_rate=1.0, send_default_pii=True, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("span") with start_transaction(): @@ -470,6 +481,7 @@ def test_langchain_tool_call_with_run_name( config={"run_name": "my-snazzy-pipeline"}, ) + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] tool_spans = list( x for x in spans if x["attributes"]["sentry.op"] == "gen_ai.execute_tool" @@ -496,6 +508,7 @@ def test_langchain_tool_call_with_run_name( ) +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) @pytest.mark.skipif( LANGCHAIN_VERSION < (1,), @@ -532,6 +545,7 @@ def test_langchain_create_agent( get_model_response, nonstreaming_responses_model_response, stream_gen_ai_spans, + span_streaming, ): sentry_init( integrations=[ @@ -542,6 +556,7 @@ def test_langchain_create_agent( traces_sample_rate=1.0, send_default_pii=send_default_pii, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) model_response = get_model_response( @@ -565,7 +580,7 @@ def test_langchain_create_agent( name="word_length_agent", ) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with patch.object( @@ -588,6 +603,7 @@ def test_langchain_create_agent( assert tx["type"] == "transaction" assert tx["contexts"]["trace"]["origin"] == "manual" + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] chat_spans = list( x for x in spans if x["attributes"]["sentry.op"] == "gen_ai.chat" @@ -723,6 +739,7 @@ def test_langchain_create_agent( assert SPANDATA.GEN_AI_RESPONSE_TEXT not in chat_spans[0].get("data", {}) +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) @pytest.mark.skipif( LANGCHAIN_VERSION < (1,), @@ -746,6 +763,7 @@ def test_tool_execution_span( get_model_response, responses_tool_call_model_responses, stream_gen_ai_spans, + span_streaming, ): sentry_init( integrations=[ @@ -756,6 +774,7 @@ def test_tool_execution_span( traces_sample_rate=1.0, send_default_pii=send_default_pii, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) responses = responses_tool_call_model_responses( @@ -818,7 +837,7 @@ def test_tool_execution_span( name="word_length_agent", ) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with patch.object( @@ -838,6 +857,7 @@ def test_tool_execution_span( assert tx["type"] == "transaction" assert tx["contexts"]["trace"]["origin"] == "manual" + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] chat_spans = list( x for x in spans if x["attributes"]["sentry.op"] == "gen_ai.chat" @@ -1013,6 +1033,7 @@ def test_tool_execution_span( assert "get_word_length" in tools_data +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) @pytest.mark.parametrize( "send_default_pii, include_prompts", @@ -1032,6 +1053,7 @@ def test_langchain_openai_tools_agent_no_prompts( server_side_event_chunks, streaming_chat_completions_model_responses, stream_gen_ai_spans, + span_streaming, ): sentry_init( integrations=[ @@ -1042,6 +1064,7 @@ def test_langchain_openai_tools_agent_no_prompts( traces_sample_rate=1.0, send_default_pii=send_default_pii, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) prompt = ChatPromptTemplate.from_messages( @@ -1080,7 +1103,7 @@ def test_langchain_openai_tools_agent_no_prompts( agent_executor = AgentExecutor(agent=agent, tools=[get_word_length], verbose=True) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with patch.object( @@ -1099,6 +1122,7 @@ def test_langchain_openai_tools_agent_no_prompts( assert tx["type"] == "transaction" assert tx["contexts"]["trace"]["origin"] == "manual" + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] invoke_agent_span = next( x for x in spans if x["attributes"]["sentry.op"] == "gen_ai.invoke_agent" @@ -1279,6 +1303,7 @@ def test_langchain_openai_tools_agent_no_prompts( assert "get_word_length" in tools_data +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) @pytest.mark.parametrize( "system_instructions_content", @@ -1302,6 +1327,7 @@ def test_langchain_openai_tools_agent( server_side_event_chunks, streaming_chat_completions_model_responses, stream_gen_ai_spans, + span_streaming, ): sentry_init( integrations=[ @@ -1312,6 +1338,7 @@ def test_langchain_openai_tools_agent( traces_sample_rate=1.0, send_default_pii=True, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) prompt = ChatPromptTemplate.from_messages( @@ -1350,7 +1377,7 @@ def test_langchain_openai_tools_agent( agent_executor = AgentExecutor(agent=agent, tools=[get_word_length], verbose=True) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with patch.object( @@ -1373,6 +1400,7 @@ def test_langchain_openai_tools_agent( assert tx["type"] == "transaction" assert tx["contexts"]["trace"]["origin"] == "manual" + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] invoke_agent_span = next( x for x in spans if x["attributes"]["sentry.op"] == "gen_ai.invoke_agent" @@ -1586,6 +1614,7 @@ def test_langchain_openai_tools_agent( assert "get_word_length" in tools_data +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_langchain_openai_tools_agent_with_config( sentry_init, @@ -1595,6 +1624,7 @@ def test_langchain_openai_tools_agent_with_config( server_side_event_chunks, streaming_chat_completions_model_responses, stream_gen_ai_spans, + span_streaming, ): sentry_init( integrations=[ @@ -1605,6 +1635,7 @@ def test_langchain_openai_tools_agent_with_config( traces_sample_rate=1.0, send_default_pii=True, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) prompt = ChatPromptTemplate.from_messages( @@ -1645,7 +1676,7 @@ def test_langchain_openai_tools_agent_with_config( agent_executor = AgentExecutor(agent=agent, tools=[get_word_length], verbose=True) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with patch.object( @@ -1663,6 +1694,7 @@ def test_langchain_openai_tools_agent_with_config( assert tx["type"] == "transaction" assert tx["contexts"]["trace"]["origin"] == "manual" + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] invoke_agent_span = next( x for x in spans if x["attributes"]["sentry.op"] == "gen_ai.invoke_agent" @@ -1695,6 +1727,7 @@ def test_langchain_openai_tools_agent_with_config( assert invoke_agent_span["data"]["gen_ai.function_id"] == "my-snazzy-pipeline" +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) @pytest.mark.parametrize( "send_default_pii, include_prompts", @@ -1714,6 +1747,7 @@ def test_langchain_openai_tools_agent_stream_no_prompts( server_side_event_chunks, streaming_chat_completions_model_responses, stream_gen_ai_spans, + span_streaming, ): sentry_init( integrations=[ @@ -1724,6 +1758,7 @@ def test_langchain_openai_tools_agent_stream_no_prompts( traces_sample_rate=1.0, send_default_pii=send_default_pii, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) prompt = ChatPromptTemplate.from_messages( @@ -1762,7 +1797,7 @@ def test_langchain_openai_tools_agent_stream_no_prompts( agent_executor = AgentExecutor(agent=agent, tools=[get_word_length], verbose=True) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with patch.object( @@ -1781,6 +1816,7 @@ def test_langchain_openai_tools_agent_stream_no_prompts( assert tx["type"] == "transaction" assert tx["contexts"]["trace"]["origin"] == "manual" + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] invoke_agent_span = next( x for x in spans if x["attributes"]["sentry.op"] == "gen_ai.invoke_agent" @@ -1804,6 +1840,7 @@ def test_langchain_openai_tools_agent_stream_no_prompts( == "my-snazzy-pipeline" ) + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] # We can't guarantee anything about the "shape" of the langchain execution graph assert ( @@ -1963,6 +2000,7 @@ def test_langchain_openai_tools_agent_stream_no_prompts( assert "get_word_length" in tools_data +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) @pytest.mark.parametrize( "system_instructions_content", @@ -1986,6 +2024,7 @@ def test_langchain_openai_tools_agent_stream( server_side_event_chunks, streaming_chat_completions_model_responses, stream_gen_ai_spans, + span_streaming, ): sentry_init( integrations=[ @@ -1996,6 +2035,7 @@ def test_langchain_openai_tools_agent_stream( traces_sample_rate=1.0, send_default_pii=True, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) prompt = ChatPromptTemplate.from_messages( @@ -2034,7 +2074,7 @@ def test_langchain_openai_tools_agent_stream( agent_executor = AgentExecutor(agent=agent, tools=[get_word_length], verbose=True) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with patch.object( @@ -2058,6 +2098,7 @@ def test_langchain_openai_tools_agent_stream( assert tx["type"] == "transaction" assert tx["contexts"]["trace"]["origin"] == "manual" + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] invoke_agent_span = next( x for x in spans if x["attributes"]["sentry.op"] == "gen_ai.invoke_agent" @@ -2282,6 +2323,7 @@ def test_langchain_openai_tools_agent_stream( assert "get_word_length" in tools_data +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_langchain_openai_tools_agent_stream_with_config( sentry_init, @@ -2291,6 +2333,7 @@ def test_langchain_openai_tools_agent_stream_with_config( server_side_event_chunks, streaming_chat_completions_model_responses, stream_gen_ai_spans, + span_streaming, ): sentry_init( integrations=[ @@ -2301,6 +2344,7 @@ def test_langchain_openai_tools_agent_stream_with_config( traces_sample_rate=1.0, send_default_pii=True, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) prompt = ChatPromptTemplate.from_messages( @@ -2341,7 +2385,7 @@ def test_langchain_openai_tools_agent_stream_with_config( agent_executor = AgentExecutor(agent=agent, tools=[get_word_length], verbose=True) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with patch.object( @@ -2359,6 +2403,7 @@ def test_langchain_openai_tools_agent_stream_with_config( assert tx["type"] == "transaction" assert tx["contexts"]["trace"]["origin"] == "manual" + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] invoke_agent_span = next( x for x in spans if x["attributes"]["sentry.op"] == "gen_ai.invoke_agent" @@ -2391,12 +2436,14 @@ def test_langchain_openai_tools_agent_stream_with_config( assert invoke_agent_span["data"]["gen_ai.function_id"] == "my-snazzy-pipeline" +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_langchain_error( sentry_init, capture_events, capture_items, stream_gen_ai_spans, + span_streaming, ): class MockOpenAI(ChatOpenAI): def _stream( @@ -2420,6 +2467,7 @@ def _llm_type(self) -> str: traces_sample_rate=1.0, send_default_pii=True, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) prompt = ChatPromptTemplate.from_messages( @@ -2441,7 +2489,7 @@ def _llm_type(self) -> str: agent_executor = AgentExecutor(agent=agent, tools=[get_word_length], verbose=True) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("event") with start_transaction(), pytest.raises(ValueError): @@ -2458,12 +2506,14 @@ def _llm_type(self) -> str: assert error["level"] == "error" +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_span_status_error( sentry_init, capture_events, capture_items, stream_gen_ai_spans, + span_streaming, ): class MockOpenAI(ChatOpenAI): def _stream( @@ -2486,8 +2536,9 @@ def _llm_type(self) -> str: integrations=[LangchainIntegration(include_prompts=True)], traces_sample_rate=1.0, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("event", "transaction", "span") with start_transaction(name="test"): @@ -2521,6 +2572,7 @@ def _llm_type(self) -> str: (error,) = (item.payload for item in items if item.type == "event") assert error["level"] == "error" + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] assert spans[0]["status"] == "error" (transaction,) = (item.payload for item in items if item.type == "transaction") @@ -2773,12 +2825,14 @@ def test_langchain_callback_list_existing_callback(sentry_init): assert handler is sentry_callback +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_langchain_message_role_mapping( sentry_init, capture_events, capture_items, stream_gen_ai_spans, + span_streaming, ): """Test that message roles are properly normalized in langchain integration.""" @@ -2813,6 +2867,7 @@ def _llm_type(self) -> str: traces_sample_rate=1.0, send_default_pii=True, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) prompt = ChatPromptTemplate.from_messages( @@ -2835,12 +2890,13 @@ def _llm_type(self) -> str: test_input = "Hello, how are you?" message_data_found = False - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("span") with start_transaction(): list(agent_executor.stream({"input": test_input})) + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] # Find spans with gen_ai operation that should have message data gen_ai_spans = [ @@ -3066,6 +3122,7 @@ def test_langchain_message_truncation(sentry_init, capture_events): assert tx["_meta"]["spans"]["0"]["data"]["gen_ai.request.messages"][""]["len"] == 5 +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) @pytest.mark.parametrize( "send_default_pii, include_prompts", @@ -3083,6 +3140,7 @@ def test_langchain_embeddings_sync( send_default_pii, include_prompts, stream_gen_ai_spans, + span_streaming, ): """Test that sync embedding methods (embed_documents, embed_query) are properly traced.""" try: @@ -3095,8 +3153,9 @@ def test_langchain_embeddings_sync( traces_sample_rate=1.0, send_default_pii=send_default_pii, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("span") # Mock the actual API call @@ -3119,6 +3178,7 @@ def test_langchain_embeddings_sync( assert len(result) == 2 mock_embed_documents.assert_called_once() + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] # Find embeddings span embeddings_spans = [ @@ -3214,6 +3274,7 @@ def test_langchain_embeddings_sync( ) +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) @pytest.mark.parametrize( "send_default_pii, include_prompts", @@ -3229,6 +3290,7 @@ def test_langchain_embeddings_embed_query( send_default_pii, include_prompts, stream_gen_ai_spans, + span_streaming, ): """Test that embed_query method is properly traced.""" try: @@ -3241,8 +3303,9 @@ def test_langchain_embeddings_embed_query( traces_sample_rate=1.0, send_default_pii=send_default_pii, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("span") # Mock the actual API call @@ -3264,6 +3327,7 @@ def test_langchain_embeddings_embed_query( assert len(result) == 3 mock_embed_query.assert_called_once() + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] # Find embeddings span embeddings_spans = [ @@ -3350,6 +3414,7 @@ def test_langchain_embeddings_embed_query( ) +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) @pytest.mark.parametrize( "send_default_pii, include_prompts", @@ -3366,6 +3431,7 @@ async def test_langchain_embeddings_async( send_default_pii, include_prompts, stream_gen_ai_spans, + span_streaming, ): """Test that async embedding methods (aembed_documents, aembed_query) are properly traced.""" try: @@ -3378,12 +3444,13 @@ async def test_langchain_embeddings_async( traces_sample_rate=1.0, send_default_pii=send_default_pii, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) async def mock_aembed_documents(self, texts): return [[0.1, 0.2, 0.3] for _ in texts] - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("span") # Mock the actual API call @@ -3407,6 +3474,7 @@ async def mock_aembed_documents(self, texts): assert len(result) == 2 mock_aembed.assert_called_once() + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] # Find embeddings span embeddings_spans = [ @@ -3508,6 +3576,7 @@ async def mock_aembed_documents(self, texts): ) +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) @pytest.mark.asyncio async def test_langchain_embeddings_aembed_query( @@ -3515,6 +3584,7 @@ async def test_langchain_embeddings_aembed_query( capture_events, capture_items, stream_gen_ai_spans, + span_streaming, ): """Test that aembed_query method is properly traced.""" try: @@ -3527,12 +3597,13 @@ async def test_langchain_embeddings_aembed_query( traces_sample_rate=1.0, send_default_pii=True, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) async def mock_aembed_query(self, text): return [0.1, 0.2, 0.3] - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("span") # Mock the actual API call @@ -3554,6 +3625,7 @@ async def mock_aembed_query(self, text): assert len(result) == 3 mock_aembed.assert_called_once() + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] # Find embeddings span embeddings_spans = [ @@ -3625,12 +3697,14 @@ async def mock_aembed_query(self, text): assert "Async query test" in input_data +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_langchain_embeddings_no_model_name( sentry_init, capture_events, capture_items, stream_gen_ai_spans, + span_streaming, ): """Test embeddings when model name is not available.""" try: @@ -3642,8 +3716,9 @@ def test_langchain_embeddings_no_model_name( integrations=[LangchainIntegration(include_prompts=False)], traces_sample_rate=1.0, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("span") # Mock the actual API call and remove model attribute @@ -3664,6 +3739,7 @@ def test_langchain_embeddings_no_model_name( with start_transaction(name="test_embeddings_no_model"): embeddings.embed_documents(["Test"]) + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] # Find embeddings span embeddings_spans = [ @@ -3725,12 +3801,14 @@ def test_langchain_embeddings_no_model_name( ) +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_langchain_embeddings_integration_disabled( sentry_init, capture_events, capture_items, stream_gen_ai_spans, + span_streaming, ): """Test that embeddings are not traced when integration is disabled.""" try: @@ -3741,10 +3819,11 @@ def test_langchain_embeddings_integration_disabled( sentry_init( traces_sample_rate=1.0, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) # Initialize without LangchainIntegration - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("span") with mock.patch.object( @@ -3760,6 +3839,7 @@ def test_langchain_embeddings_integration_disabled( embeddings.embed_documents(["Test"]) # Check that no embeddings spans were created + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] embeddings_spans = [ span @@ -3795,12 +3875,14 @@ def test_langchain_embeddings_integration_disabled( assert len(embeddings_spans) == 0 +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_langchain_embeddings_multiple_providers( sentry_init, capture_events, capture_items, stream_gen_ai_spans, + span_streaming, ): """Test that embeddings work with different providers.""" try: @@ -3813,8 +3895,9 @@ def test_langchain_embeddings_multiple_providers( traces_sample_rate=1.0, send_default_pii=True, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("span") # Mock both providers @@ -3843,6 +3926,7 @@ def test_langchain_embeddings_multiple_providers( openai_embeddings.embed_documents(["OpenAI test"]) azure_embeddings.embed_documents(["Azure test"]) + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] # Find embeddings spans embeddings_spans = [ @@ -3948,12 +4032,14 @@ def test_langchain_embeddings_error_handling(sentry_init, capture_events): # but the span should still be created +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_langchain_embeddings_multiple_calls( sentry_init, capture_events, capture_items, stream_gen_ai_spans, + span_streaming, ): """Test that multiple embeddings calls within a transaction are all traced.""" try: @@ -3966,8 +4052,9 @@ def test_langchain_embeddings_multiple_calls( traces_sample_rate=1.0, send_default_pii=True, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("span") # Mock the actual API calls @@ -3995,6 +4082,7 @@ def test_langchain_embeddings_multiple_calls( # Call embed_documents again embeddings.embed_documents(["Third batch"]) + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] # Find embeddings spans - should have 3 (2 embed_documents + 1 embed_query) embeddings_spans = [ @@ -4072,12 +4160,14 @@ def test_langchain_embeddings_multiple_calls( assert len(set(str(data) for data in input_data_list)) == 3 +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_langchain_embeddings_span_hierarchy( sentry_init, capture_events, capture_items, stream_gen_ai_spans, + span_streaming, ): """Test that embeddings spans are properly nested within parent spans.""" try: @@ -4090,8 +4180,56 @@ def test_langchain_embeddings_span_hierarchy( traces_sample_rate=1.0, send_default_pii=True, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) - if stream_gen_ai_spans: + if span_streaming: + items = capture_items("span") + + # Mock the actual API call + with mock.patch.object( + OpenAIEmbeddings, + "embed_documents", + wraps=lambda self, texts: [[0.1, 0.2, 0.3] for _ in texts], + ): + embeddings = OpenAIEmbeddings( + model="text-embedding-ada-002", openai_api_key="test-key" + ) + + # Force setup to re-run + LangchainIntegration.setup_once() + + with sentry_sdk.traces.start_span( + name="test_span_hierarchy" + ), sentry_sdk.traces.start_span( + name="custom operation", + attributes={ + "sentry.op": "custom", + }, + ): + embeddings.embed_documents(["Test within custom span"]) + + sentry_sdk.flush() + spans = [item.payload for item in items] + # Find all spans + embeddings_spans = [ + span + for span in spans + if span["attributes"].get("sentry.op") == "gen_ai.embeddings" + ] + custom_spans = [ + span for span in spans if span["attributes"].get("sentry.op") == "custom" + ] + + assert len(embeddings_spans) == 1 + assert len(custom_spans) == 1 + + # Both spans should exist + embeddings_span = embeddings_spans[0] + custom_span = custom_spans[0] + + assert embeddings_span["attributes"]["gen_ai.operation.name"] == "embeddings" + assert custom_span["name"] == "custom operation" + elif stream_gen_ai_spans: items = capture_items("transaction", "span") # Mock the actual API call @@ -4132,6 +4270,7 @@ def test_langchain_embeddings_span_hierarchy( custom_span = custom_spans[0] assert embeddings_span["attributes"]["gen_ai.operation.name"] == "embeddings" + assert custom_span["description"] == "custom operation" else: events = capture_events() @@ -4176,15 +4315,17 @@ def test_langchain_embeddings_span_hierarchy( custom_span = custom_spans[0] assert embeddings_span["data"]["gen_ai.operation.name"] == "embeddings" - assert custom_span["description"] == "custom operation" + assert custom_span["description"] == "custom operation" +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_langchain_embeddings_with_list_and_string_inputs( sentry_init, capture_events, capture_items, stream_gen_ai_spans, + span_streaming, ): """Test that embeddings correctly handle both list and string inputs.""" try: @@ -4197,8 +4338,9 @@ def test_langchain_embeddings_with_list_and_string_inputs( traces_sample_rate=1.0, send_default_pii=True, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("span") # Mock the actual API calls @@ -4226,6 +4368,7 @@ def test_langchain_embeddings_with_list_and_string_inputs( # embed_query takes a string embeddings.embed_query("Single string query") + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] # Find embeddings spans embeddings_spans = [ @@ -4298,6 +4441,7 @@ def test_langchain_embeddings_with_list_and_string_inputs( ), f"Expected input text in serialized data: {input_data}" +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) @pytest.mark.parametrize( "response_metadata_model,expected_model", @@ -4313,12 +4457,14 @@ def test_langchain_response_model_extraction( response_metadata_model, expected_model, stream_gen_ai_spans, + span_streaming, ): sentry_init( integrations=[LangchainIntegration(include_prompts=True)], traces_sample_rate=1.0, send_default_pii=True, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) callback = SentryLangchainCallback(max_span_map_size=100, include_prompts=True) @@ -4327,7 +4473,7 @@ def test_langchain_response_model_extraction( serialized = {"_type": "openai-chat", "model_name": "gpt-3.5-turbo"} prompts = ["Test prompt"] - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("span") with start_transaction(): @@ -4347,6 +4493,7 @@ def test_langchain_response_model_extraction( response = Mock(generations=[[generation]]) callback.on_llm_end(response=response, run_id=run_id) + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] llm_spans = [ span @@ -4617,6 +4764,7 @@ def test_transform_google_file_data(self): } +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) @pytest.mark.parametrize( "ai_type,expected_system", @@ -4669,11 +4817,13 @@ def test_langchain_ai_system_detection( ai_type, expected_system, stream_gen_ai_spans, + span_streaming, ): sentry_init( integrations=[LangchainIntegration()], traces_sample_rate=1.0, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) callback = SentryLangchainCallback(max_span_map_size=100, include_prompts=True) @@ -4682,7 +4832,7 @@ def test_langchain_ai_system_detection( serialized = {"_type": ai_type} if ai_type is not None else {} prompts = ["Test prompt"] - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("span") with start_transaction(): @@ -4697,6 +4847,7 @@ def test_langchain_ai_system_detection( response = Mock(generations=[[generation]]) callback.on_llm_end(response=response, run_id=run_id) + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] llm_spans = [ span