Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 78 additions & 5 deletions crates/buzz-acp/src/acp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,10 @@ impl AcpClient {
/// Must be called exactly once, before any other ACP method.
/// The caller may inspect `agentCapabilities` in the returned value.
pub async fn initialize(&mut self) -> Result<serde_json::Value, AcpError> {
// Requesting version 2 is an intentional temporary pin — we are squatting
// on ACP v2 ahead of the upstream ACP RFD. Revisit when that RFD merges.
let params = serde_json::json!({
"protocolVersion": 1,
"protocolVersion": 2,
"clientCapabilities": {},
"clientInfo": {
"name": "buzz-acp",
Expand All @@ -296,17 +298,23 @@ impl AcpClient {
/// Send `session/new` and return the full response alongside the session ID.
///
/// `cwd` must be an absolute path. `mcp_servers` may be empty.
/// `system_prompt` is included in the request when `Some` — agents that
/// support the field will use it; others ignore unknown fields per JSON-RPC.
/// Callers use [`extract_model_config_options`] and [`extract_model_state`]
/// to pull model info from the raw result.
pub async fn session_new_full(
&mut self,
cwd: &str,
mcp_servers: Vec<McpServer>,
system_prompt: Option<&str>,
) -> Result<SessionNewResponse, AcpError> {
let params = serde_json::json!({
let mut params = serde_json::json!({
"cwd": cwd,
"mcpServers": mcp_servers,
});
if let Some(sp) = system_prompt {
params["systemPrompt"] = serde_json::Value::String(sp.to_owned());
}
let result = self.send_request("session/new", params).await?;
let session_id = result["sessionId"]
.as_str()
Expand All @@ -327,8 +335,12 @@ impl AcpClient {
&mut self,
cwd: &str,
mcp_servers: Vec<McpServer>,
system_prompt: Option<&str>,
) -> Result<String, AcpError> {
Ok(self.session_new_full(cwd, mcp_servers).await?.session_id)
Ok(self
.session_new_full(cwd, mcp_servers, system_prompt)
.await?
.session_id)
}

/// Send `session/set_config_option` (stable ACP path).
Expand Down Expand Up @@ -1401,15 +1413,15 @@ mod tests {
"id": 0u64,
"method": "initialize",
"params": {
"protocolVersion": 1,
"protocolVersion": 2,
"clientCapabilities": {},
"clientInfo": {
"name": "buzz-acp",
"version": "0.1.0"
}
}
});
assert_eq!(msg["params"]["protocolVersion"].as_u64(), Some(1));
assert_eq!(msg["params"]["protocolVersion"].as_u64(), Some(2));
assert_eq!(
msg["params"]["clientInfo"]["name"].as_str(),
Some("buzz-acp")
Expand Down Expand Up @@ -2031,4 +2043,65 @@ mod tests {
"expected IdleTimeout after silence, got {result:?}"
);
}

// ── session_new_full systemPrompt serialization ──────────────────────

#[tokio::test]
async fn session_new_full_includes_system_prompt_when_some() {
// Script: respond to initialize, then echo back the session/new request.
let script = r#"
read -t 2 _init
echo '{"jsonrpc":"2.0","id":0,"result":{"protocolVersion":1,"agentCapabilities":{}}}'
read -t 2 REQ
echo '{"jsonrpc":"2.0","id":1,"result":{"sessionId":"ses_test","_receivedRequest":'"$REQ"'}}'
sleep 1
"#;
let mut client = spawn_script(script).await;
client
.initialize()
.await
.expect("initialize should succeed");

let resp = client
.session_new_full("/tmp", vec![], Some("Custom system prompt"))
.await
.expect("session_new_full should succeed");

assert_eq!(resp.session_id, "ses_test");
let received = &resp.raw["_receivedRequest"];
assert_eq!(
received["params"]["systemPrompt"].as_str(),
Some("Custom system prompt"),
"systemPrompt should be included in params when Some"
);
}

#[tokio::test]
async fn session_new_full_omits_system_prompt_when_none() {
// When system_prompt is None, the field should not appear in params.
let script = r#"
read -t 2 _init
echo '{"jsonrpc":"2.0","id":0,"result":{"protocolVersion":1,"agentCapabilities":{}}}'
read -t 2 REQ
echo '{"jsonrpc":"2.0","id":1,"result":{"sessionId":"ses_test","_receivedRequest":'"$REQ"'}}'
sleep 1
"#;
let mut client = spawn_script(script).await;
client
.initialize()
.await
.expect("initialize should succeed");

let resp = client
.session_new_full("/tmp", vec![], None)
.await
.expect("session_new_full should succeed");

assert_eq!(resp.session_id, "ses_test");
let received = &resp.raw["_receivedRequest"];
assert!(
received["params"]["systemPrompt"].is_null(),
"systemPrompt should NOT be in params when value is None"
);
}
}
61 changes: 49 additions & 12 deletions crates/buzz-acp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use pool::{
AgentPool, ControlSignal, OwnedAgent, PromptContext, PromptOutcome, PromptResult, PromptSource,
SessionState,
};
use queue::{prepend_base_prompt, EventQueue, QueuedEvent, ThreadTags};
use queue::{EventQueue, QueuedEvent, ThreadTags};
use relay::{HarnessRelay, RelayEventPublisher};
use tokio::sync::{mpsc, watch};
use tracing_subscriber::EnvFilter;
Expand Down Expand Up @@ -695,7 +695,7 @@ fn any_respawn_in_flight(crash_history: &[SlotCircuit]) -> bool {
/// Result of a background respawn task.
struct RespawnResult {
index: usize,
result: Result<AcpClient>,
result: Result<(AcpClient, u32)>,
}

/// RAII guard that ensures a `RespawnResult` is sent even if the task panics.
Expand All @@ -719,7 +719,7 @@ impl RespawnGuard {
/// Send the result and disarm the guard. Uses `try_send` (sync) so there
/// is no await boundary between marking `sent` and actually enqueueing —
/// cancellation cannot slip between the two.
fn send(mut self, result: Result<AcpClient>) {
fn send(mut self, result: Result<(AcpClient, u32)>) {
// Invariant: try_send succeeds because the channel capacity equals the
// slot count, and respawn_in_flight guarantees at most one outstanding
// result per slot. If this ever fails, the channel sizing or the
Expand Down Expand Up @@ -839,6 +839,8 @@ async fn tokio_main() -> Result<()> {
match tokio::time::timeout(Duration::from_secs(60), acp.initialize()).await {
Ok(Ok(init_result)) => {
tracing::info!(agent = i, "agent initialized: {init_result}");
let protocol_version =
init_result["protocolVersion"].as_u64().unwrap_or(1) as u32;
acp.observe(
"agent_initialized",
serde_json::json!({
Expand All @@ -852,6 +854,7 @@ async fn tokio_main() -> Result<()> {
state: SessionState::default(),
model_capabilities: None,
desired_model: config.model.clone(),
protocol_version,
}));
}
Ok(Err(e)) => {
Expand Down Expand Up @@ -1279,13 +1282,14 @@ async fn tokio_main() -> Result<()> {
while let Ok(rr) = respawn_rx.try_recv() {
crash_history[rr.index].respawn_in_flight = false;
match rr.result {
Ok(acp) => {
Ok((acp, protocol_version)) => {
let agent = OwnedAgent {
index: rr.index,
acp,
state: SessionState::default(),
model_capabilities: None,
desired_model: config.model.clone(),
protocol_version,
};
pool.return_agent(agent);
tracing::info!(agent = rr.index, "respawn complete");
Expand Down Expand Up @@ -1887,7 +1891,7 @@ async fn tokio_main() -> Result<()> {
// Drain any respawn results that completed before the abort. Explicitly
// shut down returned agents instead of relying on AcpClient::Drop.
while let Ok(rr) = respawn_rx.try_recv() {
if let Ok(mut acp) = rr.result {
if let Ok((mut acp, _)) = rr.result {
acp.shutdown().await;
tracing::debug!(agent = rr.index, "reaped respawned agent on shutdown");
}
Expand Down Expand Up @@ -2429,10 +2433,10 @@ fn dispatch_heartbeat(
.heartbeat_prompt
.clone()
.unwrap_or_else(default_heartbeat_prompt);
let prompt_text = match ctx.base_prompt {
Some(bp) => prepend_base_prompt(bp, &prompt_text),
None => prompt_text,
};
// For legacy agents (protocol_version < 2), prepend base_prompt to the
// heartbeat user message since they don't receive it via session/new.
let prompt_text =
pool::prepend_base_for_legacy(agent.protocol_version, ctx.base_prompt, &prompt_text);
let result_tx = pool.result_tx();
let ctx_clone = Arc::clone(ctx);
let agent_index = agent.index;
Expand Down Expand Up @@ -2546,7 +2550,7 @@ async fn spawn_and_init(
extra_env: &[(String, String)],
agent_index: usize,
observer: Option<observer::ObserverHandle>,
) -> Result<AcpClient> {
) -> Result<(AcpClient, u32)> {
let mut acp = AcpClient::spawn(command, args, extra_env)
.await
.map_err(|e| anyhow::anyhow!("failed to spawn agent: {e}"))?;
Expand All @@ -2555,14 +2559,15 @@ async fn spawn_and_init(
match acp.initialize().await {
Ok(init_result) => {
tracing::info!("agent initialized: {init_result}");
let protocol_version = init_result["protocolVersion"].as_u64().unwrap_or(1) as u32;
acp.observe(
"agent_initialized",
serde_json::json!({
"agentIndex": agent_index,
"initializeResult": init_result,
}),
);
Ok(acp)
Ok((acp, protocol_version))
}
Err(e) => {
// Explicitly shut down the spawned child to prevent zombie/leak.
Expand Down Expand Up @@ -2605,7 +2610,7 @@ async fn run_models(args: ModelsArgs) -> Result<()> {
// so shutdown() runs on all paths (success, error, timeout).
let protocol_result = tokio::time::timeout(MODELS_TIMEOUT, async {
let init = client.initialize().await?;
let session = client.session_new_full(&cwd, vec![]).await?;
let session = client.session_new_full(&cwd, vec![], None).await?;
Ok::<_, acp::AcpError>((init, session))
})
.await;
Expand Down Expand Up @@ -2764,6 +2769,38 @@ fn build_mcp_servers(config: &Config) -> Vec<McpServer> {

// ── Tests ─────────────────────────────────────────────────────────────────────

#[cfg(test)]
mod heartbeat_base_prompt_tests {
use super::*;

// Pins the heartbeat dispatch path (dispatch_heartbeat, ~line 2359): a
// legacy agent WITH a base_prompt must get [Base] prepended to the
// heartbeat user message, composed as `[Base]\n{bp}\n\n{prompt}`. This is
// the second half of the round-2 regression (the first being initial_message).

#[test]
fn test_heartbeat_legacy_agent_gets_base_prepended() {
// protocol_version 1 + Some(base_prompt): heartbeat prompt is prefixed
// with the [Base] section exactly as the legacy session/new path would.
let prompt = "[System: Heartbeat]\nrun feed get";
let composed = pool::prepend_base_for_legacy(1, Some("you are a helpful agent"), prompt);
assert_eq!(
composed,
"[Base]\nyou are a helpful agent\n\n[System: Heartbeat]\nrun feed get"
);
assert!(composed.starts_with("[Base]\nyou are a helpful agent\n\n"));
}

#[test]
fn test_heartbeat_modern_agent_omits_base() {
// protocol_version 2 gets base_prompt via session/new; the heartbeat
// prompt is sent verbatim.
let prompt = "[System: Heartbeat]\nrun feed get";
let composed = pool::prepend_base_for_legacy(2, Some("you are a helpful agent"), prompt);
assert_eq!(composed, prompt);
}
}

#[cfg(test)]
mod owner_control_command_tests {
use super::*;
Expand Down
Loading