diff --git a/src/strands/multiagent/__init__.py b/src/strands/multiagent/__init__.py index ad99944a8..dfad61903 100644 --- a/src/strands/multiagent/__init__.py +++ b/src/strands/multiagent/__init__.py @@ -8,11 +8,12 @@ standardized communication between agents. """ -from .base import MultiAgentBase, MultiAgentResult, Status +from .base import EdgeExecutionMode, MultiAgentBase, MultiAgentResult, Status from .graph import GraphBuilder, GraphResult from .swarm import Swarm, SwarmResult __all__ = [ + "EdgeExecutionMode", "GraphBuilder", "GraphResult", "MultiAgentBase", diff --git a/src/strands/multiagent/base.py b/src/strands/multiagent/base.py index dc3258f68..fa7aec3c1 100644 --- a/src/strands/multiagent/base.py +++ b/src/strands/multiagent/base.py @@ -39,6 +39,24 @@ class Status(Enum): INTERRUPTED = "interrupted" +class EdgeExecutionMode(Enum): + """Edge execution mode for graph traversal. + + Controls how the graph determines when a node is ready to execute + based on its incoming edges. + + Attributes: + OR: Node executes when ANY incoming edge's source node completes (default). + This is the current behavior - a node becomes ready as soon as any + of its predecessors finish. + AND: Node executes when ALL incoming edges' source nodes complete. + The node waits for every predecessor to finish before executing. + """ + + OR = "or" + AND = "and" + + @dataclass class NodeResult: """Unified result from node execution - handles both Agent and nested MultiAgentBase results.""" diff --git a/src/strands/multiagent/graph.py b/src/strands/multiagent/graph.py index d296753c0..4543efa0a 100644 --- a/src/strands/multiagent/graph.py +++ b/src/strands/multiagent/graph.py @@ -51,7 +51,7 @@ from ..types.event_loop import Metrics, Usage from ..types.multiagent import MultiAgentInput from ..types.traces import AttributeValue -from .base import MultiAgentBase, MultiAgentResult, NodeResult, Status +from .base import EdgeExecutionMode, MultiAgentBase, MultiAgentResult, NodeResult, Status logger = logging.getLogger(__name__) @@ -241,6 +241,7 @@ def __init__(self) -> None: self._execution_timeout: float | None = None self._node_timeout: float | None = None self._reset_on_revisit: bool = False + self._edge_execution_mode: EdgeExecutionMode = EdgeExecutionMode.OR self._id: str = _DEFAULT_GRAPH_ID self._session_manager: SessionManager | None = None self._hooks: list[HookProvider] | None = None @@ -307,6 +308,25 @@ def reset_on_revisit(self, enabled: bool = True) -> "GraphBuilder": self._reset_on_revisit = enabled return self + def set_edge_execution_mode(self, mode: EdgeExecutionMode) -> "GraphBuilder": + """Set the edge execution mode for determining node readiness. + + Controls how the graph determines when a node is ready to execute + based on its incoming edges. + + Args: + mode: EdgeExecutionMode.OR (default) - node executes when ANY predecessor completes. + EdgeExecutionMode.AND - node executes when ALL predecessors complete. + + Example: + For a graph where nodes A, B, C all connect to node Z: + + - OR mode (default): Z executes as soon as A, B, or C completes + - AND mode: Z waits until A AND B AND C have all completed + """ + self._edge_execution_mode = mode + return self + def set_max_node_executions(self, max_executions: int) -> "GraphBuilder": """Set maximum number of node executions allowed. @@ -389,6 +409,7 @@ def build(self) -> "Graph": session_manager=self._session_manager, hooks=self._hooks, id=self._id, + edge_execution_mode=self._edge_execution_mode, ) def _validate_graph(self) -> None: @@ -420,6 +441,7 @@ def __init__( hooks: list[HookProvider] | None = None, id: str = _DEFAULT_GRAPH_ID, trace_attributes: Mapping[str, AttributeValue] | None = None, + edge_execution_mode: EdgeExecutionMode = EdgeExecutionMode.OR, ) -> None: """Initialize Graph with execution limits and reset behavior. @@ -435,6 +457,9 @@ def __init__( hooks: List of hook providers for monitoring and extending graph execution behavior (default: None) id: Unique graph id (default: None) trace_attributes: Custom trace attributes to apply to the agent's trace span (default: None) + edge_execution_mode: Controls when nodes execute based on incoming edges (default: OR). + OR - node executes when ANY predecessor completes. + AND - node executes when ALL predecessors complete. """ super().__init__() @@ -448,6 +473,7 @@ def __init__( self.execution_timeout = execution_timeout self.node_timeout = node_timeout self.reset_on_revisit = reset_on_revisit + self.edge_execution_mode = edge_execution_mode self.state = GraphState() self._interrupt_state = _InterruptState() self.tracer = get_tracer() @@ -824,23 +850,65 @@ def _find_newly_ready_nodes(self, completed_batch: list["GraphNode"]) -> list["G return newly_ready def _is_node_ready_with_conditions(self, node: GraphNode, completed_batch: list["GraphNode"]) -> bool: - """Check if a node is ready considering conditional edges.""" + """Check if a node is ready considering conditional edges and execution mode. + + For OR mode (default): Node is ready when ANY incoming edge's source has completed. + For AND mode: Node is ready when ALL incoming edges' sources have completed. + """ # Get incoming edges to this node incoming_edges = [edge for edge in self.edges if edge.to_node == node] - # Check if at least one incoming edge condition is satisfied - for edge in incoming_edges: - if edge.from_node in completed_batch: - if edge.should_traverse(self.state): + if not incoming_edges: + return False + + if self.edge_execution_mode == EdgeExecutionMode.AND: + # AND mode: ALL incoming edges must have their source nodes completed + # and all conditions must be satisfied + all_completed_nodes = self.state.completed_nodes | set(completed_batch) + has_new_completion = False + + for edge in incoming_edges: + # Check if source node has completed (either previously or in current batch) + if edge.from_node not in all_completed_nodes: logger.debug( - "from=<%s>, to=<%s> | edge ready via satisfied condition", edge.from_node.node_id, node.node_id + "from=<%s>, to=<%s> | AND mode: source not completed", edge.from_node.node_id, node.node_id ) - return True - else: + return False + # Check if condition is satisfied + if not edge.should_traverse(self.state): logger.debug( - "from=<%s>, to=<%s> | edge condition not satisfied", edge.from_node.node_id, node.node_id + "from=<%s>, to=<%s> | AND mode: edge condition not satisfied", + edge.from_node.node_id, + node.node_id, ) - return False + return False + # Track if at least one edge is from the current batch + if edge.from_node in completed_batch: + has_new_completion = True + + # Only ready if at least one edge is newly completed (prevents re-triggering) + if has_new_completion: + logger.debug("node=<%s> | AND mode: all dependencies satisfied", node.node_id) + return True + return False + else: + # OR mode (default): ANY incoming edge condition being satisfied is enough + for edge in incoming_edges: + if edge.from_node in completed_batch: + if edge.should_traverse(self.state): + logger.debug( + "from=<%s>, to=<%s> | OR mode: edge ready via satisfied condition", + edge.from_node.node_id, + node.node_id, + ) + return True + else: + logger.debug( + "from=<%s>, to=<%s> | OR mode: edge condition not satisfied", + edge.from_node.node_id, + node.node_id, + ) + return False async def _execute_node(self, node: GraphNode, invocation_state: dict[str, Any]) -> AsyncIterator[Any]: """Execute a single node and yield TypedEvent objects.""" @@ -1172,6 +1240,12 @@ def deserialize_state(self, payload: dict[str, Any]) -> None: self._resume_from_session = True def _compute_ready_nodes_for_resume(self) -> list[GraphNode]: + """Compute which nodes should be ready to execute when resuming. + + Respects the edge_execution_mode setting: + - OR mode: Node is ready if ANY incoming edge's source has completed + - AND mode: Node is ready if ALL incoming edges' sources have completed + """ if self.state.status == Status.PENDING: return [] ready_nodes: list[GraphNode] = [] @@ -1182,8 +1256,14 @@ def _compute_ready_nodes_for_resume(self) -> list[GraphNode]: incoming = [e for e in self.edges if e.to_node is node] if not incoming: ready_nodes.append(node) - elif all(e.from_node in completed_nodes and e.should_traverse(self.state) for e in incoming): - ready_nodes.append(node) + elif self.edge_execution_mode == EdgeExecutionMode.AND: + # AND mode: ALL incoming edges must have completed sources with satisfied conditions + if all(e.from_node in completed_nodes and e.should_traverse(self.state) for e in incoming): + ready_nodes.append(node) + else: + # OR mode: ANY incoming edge with completed source and satisfied condition + if any(e.from_node in completed_nodes and e.should_traverse(self.state) for e in incoming): + ready_nodes.append(node) return ready_nodes diff --git a/tests/strands/multiagent/test_graph.py b/tests/strands/multiagent/test_graph.py index c511328d4..9ea7145ec 100644 --- a/tests/strands/multiagent/test_graph.py +++ b/tests/strands/multiagent/test_graph.py @@ -2166,6 +2166,541 @@ def test_graph_interrupt_on_before_node_call_event(interrupt_hook): assert multiagent_result.execution_time >= first_execution_time +# ============================================================================= +# Edge Execution Mode Tests +# ============================================================================= + + +def test_edge_execution_mode_enum(): + """Test EdgeExecutionMode enum values.""" + from strands.multiagent.base import EdgeExecutionMode + + assert EdgeExecutionMode.OR.value == "or" + assert EdgeExecutionMode.AND.value == "and" + + +def test_graph_builder_set_edge_execution_mode(): + """Test GraphBuilder.set_edge_execution_mode method.""" + from strands.multiagent.base import EdgeExecutionMode + + agent = create_mock_agent("test_agent") + + # Test default is OR + builder = GraphBuilder() + builder.add_node(agent, "node") + graph = builder.build() + assert graph.edge_execution_mode == EdgeExecutionMode.OR + + # Test setting AND mode + builder = GraphBuilder() + builder.add_node(create_mock_agent("test_agent_2"), "node") + builder.set_edge_execution_mode(EdgeExecutionMode.AND) + graph = builder.build() + assert graph.edge_execution_mode == EdgeExecutionMode.AND + + # Test method chaining + builder = GraphBuilder() + result = builder.set_edge_execution_mode(EdgeExecutionMode.AND) + assert result is builder # Returns self for chaining + + +@pytest.mark.asyncio +async def test_edge_execution_mode_or_default(mock_strands_tracer, mock_use_span): + """Test OR mode (default) - node executes when ANY predecessor completes.""" + from strands.multiagent.base import EdgeExecutionMode + + # Create agents + agent_a = create_mock_agent("agent_a", "Response A") + agent_b = create_mock_agent("agent_b", "Response B") + agent_c = create_mock_agent("agent_c", "Response C") + agent_z = create_mock_agent("agent_z", "Response Z") + + # Track execution order + execution_order = [] + + async def create_tracking_stream(agent, name): + async def tracking_stream(*args, **kwargs): + execution_order.append(name) + yield {"agent_start": True} + yield {"result": agent.return_value} + + return tracking_stream + + agent_a.stream_async = Mock(side_effect=await create_tracking_stream(agent_a, "A")) + agent_b.stream_async = Mock(side_effect=await create_tracking_stream(agent_b, "B")) + agent_c.stream_async = Mock(side_effect=await create_tracking_stream(agent_c, "C")) + agent_z.stream_async = Mock(side_effect=await create_tracking_stream(agent_z, "Z")) + + # Build graph: A, B, C all connect to Z + # In OR mode, Z should execute after ANY of A, B, C completes + builder = GraphBuilder() + builder.add_node(agent_a, "A") + builder.add_node(agent_b, "B") + builder.add_node(agent_c, "C") + builder.add_node(agent_z, "Z") + builder.add_edge("A", "Z") + builder.add_edge("B", "Z") + builder.add_edge("C", "Z") + builder.set_entry_point("A") + builder.set_entry_point("B") + builder.set_entry_point("C") + # Default is OR mode + + graph = builder.build() + assert graph.edge_execution_mode == EdgeExecutionMode.OR + + result = await graph.invoke_async("Test OR mode") + + # Verify successful execution + assert result.status == Status.COMPLETED + + # In OR mode with parallel entry points, Z should execute once + # after A, B, C all complete (since they're parallel) + assert agent_z.stream_async.call_count == 1 + + # Z should appear in execution order + z_executions = sum(1 for node in result.execution_order if node.node_id == "Z") + assert z_executions == 1 + + +@pytest.mark.asyncio +async def test_edge_execution_mode_and(mock_strands_tracer, mock_use_span): + """Test AND mode - node executes only when ALL predecessors complete.""" + from strands.multiagent.base import EdgeExecutionMode + + # Create agents with small delays to ensure deterministic order + agent_a = create_mock_agent("agent_a", "Response A") + agent_b = create_mock_agent("agent_b", "Response B") + agent_c = create_mock_agent("agent_c", "Response C") + agent_z = create_mock_agent("agent_z", "Response Z") + + # Track when Z executes relative to A, B, C + completed_before_z = [] + + async def stream_a(*args, **kwargs): + yield {"agent_start": True} + await asyncio.sleep(0.02) + yield {"result": agent_a.return_value} + + async def stream_b(*args, **kwargs): + yield {"agent_start": True} + await asyncio.sleep(0.04) + yield {"result": agent_b.return_value} + + async def stream_c(*args, **kwargs): + yield {"agent_start": True} + await asyncio.sleep(0.06) + yield {"result": agent_c.return_value} + + async def stream_z(*args, **kwargs): + # Record which nodes completed before Z started + completed_before_z.extend(["A", "B", "C"]) # All should be complete + yield {"agent_start": True} + yield {"result": agent_z.return_value} + + agent_a.stream_async = Mock(side_effect=stream_a) + agent_b.stream_async = Mock(side_effect=stream_b) + agent_c.stream_async = Mock(side_effect=stream_c) + agent_z.stream_async = Mock(side_effect=stream_z) + + # Build graph with AND mode: A, B, C all must complete before Z + builder = GraphBuilder() + builder.add_node(agent_a, "A") + builder.add_node(agent_b, "B") + builder.add_node(agent_c, "C") + builder.add_node(agent_z, "Z") + builder.add_edge("A", "Z") + builder.add_edge("B", "Z") + builder.add_edge("C", "Z") + builder.set_entry_point("A") + builder.set_entry_point("B") + builder.set_entry_point("C") + builder.set_edge_execution_mode(EdgeExecutionMode.AND) + + graph = builder.build() + assert graph.edge_execution_mode == EdgeExecutionMode.AND + + result = await graph.invoke_async("Test AND mode") + + # Verify successful execution + assert result.status == Status.COMPLETED + + # Z should execute exactly once after all predecessors + assert agent_z.stream_async.call_count == 1 + + # Verify execution order: A, B, C should all be before Z + execution_ids = [node.node_id for node in result.execution_order] + z_index = execution_ids.index("Z") + assert "A" in execution_ids[:z_index], "A should complete before Z in AND mode" + assert "B" in execution_ids[:z_index], "B should complete before Z in AND mode" + assert "C" in execution_ids[:z_index], "C should complete before Z in AND mode" + + +@pytest.mark.asyncio +async def test_edge_execution_mode_and_waits_for_all(mock_strands_tracer, mock_use_span): + """Test AND mode explicitly waits for all predecessors, not just any.""" + from strands.multiagent.base import EdgeExecutionMode + + # Create agents where A completes much faster than B and C + agent_a = create_mock_agent("agent_a", "Response A") + agent_b = create_mock_agent("agent_b", "Response B") + agent_z = create_mock_agent("agent_z", "Response Z") + + z_start_time = None + a_end_time = None + b_end_time = None + + async def stream_a(*args, **kwargs): + nonlocal a_end_time + yield {"agent_start": True} + await asyncio.sleep(0.01) # A completes quickly + a_end_time = time.time() + yield {"result": agent_a.return_value} + + async def stream_b(*args, **kwargs): + nonlocal b_end_time + yield {"agent_start": True} + await asyncio.sleep(0.1) # B takes much longer + b_end_time = time.time() + yield {"result": agent_b.return_value} + + async def stream_z(*args, **kwargs): + nonlocal z_start_time + z_start_time = time.time() + yield {"agent_start": True} + yield {"result": agent_z.return_value} + + agent_a.stream_async = Mock(side_effect=stream_a) + agent_b.stream_async = Mock(side_effect=stream_b) + agent_z.stream_async = Mock(side_effect=stream_z) + + # Build graph with AND mode + builder = GraphBuilder() + builder.add_node(agent_a, "A") + builder.add_node(agent_b, "B") + builder.add_node(agent_z, "Z") + builder.add_edge("A", "Z") + builder.add_edge("B", "Z") + builder.set_entry_point("A") + builder.set_entry_point("B") + builder.set_edge_execution_mode(EdgeExecutionMode.AND) + + graph = builder.build() + result = await graph.invoke_async("Test AND mode waits") + + assert result.status == Status.COMPLETED + + # In AND mode, Z should start after BOTH A and B complete + # Z should not start right after A (the faster one) + assert z_start_time is not None + assert b_end_time is not None + assert z_start_time >= b_end_time - 0.01, "Z should wait for B (the slower one) in AND mode" + + +@pytest.mark.asyncio +async def test_edge_execution_mode_and_with_conditions(mock_strands_tracer, mock_use_span): + """Test AND mode respects conditional edges.""" + from strands.multiagent.base import EdgeExecutionMode + + agent_a = create_mock_agent("agent_a", "Response A") + agent_b = create_mock_agent("agent_b", "Response B") + agent_z = create_mock_agent("agent_z", "Response Z") + + # Condition that always returns False for B->Z edge + def block_b_to_z(state: GraphState) -> bool: + return False + + # Build graph with AND mode and a blocked edge + builder = GraphBuilder() + builder.add_node(agent_a, "A") + builder.add_node(agent_b, "B") + builder.add_node(agent_z, "Z") + builder.add_edge("A", "Z") + builder.add_edge("B", "Z", condition=block_b_to_z) # This edge is blocked + builder.set_entry_point("A") + builder.set_entry_point("B") + builder.set_edge_execution_mode(EdgeExecutionMode.AND) + + graph = builder.build() + result = await graph.invoke_async("Test AND mode with conditions") + + # Z should NOT execute because the B->Z edge condition is not satisfied + # even though both A and B complete + assert result.status == Status.COMPLETED + assert agent_z.stream_async.call_count == 0 + + # Only A and B should be in the execution order + execution_ids = [node.node_id for node in result.execution_order] + assert "A" in execution_ids + assert "B" in execution_ids + assert "Z" not in execution_ids + + +@pytest.mark.asyncio +async def test_edge_execution_mode_or_vs_and_behavior(mock_strands_tracer, mock_use_span): + """Directly compare OR vs AND behavior with the same graph structure.""" + from strands.multiagent.base import EdgeExecutionMode + + async def run_graph_with_mode(mode: EdgeExecutionMode): + # Fresh agents for each run + agent_a = create_mock_agent("agent_a", "Response A") + agent_b = create_mock_agent("agent_b", "Response B") + agent_z = create_mock_agent("agent_z", "Response Z") + + z_execution_count = 0 + + async def stream_a(*args, **kwargs): + yield {"agent_start": True} + await asyncio.sleep(0.01) + yield {"result": agent_a.return_value} + + async def stream_b(*args, **kwargs): + yield {"agent_start": True} + await asyncio.sleep(0.05) + yield {"result": agent_b.return_value} + + async def stream_z(*args, **kwargs): + nonlocal z_execution_count + z_execution_count += 1 + yield {"agent_start": True} + yield {"result": agent_z.return_value} + + agent_a.stream_async = Mock(side_effect=stream_a) + agent_b.stream_async = Mock(side_effect=stream_b) + agent_z.stream_async = Mock(side_effect=stream_z) + + builder = GraphBuilder() + builder.add_node(agent_a, "A") + builder.add_node(agent_b, "B") + builder.add_node(agent_z, "Z") + builder.add_edge("A", "Z") + builder.add_edge("B", "Z") + builder.set_entry_point("A") + builder.set_entry_point("B") + builder.set_edge_execution_mode(mode) + + graph = builder.build() + result = await graph.invoke_async(f"Test {mode.value} mode") + + return result, z_execution_count + + # Run with OR mode + or_result, or_z_count = await run_graph_with_mode(EdgeExecutionMode.OR) + + # Run with AND mode + and_result, and_z_count = await run_graph_with_mode(EdgeExecutionMode.AND) + + # Both should complete successfully + assert or_result.status == Status.COMPLETED + assert and_result.status == Status.COMPLETED + + # Z should execute once in both modes (but timing differs) + assert or_z_count == 1 + assert and_z_count == 1 + + +def test_edge_execution_mode_compute_ready_nodes_for_resume(): + """Test _compute_ready_nodes_for_resume respects edge_execution_mode.""" + from strands.multiagent.base import EdgeExecutionMode + + agent_a = create_mock_agent("agent_a") + agent_b = create_mock_agent("agent_b") + agent_z = create_mock_agent("agent_z") + + # Build graph with AND mode + builder = GraphBuilder() + builder.add_node(agent_a, "A") + builder.add_node(agent_b, "B") + builder.add_node(agent_z, "Z") + builder.add_edge("A", "Z") + builder.add_edge("B", "Z") + builder.set_entry_point("A") + builder.set_entry_point("B") + builder.set_edge_execution_mode(EdgeExecutionMode.AND) + + graph = builder.build() + graph.state.status = Status.EXECUTING + + # Simulate only A completed + node_a = graph.nodes["A"] + node_a.execution_status = Status.COMPLETED + graph.state.completed_nodes.add(node_a) + + # In AND mode, Z should NOT be ready (B not completed) + ready_nodes = graph._compute_ready_nodes_for_resume() + ready_node_ids = [n.node_id for n in ready_nodes] + assert "Z" not in ready_node_ids, "Z should not be ready in AND mode when only A completed" + + # Now complete B as well + node_b = graph.nodes["B"] + node_b.execution_status = Status.COMPLETED + graph.state.completed_nodes.add(node_b) + + # Now Z should be ready + ready_nodes = graph._compute_ready_nodes_for_resume() + ready_node_ids = [n.node_id for n in ready_nodes] + assert "Z" in ready_node_ids, "Z should be ready in AND mode when both A and B completed" + + # Test OR mode - rebuild graph + builder = GraphBuilder() + builder.add_node(create_mock_agent("agent_a_2"), "A") + builder.add_node(create_mock_agent("agent_b_2"), "B") + builder.add_node(create_mock_agent("agent_z_2"), "Z") + builder.add_edge("A", "Z") + builder.add_edge("B", "Z") + builder.set_entry_point("A") + builder.set_entry_point("B") + # Default OR mode + + graph_or = builder.build() + graph_or.state.status = Status.EXECUTING + + # Simulate only A completed + node_a_or = graph_or.nodes["A"] + node_a_or.execution_status = Status.COMPLETED + graph_or.state.completed_nodes.add(node_a_or) + + # In OR mode, Z SHOULD be ready (A completed is enough) + ready_nodes_or = graph_or._compute_ready_nodes_for_resume() + ready_node_ids_or = [n.node_id for n in ready_nodes_or] + assert "Z" in ready_node_ids_or, "Z should be ready in OR mode when A completed" + + +def test_edge_execution_mode_exported(): + """Test EdgeExecutionMode is exported from multiagent module.""" + from strands.multiagent import EdgeExecutionMode + + assert EdgeExecutionMode.OR.value == "or" + assert EdgeExecutionMode.AND.value == "and" + + +def test_is_node_ready_with_conditions_and_mode_partial_completion(): + """Test _is_node_ready_with_conditions returns False when not all predecessors completed in AND mode.""" + from strands.multiagent.base import EdgeExecutionMode + + agent_a = create_mock_agent("agent_a") + agent_b = create_mock_agent("agent_b") + agent_z = create_mock_agent("agent_z") + + # Build graph with AND mode + builder = GraphBuilder() + builder.add_node(agent_a, "A") + builder.add_node(agent_b, "B") + builder.add_node(agent_z, "Z") + builder.add_edge("A", "Z") + builder.add_edge("B", "Z") + builder.set_entry_point("A") + builder.set_entry_point("B") + builder.set_edge_execution_mode(EdgeExecutionMode.AND) + + graph = builder.build() + + node_a = graph.nodes["A"] + node_b = graph.nodes["B"] + node_z = graph.nodes["Z"] + + # Only A completed in this batch, B not completed at all + completed_batch = [node_a] + # state.completed_nodes is empty (nothing completed previously) + + # In AND mode, Z should NOT be ready because B hasn't completed + is_ready = graph._is_node_ready_with_conditions(node_z, completed_batch) + assert is_ready is False, "Z should not be ready when B hasn't completed in AND mode" + + # Now B completes in the next batch + graph.state.completed_nodes.add(node_a) # A completed previously + completed_batch_2 = [node_b] + + # Now Z should be ready (A completed previously, B just completed) + is_ready = graph._is_node_ready_with_conditions(node_z, completed_batch_2) + assert is_ready is True, "Z should be ready when all predecessors have completed" + + +def test_is_node_ready_with_conditions_or_mode(): + """Test _is_node_ready_with_conditions in OR mode.""" + from strands.multiagent.base import EdgeExecutionMode + + agent_a = create_mock_agent("agent_a") + agent_b = create_mock_agent("agent_b") + agent_z = create_mock_agent("agent_z") + + # Build graph with default OR mode + builder = GraphBuilder() + builder.add_node(agent_a, "A") + builder.add_node(agent_b, "B") + builder.add_node(agent_z, "Z") + builder.add_edge("A", "Z") + builder.add_edge("B", "Z") + builder.set_entry_point("A") + builder.set_entry_point("B") + + graph = builder.build() + assert graph.edge_execution_mode == EdgeExecutionMode.OR + + node_a = graph.nodes["A"] + node_z = graph.nodes["Z"] + + # Only A completed in this batch + completed_batch = [node_a] + + # In OR mode, Z SHOULD be ready because A completed (any predecessor is enough) + is_ready = graph._is_node_ready_with_conditions(node_z, completed_batch) + assert is_ready is True, "Z should be ready when any predecessor completes in OR mode" + + +def test_is_node_ready_with_conditions_no_incoming_edges(): + """Test _is_node_ready_with_conditions returns False for nodes with no incoming edges.""" + agent_a = create_mock_agent("agent_a") + + builder = GraphBuilder() + builder.add_node(agent_a, "A") + builder.set_entry_point("A") + + graph = builder.build() + node_a = graph.nodes["A"] + + # Node with no incoming edges should return False + is_ready = graph._is_node_ready_with_conditions(node_a, []) + assert is_ready is False + + +def test_is_node_ready_with_conditions_and_mode_no_new_completion(): + """Test AND mode returns False when all predecessors completed previously but none in current batch.""" + from strands.multiagent.base import EdgeExecutionMode + + agent_a = create_mock_agent("agent_a") + agent_b = create_mock_agent("agent_b") + agent_z = create_mock_agent("agent_z") + + builder = GraphBuilder() + builder.add_node(agent_a, "A") + builder.add_node(agent_b, "B") + builder.add_node(agent_z, "Z") + builder.add_edge("A", "Z") + builder.add_edge("B", "Z") + builder.set_entry_point("A") + builder.set_entry_point("B") + builder.set_edge_execution_mode(EdgeExecutionMode.AND) + + graph = builder.build() + + node_a = graph.nodes["A"] + node_b = graph.nodes["B"] + node_z = graph.nodes["Z"] + + # Both A and B completed in previous batches + graph.state.completed_nodes.add(node_a) + graph.state.completed_nodes.add(node_b) + + # Current batch has some other node (not A or B) + # This simulates checking Z's readiness when neither of its predecessors + # just completed (they completed earlier) + completed_batch = [] # Empty batch - no new completions + + # Should return False because has_new_completion is False + # (prevents re-triggering Z multiple times) + is_ready = graph._is_node_ready_with_conditions(node_z, completed_batch) + assert is_ready is False, "Z should not be ready when no predecessor completed in current batch" def test_graph_interrupt_on_agent(agenerator): exp_interrupts = [ Interrupt( diff --git a/tests_integ/test_multiagent_graph.py b/tests_integ/test_multiagent_graph.py index b80a0f82d..9099b99e2 100644 --- a/tests_integ/test_multiagent_graph.py +++ b/tests_integ/test_multiagent_graph.py @@ -586,3 +586,173 @@ async def failing_after_two(*args, **kwargs): assert result.status == Status.COMPLETED assert len(result.execution_order) == 5 assert all(node.node_id == "loop_node" for node in result.execution_order) + + +@pytest.mark.asyncio +async def test_edge_execution_mode_and(): + """Test AND edge execution mode - node waits for ALL predecessors to complete.""" + from strands.multiagent import EdgeExecutionMode + + # Create three parallel agents and one downstream agent + agent_a = Agent( + name="agent_a", + model="us.amazon.nova-lite-v1:0", + system_prompt="You are Agent A. Respond briefly with 'A completed'.", + ) + agent_b = Agent( + name="agent_b", + model="us.amazon.nova-lite-v1:0", + system_prompt="You are Agent B. Respond briefly with 'B completed'.", + ) + agent_c = Agent( + name="agent_c", + model="us.amazon.nova-lite-v1:0", + system_prompt="You are Agent C. Respond briefly with 'C completed'.", + ) + agent_z = Agent( + name="agent_z", + model="us.amazon.nova-lite-v1:0", + system_prompt="You are Agent Z. Summarize the inputs you received from previous agents.", + ) + + # Build graph with AND mode: A, B, C all must complete before Z + builder = GraphBuilder() + builder.add_node(agent_a, "A") + builder.add_node(agent_b, "B") + builder.add_node(agent_c, "C") + builder.add_node(agent_z, "Z") + builder.add_edge("A", "Z") + builder.add_edge("B", "Z") + builder.add_edge("C", "Z") + builder.set_entry_point("A") + builder.set_entry_point("B") + builder.set_entry_point("C") + builder.set_edge_execution_mode(EdgeExecutionMode.AND) + + graph = builder.build() + assert graph.edge_execution_mode == EdgeExecutionMode.AND + + result = await graph.invoke_async("Execute all agents") + + # Verify successful execution + assert result.status == Status.COMPLETED + assert result.completed_nodes == 4 # A, B, C, Z all completed + + # Verify execution order: A, B, C should all be before Z + execution_ids = [node.node_id for node in result.execution_order] + z_index = execution_ids.index("Z") + + # All of A, B, C must appear before Z in the execution order + assert "A" in execution_ids[:z_index], "A should complete before Z in AND mode" + assert "B" in execution_ids[:z_index], "B should complete before Z in AND mode" + assert "C" in execution_ids[:z_index], "C should complete before Z in AND mode" + + # Verify Z only executed once + z_executions = sum(1 for node in result.execution_order if node.node_id == "Z") + assert z_executions == 1, "Z should execute exactly once" + + # Verify Z received inputs from all predecessors + z_result = result.results["Z"] + assert z_result.status == Status.COMPLETED + + +@pytest.mark.asyncio +async def test_edge_execution_mode_or_default(): + """Test OR edge execution mode (default) - node executes when ANY predecessor completes.""" + from strands.multiagent import EdgeExecutionMode + + # Create two parallel agents and one downstream agent + agent_a = Agent( + name="agent_a", + model="us.amazon.nova-lite-v1:0", + system_prompt="You are Agent A. Respond briefly with 'A completed'.", + ) + agent_b = Agent( + name="agent_b", + model="us.amazon.nova-lite-v1:0", + system_prompt="You are Agent B. Respond briefly with 'B completed'.", + ) + agent_z = Agent( + name="agent_z", + model="us.amazon.nova-lite-v1:0", + system_prompt="You are Agent Z. Respond briefly acknowledging the input.", + ) + + # Build graph with default OR mode + builder = GraphBuilder() + builder.add_node(agent_a, "A") + builder.add_node(agent_b, "B") + builder.add_node(agent_z, "Z") + builder.add_edge("A", "Z") + builder.add_edge("B", "Z") + builder.set_entry_point("A") + builder.set_entry_point("B") + # Default is OR mode - don't set explicitly + + graph = builder.build() + assert graph.edge_execution_mode == EdgeExecutionMode.OR + + result = await graph.invoke_async("Execute all agents") + + # Verify successful execution + assert result.status == Status.COMPLETED + assert result.completed_nodes == 3 # A, B, Z all completed + + # In OR mode, Z executes once after A and B complete (since they run in parallel) + z_executions = sum(1 for node in result.execution_order if node.node_id == "Z") + assert z_executions == 1, "Z should execute exactly once" + + # Verify all nodes are in results + assert "A" in result.results + assert "B" in result.results + assert "Z" in result.results + + +@pytest.mark.asyncio +async def test_edge_execution_mode_and_with_conditions(): + """Test AND edge execution mode with conditional edges.""" + from strands.multiagent import EdgeExecutionMode + + agent_a = Agent( + name="agent_a", + model="us.amazon.nova-lite-v1:0", + system_prompt="You are Agent A. Respond briefly.", + ) + agent_b = Agent( + name="agent_b", + model="us.amazon.nova-lite-v1:0", + system_prompt="You are Agent B. Respond briefly.", + ) + agent_z = Agent( + name="agent_z", + model="us.amazon.nova-lite-v1:0", + system_prompt="You are Agent Z. Respond briefly.", + ) + + # Condition that blocks B->Z edge + def block_b_to_z(state): + return False + + # Build graph with AND mode and a blocked edge + builder = GraphBuilder() + builder.add_node(agent_a, "A") + builder.add_node(agent_b, "B") + builder.add_node(agent_z, "Z") + builder.add_edge("A", "Z") + builder.add_edge("B", "Z", condition=block_b_to_z) # This edge is blocked + builder.set_entry_point("A") + builder.set_entry_point("B") + builder.set_edge_execution_mode(EdgeExecutionMode.AND) + + graph = builder.build() + result = await graph.invoke_async("Test AND mode with blocked condition") + + # Z should NOT execute because in AND mode ALL conditions must be satisfied + assert result.status == Status.COMPLETED + assert result.completed_nodes == 2 # Only A and B completed + + # Verify Z is not in execution order + execution_ids = [node.node_id for node in result.execution_order] + assert "A" in execution_ids + assert "B" in execution_ids + assert "Z" not in execution_ids