From 4095e2132681459dbe036622e2841533b98db4e0 Mon Sep 17 00:00:00 2001 From: 2witstudios <2witstudios@gmail.com> Date: Tue, 10 Mar 2026 10:10:39 -0500 Subject: [PATCH] refactor: Extract shared patterns from IPC server streaming handlers Introduce StreamMode enum and parse_stream_request helper to consolidate duplicated read-parse-dispatch logic across attach, grid, and status streaming handlers. Extract dispatch_request method to flatten the connection loop. Reduces cyclomatic complexity and improves code density. Co-Authored-By: Claude Opus 4.6 --- crates/pu-engine/src/ipc_server.rs | 279 +++++++++++++---------------- 1 file changed, 129 insertions(+), 150 deletions(-) diff --git a/crates/pu-engine/src/ipc_server.rs b/crates/pu-engine/src/ipc_server.rs index d5c2b03..167cf65 100644 --- a/crates/pu-engine/src/ipc_server.rs +++ b/crates/pu-engine/src/ipc_server.rs @@ -12,6 +12,34 @@ const ATTACH_OUTPUT_CHUNK_SIZE: usize = 64 * 1024; use crate::engine::Engine; use pu_core::protocol::{Request, Response}; +type IpcReader = BufReader; +type IpcWriter = tokio::net::unix::OwnedWriteHalf; + +enum StreamMode { + None, + Attach(String), + Grid(String), + Status(String), +} + +impl StreamMode { + fn from_request(request: &Request) -> Self { + match request { + Request::Attach { agent_id } => Self::Attach(agent_id.clone()), + Request::SubscribeGrid { project_root } => Self::Grid(project_root.clone()), + Request::SubscribeStatus { project_root } => Self::Status(project_root.clone()), + _ => Self::None, + } + } +} + +fn parse_stream_request(result: std::io::Result, line: &str) -> Option { + match result { + Ok(0) | Err(_) => None, + Ok(_) => serde_json::from_str(line.trim()).ok(), + } +} + pub struct IpcServer { listener: UnixListener, engine: Arc, @@ -78,8 +106,9 @@ impl IpcServer { engine: Arc, shutdown: Arc, ) { - let (reader, mut writer) = stream.into_split(); + let (reader, writer) = stream.into_split(); let mut reader = BufReader::new(reader); + let mut writer = writer; let mut line = String::new(); loop { @@ -91,99 +120,81 @@ impl IpcServer { { Ok(0) => break, // EOF Ok(_) => { - let request: Request = match serde_json::from_str(line.trim()) { - Ok(r) => r, - Err(e) => { - let resp = Response::Error { - code: "PARSE_ERROR".into(), - message: e.to_string(), - }; - if Self::write_response(&mut writer, &resp).await.is_err() { - break; - } - continue; - } - }; - - let is_shutdown = matches!(request, Request::Shutdown); - - // Detect attach/subscribe requests to enter streaming mode - let attach_agent_id = if let Request::Attach { ref agent_id } = request { - Some(agent_id.clone()) - } else { - None - }; - let subscribe_grid_root = - if let Request::SubscribeGrid { ref project_root } = request { - Some(project_root.clone()) - } else { - None - }; - let subscribe_status_root = - if let Request::SubscribeStatus { ref project_root } = request { - Some(project_root.clone()) - } else { - None - }; - - let response = engine.handle_request(request).await; - if Self::write_response(&mut writer, &response).await.is_err() { - if is_shutdown { - shutdown.notify_one(); - } + if !Self::dispatch_request(&line, &mut reader, &mut writer, &engine, &shutdown) + .await + { break; } + } + Err(_) => break, + } + } + } - if is_shutdown { - shutdown.notify_one(); - break; - } + /// Handles a single parsed request line. Returns `true` to continue the + /// connection loop, `false` to break. + async fn dispatch_request( + line: &str, + reader: &mut IpcReader, + writer: &mut IpcWriter, + engine: &Engine, + shutdown: &Notify, + ) -> bool { + let request: Request = match serde_json::from_str(line.trim()) { + Ok(r) => r, + Err(e) => { + let resp = Response::Error { + code: "PARSE_ERROR".into(), + message: e.to_string(), + }; + return Self::write_response(writer, &resp).await.is_ok(); + } + }; - // Enter streaming sub-loop for attach - if let Some(agent_id) = attach_agent_id { - if !matches!(response, Response::AttachReady { .. }) { - break; - } - Self::handle_attach_stream(&mut reader, &mut writer, &engine, &agent_id) - .await; - } + let is_shutdown = matches!(request, Request::Shutdown); + let stream_mode = StreamMode::from_request(&request); - // Enter streaming sub-loop for grid subscription - if let Some(project_root) = subscribe_grid_root { - if matches!(response, Response::GridSubscribed) { - Self::handle_grid_stream( - &mut reader, - &mut writer, - &engine, - &project_root, - ) - .await; - } - } + let response = engine.handle_request(request).await; + if Self::write_response(writer, &response).await.is_err() { + if is_shutdown { + shutdown.notify_one(); + } + return false; + } - // Enter streaming sub-loop for status subscription - if let Some(project_root) = subscribe_status_root { - if matches!(response, Response::StatusSubscribed) { - Self::handle_status_stream( - &mut reader, - &mut writer, - &engine, - &project_root, - ) - .await; - } - } + if is_shutdown { + shutdown.notify_one(); + return false; + } + + match stream_mode { + StreamMode::Attach(agent_id) => { + if !matches!(response, Response::AttachReady { .. }) { + return false; + } + Self::handle_attach_stream(reader, writer, engine, &agent_id).await; + } + StreamMode::Grid(project_root) => { + if matches!(response, Response::GridSubscribed) { + Self::handle_grid_stream(reader, writer, engine, &project_root).await; } - Err(_) => break, } + StreamMode::Status(project_root) => { + if matches!(response, Response::StatusSubscribed) { + Self::handle_status_stream(reader, writer, engine, &project_root).await; + } + } + StreamMode::None => {} } + + true } /// Streaming attach sub-loop: sends all buffered output, then streams new output /// while accepting Input/Resize commands from the client. async fn handle_attach_stream( - reader: &mut BufReader, - writer: &mut tokio::net::unix::OwnedWriteHalf, + reader: &mut IpcReader, + writer: &mut IpcWriter, engine: &Engine, agent_id: &str, ) { @@ -219,11 +230,10 @@ impl IpcServer { return; } - let process_exited = exit_rx.borrow().is_some(); // If process already exited, we've sent all buffered output above — return // immediately. Without this, the streaming loop deadlocks: watcher never fires // (no new output), exit_rx.changed() is disabled, and reader blocks forever. - if process_exited { + if exit_rx.borrow().is_some() { tracing::debug!(agent_id, "attach stream: process already exited"); return; } @@ -241,7 +251,7 @@ impl IpcServer { break; } } - Ok(()) = exit_rx.changed(), if !process_exited => { + Ok(()) = exit_rx.changed() => { // Drain any remaining buffered output let (data, _) = buffer.read_from(offset); if !data.is_empty() { @@ -254,27 +264,14 @@ impl IpcServer { line.clear(); reader.take(MAX_MESSAGE_SIZE).read_line(&mut line).await } => { - match result { - Ok(0) => { - tracing::debug!(agent_id, "attach stream ended: client disconnected"); - break; + match parse_stream_request(result, &line) { + Some(Request::Input { data, .. }) => { + engine.write_to_pty(&master_fd, &data).await.ok(); } - Ok(_) => { - let request: Request = match serde_json::from_str(line.trim()) { - Ok(r) => r, - Err(_) => break, - }; - match request { - Request::Input { data, .. } => { - engine.write_to_pty(&master_fd, &data).await.ok(); - } - Request::Resize { cols, rows, .. } => { - engine.resize_pty(&master_fd, cols, rows).await.ok(); - } - _ => break, // Any other request exits the attach loop - } + Some(Request::Resize { cols, rows, .. }) => { + engine.resize_pty(&master_fd, cols, rows).await.ok(); } - Err(_) => break, + _ => break, } } } @@ -285,8 +282,8 @@ impl IpcServer { /// Streaming grid subscription: forwards GridEvent broadcasts to subscriber, /// accepts incoming GridCommand requests from the subscriber connection. async fn handle_grid_stream( - reader: &mut BufReader, - writer: &mut tokio::net::unix::OwnedWriteHalf, + reader: &mut IpcReader, + writer: &mut IpcWriter, engine: &Engine, project_root: &str, ) { @@ -319,25 +316,14 @@ impl IpcServer { line.clear(); reader.take(MAX_MESSAGE_SIZE).read_line(&mut line).await } => { - match result { - Ok(0) => break, // Client disconnected - Ok(_) => { - let request: Request = match serde_json::from_str(line.trim()) { - Ok(r) => r, - Err(_) => break, - }; - // Only accept GridCommand during grid stream - match request { - Request::GridCommand { command, .. } => { - let resp = engine.handle_grid_command(&pr, command).await; - if Self::write_response(writer, &resp).await.is_err() { - break; - } - } - _ => break, // Any other request exits the grid stream + match parse_stream_request(result, &line) { + Some(Request::GridCommand { command, .. }) => { + let resp = engine.handle_grid_command(&pr, command).await; + if Self::write_response(writer, &resp).await.is_err() { + break; } } - Err(_) => break, + _ => break, } } } @@ -345,11 +331,21 @@ impl IpcServer { tracing::debug!(project_root, "grid stream ended"); } + /// Computes and sends a full status snapshot. Returns `false` if the write failed. + async fn send_status(writer: &mut IpcWriter, engine: &Engine, project_root: &str) -> bool { + if let Ok((worktrees, agents)) = engine.compute_full_status(project_root).await { + let resp = Response::StatusEvent { worktrees, agents }; + Self::write_response(writer, &resp).await.is_ok() + } else { + true + } + } + /// Streaming status subscription: pushes full StatusEvent on every state change. /// Client receives real-time updates without polling. async fn handle_status_stream( - reader: &mut BufReader, - writer: &mut tokio::net::unix::OwnedWriteHalf, + reader: &mut IpcReader, + writer: &mut IpcWriter, engine: &Engine, project_root: &str, ) { @@ -359,11 +355,8 @@ impl IpcServer { tracing::debug!(project_root, "status stream started"); // Send initial status immediately - if let Ok((worktrees, agents)) = engine.compute_full_status(&pr).await { - let resp = Response::StatusEvent { worktrees, agents }; - if Self::write_response(writer, &resp).await.is_err() { - return; - } + if !Self::send_status(writer, engine, &pr).await { + return; } let mut line = String::new(); @@ -374,35 +367,24 @@ impl IpcServer { Ok(()) => { // Drain any queued signals (batch rapid changes) while rx.try_recv().is_ok() {} - if let Ok((worktrees, agents)) = engine.compute_full_status(&pr).await { - let resp = Response::StatusEvent { worktrees, agents }; - if Self::write_response(writer, &resp).await.is_err() { - break; - } + if !Self::send_status(writer, engine, &pr).await { + break; } } Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { tracing::warn!(project_root, "status subscriber lagged {n} messages"); - // Send fresh status after lag - if let Ok((worktrees, agents)) = engine.compute_full_status(&pr).await { - let resp = Response::StatusEvent { worktrees, agents }; - if Self::write_response(writer, &resp).await.is_err() { - break; - } + if !Self::send_status(writer, engine, &pr).await { + break; } } Err(tokio::sync::broadcast::error::RecvError::Closed) => break, } } - result = async { + _ = async { line.clear(); reader.take(MAX_MESSAGE_SIZE).read_line(&mut line).await } => { - match result { - Ok(0) => break, // Client disconnected - Ok(_) => break, // Any request exits the status stream - Err(_) => break, - } + break; } } } @@ -410,7 +392,7 @@ impl IpcServer { } async fn write_output_chunks( - writer: &mut tokio::net::unix::OwnedWriteHalf, + writer: &mut IpcWriter, agent_id: &str, data: &[u8], ) -> std::io::Result<()> { @@ -424,10 +406,7 @@ impl IpcServer { Ok(()) } - async fn write_response( - writer: &mut tokio::net::unix::OwnedWriteHalf, - response: &Response, - ) -> std::io::Result<()> { + async fn write_response(writer: &mut IpcWriter, response: &Response) -> std::io::Result<()> { let json = serde_json::to_string(response) .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?; writer.write_all(json.as_bytes()).await?;