diff --git a/crates/buzz-acp/src/acp.rs b/crates/buzz-acp/src/acp.rs index 0856c77b7..6cd705d6c 100644 --- a/crates/buzz-acp/src/acp.rs +++ b/crates/buzz-acp/src/acp.rs @@ -280,8 +280,10 @@ impl AcpClient { /// Must be called exactly once, before any other ACP method. /// The caller may inspect `agentCapabilities` in the returned value. pub async fn initialize(&mut self) -> Result { + // Requesting version 2 is an intentional temporary pin — we are squatting + // on ACP v2 ahead of the upstream ACP RFD. Revisit when that RFD merges. let params = serde_json::json!({ - "protocolVersion": 1, + "protocolVersion": 2, "clientCapabilities": {}, "clientInfo": { "name": "buzz-acp", @@ -296,17 +298,23 @@ impl AcpClient { /// Send `session/new` and return the full response alongside the session ID. /// /// `cwd` must be an absolute path. `mcp_servers` may be empty. + /// `system_prompt` is included in the request when `Some` — agents that + /// support the field will use it; others ignore unknown fields per JSON-RPC. /// Callers use [`extract_model_config_options`] and [`extract_model_state`] /// to pull model info from the raw result. pub async fn session_new_full( &mut self, cwd: &str, mcp_servers: Vec, + system_prompt: Option<&str>, ) -> Result { - let params = serde_json::json!({ + let mut params = serde_json::json!({ "cwd": cwd, "mcpServers": mcp_servers, }); + if let Some(sp) = system_prompt { + params["systemPrompt"] = serde_json::Value::String(sp.to_owned()); + } let result = self.send_request("session/new", params).await?; let session_id = result["sessionId"] .as_str() @@ -327,8 +335,12 @@ impl AcpClient { &mut self, cwd: &str, mcp_servers: Vec, + system_prompt: Option<&str>, ) -> Result { - Ok(self.session_new_full(cwd, mcp_servers).await?.session_id) + Ok(self + .session_new_full(cwd, mcp_servers, system_prompt) + .await? + .session_id) } /// Send `session/set_config_option` (stable ACP path). @@ -1401,7 +1413,7 @@ mod tests { "id": 0u64, "method": "initialize", "params": { - "protocolVersion": 1, + "protocolVersion": 2, "clientCapabilities": {}, "clientInfo": { "name": "buzz-acp", @@ -1409,7 +1421,7 @@ mod tests { } } }); - assert_eq!(msg["params"]["protocolVersion"].as_u64(), Some(1)); + assert_eq!(msg["params"]["protocolVersion"].as_u64(), Some(2)); assert_eq!( msg["params"]["clientInfo"]["name"].as_str(), Some("buzz-acp") @@ -2031,4 +2043,65 @@ mod tests { "expected IdleTimeout after silence, got {result:?}" ); } + + // ── session_new_full systemPrompt serialization ────────────────────── + + #[tokio::test] + async fn session_new_full_includes_system_prompt_when_some() { + // Script: respond to initialize, then echo back the session/new request. + let script = r#" + read -t 2 _init + echo '{"jsonrpc":"2.0","id":0,"result":{"protocolVersion":1,"agentCapabilities":{}}}' + read -t 2 REQ + echo '{"jsonrpc":"2.0","id":1,"result":{"sessionId":"ses_test","_receivedRequest":'"$REQ"'}}' + sleep 1 + "#; + let mut client = spawn_script(script).await; + client + .initialize() + .await + .expect("initialize should succeed"); + + let resp = client + .session_new_full("/tmp", vec![], Some("Custom system prompt")) + .await + .expect("session_new_full should succeed"); + + assert_eq!(resp.session_id, "ses_test"); + let received = &resp.raw["_receivedRequest"]; + assert_eq!( + received["params"]["systemPrompt"].as_str(), + Some("Custom system prompt"), + "systemPrompt should be included in params when Some" + ); + } + + #[tokio::test] + async fn session_new_full_omits_system_prompt_when_none() { + // When system_prompt is None, the field should not appear in params. + let script = r#" + read -t 2 _init + echo '{"jsonrpc":"2.0","id":0,"result":{"protocolVersion":1,"agentCapabilities":{}}}' + read -t 2 REQ + echo '{"jsonrpc":"2.0","id":1,"result":{"sessionId":"ses_test","_receivedRequest":'"$REQ"'}}' + sleep 1 + "#; + let mut client = spawn_script(script).await; + client + .initialize() + .await + .expect("initialize should succeed"); + + let resp = client + .session_new_full("/tmp", vec![], None) + .await + .expect("session_new_full should succeed"); + + assert_eq!(resp.session_id, "ses_test"); + let received = &resp.raw["_receivedRequest"]; + assert!( + received["params"]["systemPrompt"].is_null(), + "systemPrompt should NOT be in params when value is None" + ); + } } diff --git a/crates/buzz-acp/src/lib.rs b/crates/buzz-acp/src/lib.rs index 7640f35a1..7a0e7f3af 100644 --- a/crates/buzz-acp/src/lib.rs +++ b/crates/buzz-acp/src/lib.rs @@ -31,7 +31,7 @@ use pool::{ AgentPool, ControlSignal, OwnedAgent, PromptContext, PromptOutcome, PromptResult, PromptSource, SessionState, }; -use queue::{prepend_base_prompt, EventQueue, QueuedEvent, ThreadTags}; +use queue::{EventQueue, QueuedEvent, ThreadTags}; use relay::{HarnessRelay, RelayEventPublisher}; use tokio::sync::{mpsc, watch}; use tracing_subscriber::EnvFilter; @@ -695,7 +695,7 @@ fn any_respawn_in_flight(crash_history: &[SlotCircuit]) -> bool { /// Result of a background respawn task. struct RespawnResult { index: usize, - result: Result, + result: Result<(AcpClient, u32)>, } /// RAII guard that ensures a `RespawnResult` is sent even if the task panics. @@ -719,7 +719,7 @@ impl RespawnGuard { /// Send the result and disarm the guard. Uses `try_send` (sync) so there /// is no await boundary between marking `sent` and actually enqueueing — /// cancellation cannot slip between the two. - fn send(mut self, result: Result) { + fn send(mut self, result: Result<(AcpClient, u32)>) { // Invariant: try_send succeeds because the channel capacity equals the // slot count, and respawn_in_flight guarantees at most one outstanding // result per slot. If this ever fails, the channel sizing or the @@ -839,6 +839,8 @@ async fn tokio_main() -> Result<()> { match tokio::time::timeout(Duration::from_secs(60), acp.initialize()).await { Ok(Ok(init_result)) => { tracing::info!(agent = i, "agent initialized: {init_result}"); + let protocol_version = + init_result["protocolVersion"].as_u64().unwrap_or(1) as u32; acp.observe( "agent_initialized", serde_json::json!({ @@ -852,6 +854,7 @@ async fn tokio_main() -> Result<()> { state: SessionState::default(), model_capabilities: None, desired_model: config.model.clone(), + protocol_version, })); } Ok(Err(e)) => { @@ -1279,13 +1282,14 @@ async fn tokio_main() -> Result<()> { while let Ok(rr) = respawn_rx.try_recv() { crash_history[rr.index].respawn_in_flight = false; match rr.result { - Ok(acp) => { + Ok((acp, protocol_version)) => { let agent = OwnedAgent { index: rr.index, acp, state: SessionState::default(), model_capabilities: None, desired_model: config.model.clone(), + protocol_version, }; pool.return_agent(agent); tracing::info!(agent = rr.index, "respawn complete"); @@ -1887,7 +1891,7 @@ async fn tokio_main() -> Result<()> { // Drain any respawn results that completed before the abort. Explicitly // shut down returned agents instead of relying on AcpClient::Drop. while let Ok(rr) = respawn_rx.try_recv() { - if let Ok(mut acp) = rr.result { + if let Ok((mut acp, _)) = rr.result { acp.shutdown().await; tracing::debug!(agent = rr.index, "reaped respawned agent on shutdown"); } @@ -2429,10 +2433,10 @@ fn dispatch_heartbeat( .heartbeat_prompt .clone() .unwrap_or_else(default_heartbeat_prompt); - let prompt_text = match ctx.base_prompt { - Some(bp) => prepend_base_prompt(bp, &prompt_text), - None => prompt_text, - }; + // For legacy agents (protocol_version < 2), prepend base_prompt to the + // heartbeat user message since they don't receive it via session/new. + let prompt_text = + pool::prepend_base_for_legacy(agent.protocol_version, ctx.base_prompt, &prompt_text); let result_tx = pool.result_tx(); let ctx_clone = Arc::clone(ctx); let agent_index = agent.index; @@ -2546,7 +2550,7 @@ async fn spawn_and_init( extra_env: &[(String, String)], agent_index: usize, observer: Option, -) -> Result { +) -> Result<(AcpClient, u32)> { let mut acp = AcpClient::spawn(command, args, extra_env) .await .map_err(|e| anyhow::anyhow!("failed to spawn agent: {e}"))?; @@ -2555,6 +2559,7 @@ async fn spawn_and_init( match acp.initialize().await { Ok(init_result) => { tracing::info!("agent initialized: {init_result}"); + let protocol_version = init_result["protocolVersion"].as_u64().unwrap_or(1) as u32; acp.observe( "agent_initialized", serde_json::json!({ @@ -2562,7 +2567,7 @@ async fn spawn_and_init( "initializeResult": init_result, }), ); - Ok(acp) + Ok((acp, protocol_version)) } Err(e) => { // Explicitly shut down the spawned child to prevent zombie/leak. @@ -2605,7 +2610,7 @@ async fn run_models(args: ModelsArgs) -> Result<()> { // so shutdown() runs on all paths (success, error, timeout). let protocol_result = tokio::time::timeout(MODELS_TIMEOUT, async { let init = client.initialize().await?; - let session = client.session_new_full(&cwd, vec![]).await?; + let session = client.session_new_full(&cwd, vec![], None).await?; Ok::<_, acp::AcpError>((init, session)) }) .await; @@ -2764,6 +2769,38 @@ fn build_mcp_servers(config: &Config) -> Vec { // ── Tests ───────────────────────────────────────────────────────────────────── +#[cfg(test)] +mod heartbeat_base_prompt_tests { + use super::*; + + // Pins the heartbeat dispatch path (dispatch_heartbeat, ~line 2359): a + // legacy agent WITH a base_prompt must get [Base] prepended to the + // heartbeat user message, composed as `[Base]\n{bp}\n\n{prompt}`. This is + // the second half of the round-2 regression (the first being initial_message). + + #[test] + fn test_heartbeat_legacy_agent_gets_base_prepended() { + // protocol_version 1 + Some(base_prompt): heartbeat prompt is prefixed + // with the [Base] section exactly as the legacy session/new path would. + let prompt = "[System: Heartbeat]\nrun feed get"; + let composed = pool::prepend_base_for_legacy(1, Some("you are a helpful agent"), prompt); + assert_eq!( + composed, + "[Base]\nyou are a helpful agent\n\n[System: Heartbeat]\nrun feed get" + ); + assert!(composed.starts_with("[Base]\nyou are a helpful agent\n\n")); + } + + #[test] + fn test_heartbeat_modern_agent_omits_base() { + // protocol_version 2 gets base_prompt via session/new; the heartbeat + // prompt is sent verbatim. + let prompt = "[System: Heartbeat]\nrun feed get"; + let composed = pool::prepend_base_for_legacy(2, Some("you are a helpful agent"), prompt); + assert_eq!(composed, prompt); + } +} + #[cfg(test)] mod owner_control_command_tests { use super::*; diff --git a/crates/buzz-acp/src/pool.rs b/crates/buzz-acp/src/pool.rs index 33c31d623..543fd32ef 100644 --- a/crates/buzz-acp/src/pool.rs +++ b/crates/buzz-acp/src/pool.rs @@ -35,8 +35,8 @@ use crate::acp::{ use crate::config::{DedupMode, PermissionMode}; use crate::observer; use crate::queue::{ - prepend_base_prompt, ContextMessage, ConversationContext, FlushBatch, PromptChannelInfo, - PromptProfile, PromptProfileLookup, + ContextMessage, ConversationContext, FlushBatch, PromptChannelInfo, PromptProfile, + PromptProfileLookup, }; use crate::relay::{ChannelInfo, RestClient}; @@ -135,6 +135,9 @@ pub struct OwnedAgent { pub model_capabilities: Option, /// Desired model ID (from `Config.model`). Applied after every `session_new_full()`. pub desired_model: Option, + /// Protocol version reported by the agent in its initialize response. + /// Agents declaring >= 2 support `systemPrompt` in session/new. + pub protocol_version: u32, } /// Pool of agents with take-and-return ownership semantics. @@ -424,9 +427,27 @@ async fn create_session_and_apply_model( agent: &mut OwnedAgent, ctx: &PromptContext, ) -> Result { + // Combine base_prompt + system_prompt into a single systemPrompt value + // for the session/new request. Only sent when the agent declares protocol + // version >= 2 (supports systemPrompt); legacy agents ignore it. + let combined_system_prompt: Option = if agent.protocol_version >= 2 { + match (ctx.base_prompt, ctx.system_prompt.as_deref()) { + (Some(bp), Some(sp)) => Some(format!("{}\n\n{sp}", bp.trim_end())), + (Some(bp), None) => Some(bp.trim_end().to_string()), + (None, Some(sp)) => Some(sp.to_string()), + (None, None) => None, + } + } else { + None + }; + let resp = agent .acp - .session_new_full(&ctx.cwd, ctx.mcp_servers.clone()) + .session_new_full( + &ctx.cwd, + ctx.mcp_servers.clone(), + combined_system_prompt.as_deref(), + ) .await?; // Populate model capabilities on first session creation. @@ -615,6 +636,26 @@ async fn apply_permission_mode( Ok(()) } +/// Prepend the `[Base]` section to a user-message body for legacy agents. +/// +/// Legacy agents (`protocol_version < 2`) don't receive `base_prompt` via the +/// system role in `session/new`, so it must ride along in the user message. +/// Agents with `protocol_version >= 2`, or any agent without a `base_prompt`, +/// get `body` unchanged. The gate lives here so the heartbeat and +/// initial-message dispatch paths can't drift apart again. +pub(crate) fn prepend_base_for_legacy( + protocol_version: u32, + base_prompt: Option<&str>, + body: &str, +) -> String { + match base_prompt { + Some(bp) if protocol_version < 2 => { + format!("{}\n\n{body}", crate::queue::base_section(bp)) + } + _ => body.to_string(), + } +} + /// Core async function spawned for each prompt. /// /// Lifecycle: @@ -845,11 +886,11 @@ pub async fn run_prompt_task( target: "pool::session", "sending initial_message to session {session_id} for channel {cid}" ); - // Prepend base prompt to initial_message for platform orientation. - let init_msg = match ctx.base_prompt { - Some(bp) => prepend_base_prompt(bp, initial_msg), - None => initial_msg.to_string(), - }; + // For agents with systemPrompt support (protocol_version >= 2), + // base_prompt is delivered via the system role in session/new. + // Legacy agents receive it via [Base] in the user message instead. + let init_msg = + prepend_base_for_legacy(agent.protocol_version, ctx.base_prompt, initial_msg); let init_result = agent .acp .session_prompt_with_idle_timeout( @@ -1001,12 +1042,13 @@ pub async fn run_prompt_task( crate::queue::format_prompt( b, &crate::queue::FormatPromptArgs { - base_prompt: ctx.base_prompt, - system_prompt: ctx.system_prompt.as_deref(), agent_core: agent_core_section.as_deref(), channel_info: channel_info.as_ref(), conversation_context: conversation_context.as_ref(), profile_lookup: profile_lookup.as_ref(), + has_system_prompt_support: agent.protocol_version >= 2, + base_prompt: ctx.base_prompt, + system_prompt: ctx.system_prompt.as_deref(), }, ) } else { @@ -2199,6 +2241,35 @@ mod tests { use nostr::{EventBuilder, Keys, Kind, Tag}; use serde_json::json; + // ── prepend_base_for_legacy regression tests ───────────────────────────── + // These pin the initial_message dispatch path (run_prompt_task, ~line 855): + // a legacy agent WITH a base_prompt must get [Base] prepended to the user + // message. This is the exact regression that shipped in the round-2 bug. + + #[test] + fn test_initial_message_legacy_agent_gets_base_prepended() { + // protocol_version 1 + Some(base_prompt): [Base] rides along in the + // user message, composed as `[Base]\n{bp}\n\n{initial_msg}`. + let composed = prepend_base_for_legacy(1, Some("you are a helpful agent"), "hello channel"); + assert_eq!(composed, "[Base]\nyou are a helpful agent\n\nhello channel"); + assert!(composed.starts_with("[Base]\nyou are a helpful agent\n\n")); + } + + #[test] + fn test_initial_message_modern_agent_omits_base() { + // protocol_version 2 receives base_prompt via session/new, so the user + // message is left untouched even when a base_prompt is present. + let composed = prepend_base_for_legacy(2, Some("you are a helpful agent"), "hello channel"); + assert_eq!(composed, "hello channel"); + } + + #[test] + fn test_initial_message_legacy_agent_without_base_is_unchanged() { + // No base_prompt configured: nothing to prepend regardless of version. + let composed = prepend_base_for_legacy(1, None, "hello channel"); + assert_eq!(composed, "hello channel"); + } + // ── parse_thread_response tests ────────────────────────────────────────── #[test] diff --git a/crates/buzz-acp/src/queue.rs b/crates/buzz-acp/src/queue.rs index ee776f830..a906ca39c 100644 --- a/crates/buzz-acp/src/queue.rs +++ b/crates/buzz-acp/src/queue.rs @@ -1009,31 +1009,41 @@ fn format_conversation_context( /// Arguments for [`format_prompt`] beyond the required [`FlushBatch`]. #[derive(Default)] pub struct FormatPromptArgs<'a> { - pub base_prompt: Option<&'a str>, - pub system_prompt: Option<&'a str>, pub agent_core: Option<&'a str>, pub channel_info: Option<&'a PromptChannelInfo>, pub conversation_context: Option<&'a ConversationContext>, pub profile_lookup: Option<&'a PromptProfileLookup>, + /// When true, base_prompt and system_prompt are delivered via the system + /// role (session/new) and omitted from the user message. When false + /// (legacy agents), they are injected as `[Base]` and `[System]` sections. + pub has_system_prompt_support: bool, + /// Base prompt content for legacy agents (protocol_version < 2). + pub base_prompt: Option<&'a str>, + /// System prompt content for legacy agents (protocol_version < 2). + pub system_prompt: Option<&'a str>, } -/// Prepend the `[Base]` platform-context section to a prompt body. +/// Format the `[Base]` section for the base prompt. /// -/// Used by the heartbeat and initial-message paths so the `[Base]` format -/// is defined in exactly one place. (`format_prompt` uses a sections-vec -/// approach instead, but the resulting `[Base]\n{content}` format is identical.) -pub fn prepend_base_prompt(base: &str, body: &str) -> String { - format!("[Base]\n{}\n\n{body}", base.trim_end()) +/// Single source of truth for the `[Base]` framing so the format is defined in +/// exactly one place across all dispatch paths (batch flush, heartbeat, +/// initial message). +pub(crate) fn base_section(base_prompt: &str) -> String { + format!("[Base]\n{}", base_prompt.trim_end()) } /// Format a [`FlushBatch`] into a prompt string for the agent. /// /// Produces a stable prompt with these sections (in order): -/// 0. `[Base]\n{base_prompt}` — platform orientation (if configured) -/// 1. `[System]\n{system_prompt}` — if system prompt is set -/// 2. `[Context]` — scope, channel name, and contextual hints for the agent -/// 3. `[Thread Context]` or `[Conversation Context]` — if fetched -/// 4. `[Event]` / `[Buzz events]` — the triggering event(s) +/// 0. `[Base]` — base prompt (only for legacy agents without systemPrompt support) +/// 1. `[System]` — system prompt (only for legacy agents without systemPrompt support) +/// 2. `[Agent Memory — core]` — if agent core memory is set +/// 3. `[Context]` — scope, channel name, and contextual hints for the agent +/// 4. `[Thread Context]` or `[Conversation Context]` — if fetched +/// 5. `[Event]` / `[Buzz events]` — the triggering event(s) +/// +/// For agents with `protocol_version >= 2`, base_prompt and system_prompt are +/// delivered via the system role in `session/new` and omitted from this message. pub fn format_prompt(batch: &FlushBatch, args: &FormatPromptArgs<'_>) -> String { // Scope is always derived from the LAST event in the batch — that's the // one the agent is responding to. Thread/DM context is supplementary info @@ -1054,17 +1064,22 @@ pub fn format_prompt(batch: &FlushBatch, args: &FormatPromptArgs<'_>) -> String let mut sections: Vec = Vec::with_capacity(7); - // 0. Base prompt (platform-level, always first). - if let Some(bp) = args.base_prompt { - sections.push(format!("[Base]\n{}", bp.trim_end())); - } - - // 1. System prompt. - if let Some(sp) = args.system_prompt { - sections.push(format!("[System]\n{sp}")); + // For legacy agents (protocol_version < 2), inject base_prompt and + // system_prompt as user-message sections. Modern agents receive these + // via the system role in session/new. + if !args.has_system_prompt_support { + if let Some(bp) = args.base_prompt { + sections.push(base_section(bp)); + } + if let Some(sp) = args.system_prompt { + sections.push(format!("[System]\n{sp}")); + } } - // 1b. NIP-AE agent core memory (rendered by `engram_fetch::build_core_section`). + // NIP-AE agent core memory (rendered by `engram_fetch::build_core_section`). + // agent_core is always in user messages because it is resolved per-channel + // after session creation. A future session/update mechanism could move it + // to the system role. if let Some(core) = args.agent_core { sections.push(core.to_string()); } @@ -1200,6 +1215,16 @@ mod tests { !q.in_flight_channels.is_empty() } + #[test] + fn test_base_section_prepends_header_and_trims_trailing_whitespace() { + // Trailing whitespace/newlines are stripped; the [Base] header is + // prepended exactly once with a single newline separator. + assert_eq!(base_section("hello \n\n"), "[Base]\nhello"); + assert_eq!(base_section("hello"), "[Base]\nhello"); + // Internal newlines and leading whitespace are preserved verbatim. + assert_eq!(base_section(" line1\nline2 "), "[Base]\n line1\nline2"); + } + // ── Test 1: push + flush_next basic ────────────────────────────────────── #[test] @@ -1474,10 +1499,10 @@ mod tests { assert!(prompt.contains("Content: third message")); } - // ── Test 11: system prompt prepended ───────────────────────────────────── + // ── Test 11: system prompt NOT in user message (delivered via system role) ── #[test] - fn test_format_prompt_with_system_prompt() { + fn test_format_prompt_no_system_prompt_in_user_message() { let ch = Uuid::new_v4(); let event = make_event("hello"); @@ -1491,17 +1516,15 @@ mod tests { cancelled_events: vec![], }; - let prompt = format_prompt( - &batch, - &FormatPromptArgs { - system_prompt: Some("You are a triage bot."), - ..Default::default() - }, - ); - assert!(prompt.starts_with("[System]\nYou are a triage bot.\n\n[Context]")); + let prompt = format_prompt(&batch, &FormatPromptArgs::default()); + // system_prompt and base_prompt are delivered via session/new system role, + // so they must NOT appear in the user message. + assert!(!prompt.contains("[System]")); + assert!(!prompt.contains("[Base]")); + assert!(prompt.starts_with("[Context]")); } - // ── Test 11b: agent_core section is injected after [System] ────────────── + // ── Test 11b: agent_core section is first in user message ────────────── #[test] fn test_format_prompt_with_agent_core() { @@ -1520,14 +1543,13 @@ mod tests { let prompt = format_prompt( &batch, &FormatPromptArgs { - system_prompt: Some("sys"), agent_core: Some(core), ..Default::default() }, ); assert!( - prompt.contains("[System]\nsys\n\n[Agent Memory — core]\nbe helpful"), - "expected core block after [System]; got: {prompt}" + prompt.starts_with("[Agent Memory — core]\nbe helpful\n\n[Context]"), + "expected core block first, then [Context]; got: {prompt}" ); } @@ -1555,10 +1577,10 @@ mod tests { assert!(prompt.starts_with("[Agent Memory — core]\nbe helpful\n\n[Context]")); } - // ── Test 11c: base prompt prepended before system prompt ───────────────── + // ── Test 11c: base_prompt and system_prompt NOT in user message ──────────── #[test] - fn test_format_prompt_with_base_prompt() { + fn test_format_prompt_no_base_or_system_sections() { let ch = Uuid::new_v4(); let event = make_event("hello"); @@ -1572,35 +1594,111 @@ mod tests { cancelled_events: vec![], }; - // Both base_prompt and system_prompt: [Base] comes first, then [System]. + // format_prompt no longer accepts or emits base_prompt/system_prompt. + // They are delivered via session/new system role instead. + let prompt = format_prompt(&batch, &FormatPromptArgs::default()); + assert!(!prompt.contains("[Base]")); + assert!(!prompt.contains("[System]")); + assert!(prompt.starts_with("[Context]")); + } + + // ── Test 11d: legacy agents receive [Base]/[System] in user message ─────── + + #[test] + fn test_format_prompt_legacy_agent_emits_base_and_system() { + let ch = Uuid::new_v4(); + let event = make_event("hello"); + + let batch = FlushBatch { + channel_id: ch, + events: vec![BatchEvent { + event, + prompt_tag: "test".into(), + received_at: Instant::now(), + }], + cancelled_events: vec![], + }; + + let core = "[Agent Memory — core]\nremember this"; let prompt = format_prompt( &batch, &FormatPromptArgs { - base_prompt: Some("Platform base."), - system_prompt: Some("Role prompt."), + has_system_prompt_support: false, + base_prompt: Some("test base prompt"), + system_prompt: Some("test system prompt"), + agent_core: Some(core), ..Default::default() }, ); - assert!(prompt.starts_with("[Base]\nPlatform base.\n\n[System]\nRole prompt.")); - // Only base_prompt (no system_prompt): [Base] comes first, then [Context]. + // Both sections must be present + assert!( + prompt.contains("[Base]\ntest base prompt"), + "missing [Base] section" + ); + assert!( + prompt.contains("[System]\ntest system prompt"), + "missing [System] section" + ); + + // [Base] and [System] must appear BEFORE [Agent Memory] and [Context] + let base_pos = prompt.find("[Base]").unwrap(); + let system_pos = prompt.find("[System]").unwrap(); + let core_pos = prompt.find("[Agent Memory").unwrap(); + let context_pos = prompt.find("[Context]").unwrap(); + + assert!(base_pos < system_pos, "[Base] should come before [System]"); + assert!( + system_pos < core_pos, + "[System] should come before [Agent Memory]" + ); + assert!( + core_pos < context_pos, + "[Agent Memory] should come before [Context]" + ); + } + + // ── Test 11e: modern agents suppress [Base]/[System] from user message ──── + + #[test] + fn test_format_prompt_modern_agent_suppresses_base_and_system() { + let ch = Uuid::new_v4(); + let event = make_event("hello"); + + let batch = FlushBatch { + channel_id: ch, + events: vec![BatchEvent { + event, + prompt_tag: "test".into(), + received_at: Instant::now(), + }], + cancelled_events: vec![], + }; + let prompt = format_prompt( &batch, &FormatPromptArgs { - base_prompt: Some("Platform base."), + has_system_prompt_support: true, + base_prompt: Some("test base prompt"), + system_prompt: Some("test system prompt"), ..Default::default() }, ); - assert!(prompt.starts_with("[Base]\nPlatform base.\n\n[Context]")); - // No base_prompt: no [Base] section emitted. - let prompt = format_prompt(&batch, &FormatPromptArgs::default()); - assert!(!prompt.contains("[Base]")); + // Neither section should appear — they are delivered via session/new + assert!( + !prompt.contains("[Base]"), + "[Base] should be suppressed for modern agents" + ); + assert!( + !prompt.contains("[System]"), + "[System] should be suppressed for modern agents" + ); assert!(prompt.starts_with("[Context]")); } #[test] - fn test_format_prompt_base_prompt_ordering_with_full_context() { + fn test_format_prompt_ordering_with_full_context() { let ch = Uuid::new_v4(); let event = make_event("hello"); let batch = FlushBatch { @@ -1623,34 +1721,38 @@ mod tests { truncated: false, }; + let core = "[Agent Memory — core]\nbe helpful"; let prompt = format_prompt( &batch, &FormatPromptArgs { - base_prompt: Some("Platform base."), - system_prompt: Some("Role prompt."), + agent_core: Some(core), conversation_context: Some(&ctx), ..Default::default() }, ); - // Verify section ordering: [Base] < [System] < [Context] < [Thread Context] - let base_pos = prompt.find("[Base]").expect("[Base] missing"); - let system_pos = prompt.find("[System]").expect("[System] missing"); + // Verify section ordering: [Agent Memory] < [Context] < [Thread Context] + let core_pos = prompt + .find("[Agent Memory") + .expect("[Agent Memory] missing"); let context_pos = prompt.find("[Context]").expect("[Context] missing"); let thread_pos = prompt .find("[Thread Context") .expect("[Thread Context] missing"); - assert!(base_pos < system_pos, "[Base] must come before [System]"); assert!( - system_pos < context_pos, - "[System] must come before [Context]" + core_pos < context_pos, + "[Agent Memory] must come before [Context]" ); assert!( context_pos < thread_pos, "[Context] must come before [Thread Context]" ); + // No [Base] or [System] in user message + assert!(!prompt.contains("[Base]")); + assert!(!prompt.contains("[System]")); } + // ── Test 12: drop mode discards in-flight channel events ───────────────── #[test] diff --git a/crates/buzz-agent/src/config.rs b/crates/buzz-agent/src/config.rs index 06aa5ecff..b8ed1a099 100644 --- a/crates/buzz-agent/src/config.rs +++ b/crates/buzz-agent/src/config.rs @@ -1,8 +1,9 @@ use std::time::Duration; -pub const PROTOCOL_VERSION: u32 = 1; +pub const PROTOCOL_VERSION: u32 = 2; pub const MAX_PROMPT_BYTES: usize = 1024 * 1024; +pub const MAX_SYSTEM_PROMPT_BYTES: usize = 512 * 1024; /// Total per-result byte ceiling (text + images). Sized for image-bearing /// results — view_image can legitimately return multi-MiB base64 payloads. /// Text is governed by the much smaller `BUZZ_AGENT_MAX_TOOL_RESULT_TEXT_BYTES`. diff --git a/crates/buzz-agent/src/lib.rs b/crates/buzz-agent/src/lib.rs index 442447c64..5e4988b23 100644 --- a/crates/buzz-agent/src/lib.rs +++ b/crates/buzz-agent/src/lib.rs @@ -18,7 +18,7 @@ use tokio::io::BufReader; use tokio::sync::{mpsc, watch, Mutex}; use crate::agent::RunCtx; -use crate::config::{Config, PROTOCOL_VERSION}; +use crate::config::{Config, MAX_SYSTEM_PROMPT_BYTES, PROTOCOL_VERSION}; use crate::llm::Llm; use crate::mcp::McpRegistry; use crate::types::HistoryItem; @@ -215,13 +215,19 @@ async fn initialize(id: Value, params: Value, wire_tx: &WireSender) { Ok(p) => p, Err(m) => return reject(wire_tx, id, INVALID_PARAMS, &m).await, }; - let _ = p.protocol_version; + // Honest negotiation: respond with the minimum of what the client + // requested and what we support. + // NOTE: gating `[Base]` injection on `protocol_version < 2` is a deliberate + // temporary measure — we are squatting on ACP v2 ahead of the upstream ACP + // RFD. Revisit when that RFD merges; otherwise a genuine upstream-v2 agent + // would silently lose `[Base]`. + let negotiated_version = p.protocol_version.min(PROTOCOL_VERSION); wire::send( wire_tx, wire::ok( id, json!({ - "protocolVersion": PROTOCOL_VERSION, + "protocolVersion": negotiated_version, "agentCapabilities": { "loadSession": false, "promptCapabilities": { "image": false, "audio": false, "embeddedContext": false }, @@ -261,15 +267,39 @@ async fn session_new(app: &Arc, id: Value, params: Value, wire_tx: &WireSen .await; } } - let effective_system_prompt: Arc = if app.cfg.hints_enabled { - let hints = hints::build_hints_section(std::path::Path::new(&p.cwd)); - if hints.is_empty() { - Arc::from(app.cfg.system_prompt.as_str()) + let effective_system_prompt: Arc = { + let hints = if app.cfg.hints_enabled { + hints::build_hints_section(std::path::Path::new(&p.cwd)) } else { - Arc::from(format!("{}\n\n{}", app.cfg.system_prompt, hints)) + String::new() + }; + // When the harness provides a systemPrompt (base_prompt + persona), use + // it as the primary content and suppress the default. The default is only + // a fallback for legacy harnesses that don't send systemPrompt. + let base = match p.system_prompt.as_deref() { + Some(client_prompt) if !client_prompt.trim().is_empty() => client_prompt.to_owned(), + _ => app.cfg.system_prompt.clone(), + }; + let prompt = if hints.is_empty() { + base + } else { + format!("{base}\n\n{hints}") + }; + // Reject combined prompts exceeding 512KB. + if prompt.len() > MAX_SYSTEM_PROMPT_BYTES { + return reject( + wire_tx, + id, + INVALID_PARAMS, + &format!( + "session/new: combined system prompt exceeds {}KB limit ({} bytes)", + MAX_SYSTEM_PROMPT_BYTES / 1024, + prompt.len() + ), + ) + .await; } - } else { - Arc::from(app.cfg.system_prompt.as_str()) + Arc::from(prompt) }; let mcp = match McpRegistry::spawn_all(&app.cfg, &p.mcp_servers, &p.cwd).await { Ok(m) => Arc::new(m), diff --git a/crates/buzz-agent/src/wire.rs b/crates/buzz-agent/src/wire.rs index ba69eb1d9..6250d0e63 100644 --- a/crates/buzz-agent/src/wire.rs +++ b/crates/buzz-agent/src/wire.rs @@ -49,6 +49,8 @@ pub struct SessionNewParams { pub cwd: String, #[serde(default)] pub mcp_servers: Vec, + #[serde(default)] + pub system_prompt: Option, } #[derive(Debug, Deserialize)] @@ -178,3 +180,59 @@ pub async fn writer_task(mut rx: mpsc::Receiver) { let _ = stdout.flush().await; } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn session_new_params_deserializes_system_prompt() { + let json = serde_json::json!({ + "cwd": "/tmp/test", + "mcpServers": [], + "systemPrompt": "You are a helpful agent." + }); + let params: SessionNewParams = serde_json::from_value(json).unwrap(); + assert_eq!(params.cwd, "/tmp/test"); + assert_eq!( + params.system_prompt.as_deref(), + Some("You are a helpful agent.") + ); + } + + #[test] + fn session_new_params_system_prompt_defaults_to_none() { + let json = serde_json::json!({ + "cwd": "/tmp/test", + "mcpServers": [] + }); + let params: SessionNewParams = serde_json::from_value(json).unwrap(); + assert_eq!(params.cwd, "/tmp/test"); + assert!(params.system_prompt.is_none()); + } + + #[test] + fn session_new_params_ignores_unknown_fields() { + // Backward compat: old agents with new harness — unknown fields are ignored. + let json = serde_json::json!({ + "cwd": "/tmp/test", + "mcpServers": [], + "unknownField": "should be ignored" + }); + let params: SessionNewParams = serde_json::from_value(json).unwrap(); + assert_eq!(params.cwd, "/tmp/test"); + assert!(params.system_prompt.is_none()); + } + + #[test] + fn session_new_params_empty_string_system_prompt() { + // An explicit empty string is distinct from absent — deserializes to Some(""). + let json = serde_json::json!({ + "cwd": "/tmp/test", + "mcpServers": [], + "systemPrompt": "" + }); + let params: SessionNewParams = serde_json::from_value(json).unwrap(); + assert_eq!(params.system_prompt, Some(String::new())); + } +} diff --git a/crates/buzz-agent/tests/fake_llm.rs b/crates/buzz-agent/tests/fake_llm.rs index 740288819..e3e83a6f0 100644 --- a/crates/buzz-agent/tests/fake_llm.rs +++ b/crates/buzz-agent/tests/fake_llm.rs @@ -59,6 +59,91 @@ async fn spawn_fake_llm(responses: Vec) -> String { url } +// ─── Request-capturing fake LLM server ────────────────────────────────────── + +/// Like `spawn_fake_llm` but also captures the full JSON request body from each +/// incoming HTTP request. Returns (url, captured_requests). +async fn spawn_capturing_fake_llm(responses: Vec) -> (String, Arc>>) { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let url = format!("http://{}", listener.local_addr().unwrap()); + let queue = Arc::new(Mutex::new(VecDeque::from(responses))); + let captures: Arc>> = Arc::new(Mutex::new(Vec::new())); + let captures_clone = captures.clone(); + tokio::spawn(async move { + loop { + let (mut sock, _) = match listener.accept().await { + Ok(p) => p, + Err(_) => return, + }; + let queue = queue.clone(); + let captures = captures_clone.clone(); + tokio::spawn(async move { + // Read headers. + let mut buf = Vec::new(); + let mut tmp = [0u8; 4096]; + while !buf.windows(4).any(|w| w == b"\r\n\r\n") { + match sock.read(&mut tmp).await { + Ok(0) | Err(_) => return, + Ok(n) => buf.extend_from_slice(&tmp[..n]), + } + if buf.len() > 2_000_000 { + return; + } + } + // Parse Content-Length from headers to read the body. + let header_end = buf.windows(4).position(|w| w == b"\r\n\r\n").unwrap() + 4; + let header_str = String::from_utf8_lossy(&buf[..header_end]); + let content_length: usize = header_str + .lines() + .find_map(|line| { + let lower = line.to_lowercase(); + if lower.starts_with("content-length:") { + lower + .trim_start_matches("content-length:") + .trim() + .parse() + .ok() + } else { + None + } + }) + .unwrap_or(0); + + // Collect body bytes (some may already be in buf after headers). + let mut body_buf = buf[header_end..].to_vec(); + while body_buf.len() < content_length { + match sock.read(&mut tmp).await { + Ok(0) | Err(_) => break, + Ok(n) => body_buf.extend_from_slice(&tmp[..n]), + } + } + + // Parse and store the request body. + if let Ok(parsed) = + serde_json::from_slice::(&body_buf[..content_length.min(body_buf.len())]) + { + captures.lock().await.push(parsed); + } + + // Send canned response. + let body = queue + .lock() + .await + .pop_front() + .unwrap_or_else(|| json!({ "error": "no canned response" })); + let body_s = serde_json::to_string(&body).unwrap(); + let resp = format!( + "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}", + body_s.len(), body_s, + ); + let _ = sock.write_all(resp.as_bytes()).await; + let _ = sock.shutdown().await; + }); + } + }); + (url, captures) +} + // ─── ACP harness ──────────────────────────────────────────────────────────── struct Harness { @@ -169,11 +254,11 @@ fn openai_tool_call(id: &str, name: &str, args: Value) -> Value { async fn init_session(h: &mut Harness) -> String { h.send( "initialize", - json!({"protocolVersion":1,"clientCapabilities":{}}), + json!({"protocolVersion":2,"clientCapabilities":{}}), ) .await; let r = h.recv().await; - assert_eq!(r["result"]["protocolVersion"], 1); + assert_eq!(r["result"]["protocolVersion"], 2); assert_eq!(r["result"]["agentInfo"]["name"], "buzz-agent"); h.send("session/new", json!({"cwd":"/tmp","mcpServers":[]})) .await; @@ -335,3 +420,167 @@ async fn rejects_oversized_line() { .await .expect("agent didn't exit after oversized line"); } + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn session_new_rejects_oversized_system_prompt() { + // A systemPrompt exceeding 512KB must produce a JSON-RPC error, not a panic. + let url = spawn_fake_llm(vec![]).await; + let mut h = Harness::spawn(&url).await; + h.send( + "initialize", + json!({"protocolVersion":2,"clientCapabilities":{}}), + ) + .await; + let r = h.recv().await; + assert_eq!(r["result"]["protocolVersion"], 2); + + // 600KB payload — exceeds the 512KB limit. + let big_prompt = "x".repeat(600 * 1024); + let id = h + .send( + "session/new", + json!({"cwd":"/tmp","mcpServers":[],"systemPrompt": big_prompt}), + ) + .await; + let r = h.recv_until(|v| v["id"] == json!(id)).await; + assert!( + r.get("error").is_some(), + "expected JSON-RPC error for oversized systemPrompt, got: {r}" + ); + let err_msg = r["error"]["message"].as_str().unwrap_or(""); + assert!( + err_msg.contains("512KB limit"), + "error message should mention 512KB limit, got: {err_msg}" + ); + h.shutdown().await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn system_prompt_reaches_llm_system_role() { + // Proves the full contract: systemPrompt sent via session/new → agent appends + // it to the effective system prompt → LLM receives it in the system role. + let canary = "CANARY_E2E_TEST_MARKER_7f3a9b"; + let (url, captures) = spawn_capturing_fake_llm(vec![openai_text("done")]).await; + let mut h = Harness::spawn(&url).await; + + // initialize. + h.send( + "initialize", + json!({"protocolVersion":2,"clientCapabilities":{}}), + ) + .await; + let r = h.recv().await; + assert_eq!(r["result"]["protocolVersion"], 2); + + // session/new with systemPrompt containing the canary. + let sn_id = h + .send( + "session/new", + json!({"cwd":"/tmp","mcpServers":[],"systemPrompt": canary}), + ) + .await; + let r = h.recv_until(|v| v["id"] == json!(sn_id)).await; + let sid = r["result"]["sessionId"].as_str().unwrap().to_owned(); + assert!(sid.starts_with("ses_")); + + // session/prompt — triggers the LLM call. + let p_id = h + .send( + "session/prompt", + json!({ + "sessionId": sid, + "prompt": [{"type":"text","text":"hello"}], + }), + ) + .await; + let _ = h.recv_until(|v| v["id"] == json!(p_id)).await; + + // Inspect the captured LLM request. + let reqs = captures.lock().await; + assert!(!reqs.is_empty(), "expected at least one LLM request"); + let llm_req = &reqs[0]; + let messages = llm_req["messages"].as_array().expect("messages array"); + + // First message should be the system role. + let system_msg = &messages[0]; + assert_eq!( + system_msg["role"], "system", + "first message must be system role" + ); + let system_content = system_msg["content"].as_str().unwrap_or(""); + + // Canary must appear in the system message (proves systemPrompt was used as base). + assert!( + system_content.contains(canary), + "system message must contain the canary string.\nGot: {system_content}" + ); + + // The agent's default prompt must NOT appear — it is suppressed when + // the harness provides a systemPrompt. + let default_prompt = "You are buzz-agent"; + assert!( + !system_content.contains(default_prompt), + "system message must NOT contain the default prompt when systemPrompt is provided.\nGot: {system_content}" + ); + + h.shutdown().await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn system_prompt_absent_no_canary() { + // Negative case: when systemPrompt is NOT sent in session/new, the canary + // must NOT appear in the LLM system message. + let canary = "CANARY_E2E_TEST_MARKER_7f3a9b"; + let (url, captures) = spawn_capturing_fake_llm(vec![openai_text("done")]).await; + let mut h = Harness::spawn(&url).await; + + // initialize. + h.send( + "initialize", + json!({"protocolVersion":2,"clientCapabilities":{}}), + ) + .await; + let _ = h.recv().await; + + // session/new WITHOUT systemPrompt field. + let sn_id = h + .send("session/new", json!({"cwd":"/tmp","mcpServers":[]})) + .await; + let r = h.recv_until(|v| v["id"] == json!(sn_id)).await; + let sid = r["result"]["sessionId"].as_str().unwrap().to_owned(); + + // session/prompt — triggers the LLM call. + let p_id = h + .send( + "session/prompt", + json!({ + "sessionId": sid, + "prompt": [{"type":"text","text":"hello"}], + }), + ) + .await; + let _ = h.recv_until(|v| v["id"] == json!(p_id)).await; + + // Inspect the captured LLM request. + let reqs = captures.lock().await; + assert!(!reqs.is_empty(), "expected at least one LLM request"); + let llm_req = &reqs[0]; + let messages = llm_req["messages"].as_array().expect("messages array"); + let system_msg = &messages[0]; + assert_eq!(system_msg["role"], "system"); + let system_content = system_msg["content"].as_str().unwrap_or(""); + + // Canary must NOT appear (it was never sent). + assert!( + !system_content.contains(canary), + "system message must NOT contain canary when systemPrompt is absent.\nGot: {system_content}" + ); + + // But the agent's default prompt should still be there. + assert!( + system_content.contains("You are buzz-agent"), + "system message must still contain the agent's default prompt" + ); + + h.shutdown().await; +} diff --git a/crates/buzz-agent/tests/golden_transcripts.rs b/crates/buzz-agent/tests/golden_transcripts.rs index e691e4411..1b1926617 100644 --- a/crates/buzz-agent/tests/golden_transcripts.rs +++ b/crates/buzz-agent/tests/golden_transcripts.rs @@ -181,11 +181,11 @@ async fn handshake(h: &mut Harness) -> String { let init_id = h .send( "initialize", - json!({ "protocolVersion": 1, "clientCapabilities": {} }), + json!({ "protocolVersion": 2, "clientCapabilities": {} }), ) .await; let init = h.recv_for_id(init_id).await; - assert_eq!(init["result"]["protocolVersion"], 1); + assert_eq!(init["result"]["protocolVersion"], 2); assert_eq!(init["result"]["agentInfo"]["name"], "buzz-agent"); assert_eq!( init["result"]["agentCapabilities"]["promptCapabilities"]["image"], @@ -296,7 +296,7 @@ async fn test_initialize_version_check() { ) .await; let resp = h.recv_for_id(id).await; - assert_eq!(resp["result"]["protocolVersion"], 1); + assert_eq!(resp["result"]["protocolVersion"], 2); let id2 = h .send(