diff --git a/crates/pu-engine/src/ipc_server.rs b/crates/pu-engine/src/ipc_server.rs deleted file mode 100644 index 167cf65..0000000 --- a/crates/pu-engine/src/ipc_server.rs +++ /dev/null @@ -1,831 +0,0 @@ -use std::path::Path; -use std::sync::Arc; - -use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader}; -use tokio::net::UnixListener; -use tokio::sync::{Notify, Semaphore}; - -const MAX_MESSAGE_SIZE: u64 = 1024 * 1024; // 1MB -const MAX_CONNECTIONS: usize = 64; -const ATTACH_OUTPUT_CHUNK_SIZE: usize = 64 * 1024; - -use crate::engine::Engine; -use pu_core::protocol::{Request, Response}; - -type IpcReader = BufReader; -type IpcWriter = tokio::net::unix::OwnedWriteHalf; - -enum StreamMode { - None, - Attach(String), - Grid(String), - Status(String), -} - -impl StreamMode { - fn from_request(request: &Request) -> Self { - match request { - Request::Attach { agent_id } => Self::Attach(agent_id.clone()), - Request::SubscribeGrid { project_root } => Self::Grid(project_root.clone()), - Request::SubscribeStatus { project_root } => Self::Status(project_root.clone()), - _ => Self::None, - } - } -} - -fn parse_stream_request(result: std::io::Result, line: &str) -> Option { - match result { - Ok(0) | Err(_) => None, - Ok(_) => serde_json::from_str(line.trim()).ok(), - } -} - -pub struct IpcServer { - listener: UnixListener, - engine: Arc, - shutdown: Arc, - conn_limit: Arc, -} - -impl IpcServer { - pub fn bind(socket_path: &Path, engine: Engine) -> Result { - // Remove stale socket if it exists - let _ = std::fs::remove_file(socket_path); - let listener = UnixListener::bind(socket_path)?; - Ok(Self { - listener, - engine: Arc::new(engine), - shutdown: Arc::new(Notify::new()), - conn_limit: Arc::new(Semaphore::new(MAX_CONNECTIONS)), - }) - } - - /// Get a reference to the engine for starting background tasks. - pub fn engine(&self) -> &Arc { - &self.engine - } - - pub async fn run(self) -> Result<(), std::io::Error> { - let mut sigterm = - tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?; - let mut sigint = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())?; - - loop { - tokio::select! { - accept = self.listener.accept() => { - let (stream, _addr) = accept?; - let engine = self.engine.clone(); - let shutdown = self.shutdown.clone(); - let permit = match self.conn_limit.clone().acquire_owned().await { - Ok(p) => p, - Err(_) => continue, // semaphore closed - }; - tokio::spawn(async move { - let _permit = permit; - Self::handle_connection(stream, engine, shutdown).await; - }); - } - _ = self.shutdown.notified() => { - tracing::info!("shutdown requested via IPC"); - return Ok(()); - } - _ = sigterm.recv() => { - tracing::info!("received SIGTERM, shutting down"); - return Ok(()); - } - _ = sigint.recv() => { - tracing::info!("received SIGINT, shutting down"); - return Ok(()); - } - } - } - } - - async fn handle_connection( - stream: tokio::net::UnixStream, - engine: Arc, - shutdown: Arc, - ) { - let (reader, writer) = stream.into_split(); - let mut reader = BufReader::new(reader); - let mut writer = writer; - let mut line = String::new(); - - loop { - line.clear(); - match (&mut reader) - .take(MAX_MESSAGE_SIZE) - .read_line(&mut line) - .await - { - Ok(0) => break, // EOF - Ok(_) => { - if !Self::dispatch_request(&line, &mut reader, &mut writer, &engine, &shutdown) - .await - { - break; - } - } - Err(_) => break, - } - } - } - - /// Handles a single parsed request line. Returns `true` to continue the - /// connection loop, `false` to break. - async fn dispatch_request( - line: &str, - reader: &mut IpcReader, - writer: &mut IpcWriter, - engine: &Engine, - shutdown: &Notify, - ) -> bool { - let request: Request = match serde_json::from_str(line.trim()) { - Ok(r) => r, - Err(e) => { - let resp = Response::Error { - code: "PARSE_ERROR".into(), - message: e.to_string(), - }; - return Self::write_response(writer, &resp).await.is_ok(); - } - }; - - let is_shutdown = matches!(request, Request::Shutdown); - let stream_mode = StreamMode::from_request(&request); - - let response = engine.handle_request(request).await; - if Self::write_response(writer, &response).await.is_err() { - if is_shutdown { - shutdown.notify_one(); - } - return false; - } - - if is_shutdown { - shutdown.notify_one(); - return false; - } - - match stream_mode { - StreamMode::Attach(agent_id) => { - if !matches!(response, Response::AttachReady { .. }) { - return false; - } - Self::handle_attach_stream(reader, writer, engine, &agent_id).await; - } - StreamMode::Grid(project_root) => { - if matches!(response, Response::GridSubscribed) { - Self::handle_grid_stream(reader, writer, engine, &project_root).await; - } - } - StreamMode::Status(project_root) => { - if matches!(response, Response::StatusSubscribed) { - Self::handle_status_stream(reader, writer, engine, &project_root).await; - } - } - StreamMode::None => {} - } - - true - } - - /// Streaming attach sub-loop: sends all buffered output, then streams new output - /// while accepting Input/Resize commands from the client. - async fn handle_attach_stream( - reader: &mut IpcReader, - writer: &mut IpcWriter, - engine: &Engine, - agent_id: &str, - ) { - let (buffer, master_fd, mut exit_rx) = match engine.get_attach_handles(agent_id).await { - Some(handles) => handles, - None => { - let _ = Self::write_response( - writer, - &Response::Error { - code: "AGENT_NOT_FOUND".into(), - message: format!("agent {agent_id} was removed during attach"), - }, - ) - .await; - return; - } - }; - - tracing::debug!(agent_id, "attach stream started"); - - let mut watcher = buffer.subscribe(); - - // Send buffered output in fixed-size chunks so the client can start rendering quickly. - let mut offset = 0; - let (data, new_offset) = buffer.read_from(offset); - offset = new_offset; - if !data.is_empty() - && Self::write_output_chunks(writer, agent_id, &data) - .await - .is_err() - { - tracing::debug!(agent_id, "attach stream ended: write error on initial data"); - return; - } - - // If process already exited, we've sent all buffered output above — return - // immediately. Without this, the streaming loop deadlocks: watcher never fires - // (no new output), exit_rx.changed() is disabled, and reader blocks forever. - if exit_rx.borrow().is_some() { - tracing::debug!(agent_id, "attach stream: process already exited"); - return; - } - - let mut line = String::new(); - loop { - tokio::select! { - Ok(()) = watcher.changed() => { - let (data, new_offset) = buffer.read_from(offset); - offset = new_offset; - if !data.is_empty() - && Self::write_output_chunks(writer, agent_id, &data).await.is_err() - { - tracing::debug!(agent_id, "attach stream ended: write error"); - break; - } - } - Ok(()) = exit_rx.changed() => { - // Drain any remaining buffered output - let (data, _) = buffer.read_from(offset); - if !data.is_empty() { - let _ = Self::write_output_chunks(writer, agent_id, &data).await; - } - tracing::debug!(agent_id, "attach stream ended: process exited"); - break; - } - result = async { - line.clear(); - reader.take(MAX_MESSAGE_SIZE).read_line(&mut line).await - } => { - match parse_stream_request(result, &line) { - Some(Request::Input { data, .. }) => { - engine.write_to_pty(&master_fd, &data).await.ok(); - } - Some(Request::Resize { cols, rows, .. }) => { - engine.resize_pty(&master_fd, cols, rows).await.ok(); - } - _ => break, - } - } - } - } - tracing::debug!(agent_id, "attach stream ended"); - } - - /// Streaming grid subscription: forwards GridEvent broadcasts to subscriber, - /// accepts incoming GridCommand requests from the subscriber connection. - async fn handle_grid_stream( - reader: &mut IpcReader, - writer: &mut IpcWriter, - engine: &Engine, - project_root: &str, - ) { - let mut rx = engine.subscribe_grid(project_root).await; - let pr = project_root.to_string(); - - tracing::debug!(project_root, "grid stream started"); - - let mut line = String::new(); - loop { - tokio::select! { - result = rx.recv() => { - match result { - Ok(command) => { - let resp = Response::GridEvent { - project_root: pr.clone(), - command, - }; - if Self::write_response(writer, &resp).await.is_err() { - break; - } - } - Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { - tracing::warn!(project_root, "grid subscriber lagged {n} messages"); - } - Err(tokio::sync::broadcast::error::RecvError::Closed) => break, - } - } - result = async { - line.clear(); - reader.take(MAX_MESSAGE_SIZE).read_line(&mut line).await - } => { - match parse_stream_request(result, &line) { - Some(Request::GridCommand { command, .. }) => { - let resp = engine.handle_grid_command(&pr, command).await; - if Self::write_response(writer, &resp).await.is_err() { - break; - } - } - _ => break, - } - } - } - } - tracing::debug!(project_root, "grid stream ended"); - } - - /// Computes and sends a full status snapshot. Returns `false` if the write failed. - async fn send_status(writer: &mut IpcWriter, engine: &Engine, project_root: &str) -> bool { - if let Ok((worktrees, agents)) = engine.compute_full_status(project_root).await { - let resp = Response::StatusEvent { worktrees, agents }; - Self::write_response(writer, &resp).await.is_ok() - } else { - true - } - } - - /// Streaming status subscription: pushes full StatusEvent on every state change. - /// Client receives real-time updates without polling. - async fn handle_status_stream( - reader: &mut IpcReader, - writer: &mut IpcWriter, - engine: &Engine, - project_root: &str, - ) { - let mut rx = engine.subscribe_status(project_root).await; - let pr = project_root.to_string(); - - tracing::debug!(project_root, "status stream started"); - - // Send initial status immediately - if !Self::send_status(writer, engine, &pr).await { - return; - } - - let mut line = String::new(); - loop { - tokio::select! { - result = rx.recv() => { - match result { - Ok(()) => { - // Drain any queued signals (batch rapid changes) - while rx.try_recv().is_ok() {} - if !Self::send_status(writer, engine, &pr).await { - break; - } - } - Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { - tracing::warn!(project_root, "status subscriber lagged {n} messages"); - if !Self::send_status(writer, engine, &pr).await { - break; - } - } - Err(tokio::sync::broadcast::error::RecvError::Closed) => break, - } - } - _ = async { - line.clear(); - reader.take(MAX_MESSAGE_SIZE).read_line(&mut line).await - } => { - break; - } - } - } - tracing::debug!(project_root, "status stream ended"); - } - - async fn write_output_chunks( - writer: &mut IpcWriter, - agent_id: &str, - data: &[u8], - ) -> std::io::Result<()> { - for chunk in data.chunks(ATTACH_OUTPUT_CHUNK_SIZE) { - let resp = Response::Output { - agent_id: agent_id.to_string(), - data: chunk.to_vec(), - }; - Self::write_response(writer, &resp).await?; - } - Ok(()) - } - - async fn write_response(writer: &mut IpcWriter, response: &Response) -> std::io::Result<()> { - let json = serde_json::to_string(response) - .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?; - writer.write_all(json.as_bytes()).await?; - writer.write_all(b"\n").await - } -} - -#[cfg(test)] -mod tests { - use super::*; - use pu_core::protocol::{KillTarget, Request, Response}; - use tempfile::TempDir; - use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; - use tokio::net::UnixStream; - - #[tokio::test(flavor = "current_thread")] - async fn given_ipc_server_should_accept_connection() { - let tmp = TempDir::new().unwrap(); - let sock_path = tmp.path().join("test.sock"); - let engine = Engine::new(); - - let server = IpcServer::bind(&sock_path, engine).unwrap(); - let handle = tokio::spawn(async move { server.run().await }); - - // Connect as client - tokio::time::sleep(std::time::Duration::from_millis(50)).await; - let stream = UnixStream::connect(&sock_path).await.unwrap(); - assert!(stream.peer_addr().is_ok()); - - handle.abort(); - } - - #[tokio::test(flavor = "current_thread")] - async fn given_health_request_should_respond_with_report() { - let tmp = TempDir::new().unwrap(); - let sock_path = tmp.path().join("test.sock"); - let engine = Engine::new(); - - let server = IpcServer::bind(&sock_path, engine).unwrap(); - let handle = tokio::spawn(async move { server.run().await }); - tokio::time::sleep(std::time::Duration::from_millis(50)).await; - - let stream = UnixStream::connect(&sock_path).await.unwrap(); - let (reader, mut writer) = stream.into_split(); - let mut reader = BufReader::new(reader); - - // Send health request - let req = serde_json::to_string(&Request::Health).unwrap(); - writer - .write_all(format!("{req}\n").as_bytes()) - .await - .unwrap(); - - // Read response - let mut line = String::new(); - reader.read_line(&mut line).await.unwrap(); - let resp: Response = serde_json::from_str(&line).unwrap(); - - match resp { - Response::HealthReport { - protocol_version, .. - } => { - assert_eq!(protocol_version, pu_core::protocol::PROTOCOL_VERSION); - } - other => panic!("expected HealthReport, got {other:?}"), - } - - handle.abort(); - } - - #[tokio::test(flavor = "current_thread")] - async fn given_init_request_should_create_manifest() { - let tmp = TempDir::new().unwrap(); - let sock_path = tmp.path().join("test.sock"); - let project_root = tmp.path().join("project"); - std::fs::create_dir_all(&project_root).unwrap(); - let engine = Engine::new(); - - let server = IpcServer::bind(&sock_path, engine).unwrap(); - let handle = tokio::spawn(async move { server.run().await }); - tokio::time::sleep(std::time::Duration::from_millis(50)).await; - - let stream = UnixStream::connect(&sock_path).await.unwrap(); - let (reader, mut writer) = stream.into_split(); - let mut reader = BufReader::new(reader); - - let req = serde_json::to_string(&Request::Init { - project_root: project_root.to_string_lossy().into(), - }) - .unwrap(); - writer - .write_all(format!("{req}\n").as_bytes()) - .await - .unwrap(); - - let mut line = String::new(); - reader.read_line(&mut line).await.unwrap(); - let resp: Response = serde_json::from_str(&line).unwrap(); - - match resp { - Response::InitResult { created } => assert!(created), - other => panic!("expected InitResult, got {other:?}"), - } - - // Verify manifest exists - let manifest_path = project_root.join(".pu/manifest.json"); - assert!(manifest_path.exists()); - - handle.abort(); - } - - #[tokio::test(flavor = "current_thread")] - async fn given_shutdown_request_should_respond_and_stop() { - let tmp = TempDir::new().unwrap(); - let sock_path = tmp.path().join("test.sock"); - let engine = Engine::new(); - - let server = IpcServer::bind(&sock_path, engine).unwrap(); - let handle = tokio::spawn(async move { server.run().await }); - tokio::time::sleep(std::time::Duration::from_millis(50)).await; - - let stream = UnixStream::connect(&sock_path).await.unwrap(); - let (reader, mut writer) = stream.into_split(); - let mut reader = BufReader::new(reader); - - let req = serde_json::to_string(&Request::Shutdown).unwrap(); - writer - .write_all(format!("{req}\n").as_bytes()) - .await - .unwrap(); - - let mut line = String::new(); - reader.read_line(&mut line).await.unwrap(); - let resp: Response = serde_json::from_str(&line).unwrap(); - assert!(matches!(resp, Response::ShuttingDown)); - - // Server should stop - tokio::time::sleep(std::time::Duration::from_millis(200)).await; - assert!(handle.is_finished()); - } - - /// Helper: init project + spawn agent via shared helper, then bind IPC server. - /// Returns (sock_path, agent_id, server_handle, _tmp). - async fn setup_with_agent() -> ( - std::path::PathBuf, - String, - tokio::task::JoinHandle>, - TempDir, - ) { - let (engine, agent_id, tmp) = crate::test_helpers::init_and_spawn().await; - let sock_path = tmp.path().join("test.sock"); - - let server = IpcServer::bind(&sock_path, engine).unwrap(); - let handle = tokio::spawn(async move { server.run().await }); - tokio::time::sleep(std::time::Duration::from_millis(50)).await; - - (sock_path.clone(), agent_id, handle, tmp) - } - - #[tokio::test(flavor = "current_thread")] - async fn given_attach_request_should_stream_output_continuously() { - let (sock_path, agent_id, server_handle, _tmp) = setup_with_agent().await; - - // Wait for the agent to produce some output - tokio::time::sleep(std::time::Duration::from_millis(500)).await; - - // Connect a new client and attach - let stream = UnixStream::connect(&sock_path).await.unwrap(); - let (reader, mut writer) = stream.into_split(); - let mut reader = BufReader::new(reader); - - let req = serde_json::to_string(&Request::Attach { - agent_id: agent_id.clone(), - }) - .unwrap(); - writer - .write_all(format!("{req}\n").as_bytes()) - .await - .unwrap(); - - // Should get AttachReady first - let mut line = String::new(); - reader.read_line(&mut line).await.unwrap(); - let resp: Response = serde_json::from_str(line.trim()).unwrap(); - assert!( - matches!(resp, Response::AttachReady { .. }), - "expected AttachReady, got {resp:?}" - ); - - // Should get at least one Output message with buffered data - line.clear(); - let read_result = tokio::time::timeout( - std::time::Duration::from_secs(2), - reader.read_line(&mut line), - ) - .await; - assert!(read_result.is_ok(), "timed out waiting for Output"); - let resp: Response = serde_json::from_str(line.trim()).unwrap(); - match resp { - Response::Output { agent_id: id, data } => { - assert_eq!(id, agent_id); - assert!(!data.is_empty(), "expected non-empty output data"); - } - other => panic!("expected Output, got {other:?}"), - } - - server_handle.abort(); - } - - #[tokio::test(flavor = "current_thread")] - async fn given_attach_with_input_should_forward_to_pty() { - // This test uses a separate IPC server with a cat process to test input forwarding. - // We can't easily override the spawn command through the config, so we test the - // Input path by verifying it doesn't error — the PTY write path is already tested - // in pty_manager tests. Here we verify the IPC plumbing works end-to-end. - let (sock_path, agent_id, server_handle, _tmp) = setup_with_agent().await; - tokio::time::sleep(std::time::Duration::from_millis(300)).await; - - let stream = UnixStream::connect(&sock_path).await.unwrap(); - let (reader, mut writer) = stream.into_split(); - let mut reader = BufReader::new(reader); - - // Attach - let req = serde_json::to_string(&Request::Attach { - agent_id: agent_id.clone(), - }) - .unwrap(); - writer - .write_all(format!("{req}\n").as_bytes()) - .await - .unwrap(); - - let mut line = String::new(); - reader.read_line(&mut line).await.unwrap(); // AttachReady - - // Send input — even if the process has exited, the write to the PTY fd should - // not crash the server. The server handles EPIPE/EIO gracefully. - let input_req = serde_json::to_string(&Request::Input { - agent_id: agent_id.clone(), - data: b"hello\n".to_vec(), - submit: false, - }) - .unwrap(); - writer - .write_all(format!("{input_req}\n").as_bytes()) - .await - .unwrap(); - - tokio::time::sleep(std::time::Duration::from_millis(200)).await; - - // Verify server is still running — connection didn't crash - drop(writer); - drop(reader); - - // Health check on a new connection - let stream = UnixStream::connect(&sock_path).await.unwrap(); - let (reader2, mut writer2) = stream.into_split(); - let mut reader2 = BufReader::new(reader2); - - let health_req = serde_json::to_string(&Request::Health).unwrap(); - writer2 - .write_all(format!("{health_req}\n").as_bytes()) - .await - .unwrap(); - let mut line2 = String::new(); - reader2.read_line(&mut line2).await.unwrap(); - let resp: Response = serde_json::from_str(line2.trim()).unwrap(); - assert!( - matches!(resp, Response::HealthReport { .. }), - "server still healthy after input during attach" - ); - - server_handle.abort(); - } - - #[tokio::test(flavor = "current_thread")] - async fn given_attach_with_resize_should_update_pty_size() { - let (sock_path, agent_id, server_handle, _tmp) = setup_with_agent().await; - tokio::time::sleep(std::time::Duration::from_millis(300)).await; - - let stream = UnixStream::connect(&sock_path).await.unwrap(); - let (reader, mut writer) = stream.into_split(); - let mut reader = BufReader::new(reader); - - // Attach - let req = serde_json::to_string(&Request::Attach { - agent_id: agent_id.clone(), - }) - .unwrap(); - writer - .write_all(format!("{req}\n").as_bytes()) - .await - .unwrap(); - - let mut line = String::new(); - reader.read_line(&mut line).await.unwrap(); // AttachReady - - // Send resize — should not error or crash - let resize_req = serde_json::to_string(&Request::Resize { - agent_id: agent_id.clone(), - cols: 200, - rows: 50, - }) - .unwrap(); - writer - .write_all(format!("{resize_req}\n").as_bytes()) - .await - .unwrap(); - - // If resize caused a crash we'd get EOF; give it a moment - tokio::time::sleep(std::time::Duration::from_millis(200)).await; - - // Connection should still be alive — we can drop cleanly - drop(writer); - server_handle.abort(); - } - - #[tokio::test(flavor = "current_thread")] - async fn given_killed_agent_attach_should_return_error() { - let (sock_path, agent_id, server_handle, tmp) = setup_with_agent().await; - let pr = tmp.path().join("project").to_string_lossy().to_string(); - tokio::time::sleep(std::time::Duration::from_millis(300)).await; - - // Kill the agent via IPC - let stream = UnixStream::connect(&sock_path).await.unwrap(); - let (reader, mut writer) = stream.into_split(); - let mut reader = BufReader::new(reader); - - let kill_req = serde_json::to_string(&Request::Kill { - project_root: pr, - target: KillTarget::Agent(agent_id.clone()), - exclude: vec![], - }) - .unwrap(); - writer - .write_all(format!("{kill_req}\n").as_bytes()) - .await - .unwrap(); - - let mut line = String::new(); - reader.read_line(&mut line).await.unwrap(); - let resp: Response = serde_json::from_str(line.trim()).unwrap(); - assert!( - matches!(resp, Response::KillResult { .. }), - "expected KillResult, got {resp:?}" - ); - drop(writer); - drop(reader); - - // Now attempt to attach — agent session is gone, should get AGENT_NOT_FOUND - let stream = UnixStream::connect(&sock_path).await.unwrap(); - let (reader, mut writer) = stream.into_split(); - let mut reader = BufReader::new(reader); - - let attach_req = serde_json::to_string(&Request::Attach { agent_id }).unwrap(); - writer - .write_all(format!("{attach_req}\n").as_bytes()) - .await - .unwrap(); - - let mut line = String::new(); - reader.read_line(&mut line).await.unwrap(); - let resp: Response = serde_json::from_str(line.trim()).unwrap(); - match resp { - Response::Error { code, .. } => { - assert_eq!(code, "AGENT_NOT_FOUND"); - } - other => panic!("expected AGENT_NOT_FOUND error, got {other:?}"), - } - - server_handle.abort(); - } - - #[tokio::test(flavor = "current_thread")] - async fn given_attach_disconnect_should_not_crash_server() { - let (sock_path, agent_id, server_handle, _tmp) = setup_with_agent().await; - tokio::time::sleep(std::time::Duration::from_millis(300)).await; - - // Attach and immediately disconnect - { - let stream = UnixStream::connect(&sock_path).await.unwrap(); - let (reader, mut writer) = stream.into_split(); - let mut reader = BufReader::new(reader); - - let req = serde_json::to_string(&Request::Attach { - agent_id: agent_id.clone(), - }) - .unwrap(); - writer - .write_all(format!("{req}\n").as_bytes()) - .await - .unwrap(); - - let mut line = String::new(); - reader.read_line(&mut line).await.unwrap(); // AttachReady - // Drop stream — disconnect - } - - tokio::time::sleep(std::time::Duration::from_millis(200)).await; - - // Server should still work — connect again and do a health check - let stream = UnixStream::connect(&sock_path).await.unwrap(); - let (reader, mut writer) = stream.into_split(); - let mut reader = BufReader::new(reader); - - let req = serde_json::to_string(&Request::Health).unwrap(); - writer - .write_all(format!("{req}\n").as_bytes()) - .await - .unwrap(); - - let mut line = String::new(); - reader.read_line(&mut line).await.unwrap(); - let resp: Response = serde_json::from_str(line.trim()).unwrap(); - assert!( - matches!(resp, Response::HealthReport { .. }), - "server still healthy after attach disconnect" - ); - - server_handle.abort(); - } -} diff --git a/crates/pu-engine/src/ipc_server/mod.rs b/crates/pu-engine/src/ipc_server/mod.rs new file mode 100644 index 0000000..5ab5eed --- /dev/null +++ b/crates/pu-engine/src/ipc_server/mod.rs @@ -0,0 +1,204 @@ +use std::path::Path; +use std::sync::Arc; + +use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader}; +use tokio::net::UnixListener; +use tokio::sync::{Notify, Semaphore}; + +const MAX_MESSAGE_SIZE: u64 = 1024 * 1024; // 1MB +const MAX_CONNECTIONS: usize = 64; +const ATTACH_OUTPUT_CHUNK_SIZE: usize = 64 * 1024; + +use crate::engine::Engine; +use pu_core::protocol::{Request, Response}; + +type IpcReader = BufReader; +type IpcWriter = tokio::net::unix::OwnedWriteHalf; + +mod streams; + +#[cfg(test)] +mod tests; + +enum StreamMode { + None, + Attach(String), + Grid(String), + Status(String), +} + +impl StreamMode { + fn from_request(request: &Request) -> Self { + match request { + Request::Attach { agent_id } => Self::Attach(agent_id.clone()), + Request::SubscribeGrid { project_root } => Self::Grid(project_root.clone()), + Request::SubscribeStatus { project_root } => Self::Status(project_root.clone()), + _ => Self::None, + } + } +} + +fn parse_stream_request(result: std::io::Result, line: &str) -> Option { + match result { + Ok(0) | Err(_) => None, + Ok(_) => serde_json::from_str(line.trim()).ok(), + } +} + +pub struct IpcServer { + listener: UnixListener, + engine: Arc, + shutdown: Arc, + conn_limit: Arc, +} + +impl IpcServer { + pub fn bind(socket_path: &Path, engine: Engine) -> Result { + // Remove stale socket if it exists + let _ = std::fs::remove_file(socket_path); + let listener = UnixListener::bind(socket_path)?; + Ok(Self { + listener, + engine: Arc::new(engine), + shutdown: Arc::new(Notify::new()), + conn_limit: Arc::new(Semaphore::new(MAX_CONNECTIONS)), + }) + } + + /// Get a reference to the engine for starting background tasks. + pub fn engine(&self) -> &Arc { + &self.engine + } + + pub async fn run(self) -> Result<(), std::io::Error> { + let mut sigterm = + tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?; + let mut sigint = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())?; + + loop { + tokio::select! { + accept = self.listener.accept() => { + let (stream, _addr) = accept?; + let engine = self.engine.clone(); + let shutdown = self.shutdown.clone(); + let permit = match self.conn_limit.clone().acquire_owned().await { + Ok(p) => p, + Err(_) => continue, // semaphore closed + }; + tokio::spawn(async move { + let _permit = permit; + Self::handle_connection(stream, engine, shutdown).await; + }); + } + _ = self.shutdown.notified() => { + tracing::info!("shutdown requested via IPC"); + return Ok(()); + } + _ = sigterm.recv() => { + tracing::info!("received SIGTERM, shutting down"); + return Ok(()); + } + _ = sigint.recv() => { + tracing::info!("received SIGINT, shutting down"); + return Ok(()); + } + } + } + } + + async fn handle_connection( + stream: tokio::net::UnixStream, + engine: Arc, + shutdown: Arc, + ) { + let (reader, writer) = stream.into_split(); + let mut reader = BufReader::new(reader); + let mut writer = writer; + let mut line = String::new(); + + loop { + line.clear(); + match (&mut reader) + .take(MAX_MESSAGE_SIZE) + .read_line(&mut line) + .await + { + Ok(0) => break, // EOF + Ok(_) => { + if !Self::dispatch_request(&line, &mut reader, &mut writer, &engine, &shutdown) + .await + { + break; + } + } + Err(_) => break, + } + } + } + + /// Handles a single parsed request line. Returns `true` to continue the + /// connection loop, `false` to break. + async fn dispatch_request( + line: &str, + reader: &mut IpcReader, + writer: &mut IpcWriter, + engine: &Engine, + shutdown: &Notify, + ) -> bool { + let request: Request = match serde_json::from_str(line.trim()) { + Ok(r) => r, + Err(e) => { + let resp = Response::Error { + code: "PARSE_ERROR".into(), + message: e.to_string(), + }; + return write_response(writer, &resp).await.is_ok(); + } + }; + + let is_shutdown = matches!(request, Request::Shutdown); + let stream_mode = StreamMode::from_request(&request); + + let response = engine.handle_request(request).await; + if write_response(writer, &response).await.is_err() { + if is_shutdown { + shutdown.notify_one(); + } + return false; + } + + if is_shutdown { + shutdown.notify_one(); + return false; + } + + match stream_mode { + StreamMode::Attach(agent_id) => { + if !matches!(response, Response::AttachReady { .. }) { + return false; + } + streams::handle_attach_stream(reader, writer, engine, &agent_id).await; + } + StreamMode::Grid(project_root) => { + if matches!(response, Response::GridSubscribed) { + streams::handle_grid_stream(reader, writer, engine, &project_root).await; + } + } + StreamMode::Status(project_root) => { + if matches!(response, Response::StatusSubscribed) { + streams::handle_status_stream(reader, writer, engine, &project_root).await; + } + } + StreamMode::None => {} + } + + true + } +} + +async fn write_response(writer: &mut IpcWriter, response: &Response) -> std::io::Result<()> { + let json = serde_json::to_string(response) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?; + writer.write_all(json.as_bytes()).await?; + writer.write_all(b"\n").await +} diff --git a/crates/pu-engine/src/ipc_server/streams.rs b/crates/pu-engine/src/ipc_server/streams.rs new file mode 100644 index 0000000..12d2b58 --- /dev/null +++ b/crates/pu-engine/src/ipc_server/streams.rs @@ -0,0 +1,221 @@ +use tokio::io::{AsyncBufReadExt, AsyncReadExt}; + +use crate::engine::Engine; +use pu_core::protocol::{Request, Response}; + +use super::{ + ATTACH_OUTPUT_CHUNK_SIZE, IpcReader, IpcWriter, MAX_MESSAGE_SIZE, parse_stream_request, + write_response, +}; + +/// Streaming attach sub-loop: sends all buffered output, then streams new output +/// while accepting Input/Resize commands from the client. +pub(super) async fn handle_attach_stream( + reader: &mut IpcReader, + writer: &mut IpcWriter, + engine: &Engine, + agent_id: &str, +) { + let (buffer, master_fd, mut exit_rx) = match engine.get_attach_handles(agent_id).await { + Some(handles) => handles, + None => { + let _ = write_response( + writer, + &Response::Error { + code: "AGENT_NOT_FOUND".into(), + message: format!("agent {agent_id} was removed during attach"), + }, + ) + .await; + return; + } + }; + + tracing::debug!(agent_id, "attach stream started"); + + let mut watcher = buffer.subscribe(); + + // Send buffered output in fixed-size chunks so the client can start rendering quickly. + let mut offset = 0; + let (data, new_offset) = buffer.read_from(offset); + offset = new_offset; + if !data.is_empty() && write_output_chunks(writer, agent_id, &data).await.is_err() { + tracing::debug!(agent_id, "attach stream ended: write error on initial data"); + return; + } + + // If process already exited, we've sent all buffered output above — return + // immediately. Without this, the streaming loop deadlocks: watcher never fires + // (no new output), exit_rx.changed() is disabled, and reader blocks forever. + if exit_rx.borrow().is_some() { + tracing::debug!(agent_id, "attach stream: process already exited"); + return; + } + + let mut line = String::new(); + loop { + tokio::select! { + Ok(()) = watcher.changed() => { + let (data, new_offset) = buffer.read_from(offset); + offset = new_offset; + if !data.is_empty() + && write_output_chunks(writer, agent_id, &data).await.is_err() + { + tracing::debug!(agent_id, "attach stream ended: write error"); + break; + } + } + Ok(()) = exit_rx.changed() => { + // Drain any remaining buffered output + let (data, _) = buffer.read_from(offset); + if !data.is_empty() { + let _ = write_output_chunks(writer, agent_id, &data).await; + } + tracing::debug!(agent_id, "attach stream ended: process exited"); + break; + } + result = async { + line.clear(); + reader.take(MAX_MESSAGE_SIZE).read_line(&mut line).await + } => { + match parse_stream_request(result, &line) { + Some(Request::Input { data, .. }) => { + engine.write_to_pty(&master_fd, &data).await.ok(); + } + Some(Request::Resize { cols, rows, .. }) => { + engine.resize_pty(&master_fd, cols, rows).await.ok(); + } + _ => break, + } + } + } + } + tracing::debug!(agent_id, "attach stream ended"); +} + +/// Streaming grid subscription: forwards GridEvent broadcasts to subscriber, +/// accepts incoming GridCommand requests from the subscriber connection. +pub(super) async fn handle_grid_stream( + reader: &mut IpcReader, + writer: &mut IpcWriter, + engine: &Engine, + project_root: &str, +) { + let mut rx = engine.subscribe_grid(project_root).await; + let pr = project_root.to_string(); + + tracing::debug!(project_root, "grid stream started"); + + let mut line = String::new(); + loop { + tokio::select! { + result = rx.recv() => { + match result { + Ok(command) => { + let resp = Response::GridEvent { + project_root: pr.clone(), + command, + }; + if write_response(writer, &resp).await.is_err() { + break; + } + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { + tracing::warn!(project_root, "grid subscriber lagged {n} messages"); + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + } + } + result = async { + line.clear(); + reader.take(MAX_MESSAGE_SIZE).read_line(&mut line).await + } => { + match parse_stream_request(result, &line) { + Some(Request::GridCommand { command, .. }) => { + let resp = engine.handle_grid_command(&pr, command).await; + if write_response(writer, &resp).await.is_err() { + break; + } + } + _ => break, + } + } + } + } + tracing::debug!(project_root, "grid stream ended"); +} + +/// Computes and sends a full status snapshot. Returns `false` if the write failed. +async fn send_status(writer: &mut IpcWriter, engine: &Engine, project_root: &str) -> bool { + if let Ok((worktrees, agents)) = engine.compute_full_status(project_root).await { + let resp = Response::StatusEvent { worktrees, agents }; + write_response(writer, &resp).await.is_ok() + } else { + true + } +} + +/// Streaming status subscription: pushes full StatusEvent on every state change. +/// Client receives real-time updates without polling. +pub(super) async fn handle_status_stream( + reader: &mut IpcReader, + writer: &mut IpcWriter, + engine: &Engine, + project_root: &str, +) { + let mut rx = engine.subscribe_status(project_root).await; + let pr = project_root.to_string(); + + tracing::debug!(project_root, "status stream started"); + + // Send initial status immediately + if !send_status(writer, engine, &pr).await { + return; + } + + let mut line = String::new(); + loop { + tokio::select! { + result = rx.recv() => { + match result { + Ok(()) => { + // Drain any queued signals (batch rapid changes) + while rx.try_recv().is_ok() {} + if !send_status(writer, engine, &pr).await { + break; + } + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { + tracing::warn!(project_root, "status subscriber lagged {n} messages"); + if !send_status(writer, engine, &pr).await { + break; + } + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + } + } + _ = async { + line.clear(); + reader.take(MAX_MESSAGE_SIZE).read_line(&mut line).await + } => { + break; + } + } + } + tracing::debug!(project_root, "status stream ended"); +} + +async fn write_output_chunks( + writer: &mut IpcWriter, + agent_id: &str, + data: &[u8], +) -> std::io::Result<()> { + for chunk in data.chunks(ATTACH_OUTPUT_CHUNK_SIZE) { + let resp = Response::Output { + agent_id: agent_id.to_string(), + data: chunk.to_vec(), + }; + write_response(writer, &resp).await?; + } + Ok(()) +} diff --git a/crates/pu-engine/src/ipc_server/tests.rs b/crates/pu-engine/src/ipc_server/tests.rs new file mode 100644 index 0000000..00e8727 --- /dev/null +++ b/crates/pu-engine/src/ipc_server/tests.rs @@ -0,0 +1,412 @@ +use super::*; +use pu_core::protocol::{KillTarget, Request, Response}; +use tempfile::TempDir; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::net::UnixStream; + +#[tokio::test(flavor = "current_thread")] +async fn given_ipc_server_should_accept_connection() { + let tmp = TempDir::new().unwrap(); + let sock_path = tmp.path().join("test.sock"); + let engine = Engine::new(); + + let server = IpcServer::bind(&sock_path, engine).unwrap(); + let handle = tokio::spawn(async move { server.run().await }); + + // Connect as client + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + let stream = UnixStream::connect(&sock_path).await.unwrap(); + assert!(stream.peer_addr().is_ok()); + + handle.abort(); +} + +#[tokio::test(flavor = "current_thread")] +async fn given_health_request_should_respond_with_report() { + let tmp = TempDir::new().unwrap(); + let sock_path = tmp.path().join("test.sock"); + let engine = Engine::new(); + + let server = IpcServer::bind(&sock_path, engine).unwrap(); + let handle = tokio::spawn(async move { server.run().await }); + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + let stream = UnixStream::connect(&sock_path).await.unwrap(); + let (reader, mut writer) = stream.into_split(); + let mut reader = BufReader::new(reader); + + // Send health request + let req = serde_json::to_string(&Request::Health).unwrap(); + writer + .write_all(format!("{req}\n").as_bytes()) + .await + .unwrap(); + + // Read response + let mut line = String::new(); + reader.read_line(&mut line).await.unwrap(); + let resp: Response = serde_json::from_str(&line).unwrap(); + + match resp { + Response::HealthReport { + protocol_version, .. + } => { + assert_eq!(protocol_version, pu_core::protocol::PROTOCOL_VERSION); + } + other => panic!("expected HealthReport, got {other:?}"), + } + + handle.abort(); +} + +#[tokio::test(flavor = "current_thread")] +async fn given_init_request_should_create_manifest() { + let tmp = TempDir::new().unwrap(); + let sock_path = tmp.path().join("test.sock"); + let project_root = tmp.path().join("project"); + std::fs::create_dir_all(&project_root).unwrap(); + let engine = Engine::new(); + + let server = IpcServer::bind(&sock_path, engine).unwrap(); + let handle = tokio::spawn(async move { server.run().await }); + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + let stream = UnixStream::connect(&sock_path).await.unwrap(); + let (reader, mut writer) = stream.into_split(); + let mut reader = BufReader::new(reader); + + let req = serde_json::to_string(&Request::Init { + project_root: project_root.to_string_lossy().into(), + }) + .unwrap(); + writer + .write_all(format!("{req}\n").as_bytes()) + .await + .unwrap(); + + let mut line = String::new(); + reader.read_line(&mut line).await.unwrap(); + let resp: Response = serde_json::from_str(&line).unwrap(); + + match resp { + Response::InitResult { created } => assert!(created), + other => panic!("expected InitResult, got {other:?}"), + } + + // Verify manifest exists + let manifest_path = project_root.join(".pu/manifest.json"); + assert!(manifest_path.exists()); + + handle.abort(); +} + +#[tokio::test(flavor = "current_thread")] +async fn given_shutdown_request_should_respond_and_stop() { + let tmp = TempDir::new().unwrap(); + let sock_path = tmp.path().join("test.sock"); + let engine = Engine::new(); + + let server = IpcServer::bind(&sock_path, engine).unwrap(); + let handle = tokio::spawn(async move { server.run().await }); + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + let stream = UnixStream::connect(&sock_path).await.unwrap(); + let (reader, mut writer) = stream.into_split(); + let mut reader = BufReader::new(reader); + + let req = serde_json::to_string(&Request::Shutdown).unwrap(); + writer + .write_all(format!("{req}\n").as_bytes()) + .await + .unwrap(); + + let mut line = String::new(); + reader.read_line(&mut line).await.unwrap(); + let resp: Response = serde_json::from_str(&line).unwrap(); + assert!(matches!(resp, Response::ShuttingDown)); + + // Server should stop + tokio::time::sleep(std::time::Duration::from_millis(200)).await; + assert!(handle.is_finished()); +} + +/// Helper: init project + spawn agent via shared helper, then bind IPC server. +/// Returns (sock_path, agent_id, server_handle, _tmp). +async fn setup_with_agent() -> ( + std::path::PathBuf, + String, + tokio::task::JoinHandle>, + TempDir, +) { + let (engine, agent_id, tmp) = crate::test_helpers::init_and_spawn().await; + let sock_path = tmp.path().join("test.sock"); + + let server = IpcServer::bind(&sock_path, engine).unwrap(); + let handle = tokio::spawn(async move { server.run().await }); + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + (sock_path.clone(), agent_id, handle, tmp) +} + +#[tokio::test(flavor = "current_thread")] +async fn given_attach_request_should_stream_output_continuously() { + let (sock_path, agent_id, server_handle, _tmp) = setup_with_agent().await; + + // Wait for the agent to produce some output + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + + // Connect a new client and attach + let stream = UnixStream::connect(&sock_path).await.unwrap(); + let (reader, mut writer) = stream.into_split(); + let mut reader = BufReader::new(reader); + + let req = serde_json::to_string(&Request::Attach { + agent_id: agent_id.clone(), + }) + .unwrap(); + writer + .write_all(format!("{req}\n").as_bytes()) + .await + .unwrap(); + + // Should get AttachReady first + let mut line = String::new(); + reader.read_line(&mut line).await.unwrap(); + let resp: Response = serde_json::from_str(line.trim()).unwrap(); + assert!( + matches!(resp, Response::AttachReady { .. }), + "expected AttachReady, got {resp:?}" + ); + + // Should get at least one Output message with buffered data + line.clear(); + let read_result = tokio::time::timeout( + std::time::Duration::from_secs(2), + reader.read_line(&mut line), + ) + .await; + assert!(read_result.is_ok(), "timed out waiting for Output"); + let resp: Response = serde_json::from_str(line.trim()).unwrap(); + match resp { + Response::Output { agent_id: id, data } => { + assert_eq!(id, agent_id); + assert!(!data.is_empty(), "expected non-empty output data"); + } + other => panic!("expected Output, got {other:?}"), + } + + server_handle.abort(); +} + +#[tokio::test(flavor = "current_thread")] +async fn given_attach_with_input_should_forward_to_pty() { + // This test uses a separate IPC server with a cat process to test input forwarding. + // We can't easily override the spawn command through the config, so we test the + // Input path by verifying it doesn't error — the PTY write path is already tested + // in pty_manager tests. Here we verify the IPC plumbing works end-to-end. + let (sock_path, agent_id, server_handle, _tmp) = setup_with_agent().await; + tokio::time::sleep(std::time::Duration::from_millis(300)).await; + + let stream = UnixStream::connect(&sock_path).await.unwrap(); + let (reader, mut writer) = stream.into_split(); + let mut reader = BufReader::new(reader); + + // Attach + let req = serde_json::to_string(&Request::Attach { + agent_id: agent_id.clone(), + }) + .unwrap(); + writer + .write_all(format!("{req}\n").as_bytes()) + .await + .unwrap(); + + let mut line = String::new(); + reader.read_line(&mut line).await.unwrap(); // AttachReady + + // Send input — even if the process has exited, the write to the PTY fd should + // not crash the server. The server handles EPIPE/EIO gracefully. + let input_req = serde_json::to_string(&Request::Input { + agent_id: agent_id.clone(), + data: b"hello\n".to_vec(), + submit: false, + }) + .unwrap(); + writer + .write_all(format!("{input_req}\n").as_bytes()) + .await + .unwrap(); + + tokio::time::sleep(std::time::Duration::from_millis(200)).await; + + // Verify server is still running — connection didn't crash + drop(writer); + drop(reader); + + // Health check on a new connection + let stream = UnixStream::connect(&sock_path).await.unwrap(); + let (reader2, mut writer2) = stream.into_split(); + let mut reader2 = BufReader::new(reader2); + + let health_req = serde_json::to_string(&Request::Health).unwrap(); + writer2 + .write_all(format!("{health_req}\n").as_bytes()) + .await + .unwrap(); + let mut line2 = String::new(); + reader2.read_line(&mut line2).await.unwrap(); + let resp: Response = serde_json::from_str(line2.trim()).unwrap(); + assert!( + matches!(resp, Response::HealthReport { .. }), + "server still healthy after input during attach" + ); + + server_handle.abort(); +} + +#[tokio::test(flavor = "current_thread")] +async fn given_attach_with_resize_should_update_pty_size() { + let (sock_path, agent_id, server_handle, _tmp) = setup_with_agent().await; + tokio::time::sleep(std::time::Duration::from_millis(300)).await; + + let stream = UnixStream::connect(&sock_path).await.unwrap(); + let (reader, mut writer) = stream.into_split(); + let mut reader = BufReader::new(reader); + + // Attach + let req = serde_json::to_string(&Request::Attach { + agent_id: agent_id.clone(), + }) + .unwrap(); + writer + .write_all(format!("{req}\n").as_bytes()) + .await + .unwrap(); + + let mut line = String::new(); + reader.read_line(&mut line).await.unwrap(); // AttachReady + + // Send resize — should not error or crash + let resize_req = serde_json::to_string(&Request::Resize { + agent_id: agent_id.clone(), + cols: 200, + rows: 50, + }) + .unwrap(); + writer + .write_all(format!("{resize_req}\n").as_bytes()) + .await + .unwrap(); + + // If resize caused a crash we'd get EOF; give it a moment + tokio::time::sleep(std::time::Duration::from_millis(200)).await; + + // Connection should still be alive — we can drop cleanly + drop(writer); + server_handle.abort(); +} + +#[tokio::test(flavor = "current_thread")] +async fn given_killed_agent_attach_should_return_error() { + let (sock_path, agent_id, server_handle, tmp) = setup_with_agent().await; + let pr = tmp.path().join("project").to_string_lossy().to_string(); + tokio::time::sleep(std::time::Duration::from_millis(300)).await; + + // Kill the agent via IPC + let stream = UnixStream::connect(&sock_path).await.unwrap(); + let (reader, mut writer) = stream.into_split(); + let mut reader = BufReader::new(reader); + + let kill_req = serde_json::to_string(&Request::Kill { + project_root: pr, + target: KillTarget::Agent(agent_id.clone()), + exclude: vec![], + }) + .unwrap(); + writer + .write_all(format!("{kill_req}\n").as_bytes()) + .await + .unwrap(); + + let mut line = String::new(); + reader.read_line(&mut line).await.unwrap(); + let resp: Response = serde_json::from_str(line.trim()).unwrap(); + assert!( + matches!(resp, Response::KillResult { .. }), + "expected KillResult, got {resp:?}" + ); + drop(writer); + drop(reader); + + // Now attempt to attach — agent session is gone, should get AGENT_NOT_FOUND + let stream = UnixStream::connect(&sock_path).await.unwrap(); + let (reader, mut writer) = stream.into_split(); + let mut reader = BufReader::new(reader); + + let attach_req = serde_json::to_string(&Request::Attach { agent_id }).unwrap(); + writer + .write_all(format!("{attach_req}\n").as_bytes()) + .await + .unwrap(); + + let mut line = String::new(); + reader.read_line(&mut line).await.unwrap(); + let resp: Response = serde_json::from_str(line.trim()).unwrap(); + match resp { + Response::Error { code, .. } => { + assert_eq!(code, "AGENT_NOT_FOUND"); + } + other => panic!("expected AGENT_NOT_FOUND error, got {other:?}"), + } + + server_handle.abort(); +} + +#[tokio::test(flavor = "current_thread")] +async fn given_attach_disconnect_should_not_crash_server() { + let (sock_path, agent_id, server_handle, _tmp) = setup_with_agent().await; + tokio::time::sleep(std::time::Duration::from_millis(300)).await; + + // Attach and immediately disconnect + { + let stream = UnixStream::connect(&sock_path).await.unwrap(); + let (reader, mut writer) = stream.into_split(); + let mut reader = BufReader::new(reader); + + let req = serde_json::to_string(&Request::Attach { + agent_id: agent_id.clone(), + }) + .unwrap(); + writer + .write_all(format!("{req}\n").as_bytes()) + .await + .unwrap(); + + let mut line = String::new(); + reader.read_line(&mut line).await.unwrap(); // AttachReady + // Drop stream — disconnect + } + + tokio::time::sleep(std::time::Duration::from_millis(200)).await; + + // Server should still work — connect again and do a health check + let stream = UnixStream::connect(&sock_path).await.unwrap(); + let (reader, mut writer) = stream.into_split(); + let mut reader = BufReader::new(reader); + + let req = serde_json::to_string(&Request::Health).unwrap(); + writer + .write_all(format!("{req}\n").as_bytes()) + .await + .unwrap(); + + let mut line = String::new(); + reader.read_line(&mut line).await.unwrap(); + let resp: Response = serde_json::from_str(line.trim()).unwrap(); + assert!( + matches!(resp, Response::HealthReport { .. }), + "server still healthy after attach disconnect" + ); + + server_handle.abort(); +}