diff --git a/crates/pu-cli/src/commands/spawn.rs b/crates/pu-cli/src/commands/spawn.rs index 1752bfd..0c9bc49 100644 --- a/crates/pu-cli/src/commands/spawn.rs +++ b/crates/pu-cli/src/commands/spawn.rs @@ -25,6 +25,7 @@ pub async fn run( agent_args: Option, plan_mode: bool, no_trigger: bool, + trigger: Option, json: bool, ) -> Result<(), CliError> { daemon_ctrl::ensure_daemon(socket).await?; @@ -73,6 +74,7 @@ pub async fn run( extra_args, plan_mode, no_trigger, + trigger, }, ) .await?; diff --git a/crates/pu-cli/src/commands/trigger.rs b/crates/pu-cli/src/commands/trigger.rs index f089aeb..096ab1a 100644 --- a/crates/pu-cli/src/commands/trigger.rs +++ b/crates/pu-cli/src/commands/trigger.rs @@ -111,3 +111,25 @@ pub async fn run_delete( output::print_response(&resp, json)?; Ok(()) } + +pub async fn run_assign( + socket: &Path, + agent_id: &str, + trigger_name: &str, + json: bool, +) -> Result<(), CliError> { + daemon_ctrl::ensure_daemon(socket).await?; + let project_root = commands::cwd_string()?; + let resp = client::send_request( + socket, + &Request::AssignTrigger { + project_root, + agent_id: agent_id.to_string(), + trigger_name: trigger_name.to_string(), + }, + ) + .await?; + let resp = output::check_response(resp, json)?; + output::print_response(&resp, json)?; + Ok(()) +} diff --git a/crates/pu-cli/src/main.rs b/crates/pu-cli/src/main.rs index 23221ac..5e78e88 100644 --- a/crates/pu-cli/src/main.rs +++ b/crates/pu-cli/src/main.rs @@ -65,6 +65,9 @@ enum Commands { /// Disable event triggers for this agent #[arg(long)] no_trigger: bool, + /// Bind an idle trigger to this agent (name of trigger in .pu/triggers/) + #[arg(long, conflicts_with = "no_trigger")] + trigger: Option, /// Output as JSON #[arg(long)] json: bool, @@ -575,6 +578,16 @@ enum TriggerAction { #[arg(long)] json: bool, }, + /// Assign a trigger to an idle agent + Assign { + /// Agent ID + agent_id: String, + /// Trigger name + trigger_name: String, + /// Output as JSON + #[arg(long)] + json: bool, + }, } #[tokio::main] @@ -608,11 +621,12 @@ async fn main() { agent_args, plan, no_trigger, + trigger, json, } => { commands::spawn::run( &socket, prompt, agent, name, base, root, worktree, template, file, command, vars, - no_auto, agent_args, plan, no_trigger, json, + no_auto, agent_args, plan, no_trigger, trigger, json, ) .await } @@ -771,6 +785,11 @@ async fn main() { TriggerAction::Delete { name, scope, json } => { commands::trigger::run_delete(&socket, &name, &scope, json).await } + TriggerAction::Assign { + agent_id, + trigger_name, + json, + } => commands::trigger::run_assign(&socket, &agent_id, &trigger_name, json).await, }, Commands::Gate { event, diff --git a/crates/pu-cli/src/output.rs b/crates/pu-cli/src/output.rs index 4ecdd1f..b527f98 100644 --- a/crates/pu-cli/src/output.rs +++ b/crates/pu-cli/src/output.rs @@ -253,6 +253,18 @@ pub fn print_response(response: &Response, json_mode: bool) -> Result<(), CliErr Response::RenameResult { agent_id, name } => { println!("Renamed agent {} to {}", agent_id.bold(), name.green()); } + Response::AssignTriggerResult { + agent_id, + trigger_name, + sequence_len, + } => { + println!( + "Assigned trigger {} to agent {} ({} steps)", + trigger_name.green(), + agent_id.bold(), + sequence_len + ); + } Response::CreateWorktreeResult { worktree_id } => { println!("Created worktree {}", worktree_id.bold()); } diff --git a/crates/pu-core/src/protocol.rs b/crates/pu-core/src/protocol.rs index a3f84df..8046f09 100644 --- a/crates/pu-core/src/protocol.rs +++ b/crates/pu-core/src/protocol.rs @@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize}; use crate::types::{AgentStatus, WorktreeEntry}; -pub const PROTOCOL_VERSION: u32 = 4; +pub const PROTOCOL_VERSION: u32 = 5; /// Serde helper: encode `Vec` as hex in JSON for binary PTY data. mod hex_bytes { @@ -66,6 +66,8 @@ pub enum Request { plan_mode: bool, #[serde(default)] no_trigger: bool, + #[serde(default)] + trigger: Option, }, Status { project_root: String, @@ -127,6 +129,11 @@ pub enum Request { agent_id: String, name: String, }, + AssignTrigger { + project_root: String, + agent_id: String, + trigger_name: String, + }, CreateWorktree { project_root: String, #[serde(default)] @@ -598,6 +605,11 @@ pub enum Response { agent_id: String, name: String, }, + AssignTriggerResult { + agent_id: String, + trigger_name: String, + sequence_len: u32, + }, CreateWorktreeResult { worktree_id: String, }, @@ -836,6 +848,7 @@ mod tests { extra_args: vec![], plan_mode: false, no_trigger: false, + trigger: None, }; let json = serde_json::to_string(&req).unwrap(); let parsed: Request = serde_json::from_str(&json).unwrap(); @@ -886,6 +899,7 @@ mod tests { extra_args: vec![], plan_mode: false, no_trigger: false, + trigger: None, }; let json = serde_json::to_string(&req).unwrap(); let parsed: Request = serde_json::from_str(&json).unwrap(); @@ -920,6 +934,7 @@ mod tests { extra_args: vec!["--model".into(), "opus".into()], plan_mode: false, no_trigger: false, + trigger: None, }; let json = serde_json::to_string(&req).unwrap(); let parsed: Request = serde_json::from_str(&json).unwrap(); @@ -946,6 +961,7 @@ mod tests { extra_args: vec![], plan_mode: true, no_trigger: false, + trigger: None, }; let json = serde_json::to_string(&req).unwrap(); let parsed: Request = serde_json::from_str(&json).unwrap(); @@ -955,6 +971,64 @@ mod tests { } } + #[test] + fn given_spawn_request_without_trigger_should_default_to_none() { + let json = r#"{"type":"spawn","project_root":"/test","prompt":"fix bug"}"#; + let req: Request = serde_json::from_str(json).unwrap(); + match req { + Request::Spawn { trigger, .. } => assert!(trigger.is_none()), + _ => panic!("expected Spawn"), + } + } + + #[test] + fn given_spawn_request_with_trigger_should_round_trip() { + let req = Request::Spawn { + project_root: "/test".into(), + prompt: "fix".into(), + agent: "claude".into(), + name: None, + base: None, + root: true, + worktree: None, + command: None, + no_auto: false, + extra_args: vec![], + plan_mode: false, + no_trigger: false, + trigger: Some("review-bot".into()), + }; + let json = serde_json::to_string(&req).unwrap(); + let parsed: Request = serde_json::from_str(&json).unwrap(); + match parsed { + Request::Spawn { trigger, .. } => assert_eq!(trigger.unwrap(), "review-bot"), + _ => panic!("expected Spawn"), + } + } + + #[test] + fn given_assign_trigger_request_should_round_trip() { + let req = Request::AssignTrigger { + project_root: "/test".into(), + agent_id: "ag-abc123".into(), + trigger_name: "review-bot".into(), + }; + let json = serde_json::to_string(&req).unwrap(); + let parsed: Request = serde_json::from_str(&json).unwrap(); + match parsed { + Request::AssignTrigger { + project_root, + agent_id, + trigger_name, + } => { + assert_eq!(project_root, "/test"); + assert_eq!(agent_id, "ag-abc123"); + assert_eq!(trigger_name, "review-bot"); + } + _ => panic!("expected AssignTrigger"), + } + } + #[test] fn given_status_request_with_agent_id_should_round_trip() { let req = Request::Status { @@ -1262,7 +1336,7 @@ mod tests { #[test] fn given_protocol_version_should_be_current() { - assert_eq!(PROTOCOL_VERSION, 4); + assert_eq!(PROTOCOL_VERSION, 5); } // --- GridCommand round-trips --- @@ -1528,6 +1602,29 @@ mod tests { } } + #[test] + fn given_assign_trigger_result_should_round_trip() { + let resp = Response::AssignTriggerResult { + agent_id: "ag-abc".into(), + trigger_name: "review-bot".into(), + sequence_len: 3, + }; + let json = serde_json::to_string(&resp).unwrap(); + let parsed: Response = serde_json::from_str(&json).unwrap(); + match parsed { + Response::AssignTriggerResult { + agent_id, + trigger_name, + sequence_len, + } => { + assert_eq!(agent_id, "ag-abc"); + assert_eq!(trigger_name, "review-bot"); + assert_eq!(sequence_len, 3); + } + _ => panic!("expected AssignTriggerResult"), + } + } + // --- DeleteWorktree round-trips --- #[test] diff --git a/crates/pu-engine/src/engine.rs b/crates/pu-engine/src/engine.rs index 11759a2..c908f3a 100644 --- a/crates/pu-engine/src/engine.rs +++ b/crates/pu-engine/src/engine.rs @@ -43,6 +43,8 @@ struct SpawnParams { extra_args: Vec, plan_mode: bool, no_trigger: bool, + /// Name of trigger to bind (from --trigger flag) + trigger: Option, } struct SaveTriggerParams { @@ -227,7 +229,8 @@ impl Engine { | Request::Diff { project_root, .. } | Request::GetConfig { project_root } | Request::UpdateAgentConfig { project_root, .. } - | Request::Pulse { project_root, .. } => { + | Request::Pulse { project_root, .. } + | Request::AssignTrigger { project_root, .. } => { self.register_project(project_root); } _ => {} @@ -241,6 +244,14 @@ impl Engine { agent_id, name, } => self.handle_rename(&project_root, &agent_id, &name).await, + Request::AssignTrigger { + project_root, + agent_id, + trigger_name, + } => { + self.handle_assign_trigger(&project_root, &agent_id, &trigger_name) + .await + } Request::GetConfig { project_root } => self.handle_get_config(&project_root).await, Request::UpdateAgentConfig { project_root, @@ -269,6 +280,7 @@ impl Engine { extra_args, plan_mode, no_trigger, + trigger, } => { self.handle_spawn(SpawnParams { project_root, @@ -283,6 +295,7 @@ impl Engine { extra_args, plan_mode, no_trigger, + trigger, }) .await } @@ -836,6 +849,7 @@ impl Engine { extra_args, plan_mode, no_trigger, + trigger: trigger_param, } = params; let root_path = Path::new(&project_root); @@ -1150,34 +1164,44 @@ impl Engine { // tries to attach — the session must already be in the map. self.sessions.lock().await.insert(agent_id.clone(), handle); - // Find the first idle trigger and bind this agent to it + // Bind trigger if explicitly specified via --trigger let (trigger_name, trigger_total) = if no_trigger { (None, None) - } else { + } else if let Some(ref name) = trigger_param { let pr = project_root.to_string(); + let name_clone = name.clone(); let found = tokio::task::spawn_blocking(move || { let triggers = pu_core::trigger_def::triggers_for_event( Path::new(&pr), &pu_core::trigger_def::TriggerEvent::AgentIdle, ); - if triggers.len() > 1 { - tracing::warn!( - "multiple agent_idle triggers found ({}), using first: {}", - triggers.len(), - triggers[0].name - ); - } - triggers.into_iter().next().map(|t| { - let len = t.sequence.len() as u32; - (t.name, len) - }) + triggers + .into_iter() + .find(|t| t.name == name_clone) + .map(|t| { + let len = t.sequence.len() as u32; + (t.name, len) + }) }) .await .unwrap_or(None); match found { - Some((name, total)) if total > 0 => (Some(name), Some(total)), - _ => (None, None), + Some((tname, total)) if total > 0 => (Some(tname), Some(total)), + Some(_) => { + return Response::Error { + code: "INVALID_TRIGGER".into(), + message: format!("trigger '{name}' has empty sequence"), + }; + } + None => { + return Response::Error { + code: "NOT_FOUND".into(), + message: format!("trigger '{name}' not found"), + }; + } } + } else { + (None, None) }; // Update manifest @@ -1545,6 +1569,108 @@ impl Engine { } } + async fn handle_assign_trigger( + &self, + project_root: &str, + agent_id: &str, + trigger_name: &str, + ) -> Response { + let pr = project_root.to_string(); + let tn = trigger_name.to_string(); + + let trigger = tokio::task::spawn_blocking(move || { + pu_core::trigger_def::triggers_for_event( + Path::new(&pr), + &pu_core::trigger_def::TriggerEvent::AgentIdle, + ) + .into_iter() + .find(|t| t.name == tn) + }) + .await; + + let trigger = match trigger { + Ok(Some(t)) => t, + Ok(None) => { + return Response::Error { + code: "NOT_FOUND".into(), + message: format!("trigger '{trigger_name}' not found"), + }; + } + Err(e) => { + return Response::Error { + code: "INTERNAL_ERROR".into(), + message: format!("trigger lookup failed: {e}"), + }; + } + }; + + let sequence_len = trigger.sequence.len() as u32; + if sequence_len == 0 { + return Response::Error { + code: "INVALID_TRIGGER".into(), + message: format!("trigger '{trigger_name}' has empty sequence"), + }; + } + + // Verify the agent exists in the manifest before assigning + let pr_check = project_root.to_string(); + let aid_check = agent_id.to_string(); + let agent_exists = tokio::task::spawn_blocking(move || { + manifest::read_manifest(Path::new(&pr_check)) + .map(|m| m.find_agent(&aid_check).is_some()) + }) + .await; + + match agent_exists { + Ok(Ok(false)) | Ok(Err(_)) => { + return Response::Error { + code: "NOT_FOUND".into(), + message: format!("agent '{agent_id}' not found"), + }; + } + Err(e) => { + return Response::Error { + code: "INTERNAL_ERROR".into(), + message: format!("agent lookup failed: {e}"), + }; + } + Ok(Ok(true)) => {} // proceed + } + + let pr2 = project_root.to_string(); + let aid2 = agent_id.to_string(); + let tn2 = trigger_name.to_string(); + let result = tokio::task::spawn_blocking(move || { + manifest::update_manifest(Path::new(&pr2), |mut m| { + if let Some(agent) = m.find_agent_mut(&aid2) { + agent.trigger_name = Some(tn2.clone()); + agent.trigger_state = Some(pu_core::types::TriggerState::Active); + agent.trigger_seq_index = Some(0); + agent.trigger_total = Some(sequence_len); + agent.gate_attempts = Some(0); + } + m + }) + }) + .await; + + match result { + Ok(Ok(_)) => { + self.notify_status_change(project_root).await; + Response::AssignTriggerResult { + agent_id: agent_id.to_string(), + trigger_name: trigger_name.to_string(), + sequence_len, + } + } + Ok(Err(e)) => Self::error_response(&e), + Err(e) => Response::Error { + code: "INTERNAL_ERROR".into(), + message: format!("assign trigger task failed: {e}"), + }, + } + } + async fn handle_suspend(&self, project_root: &str, target: SuspendTarget) -> Response { let m = match self.read_manifest_async(project_root).await { Ok(m) => m, @@ -2926,6 +3052,7 @@ impl Engine { extra_args: vec![], plan_mode: false, no_trigger: false, + trigger: None, }) .await; @@ -2970,6 +3097,7 @@ impl Engine { extra_args: vec![], plan_mode: false, no_trigger: false, + trigger: None, }) .await; if let Response::SpawnResult { agent_id, .. } = resp { @@ -3718,8 +3846,7 @@ impl Engine { if let Some(ref inject_text) = action.inject { let resolved = pu_core::trigger_def::substitute_variables(inject_text, &trigger.variables); - let text_with_newline = format!("{resolved}\n"); - match self.inject_text(agent_id, &text_with_newline).await { + match self.inject_text(agent_id, &resolved).await { Ok(true) => {} // success, proceed to advance Ok(false) => { tracing::warn!(agent_id, "inject_text: session not found, marking failed"); @@ -3760,9 +3887,8 @@ impl Engine { } } - /// Inject text into an agent's PTY. Clones the fd under the lock, then drops - /// the lock before the potentially-blocking write. Returns `Ok(true)` on - /// success, `Ok(false)` if the session was not found. + /// Inject text into an agent's PTY using chunked typing + Enter submission. + /// Returns `Ok(true)` on success, `Ok(false)` if the session was not found. async fn inject_text(&self, agent_id: &str, text: &str) -> Result { let fd = { let sessions = self.sessions.lock().await; @@ -3770,7 +3896,9 @@ impl Engine { }; match fd { Some(fd) => { - self.pty_host.write_to_fd(&fd, text.as_bytes()).await?; + self.pty_host + .write_chunked_submit(&fd, text.as_bytes()) + .await?; Ok(true) } None => Ok(false), @@ -3853,6 +3981,7 @@ impl Engine { extra_args: vec![], plan_mode: false, no_trigger: false, + trigger: None, }) .await } else { @@ -3884,6 +4013,7 @@ impl Engine { extra_args: vec![], plan_mode: false, no_trigger: false, + trigger: None, }) .await } @@ -4367,6 +4497,7 @@ mod tests { extra_args: vec![], plan_mode: false, no_trigger: false, + trigger: None, }) .await; diff --git a/crates/pu-engine/src/test_helpers.rs b/crates/pu-engine/src/test_helpers.rs index 9925643..cb2d614 100644 --- a/crates/pu-engine/src/test_helpers.rs +++ b/crates/pu-engine/src/test_helpers.rs @@ -35,6 +35,7 @@ pub(crate) async fn init_and_spawn() -> (Engine, String, TempDir) { extra_args: vec![], plan_mode: false, no_trigger: false, + trigger: None, }) .await;