diff --git a/sentry_sdk/integrations/langgraph.py b/sentry_sdk/integrations/langgraph.py index 3d408d7b03..8039aec6a9 100644 --- a/sentry_sdk/integrations/langgraph.py +++ b/sentry_sdk/integrations/langgraph.py @@ -112,9 +112,11 @@ def _parse_langgraph_messages(state: "Any") -> "Optional[List[Any]]": def _wrap_state_graph_compile(f: "Callable[..., Any]") -> "Callable[..., Any]": @wraps(f) def new_compile(self: "Any", *args: "Any", **kwargs: "Any") -> "Any": - integration = sentry_sdk.get_client().get_integration(LanggraphIntegration) - if integration is None: + client = sentry_sdk.get_client() + integration = client.get_integration(LanggraphIntegration) + if integration is None or has_span_streaming_enabled(client.options): return f(self, *args, **kwargs) + with sentry_sdk.start_span( op=OP.GEN_AI_CREATE_AGENT, origin=LanggraphIntegration.origin, @@ -242,11 +244,11 @@ def new_invoke(self: "Any", *args: "Any", **kwargs: "Any") -> "Any": client = sentry_sdk.get_client() scope = sentry_sdk.get_current_scope() messages_data = ( - normalized_input_messages - if client.options.get("stream_gen_ai_spans", False) - else truncate_and_annotate_messages( + truncate_and_annotate_messages( normalized_input_messages, span, scope ) + if should_truncate_gen_ai_input(client.options) + else normalized_input_messages ) if messages_data is not None: set_data_normalized( @@ -268,7 +270,8 @@ def new_invoke(self: "Any", *args: "Any", **kwargs: "Any") -> "Any": def _wrap_pregel_ainvoke(f: "Callable[..., Any]") -> "Callable[..., Any]": @wraps(f) async def new_ainvoke(self: "Any", *args: "Any", **kwargs: "Any") -> "Any": - integration = sentry_sdk.get_client().get_integration(LanggraphIntegration) + client = sentry_sdk.get_client() + integration = client.get_integration(LanggraphIntegration) if integration is None: return await f(self, *args, **kwargs) @@ -277,6 +280,54 @@ async def new_ainvoke(self: "Any", *args: "Any", **kwargs: "Any") -> "Any": f"invoke_agent {graph_name}".strip() if graph_name else "invoke_agent" ) + if has_span_streaming_enabled(client.options): + with sentry_sdk.traces.start_span( + name=span_name, + attributes={ + "sentry.op": OP.GEN_AI_INVOKE_AGENT, + "sentry.origin": LanggraphIntegration.origin, + SPANDATA.GEN_AI_OPERATION_NAME: "invoke_agent", + }, + ) as span: + if graph_name: + span.set_attribute(SPANDATA.GEN_AI_PIPELINE_NAME, graph_name) + span.set_attribute(SPANDATA.GEN_AI_AGENT_NAME, graph_name) + + input_messages = None + if ( + len(args) > 0 + and should_send_default_pii() + and integration.include_prompts + ): + input_messages = _parse_langgraph_messages(args[0]) + if input_messages: + normalized_input_messages = normalize_message_roles( + input_messages + ) + + client = sentry_sdk.get_client() + scope = sentry_sdk.get_current_scope() + messages_data = ( + truncate_and_annotate_messages( + normalized_input_messages, span, scope + ) + if should_truncate_gen_ai_input(client.options) + else normalized_input_messages + ) + if messages_data is not None: + set_data_normalized( + span, + SPANDATA.GEN_AI_REQUEST_MESSAGES, + messages_data, + unpack=False, + ) + + result = await f(self, *args, **kwargs) + + _set_response_attributes(span, input_messages, result, integration) + + return result + with get_start_span_function()( op=OP.GEN_AI_INVOKE_AGENT, name=span_name, @@ -301,11 +352,11 @@ async def new_ainvoke(self: "Any", *args: "Any", **kwargs: "Any") -> "Any": client = sentry_sdk.get_client() scope = sentry_sdk.get_current_scope() messages_data = ( - normalized_input_messages - if client.options.get("stream_gen_ai_spans", False) - else truncate_and_annotate_messages( + truncate_and_annotate_messages( normalized_input_messages, span, scope ) + if should_truncate_gen_ai_input(client.options) + else normalized_input_messages ) if messages_data is not None: set_data_normalized( diff --git a/tests/integrations/langgraph/test_langgraph.py b/tests/integrations/langgraph/test_langgraph.py index 42da770870..db41e0e8d7 100644 --- a/tests/integrations/langgraph/test_langgraph.py +++ b/tests/integrations/langgraph/test_langgraph.py @@ -4,6 +4,7 @@ import pytest +import sentry_sdk from sentry_sdk import start_transaction from sentry_sdk.consts import OP, SPANDATA @@ -239,6 +240,7 @@ def original_compile(self, *args, **kwargs): assert "calculator" in tools_data +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) @pytest.mark.parametrize( "send_default_pii, include_prompts", @@ -256,6 +258,7 @@ def test_pregel_invoke( send_default_pii, include_prompts, stream_gen_ai_spans, + span_streaming, ): """Test Pregel.invoke() wrapper creates proper invoke_agent span.""" sentry_init( @@ -263,6 +266,7 @@ def test_pregel_invoke( traces_sample_rate=1.0, send_default_pii=send_default_pii, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) test_state = { @@ -294,7 +298,7 @@ def original_invoke(self, *args, **kwargs): ] return {"messages": new_messages} - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with start_transaction(): @@ -303,6 +307,7 @@ def original_invoke(self, *args, **kwargs): assert result is not None + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] invoke_spans = [ span @@ -421,6 +426,7 @@ def original_invoke(self, *args, **kwargs): ) +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) @pytest.mark.parametrize( "send_default_pii, include_prompts", @@ -438,6 +444,7 @@ def test_pregel_ainvoke( send_default_pii, include_prompts, stream_gen_ai_spans, + span_streaming, ): """Test Pregel.ainvoke() async wrapper creates proper invoke_agent span.""" sentry_init( @@ -445,6 +452,7 @@ def test_pregel_ainvoke( traces_sample_rate=1.0, send_default_pii=send_default_pii, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) test_state = {"messages": [MockMessage("What's the weather like?", name="user")]} @@ -476,12 +484,13 @@ async def run_test(): result = await wrapped_ainvoke(pregel, test_state) return result - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") result = asyncio.run(run_test()) assert result is not None + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] invoke_spans = [ span @@ -576,12 +585,14 @@ async def run_test(): ) +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_pregel_invoke_error( sentry_init, capture_events, capture_items, stream_gen_ai_spans, + span_streaming, ): """Test error handling during graph execution.""" sentry_init( @@ -589,6 +600,7 @@ def test_pregel_invoke_error( traces_sample_rate=1.0, send_default_pii=True, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) test_state = {"messages": [MockMessage("This will fail")]} @@ -597,7 +609,7 @@ def test_pregel_invoke_error( def original_invoke(self, *args, **kwargs): raise Exception("Graph execution failed") - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with start_transaction(), pytest.raises( @@ -606,6 +618,7 @@ def original_invoke(self, *args, **kwargs): wrapped_invoke = _wrap_pregel_invoke(original_invoke) wrapped_invoke(pregel, test_state) + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] invoke_spans = [ span @@ -636,12 +649,14 @@ def original_invoke(self, *args, **kwargs): assert invoke_span.get("tags", {}).get("status") == "internal_error" +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_pregel_ainvoke_error( sentry_init, capture_events, capture_items, stream_gen_ai_spans, + span_streaming, ): """Test error handling during async graph execution.""" sentry_init( @@ -649,6 +664,7 @@ def test_pregel_ainvoke_error( traces_sample_rate=1.0, send_default_pii=True, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) test_state = {"messages": [MockMessage("This will fail async")]} @@ -664,11 +680,12 @@ async def run_error_test(): wrapped_ainvoke = _wrap_pregel_ainvoke(original_ainvoke) await wrapped_ainvoke(pregel, test_state) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") asyncio.run(run_error_test()) + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] invoke_spans = [ span @@ -726,6 +743,7 @@ def original_compile(self, *args, **kwargs): tx = next(item.payload for item in items if item.type == "transaction") assert tx["contexts"]["trace"]["origin"] == "manual" + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] for span in spans: assert span["attributes"]["sentry.origin"] == "auto.ai.langgraph" @@ -745,6 +763,7 @@ def original_compile(self, *args, **kwargs): assert span["origin"] == "auto.ai.langgraph" +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) @pytest.mark.parametrize("graph_name", ["my_graph", None, ""]) def test_pregel_invoke_with_different_graph_names( @@ -753,6 +772,7 @@ def test_pregel_invoke_with_different_graph_names( capture_items, graph_name, stream_gen_ai_spans, + span_streaming, ): """Test Pregel.invoke() with different graph name scenarios.""" sentry_init( @@ -760,6 +780,7 @@ def test_pregel_invoke_with_different_graph_names( traces_sample_rate=1.0, send_default_pii=True, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) pregel = MockPregelInstance(graph_name) if graph_name else MockPregelInstance() @@ -770,13 +791,14 @@ def test_pregel_invoke_with_different_graph_names( def original_invoke(self, *args, **kwargs): return {"result": "test"} - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with start_transaction(): wrapped_invoke = _wrap_pregel_invoke(original_invoke) wrapped_invoke(pregel, {"messages": []}) + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] invoke_spans = [ span @@ -824,12 +846,14 @@ def original_invoke(self, *args, **kwargs): assert SPANDATA.GEN_AI_AGENT_NAME not in invoke_span.get("data", {}) +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_pregel_invoke_span_includes_usage_data( sentry_init, capture_events, capture_items, stream_gen_ai_spans, + span_streaming, ): """ Test that invoke_agent spans include aggregated usage data from context_wrapper. @@ -839,6 +863,7 @@ def test_pregel_invoke_span_includes_usage_data( integrations=[LanggraphIntegration()], traces_sample_rate=1.0, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) test_state = { @@ -878,7 +903,7 @@ def original_invoke(self, *args, **kwargs): ] return {"messages": new_messages} - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with start_transaction(): @@ -887,6 +912,7 @@ def original_invoke(self, *args, **kwargs): assert result is not None + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] invoke_spans = [ span @@ -938,12 +964,14 @@ def original_invoke(self, *args, **kwargs): assert invoke_agent_span["data"]["gen_ai.usage.total_tokens"] == 30 +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_pregel_ainvoke_span_includes_usage_data( sentry_init, capture_events, capture_items, stream_gen_ai_spans, + span_streaming, ): """ Test that invoke_agent spans include aggregated usage data from context_wrapper. @@ -953,6 +981,7 @@ def test_pregel_ainvoke_span_includes_usage_data( integrations=[LanggraphIntegration()], traces_sample_rate=1.0, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) test_state = { @@ -998,12 +1027,13 @@ async def run_test(): result = await wrapped_ainvoke(pregel, test_state) return result - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") result = asyncio.run(run_test()) assert result is not None + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] invoke_spans = [ span @@ -1052,12 +1082,14 @@ async def run_test(): assert invoke_agent_span["data"]["gen_ai.usage.total_tokens"] == 30 +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_pregel_invoke_multiple_llm_calls_aggregate_usage( sentry_init, capture_events, capture_items, stream_gen_ai_spans, + span_streaming, ): """ Test that invoke_agent spans show aggregated usage across multiple LLM calls @@ -1067,6 +1099,7 @@ def test_pregel_invoke_multiple_llm_calls_aggregate_usage( integrations=[LanggraphIntegration()], traces_sample_rate=1.0, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) test_state = { @@ -1117,7 +1150,7 @@ def original_invoke(self, *args, **kwargs): ] return {"messages": new_messages} - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with start_transaction(): @@ -1126,6 +1159,7 @@ def original_invoke(self, *args, **kwargs): assert result is not None + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] invoke_spans = [ span @@ -1165,12 +1199,14 @@ def original_invoke(self, *args, **kwargs): assert invoke_agent_span["data"]["gen_ai.usage.total_tokens"] == 50 +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_pregel_ainvoke_multiple_llm_calls_aggregate_usage( sentry_init, capture_events, capture_items, stream_gen_ai_spans, + span_streaming, ): """ Test that invoke_agent spans show aggregated usage across multiple LLM calls @@ -1180,6 +1216,7 @@ def test_pregel_ainvoke_multiple_llm_calls_aggregate_usage( integrations=[LanggraphIntegration()], traces_sample_rate=1.0, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) test_state = { @@ -1236,12 +1273,13 @@ async def run_test(): result = await wrapped_ainvoke(pregel, test_state) return result - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") result = asyncio.run(run_test()) assert result is not None + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] invoke_spans = [ span @@ -1278,12 +1316,14 @@ async def run_test(): assert invoke_agent_span["data"]["gen_ai.usage.total_tokens"] == 50 +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_pregel_invoke_span_includes_response_model( sentry_init, capture_events, capture_items, stream_gen_ai_spans, + span_streaming, ): """ Test that invoke_agent spans include the response model. @@ -1293,6 +1333,7 @@ def test_pregel_invoke_span_includes_response_model( integrations=[LanggraphIntegration()], traces_sample_rate=1.0, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) test_state = { @@ -1332,7 +1373,7 @@ def original_invoke(self, *args, **kwargs): ] return {"messages": new_messages} - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with start_transaction(): @@ -1341,6 +1382,7 @@ def original_invoke(self, *args, **kwargs): assert result is not None + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] invoke_spans = [ span @@ -1385,12 +1427,14 @@ def original_invoke(self, *args, **kwargs): ) +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_pregel_ainvoke_span_includes_response_model( sentry_init, capture_events, capture_items, stream_gen_ai_spans, + span_streaming, ): """ Test that invoke_agent spans include the response model. @@ -1400,6 +1444,7 @@ def test_pregel_ainvoke_span_includes_response_model( integrations=[LanggraphIntegration()], traces_sample_rate=1.0, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) test_state = { @@ -1445,12 +1490,13 @@ async def run_test(): result = await wrapped_ainvoke(pregel, test_state) return result - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") result = asyncio.run(run_test()) assert result is not None + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] invoke_spans = [ span @@ -1492,12 +1538,14 @@ async def run_test(): ) +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_pregel_invoke_span_uses_last_response_model( sentry_init, capture_events, capture_items, stream_gen_ai_spans, + span_streaming, ): """ Test that when an agent makes multiple LLM calls (e.g., with tools), @@ -1507,6 +1555,7 @@ def test_pregel_invoke_span_uses_last_response_model( integrations=[LanggraphIntegration()], traces_sample_rate=1.0, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) test_state = { @@ -1559,7 +1608,7 @@ def original_invoke(self, *args, **kwargs): ] return {"messages": new_messages} - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with start_transaction(): @@ -1568,6 +1617,7 @@ def original_invoke(self, *args, **kwargs): assert result is not None + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] invoke_spans = [ span @@ -1610,12 +1660,14 @@ def original_invoke(self, *args, **kwargs): ) +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_pregel_ainvoke_span_uses_last_response_model( sentry_init, capture_events, capture_items, stream_gen_ai_spans, + span_streaming, ): """ Test that when an agent makes multiple LLM calls (e.g., with tools), @@ -1625,6 +1677,7 @@ def test_pregel_ainvoke_span_uses_last_response_model( integrations=[LanggraphIntegration()], traces_sample_rate=1.0, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) test_state = { @@ -1683,12 +1736,13 @@ async def run_test(): result = await wrapped_ainvoke(pregel, test_state) return result - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") result = asyncio.run(run_test()) assert result is not None + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] invoke_spans = [ span @@ -1775,12 +1829,14 @@ def test_complex_message_parsing(): assert result[2]["function_call"]["name"] == "search" +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_extraction_functions_complex_scenario( sentry_init, capture_events, capture_items, stream_gen_ai_spans, + span_streaming, ): """Test extraction functions with complex scenarios including multiple messages and edge cases.""" sentry_init( @@ -1788,6 +1844,7 @@ def test_extraction_functions_complex_scenario( traces_sample_rate=1.0, send_default_pii=True, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) pregel = MockPregelInstance("complex_graph") @@ -1823,7 +1880,7 @@ def original_invoke(self, *args, **kwargs): ] return {"messages": new_messages} - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with start_transaction(): @@ -1832,6 +1889,7 @@ def original_invoke(self, *args, **kwargs): assert result is not None + sentry_sdk.flush() spans = [item.payload for item in items if item.type == "span"] invoke_spans = [ span @@ -1884,12 +1942,14 @@ def original_invoke(self, *args, **kwargs): assert tool_calls_data[1]["function"]["name"] == "calculate" +@pytest.mark.parametrize("span_streaming", [True, False]) @pytest.mark.parametrize("stream_gen_ai_spans", [True, False]) def test_langgraph_message_role_mapping( sentry_init, capture_events, capture_items, stream_gen_ai_spans, + span_streaming, ): """Test that Langgraph integration properly maps message roles like 'ai' to 'assistant'""" sentry_init( @@ -1897,6 +1957,7 @@ def test_langgraph_message_role_mapping( traces_sample_rate=1.0, send_default_pii=True, stream_gen_ai_spans=stream_gen_ai_spans, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) # Mock a langgraph message with mixed roles @@ -1918,7 +1979,7 @@ def __init__(self, content, message_type="human"): compiled_graph = MockCompiledGraph("test_graph") pregel = MockPregelInstance(compiled_graph) - if stream_gen_ai_spans: + if span_streaming or stream_gen_ai_spans: items = capture_items("transaction", "span") with start_transaction(name="langgraph tx"): @@ -1930,6 +1991,7 @@ def __init__(self, content, message_type="human"): ) wrapped_invoke(pregel, state_data) + sentry_sdk.flush() span = next(item.payload for item in items if item.type == "span") # Verify that the span was created correctly