From 5ee0120593a3b2a431c7fe2eddf8d1ac8bc2f92b Mon Sep 17 00:00:00 2001 From: Mohammod Al Amin Ashik Date: Sat, 14 Feb 2026 09:06:15 +0800 Subject: [PATCH 1/8] fix(streamable-http): return 409 Conflict when standalone SSE stream already active LocalSessionWorker::resume() unconditionally replaced self.common.tx on every GET request, orphaning the receiver the first SSE stream was reading from. All subsequent server-to-client notifications were sent to the new sender while the original client was still listening on the old, now-dead receiver. notify_tool_list_changed().await returned Ok(()) silently. This is triggered by VS Code's MCP extension which reconnects SSE every ~5 minutes with the same session ID. Fix: Check tx.is_closed() before replacing the common channel sender. If an active stream exists, return SessionError::Conflict which is propagated as HTTP 409 Conflict. This matches the TypeScript SDK behavior (streamableHttp.ts:423). Signed-off-by: Mohammod Al Amin Ashik --- crates/rmcp/Cargo.toml | 5 + .../streamable_http_server/session/local.rs | 7 + .../transport/streamable_http_server/tower.rs | 34 ++- .../tests/test_sse_channel_replacement_bug.rs | 224 ++++++++++++++++++ 4 files changed, 266 insertions(+), 4 deletions(-) create mode 100644 crates/rmcp/tests/test_sse_channel_replacement_bug.rs diff --git a/crates/rmcp/Cargo.toml b/crates/rmcp/Cargo.toml index ea7a308af..32acd1c5f 100644 --- a/crates/rmcp/Cargo.toml +++ b/crates/rmcp/Cargo.toml @@ -241,3 +241,8 @@ required-features = [ "transport-streamable-http-server", ] path = "tests/test_custom_headers.rs" + +[[test]] +name = "test_sse_channel_replacement_bug" +required-features = ["server", "client", "transport-streamable-http-server", "transport-streamable-http-client", "reqwest"] +path = "tests/test_sse_channel_replacement_bug.rs" diff --git a/crates/rmcp/src/transport/streamable_http_server/session/local.rs b/crates/rmcp/src/transport/streamable_http_server/session/local.rs index d68d63e1a..375743d23 100644 --- a/crates/rmcp/src/transport/streamable_http_server/session/local.rs +++ b/crates/rmcp/src/transport/streamable_http_server/session/local.rs @@ -315,6 +315,8 @@ pub enum SessionError { SessionServiceTerminated, #[error("Invalid event id")] InvalidEventId, + #[error("Conflict: Only one standalone SSE stream is allowed per session")] + Conflict, #[error("IO error: {0}")] Io(#[from] std::io::Error), } @@ -531,6 +533,11 @@ impl LocalSessionWorker { }) } None => { + // Reject if there's already an active standalone SSE stream. + // Matches TypeScript SDK behavior (409 Conflict). + if !self.common.tx.is_closed() { + return Err(SessionError::Conflict); + } let channel = tokio::sync::mpsc::channel(self.session_config.channel_capacity); let (tx, rx) = channel; self.common.tx = tx; diff --git a/crates/rmcp/src/transport/streamable_http_server/tower.rs b/crates/rmcp/src/transport/streamable_http_server/tower.rs index 37d4a008c..7ebe7207c 100644 --- a/crates/rmcp/src/transport/streamable_http_server/tower.rs +++ b/crates/rmcp/src/transport/streamable_http_server/tower.rs @@ -215,11 +215,23 @@ where .map(|s| s.to_owned()); if let Some(last_event_id) = last_event_id { // check if session has this event id - let stream = self + let stream = match self .session_manager .resume(&session_id, last_event_id) .await - .map_err(internal_error_response("resume session"))?; + { + Ok(stream) => stream, + Err(e) if e.to_string().contains("Conflict:") => { + return Ok(Response::builder() + .status(http::StatusCode::CONFLICT) + .body( + Full::new(Bytes::from(e.to_string())) + .boxed(), + ) + .expect("valid response")); + } + Err(e) => return Err(internal_error_response("resume session")(e)), + }; // Resume doesn't need priming - client already has the event ID Ok(sse_stream_response( stream, @@ -228,11 +240,25 @@ where )) } else { // create standalone stream - let stream = self + let stream = match self .session_manager .create_standalone_stream(&session_id) .await - .map_err(internal_error_response("create standalone stream"))?; + { + Ok(stream) => stream, + Err(e) if e.to_string().contains("Conflict:") => { + return Ok(Response::builder() + .status(http::StatusCode::CONFLICT) + .body( + Full::new(Bytes::from(e.to_string())) + .boxed(), + ) + .expect("valid response")); + } + Err(e) => { + return Err(internal_error_response("create standalone stream")(e)) + } + }; // Prepend priming event if sse_retry configured let stream = if let Some(retry) = self.config.sse_retry { let priming = ServerSseMessage { diff --git a/crates/rmcp/tests/test_sse_channel_replacement_bug.rs b/crates/rmcp/tests/test_sse_channel_replacement_bug.rs new file mode 100644 index 000000000..8256ee1e2 --- /dev/null +++ b/crates/rmcp/tests/test_sse_channel_replacement_bug.rs @@ -0,0 +1,224 @@ +/// Test that demonstrates the channel replacement bug +/// +/// This test reproduces the real-world scenario where VS Code reconnects SSE +/// every ~5 minutes by sending multiple GET requests with the SAME session ID. +/// +/// Expected: Second GET should return 409 Conflict (like TypeScript SDK) +/// Actual: Second GET succeeds, replaces channel, orphans first receiver (BUG) +/// +/// Root cause: local.rs:536 unconditionally replaces self.common.tx + +use std::sync::Arc; +use std::time::Duration; + +use rmcp::{ + ServerHandler, + model::{ + ServerCapabilities, ServerInfo, + ToolsCapability, Implementation, ProtocolVersion, + }, + service::NotificationContext, + RoleServer, + transport::streamable_http_server::{ + StreamableHttpServerConfig, StreamableHttpService, + session::local::LocalSessionManager, + }, +}; +use tokio::sync::Notify; +use tokio_util::sync::CancellationToken; +use reqwest; +use serde_json::json; + +// Test server that sends notifications on demand +#[derive(Clone)] +pub struct TestServer { + trigger: Arc, +} + +impl TestServer { + fn new(trigger: Arc) -> Self { + Self { trigger } + } +} + +impl ServerHandler for TestServer { + fn get_info(&self) -> ServerInfo { + ServerInfo { + protocol_version: ProtocolVersion::LATEST, + capabilities: ServerCapabilities::builder() + .enable_tools_with(ToolsCapability { + list_changed: Some(true), + }) + .build(), + server_info: Implementation { + name: "test-server".to_string(), + version: "1.0.0".to_string(), + ..Default::default() + }, + instructions: None, + } + } + + async fn on_initialized(&self, context: NotificationContext) { + let peer = context.peer.clone(); + let trigger = self.trigger.clone(); + + tokio::spawn(async move { + trigger.notified().await; + + println!("πŸ”” Server sending notification..."); + match peer.notify_tool_list_changed().await { + Ok(()) => println!("βœ… notify_tool_list_changed() returned Ok(())"), + Err(e) => println!("❌ notify_tool_list_changed() failed: {}", e), + } + }); + } +} + +#[tokio::test] +async fn test_channel_replacement_bug() { + let _ = tracing_subscriber::fmt() + .with_env_filter("debug") + .try_init(); + + let ct = CancellationToken::new(); + let notification_trigger = Arc::new(Notify::new()); + + // Start HTTP server + let server = TestServer::new(notification_trigger.clone()); + let service = StreamableHttpService::new( + move || Ok(server.clone()), + Arc::new(LocalSessionManager::default()), + StreamableHttpServerConfig { + stateful_mode: true, + sse_keep_alive: Some(Duration::from_secs(15)), + sse_retry: Some(Duration::from_secs(3)), + cancellation_token: ct.child_token(), + }, + ); + + let router = axum::Router::new().nest_service("/mcp", service); + let listener = tokio::net::TcpListener::bind("127.0.0.1:0") + .await + .expect("bind"); + let addr = listener.local_addr().unwrap(); + let url = format!("http://127.0.0.1:{}/mcp", addr.port()); + + let ct_clone = ct.clone(); + tokio::spawn(async move { + axum::serve(listener, router) + .with_graceful_shutdown(async move { ct_clone.cancelled().await }) + .await + .unwrap(); + }); + + tokio::time::sleep(Duration::from_millis(100)).await; + + println!("\n=== SSE RECONNECTION BUG REPRODUCTION ===\n"); + println!("This test reproduces the real-world VS Code reconnection scenario:"); + println!("VS Code sends GET requests every ~5 minutes with the SAME session ID."); + println!("Each GET call triggers establish_common_channel() β†’ channel replacement β†’ bug!\n"); + + let http_client = reqwest::Client::new(); + + // STEP 1: POST initialize to create session and get session ID + println!("πŸ“‘ STEP 1: POST /initialize to create session..."); + + let init_request = json!({ + "jsonrpc": "2.0", + "id": 1, + "method": "initialize", + "params": { + "protocolVersion": "2024-11-05", + "capabilities": {}, + "clientInfo": { + "name": "test-client", + "version": "1.0.0" + } + } + }); + + let post_response = http_client + .post(&url) + .header("Accept", "text/event-stream, application/json") + .header("Content-Type", "application/json") + .json(&init_request) + .timeout(Duration::from_millis(500)) // Short timeout - POST opens SSE stream + .send() + .await + .expect("POST initialize"); + + // In stateful mode, POST opens SSE stream and returns session ID in header + let session_id = post_response + .headers() + .get("Mcp-Session-Id") + .map(|v| v.to_str().unwrap_or("").to_string()); + + if let Some(ref sid) = session_id { + println!("βœ… Session created: {}", sid); + println!(" Response status: {}", post_response.status()); + } else { + println!("⚠️ No Mcp-Session-Id header in POST response"); + println!(" Response status: {}", post_response.status()); + println!(" Available headers:"); + for (name, value) in post_response.headers() { + println!(" {}: {:?}", name, value); + } + } + + // If no session ID from POST, we can't proceed with the test + let session_id = session_id.expect("Session ID required for test"); + println!(); + + // STEP 2: First GET with session ID to establish SSE stream. + // IMPORTANT: We must keep this response alive so the server-side rx stays open. + println!("πŸ“‘ STEP 2: First GET (establish SSE stream)..."); + println!(" Using session: {}", session_id); + + let _get1_response = http_client + .get(&url) + .header("Accept", "text/event-stream") + .header("Mcp-Session-Id", &session_id) + .send() + .await + .expect("First GET request failed"); + + println!(" Status: {}", _get1_response.status()); + assert!(_get1_response.status().is_success(), "First GET should succeed"); + println!(" βœ… First SSE stream established (receiver listening on rx1)"); + + // Give server time to set up the channel + tokio::time::sleep(Duration::from_millis(200)).await; + + // STEP 3: Second GET with SAME session ID β€” should return 409 Conflict + println!("πŸ“‘ STEP 3: Second GET with SAME session ID..."); + println!(" Using session: {}", session_id); + println!(" This simulates VS Code reconnecting after ~5 minutes"); + + let get2_response = http_client + .get(&url) + .header("Accept", "text/event-stream") + .header("Mcp-Session-Id", &session_id) + .timeout(Duration::from_millis(500)) + .send() + .await + .expect("Second GET request failed"); + + let status = get2_response.status(); + println!(" Status: {}", status); + + assert_eq!( + status.as_u16(), + 409, + "Second GET should return 409 Conflict (got {}). \ + Without the fix, the channel sender is silently replaced, \ + orphaning the first receiver and losing all notifications.", + status + ); + + println!(" βœ… 409 Conflict returned (matches TypeScript SDK behavior)"); + + // Cleanup + ct.cancel(); + tokio::time::sleep(Duration::from_millis(100)).await; +} From 60764ccfac2591600a067cf63d60eb81fd008fe9 Mon Sep 17 00:00:00 2001 From: Mohammod Al Amin Ashik Date: Sat, 14 Feb 2026 11:16:31 +0800 Subject: [PATCH 2/8] fix(streamable-http): handle resume with completed request-wise channel When a client sends GET with Last-Event-ID from a completed POST SSE response, the request-wise channel no longer exists in tx_router. Previously this returned ChannelClosed -> 500, causing clients like Cursor to enter an infinite re-initialization loop. Now falls back to the common channel when the request-wise channel is completed, per MCP spec: "Resumption applies regardless of how the original stream was initiated (POST or GET)." --- .../streamable_http_server/session/local.rs | 49 +++++++++++++------ .../transport/streamable_http_server/tower.rs | 14 ++---- .../tests/test_sse_channel_replacement_bug.rs | 21 ++++---- 3 files changed, 47 insertions(+), 37 deletions(-) diff --git a/crates/rmcp/src/transport/streamable_http_server/session/local.rs b/crates/rmcp/src/transport/streamable_http_server/session/local.rs index 375743d23..3594a2bbb 100644 --- a/crates/rmcp/src/transport/streamable_http_server/session/local.rs +++ b/crates/rmcp/src/transport/streamable_http_server/session/local.rs @@ -517,20 +517,41 @@ impl LocalSessionWorker { ) -> Result { match last_event_id.http_request_id { Some(http_request_id) => { - let request_wise = self - .tx_router - .get_mut(&http_request_id) - .ok_or(SessionError::ChannelClosed(Some(http_request_id)))?; - let channel = tokio::sync::mpsc::channel(self.session_config.channel_capacity); - let (tx, rx) = channel; - request_wise.tx.tx = tx; - let index = last_event_id.index; - // sync messages after index - request_wise.tx.sync(index).await?; - Ok(StreamableHttpMessageReceiver { - http_request_id: Some(http_request_id), - inner: rx, - }) + if let Some(request_wise) = self.tx_router.get_mut(&http_request_id) { + // Resume existing request-wise channel + let channel = tokio::sync::mpsc::channel(self.session_config.channel_capacity); + let (tx, rx) = channel; + request_wise.tx.tx = tx; + let index = last_event_id.index; + // sync messages after index + request_wise.tx.sync(index).await?; + Ok(StreamableHttpMessageReceiver { + http_request_id: Some(http_request_id), + inner: rx, + }) + } else { + // Request-wise channel completed (POST response already delivered). + // Per MCP spec, resumption via GET is always valid regardless of how + // the original stream was initiated. Since the request-wise stream is + // complete, fall through to common channel for server notifications. + tracing::debug!( + http_request_id, + "Request-wise channel completed, falling back to common channel" + ); + if !self.common.tx.is_closed() { + return Err(SessionError::Conflict); + } + let channel = tokio::sync::mpsc::channel(self.session_config.channel_capacity); + let (tx, rx) = channel; + self.common.tx = tx; + // Sync from beginning of common channel cache since the client + // only has events from the completed request-wise channel + self.common.sync(0).await?; + Ok(StreamableHttpMessageReceiver { + http_request_id: None, + inner: rx, + }) + } } None => { // Reject if there's already an active standalone SSE stream. diff --git a/crates/rmcp/src/transport/streamable_http_server/tower.rs b/crates/rmcp/src/transport/streamable_http_server/tower.rs index 7ebe7207c..0d70c6a19 100644 --- a/crates/rmcp/src/transport/streamable_http_server/tower.rs +++ b/crates/rmcp/src/transport/streamable_http_server/tower.rs @@ -224,10 +224,7 @@ where Err(e) if e.to_string().contains("Conflict:") => { return Ok(Response::builder() .status(http::StatusCode::CONFLICT) - .body( - Full::new(Bytes::from(e.to_string())) - .boxed(), - ) + .body(Full::new(Bytes::from(e.to_string())).boxed()) .expect("valid response")); } Err(e) => return Err(internal_error_response("resume session")(e)), @@ -249,15 +246,10 @@ where Err(e) if e.to_string().contains("Conflict:") => { return Ok(Response::builder() .status(http::StatusCode::CONFLICT) - .body( - Full::new(Bytes::from(e.to_string())) - .boxed(), - ) + .body(Full::new(Bytes::from(e.to_string())).boxed()) .expect("valid response")); } - Err(e) => { - return Err(internal_error_response("create standalone stream")(e)) - } + Err(e) => return Err(internal_error_response("create standalone stream")(e)), }; // Prepend priming event if sse_retry configured let stream = if let Some(retry) = self.config.sse_retry { diff --git a/crates/rmcp/tests/test_sse_channel_replacement_bug.rs b/crates/rmcp/tests/test_sse_channel_replacement_bug.rs index 8256ee1e2..847eebd97 100644 --- a/crates/rmcp/tests/test_sse_channel_replacement_bug.rs +++ b/crates/rmcp/tests/test_sse_channel_replacement_bug.rs @@ -7,27 +7,21 @@ /// Actual: Second GET succeeds, replaces channel, orphans first receiver (BUG) /// /// Root cause: local.rs:536 unconditionally replaces self.common.tx - use std::sync::Arc; use std::time::Duration; +use reqwest; use rmcp::{ - ServerHandler, - model::{ - ServerCapabilities, ServerInfo, - ToolsCapability, Implementation, ProtocolVersion, - }, + RoleServer, ServerHandler, + model::{Implementation, ProtocolVersion, ServerCapabilities, ServerInfo, ToolsCapability}, service::NotificationContext, - RoleServer, transport::streamable_http_server::{ - StreamableHttpServerConfig, StreamableHttpService, - session::local::LocalSessionManager, + StreamableHttpServerConfig, StreamableHttpService, session::local::LocalSessionManager, }, }; +use serde_json::json; use tokio::sync::Notify; use tokio_util::sync::CancellationToken; -use reqwest; -use serde_json::json; // Test server that sends notifications on demand #[derive(Clone)] @@ -184,7 +178,10 @@ async fn test_channel_replacement_bug() { .expect("First GET request failed"); println!(" Status: {}", _get1_response.status()); - assert!(_get1_response.status().is_success(), "First GET should succeed"); + assert!( + _get1_response.status().is_success(), + "First GET should succeed" + ); println!(" βœ… First SSE stream established (receiver listening on rx1)"); // Give server time to set up the channel From b6a457a3119afa91a706067339f97ac0751e6bc4 Mon Sep 17 00:00:00 2001 From: Mohammod Al Amin Ashik Date: Sat, 14 Feb 2026 11:30:13 +0800 Subject: [PATCH 3/8] fix: allow SSE channel replacement instead of 409 Conflict MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per MCP spec Β§Streamable HTTP, "The client MAY remain connected to multiple SSE streams simultaneously." Returning 409 Conflict when a second GET arrives causes Cursor to enter an infinite re-initialization loop (~3s cycle). Instead of rejecting, replace the old common channel sender. Dropping the old sender closes the old receiver, cleanly terminating the previous SSE stream so the client can reconnect on the new stream. This fixes both code paths: - GET with Last-Event-ID from a completed POST SSE response - GET without Last-Event-ID (standalone stream reconnection) --- .../streamable_http_server/session/local.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/crates/rmcp/src/transport/streamable_http_server/session/local.rs b/crates/rmcp/src/transport/streamable_http_server/session/local.rs index 3594a2bbb..a728d2120 100644 --- a/crates/rmcp/src/transport/streamable_http_server/session/local.rs +++ b/crates/rmcp/src/transport/streamable_http_server/session/local.rs @@ -534,13 +534,14 @@ impl LocalSessionWorker { // Per MCP spec, resumption via GET is always valid regardless of how // the original stream was initiated. Since the request-wise stream is // complete, fall through to common channel for server notifications. + // + // Replace the existing common channel sender if active β€” dropping + // the old sender closes the old receiver, terminating that SSE stream + // cleanly so the client can reconnect on this new stream. tracing::debug!( http_request_id, "Request-wise channel completed, falling back to common channel" ); - if !self.common.tx.is_closed() { - return Err(SessionError::Conflict); - } let channel = tokio::sync::mpsc::channel(self.session_config.channel_capacity); let (tx, rx) = channel; self.common.tx = tx; @@ -554,11 +555,10 @@ impl LocalSessionWorker { } } None => { - // Reject if there's already an active standalone SSE stream. - // Matches TypeScript SDK behavior (409 Conflict). - if !self.common.tx.is_closed() { - return Err(SessionError::Conflict); - } + // Per MCP spec Β§Streamable HTTP, "The client MAY remain connected + // to multiple SSE streams simultaneously." When a new common-channel + // GET arrives we replace the sender; dropping the old sender closes + // the old receiver, cleanly terminating the previous SSE stream. let channel = tokio::sync::mpsc::channel(self.session_config.channel_capacity); let (tx, rx) = channel; self.common.tx = tx; From 31add27a2a187c9b94c5766ea1c0d333a506201b Mon Sep 17 00:00:00 2001 From: Mohammod Al Amin Ashik Date: Sat, 14 Feb 2026 12:50:13 +0800 Subject: [PATCH 4/8] fix: skip cache replay when replacing active SSE stream MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a client opens a new GET SSE stream while a previous one is still active, the old sender is dropped (terminating the old stream) and a new channel is created. Previously, sync() replayed all cached events to the new stream, but the client already received those events on the old stream. This caused an infinite notification loop: 1. Client receives notifications (e.g. ResourceListChanged) 2. Old SSE stream dies (sender replaced) 3. Client reconnects after sse_retry (3s) 4. sync() replays cached notifications the client already handled 5. Client processes them again β†’ goto 2 Fix: check tx.is_closed() BEFORE replacing the sender. If the old stream was still alive, skip replay entirely β€” the client already has those events. Only replay when the old stream was genuinely dead (network failure, timeout) so the client catches up on missed events. --- .../streamable_http_server/session/local.rs | 24 ++++++++++++++----- 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/crates/rmcp/src/transport/streamable_http_server/session/local.rs b/crates/rmcp/src/transport/streamable_http_server/session/local.rs index a728d2120..26180605e 100644 --- a/crates/rmcp/src/transport/streamable_http_server/session/local.rs +++ b/crates/rmcp/src/transport/streamable_http_server/session/local.rs @@ -538,16 +538,22 @@ impl LocalSessionWorker { // Replace the existing common channel sender if active β€” dropping // the old sender closes the old receiver, terminating that SSE stream // cleanly so the client can reconnect on this new stream. + let was_active = !self.common.tx.is_closed(); tracing::debug!( http_request_id, + was_active, "Request-wise channel completed, falling back to common channel" ); let channel = tokio::sync::mpsc::channel(self.session_config.channel_capacity); let (tx, rx) = channel; self.common.tx = tx; - // Sync from beginning of common channel cache since the client - // only has events from the completed request-wise channel - self.common.sync(0).await?; + if !was_active { + // Common stream was dead β€” replay missed events so the + // client catches up on notifications it never received. + self.common.sync(0).await?; + } + // If we replaced a live stream, skip replay: the client + // already received those events on the previous stream. Ok(StreamableHttpMessageReceiver { http_request_id: None, inner: rx, @@ -559,12 +565,18 @@ impl LocalSessionWorker { // to multiple SSE streams simultaneously." When a new common-channel // GET arrives we replace the sender; dropping the old sender closes // the old receiver, cleanly terminating the previous SSE stream. + let was_active = !self.common.tx.is_closed(); let channel = tokio::sync::mpsc::channel(self.session_config.channel_capacity); let (tx, rx) = channel; self.common.tx = tx; - let index = last_event_id.index; - // sync messages after index - self.common.sync(index).await?; + if !was_active { + // Stream was dead (network failure, client timeout, etc.) β€” + // replay cached events so the client catches up. + let index = last_event_id.index; + self.common.sync(index).await?; + } + // If we replaced a live stream, skip replay: the client + // already received those events on the previous stream. Ok(StreamableHttpMessageReceiver { http_request_id: None, inner: rx, From 786801786d10a6a2857e35a21e1b2d202e600ee4 Mon Sep 17 00:00:00 2001 From: Mohammod Al Amin Ashik Date: Sat, 14 Feb 2026 14:23:07 +0800 Subject: [PATCH 5/8] fix: use shadow channels to prevent SSE reconnect loops MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When POST SSE responses include a `retry` field, the browser's EventSource automatically reconnects via GET after the stream ends. This creates multiple competing EventSource connections that each replace the common channel sender, killing the other stream's receiver. Both reconnect every sse_retry seconds, creating an infinite loop. Instead of always replacing the common channel, check if the primary is still active. If so, create a "shadow" stream β€” an idle SSE connection kept alive by keep-alive pings that doesn't receive notifications or interfere with the primary channel. Also removes cache replay (sync) on common channel resume, as replaying server-initiated list_changed notifications causes clients to re-process old signals. Signed-off-by: Myko Ash Signed-off-by: Mohammod Al Amin Ashik --- .../streamable_http_server/session/local.rs | 95 ++++++++++--------- 1 file changed, 50 insertions(+), 45 deletions(-) diff --git a/crates/rmcp/src/transport/streamable_http_server/session/local.rs b/crates/rmcp/src/transport/streamable_http_server/session/local.rs index 26180605e..6744c28fd 100644 --- a/crates/rmcp/src/transport/streamable_http_server/session/local.rs +++ b/crates/rmcp/src/transport/streamable_http_server/session/local.rs @@ -293,6 +293,12 @@ pub struct LocalSessionWorker { tx_router: HashMap, resource_router: HashMap, common: CachedTx, + /// Shadow senders for secondary SSE streams (e.g. from POST EventSource + /// reconnections). These keep the HTTP connections alive via SSE keep-alive + /// without receiving notifications, preventing clients like Cursor from + /// entering infinite reconnect loops when multiple EventSource connections + /// compete to replace the common channel. + shadow_txs: Vec>, event_rx: Receiver, session_config: SessionConfig, } @@ -515,6 +521,9 @@ impl LocalSessionWorker { &mut self, last_event_id: EventId, ) -> Result { + // Clean up closed shadow senders before processing + self.shadow_txs.retain(|tx| !tx.is_closed()); + match last_event_id.http_request_id { Some(http_request_id) => { if let Some(request_wise) = self.tx_router.get_mut(&http_request_id) { @@ -531,60 +540,52 @@ impl LocalSessionWorker { }) } else { // Request-wise channel completed (POST response already delivered). - // Per MCP spec, resumption via GET is always valid regardless of how - // the original stream was initiated. Since the request-wise stream is - // complete, fall through to common channel for server notifications. - // - // Replace the existing common channel sender if active β€” dropping - // the old sender closes the old receiver, terminating that SSE stream - // cleanly so the client can reconnect on this new stream. - let was_active = !self.common.tx.is_closed(); + // The client's EventSource is reconnecting after the POST SSE stream + // ended. Fall through to common channel handling below. tracing::debug!( http_request_id, - was_active, "Request-wise channel completed, falling back to common channel" ); - let channel = tokio::sync::mpsc::channel(self.session_config.channel_capacity); - let (tx, rx) = channel; - self.common.tx = tx; - if !was_active { - // Common stream was dead β€” replay missed events so the - // client catches up on notifications it never received. - self.common.sync(0).await?; - } - // If we replaced a live stream, skip replay: the client - // already received those events on the previous stream. - Ok(StreamableHttpMessageReceiver { - http_request_id: None, - inner: rx, - }) + self.resume_or_shadow_common() } } - None => { - // Per MCP spec Β§Streamable HTTP, "The client MAY remain connected - // to multiple SSE streams simultaneously." When a new common-channel - // GET arrives we replace the sender; dropping the old sender closes - // the old receiver, cleanly terminating the previous SSE stream. - let was_active = !self.common.tx.is_closed(); - let channel = tokio::sync::mpsc::channel(self.session_config.channel_capacity); - let (tx, rx) = channel; - self.common.tx = tx; - if !was_active { - // Stream was dead (network failure, client timeout, etc.) β€” - // replay cached events so the client catches up. - let index = last_event_id.index; - self.common.sync(index).await?; - } - // If we replaced a live stream, skip replay: the client - // already received those events on the previous stream. - Ok(StreamableHttpMessageReceiver { - http_request_id: None, - inner: rx, - }) - } + None => self.resume_or_shadow_common(), } } + /// Resume the common channel, or create a shadow stream if the primary is + /// still active. + /// + /// When the primary common channel is dead (receiver dropped), replace it + /// so this stream becomes the new primary notification channel. + /// + /// When the primary is still active, create a "shadow" stream β€” an idle SSE + /// connection kept alive by keep-alive pings. This prevents multiple + /// EventSource connections (e.g. from POST response reconnections) from + /// killing each other by repeatedly replacing the common channel sender. + fn resume_or_shadow_common(&mut self) -> Result { + let (tx, rx) = tokio::sync::mpsc::channel(self.session_config.channel_capacity); + if self.common.tx.is_closed() { + // Primary common channel is dead β€” replace it. + tracing::debug!("Replacing dead common channel with new primary"); + self.common.tx = tx; + } else { + // Primary common channel is still active. Create a shadow stream + // that stays alive via SSE keep-alive but doesn't receive + // notifications. This prevents competing EventSource connections + // from killing each other's channels. + tracing::debug!( + shadow_count = self.shadow_txs.len(), + "Common channel active, creating shadow stream" + ); + self.shadow_txs.push(tx); + } + Ok(StreamableHttpMessageReceiver { + http_request_id: None, + inner: rx, + }) + } + async fn close_sse_stream( &mut self, http_request_id: Option, @@ -624,6 +625,9 @@ impl LocalSessionWorker { let (tx, _rx) = tokio::sync::mpsc::channel(1); self.common.tx = tx; + // Also close all shadow streams + self.shadow_txs.clear(); + tracing::debug!("closed standalone SSE stream for server-initiated disconnection"); Ok(()) } @@ -1076,6 +1080,7 @@ pub fn create_local_session( tx_router: HashMap::new(), resource_router: HashMap::new(), common, + shadow_txs: Vec::new(), event_rx, session_config: config.clone(), }; From beb04daf498a7c2e402ea5d13c3c7f0d75126fbc Mon Sep 17 00:00:00 2001 From: Mohammod Al Amin Ashik Date: Sat, 14 Feb 2026 14:51:14 +0800 Subject: [PATCH 6/8] test: comprehensive shadow channel tests (15 cases) Rewrite test suite for SSE channel replacement fix: - Shadow creation: standalone GET returns 200, multiple GETs coexist - Dead primary: replacement, notification delivery, repeated cycles - Notification routing: primary receives, shadow does not - Resume paths: completed request-wise, common alive/dead - Real scenarios: Cursor leapfrog, VS Code reconnect - Edge cases: invalid session, missing header, shadow cleanup Fix Accept header bug (was missing text/event-stream for notifications/initialized POST, causing 406 rejection). --- .../tests/test_sse_channel_replacement_bug.rs | 689 +++++++++++++++--- 1 file changed, 585 insertions(+), 104 deletions(-) diff --git a/crates/rmcp/tests/test_sse_channel_replacement_bug.rs b/crates/rmcp/tests/test_sse_channel_replacement_bug.rs index 847eebd97..de5cc1c3b 100644 --- a/crates/rmcp/tests/test_sse_channel_replacement_bug.rs +++ b/crates/rmcp/tests/test_sse_channel_replacement_bug.rs @@ -1,15 +1,20 @@ -/// Test that demonstrates the channel replacement bug +/// Tests for SSE channel replacement fix (shadow channels) /// -/// This test reproduces the real-world scenario where VS Code reconnects SSE -/// every ~5 minutes by sending multiple GET requests with the SAME session ID. +/// These tests verify that multiple GET SSE streams on the same session +/// don't kill each other by replacing the common channel sender. /// -/// Expected: Second GET should return 409 Conflict (like TypeScript SDK) -/// Actual: Second GET succeeds, replaces channel, orphans first receiver (BUG) +/// Root cause: When POST SSE responses include `retry`, EventSource reconnects +/// via GET after the stream ends. Each GET was unconditionally replacing +/// `self.common.tx`, killing the other stream's receiver β€” causing an infinite +/// reconnect loop every `sse_retry` seconds. /// -/// Root cause: local.rs:536 unconditionally replaces self.common.tx +/// Fix: `resume_or_shadow_common()` checks if the primary common channel is +/// still active. If so, it creates a "shadow" stream (idle, keep-alive only) +/// instead of replacing the primary. use std::sync::Arc; use std::time::Duration; +use futures::StreamExt; use reqwest; use rmcp::{ RoleServer, ServerHandler, @@ -23,7 +28,11 @@ use serde_json::json; use tokio::sync::Notify; use tokio_util::sync::CancellationToken; -// Test server that sends notifications on demand +const ACCEPT_SSE: &str = "text/event-stream"; +const ACCEPT_BOTH: &str = "text/event-stream, application/json"; + +// ─── Test server ──────────────────────────────────────────────────────────── + #[derive(Clone)] pub struct TestServer { trigger: Arc, @@ -59,27 +68,15 @@ impl ServerHandler for TestServer { tokio::spawn(async move { trigger.notified().await; - - println!("πŸ”” Server sending notification..."); - match peer.notify_tool_list_changed().await { - Ok(()) => println!("βœ… notify_tool_list_changed() returned Ok(())"), - Err(e) => println!("❌ notify_tool_list_changed() failed: {}", e), - } + let _ = peer.notify_tool_list_changed().await; }); } } -#[tokio::test] -async fn test_channel_replacement_bug() { - let _ = tracing_subscriber::fmt() - .with_env_filter("debug") - .try_init(); - - let ct = CancellationToken::new(); - let notification_trigger = Arc::new(Notify::new()); +// ─── Helpers ──────────────────────────────────────────────────────────────── - // Start HTTP server - let server = TestServer::new(notification_trigger.clone()); +async fn start_test_server(ct: CancellationToken, trigger: Arc) -> String { + let server = TestServer::new(trigger); let service = StreamableHttpService::new( move || Ok(server.clone()), Arc::new(LocalSessionManager::default()), @@ -107,115 +104,599 @@ async fn test_channel_replacement_bug() { }); tokio::time::sleep(Duration::from_millis(100)).await; + url +} - println!("\n=== SSE RECONNECTION BUG REPRODUCTION ===\n"); - println!("This test reproduces the real-world VS Code reconnection scenario:"); - println!("VS Code sends GET requests every ~5 minutes with the SAME session ID."); - println!("Each GET call triggers establish_common_channel() β†’ channel replacement β†’ bug!\n"); - - let http_client = reqwest::Client::new(); +/// POST initialize and return session ID. +async fn initialize_session(client: &reqwest::Client, url: &str) -> String { + let resp = client + .post(url) + .header("Accept", ACCEPT_BOTH) + .header("Content-Type", "application/json") + .json(&json!({ + "jsonrpc": "2.0", + "id": 1, + "method": "initialize", + "params": { + "protocolVersion": "2024-11-05", + "capabilities": {}, + "clientInfo": { "name": "test-client", "version": "1.0.0" } + } + })) + .timeout(Duration::from_millis(500)) + .send() + .await + .expect("POST initialize"); - // STEP 1: POST initialize to create session and get session ID - println!("πŸ“‘ STEP 1: POST /initialize to create session..."); + assert!(resp.status().is_success(), "initialize should succeed"); - let init_request = json!({ - "jsonrpc": "2.0", - "id": 1, - "method": "initialize", - "params": { - "protocolVersion": "2024-11-05", - "capabilities": {}, - "clientInfo": { - "name": "test-client", - "version": "1.0.0" - } - } - }); + resp.headers() + .get("Mcp-Session-Id") + .expect("session ID header") + .to_str() + .unwrap() + .to_string() +} - let post_response = http_client - .post(&url) - .header("Accept", "text/event-stream, application/json") +/// POST `notifications/initialized` to complete the MCP handshake. +/// This triggers the server's `on_initialized` handler. +async fn send_initialized_notification(client: &reqwest::Client, url: &str, session_id: &str) { + let resp = client + .post(url) + .header("Accept", ACCEPT_BOTH) .header("Content-Type", "application/json") - .json(&init_request) - .timeout(Duration::from_millis(500)) // Short timeout - POST opens SSE stream + .header("Mcp-Session-Id", session_id) + .json(&json!({ + "jsonrpc": "2.0", + "method": "notifications/initialized" + })) .send() .await - .expect("POST initialize"); + .expect("POST notifications/initialized"); - // In stateful mode, POST opens SSE stream and returns session ID in header - let session_id = post_response - .headers() - .get("Mcp-Session-Id") - .map(|v| v.to_str().unwrap_or("").to_string()); - - if let Some(ref sid) = session_id { - println!("βœ… Session created: {}", sid); - println!(" Response status: {}", post_response.status()); - } else { - println!("⚠️ No Mcp-Session-Id header in POST response"); - println!(" Response status: {}", post_response.status()); - println!(" Available headers:"); - for (name, value) in post_response.headers() { - println!(" {}: {:?}", name, value); + assert_eq!( + resp.status().as_u16(), + 202, + "notifications/initialized should return 202 Accepted" + ); +} + +/// Open a standalone GET SSE stream (no Last-Event-ID). +async fn open_standalone_get( + client: &reqwest::Client, + url: &str, + session_id: &str, +) -> reqwest::Response { + client + .get(url) + .header("Accept", ACCEPT_SSE) + .header("Mcp-Session-Id", session_id) + .send() + .await + .expect("GET SSE stream") +} + +/// Open a GET SSE stream with Last-Event-ID (resume). +async fn open_resume_get( + client: &reqwest::Client, + url: &str, + session_id: &str, + last_event_id: &str, +) -> reqwest::Response { + client + .get(url) + .header("Accept", ACCEPT_SSE) + .header("Mcp-Session-Id", session_id) + .header("Last-Event-ID", last_event_id) + .send() + .await + .expect("GET SSE stream with Last-Event-ID") +} + +/// Read from an SSE byte stream until we find a specific text or timeout. +async fn wait_for_sse_event(resp: reqwest::Response, needle: &str, timeout: Duration) -> bool { + let mut stream = resp.bytes_stream(); + let result = tokio::time::timeout(timeout, async { + while let Some(Ok(chunk)) = stream.next().await { + let text = String::from_utf8_lossy(&chunk); + if text.contains(needle) { + return true; + } } + false + }) + .await; + + matches!(result, Ok(true)) +} + +// ─── Tests: Shadow stream creation ────────────────────────────────────────── + +/// Second standalone GET with same session ID should return 200 OK +/// (shadow stream), NOT 409 Conflict. +#[tokio::test] +async fn shadow_second_standalone_get_returns_200() { + let ct = CancellationToken::new(); + let trigger = Arc::new(Notify::new()); + let url = start_test_server(ct.clone(), trigger).await; + let client = reqwest::Client::new(); + + let session_id = initialize_session(&client, &url).await; + + // First GET β€” becomes primary common channel + let get1 = open_standalone_get(&client, &url, &session_id).await; + assert_eq!(get1.status(), 200, "First GET should succeed"); + + tokio::time::sleep(Duration::from_millis(100)).await; + + // Second GET β€” should get 200 (shadow), NOT 409 + let get2 = open_standalone_get(&client, &url, &session_id).await; + assert_eq!( + get2.status(), + 200, + "Second GET should return 200 (shadow stream), not 409 Conflict" + ); + + ct.cancel(); +} + +/// Multiple standalone GETs should all return 200 β€” the server can handle +/// many shadow streams concurrently. +#[tokio::test] +async fn shadow_multiple_standalone_gets_all_succeed() { + let ct = CancellationToken::new(); + let trigger = Arc::new(Notify::new()); + let url = start_test_server(ct.clone(), trigger).await; + let client = reqwest::Client::new(); + + let session_id = initialize_session(&client, &url).await; + + // Open 5 concurrent standalone GETs + let mut responses = Vec::new(); + for i in 0..5 { + let resp = open_standalone_get(&client, &url, &session_id).await; + assert_eq!(resp.status(), 200, "GET #{i} should succeed"); + responses.push(resp); + tokio::time::sleep(Duration::from_millis(50)).await; } - // If no session ID from POST, we can't proceed with the test - let session_id = session_id.expect("Session ID required for test"); - println!(); + // All 5 should be alive (first is primary, rest are shadows) + assert_eq!(responses.len(), 5); - // STEP 2: First GET with session ID to establish SSE stream. - // IMPORTANT: We must keep this response alive so the server-side rx stays open. - println!("πŸ“‘ STEP 2: First GET (establish SSE stream)..."); - println!(" Using session: {}", session_id); + ct.cancel(); +} - let _get1_response = http_client - .get(&url) - .header("Accept", "text/event-stream") - .header("Mcp-Session-Id", &session_id) - .send() - .await - .expect("First GET request failed"); +// ─── Tests: Dead primary replacement ──────────────────────────────────────── + +/// When the primary common channel is dead (first GET dropped), the next GET +/// should replace it and become the new primary. +#[tokio::test] +async fn dead_primary_gets_replaced_by_next_get() { + let ct = CancellationToken::new(); + let trigger = Arc::new(Notify::new()); + let url = start_test_server(ct.clone(), trigger).await; + let client = reqwest::Client::new(); + + let session_id = initialize_session(&client, &url).await; + + // First GET β€” becomes primary + let get1 = open_standalone_get(&client, &url, &session_id).await; + assert_eq!(get1.status(), 200); + + // Drop primary β€” kills receiver, making sender closed + drop(get1); + tokio::time::sleep(Duration::from_millis(100)).await; + + // Second GET β€” primary is dead, should replace it + let get2 = open_standalone_get(&client, &url, &session_id).await; + assert_eq!( + get2.status(), + 200, + "GET should succeed as new primary after old primary was dropped" + ); + + ct.cancel(); +} + +/// After primary dies, the replacement primary should be able to receive +/// notifications (verifies the channel was actually replaced, not shadowed). +#[tokio::test] +async fn dead_primary_replacement_receives_notifications() { + let ct = CancellationToken::new(); + let trigger = Arc::new(Notify::new()); + let url = start_test_server(ct.clone(), trigger.clone()).await; + let client = reqwest::Client::new(); + + let session_id = initialize_session(&client, &url).await; + send_initialized_notification(&client, &url, &session_id).await; + tokio::time::sleep(Duration::from_millis(100)).await; + + // First GET β€” becomes primary + let get1 = open_standalone_get(&client, &url, &session_id).await; + assert_eq!(get1.status(), 200); + + // Drop primary + drop(get1); + tokio::time::sleep(Duration::from_millis(100)).await; + + // Second GET β€” becomes new primary (replacement) + let get2 = open_standalone_get(&client, &url, &session_id).await; + assert_eq!(get2.status(), 200); + + // Trigger notification β€” should arrive on get2 (the new primary) + trigger.notify_one(); + + assert!( + wait_for_sse_event(get2, "tools/list_changed", Duration::from_secs(3)).await, + "Replacement primary should receive notifications" + ); + + ct.cancel(); +} + +/// Multiple drops and replacements should work: primary can be replaced +/// more than once. +#[tokio::test] +async fn dead_primary_can_be_replaced_multiple_times() { + let ct = CancellationToken::new(); + let trigger = Arc::new(Notify::new()); + let url = start_test_server(ct.clone(), trigger).await; + let client = reqwest::Client::new(); + + let session_id = initialize_session(&client, &url).await; + + for i in 0..3 { + let get = open_standalone_get(&client, &url, &session_id).await; + assert_eq!(get.status(), 200, "GET #{i} should succeed"); + drop(get); + tokio::time::sleep(Duration::from_millis(100)).await; + } + + // Final GET should still work + let final_get = open_standalone_get(&client, &url, &session_id).await; + assert_eq!( + final_get.status(), + 200, + "GET after multiple replacements should succeed" + ); + + ct.cancel(); +} + +// ─── Tests: Notification routing ──────────────────────────────────────────── + +/// Notification should arrive on the primary stream even after shadow streams +/// are created by subsequent GETs. +#[tokio::test] +async fn notification_reaches_primary_not_shadow() { + let ct = CancellationToken::new(); + let trigger = Arc::new(Notify::new()); + let url = start_test_server(ct.clone(), trigger.clone()).await; + let client = reqwest::Client::new(); + + let session_id = initialize_session(&client, &url).await; + send_initialized_notification(&client, &url, &session_id).await; + tokio::time::sleep(Duration::from_millis(200)).await; + + // First GET β€” primary common channel + let get1 = open_standalone_get(&client, &url, &session_id).await; + assert_eq!(get1.status(), 200); + tokio::time::sleep(Duration::from_millis(100)).await; + + // Second GET β€” shadow stream (should NOT steal notifications) + let _get2 = open_standalone_get(&client, &url, &session_id).await; + assert_eq!(_get2.status(), 200); + tokio::time::sleep(Duration::from_millis(100)).await; + + // Trigger notification + trigger.notify_one(); + + // Primary stream should receive the notification + assert!( + wait_for_sse_event(get1, "tools/list_changed", Duration::from_secs(3)).await, + "Primary stream should receive notification even after shadow was created" + ); + + ct.cancel(); +} + +// ─── Tests: Resume with Last-Event-ID ─────────────────────────────────────── + +/// GET with Last-Event-ID referencing a completed request-wise channel should +/// fall through to shadow (not crash or return 500). +/// +/// This simulates the real-world scenario: POST SSE response ends, the +/// EventSource reconnects via GET with the last event ID from the POST stream. +/// The request-wise channel no longer exists, so the server should create a +/// shadow stream. +#[tokio::test] +async fn resume_completed_request_wise_creates_shadow() { + let ct = CancellationToken::new(); + let trigger = Arc::new(Notify::new()); + let url = start_test_server(ct.clone(), trigger).await; + let client = reqwest::Client::new(); + + let session_id = initialize_session(&client, &url).await; + + // First GET β€” establish primary + let _get1 = open_standalone_get(&client, &url, &session_id).await; + assert_eq!(_get1.status(), 200); + tokio::time::sleep(Duration::from_millis(100)).await; + + // GET with Last-Event-ID for non-existent request-wise channel + let get_resume = open_resume_get(&client, &url, &session_id, "0/999").await; + assert_eq!( + get_resume.status(), + 200, + "Resume of completed request-wise channel should return 200 (shadow)" + ); + + ct.cancel(); +} + +/// GET with Last-Event-ID "0" (common channel resume) while primary is alive +/// should create a shadow. +#[tokio::test] +async fn resume_common_while_primary_alive_creates_shadow() { + let ct = CancellationToken::new(); + let trigger = Arc::new(Notify::new()); + let url = start_test_server(ct.clone(), trigger).await; + let client = reqwest::Client::new(); + + let session_id = initialize_session(&client, &url).await; + + // First GET β€” establish primary + let _get1 = open_standalone_get(&client, &url, &session_id).await; + assert_eq!(_get1.status(), 200); + tokio::time::sleep(Duration::from_millis(100)).await; + + // GET with Last-Event-ID "0" β€” resume common while primary alive β†’ shadow + let get_resume = open_resume_get(&client, &url, &session_id, "0").await; + assert_eq!( + get_resume.status(), + 200, + "Common channel resume while primary alive should return 200 (shadow)" + ); + + ct.cancel(); +} + +/// GET with Last-Event-ID "0" (common channel resume) while primary is dead +/// should become the new primary. +#[tokio::test] +async fn resume_common_while_primary_dead_becomes_primary() { + let ct = CancellationToken::new(); + let trigger = Arc::new(Notify::new()); + let url = start_test_server(ct.clone(), trigger.clone()).await; + let client = reqwest::Client::new(); + + let session_id = initialize_session(&client, &url).await; + send_initialized_notification(&client, &url, &session_id).await; + tokio::time::sleep(Duration::from_millis(200)).await; + + // First GET β€” establish primary + let get1 = open_standalone_get(&client, &url, &session_id).await; + assert_eq!(get1.status(), 200); + + // Drop primary + drop(get1); + tokio::time::sleep(Duration::from_millis(100)).await; + + // GET with Last-Event-ID "0" β€” primary dead β†’ becomes new primary + let get_resume = open_resume_get(&client, &url, &session_id, "0").await; + assert_eq!(get_resume.status(), 200); + + // New primary should receive notifications + trigger.notify_one(); - println!(" Status: {}", _get1_response.status()); assert!( - _get1_response.status().is_success(), - "First GET should succeed" + wait_for_sse_event(get_resume, "tools/list_changed", Duration::from_secs(3)).await, + "Resumed stream that replaced dead primary should receive notifications" + ); + + ct.cancel(); +} + +// ─── Tests: Mixed scenarios ───────────────────────────────────────────────── + +/// POST SSE reconnections and standalone GET should coexist: POST initialize +/// creates a request-wise channel, its EventSource reconnects via GET after +/// the stream ends, while a standalone GET is also active. +#[tokio::test] +async fn post_reconnect_and_standalone_coexist() { + let ct = CancellationToken::new(); + let trigger = Arc::new(Notify::new()); + let url = start_test_server(ct.clone(), trigger).await; + let client = reqwest::Client::new(); + + let session_id = initialize_session(&client, &url).await; + + // Standalone GET β€” becomes primary + let _standalone = open_standalone_get(&client, &url, &session_id).await; + assert_eq!(_standalone.status(), 200); + tokio::time::sleep(Duration::from_millis(100)).await; + + // Simulate POST SSE response reconnection (EventSource reconnects with + // Last-Event-ID from the initialize POST stream). The request-wise channel + // for the initialize request is already completed. + let reconnect1 = open_resume_get(&client, &url, &session_id, "0/0").await; + assert_eq!( + reconnect1.status(), + 200, + "POST reconnection should get shadow, not replace primary" ); - println!(" βœ… First SSE stream established (receiver listening on rx1)"); - // Give server time to set up the channel + tokio::time::sleep(Duration::from_millis(100)).await; + + // Another POST reconnection (e.g. from tools/list response) + let reconnect2 = open_resume_get(&client, &url, &session_id, "0/1").await; + assert_eq!( + reconnect2.status(), + 200, + "Second POST reconnection should also succeed" + ); + + ct.cancel(); +} + +/// Standalone GET is dropped (e.g. client timeout), a new standalone GET +/// connects. The new one should become the primary and receive notifications. +#[tokio::test] +async fn reconnect_after_stream_timeout() { + let ct = CancellationToken::new(); + let trigger = Arc::new(Notify::new()); + let url = start_test_server(ct.clone(), trigger.clone()).await; + let client = reqwest::Client::new(); + + let session_id = initialize_session(&client, &url).await; + send_initialized_notification(&client, &url, &session_id).await; tokio::time::sleep(Duration::from_millis(200)).await; - // STEP 3: Second GET with SAME session ID β€” should return 409 Conflict - println!("πŸ“‘ STEP 3: Second GET with SAME session ID..."); - println!(" Using session: {}", session_id); - println!(" This simulates VS Code reconnecting after ~5 minutes"); + // First standalone GET β€” primary + let get1 = open_standalone_get(&client, &url, &session_id).await; + assert_eq!(get1.status(), 200); - let get2_response = http_client + // Client drops the stream (e.g. timeout or reconnection) + drop(get1); + tokio::time::sleep(Duration::from_millis(100)).await; + + // Client reconnects with a new standalone GET + let get2 = open_standalone_get(&client, &url, &session_id).await; + assert_eq!(get2.status(), 200); + + // Notification should reach the new primary + trigger.notify_one(); + + assert!( + wait_for_sse_event(get2, "tools/list_changed", Duration::from_secs(3)).await, + "Reconnected stream should receive notifications" + ); + + ct.cancel(); +} + +// ─── Tests: Edge cases ────────────────────────────────────────────────────── + +/// GET without a valid session ID should return 401 Unauthorized. +#[tokio::test] +async fn get_without_valid_session_returns_401() { + let ct = CancellationToken::new(); + let trigger = Arc::new(Notify::new()); + let url = start_test_server(ct.clone(), trigger).await; + let client = reqwest::Client::new(); + + let resp = client .get(&url) - .header("Accept", "text/event-stream") - .header("Mcp-Session-Id", &session_id) - .timeout(Duration::from_millis(500)) + .header("Accept", ACCEPT_SSE) + .header("Mcp-Session-Id", "nonexistent-session-id") .send() .await - .expect("Second GET request failed"); - - let status = get2_response.status(); - println!(" Status: {}", status); + .expect("GET with invalid session"); assert_eq!( - status.as_u16(), - 409, - "Second GET should return 409 Conflict (got {}). \ - Without the fix, the channel sender is silently replaced, \ - orphaning the first receiver and losing all notifications.", - status + resp.status().as_u16(), + 401, + "GET with invalid session ID should return 401" ); - println!(" βœ… 409 Conflict returned (matches TypeScript SDK behavior)"); + ct.cancel(); +} + +/// GET without session ID header should return an error (400 or similar). +#[tokio::test] +async fn get_without_session_id_header_returns_error() { + let ct = CancellationToken::new(); + let trigger = Arc::new(Notify::new()); + let url = start_test_server(ct.clone(), trigger).await; + let client = reqwest::Client::new(); + + let resp = client + .get(&url) + .header("Accept", ACCEPT_SSE) + .send() + .await + .expect("GET without session ID"); + + // Should fail (400 Bad Request or similar) + assert!( + !resp.status().is_success(), + "GET without session ID should fail, got {}", + resp.status() + ); - // Cleanup ct.cancel(); +} + +/// Shadow streams should be idle β€” they should NOT receive notifications. +/// Only the primary receives them. +#[tokio::test] +async fn shadow_stream_does_not_receive_notifications() { + let ct = CancellationToken::new(); + let trigger = Arc::new(Notify::new()); + let url = start_test_server(ct.clone(), trigger.clone()).await; + let client = reqwest::Client::new(); + + let session_id = initialize_session(&client, &url).await; + send_initialized_notification(&client, &url, &session_id).await; + tokio::time::sleep(Duration::from_millis(200)).await; + + // First GET β€” primary + let _get1 = open_standalone_get(&client, &url, &session_id).await; + assert_eq!(_get1.status(), 200); + tokio::time::sleep(Duration::from_millis(100)).await; + + // Second GET β€” shadow + let get2 = open_standalone_get(&client, &url, &session_id).await; + assert_eq!(get2.status(), 200); tokio::time::sleep(Duration::from_millis(100)).await; + + // Trigger notification + trigger.notify_one(); + + // Shadow stream should NOT receive the notification (timeout expected) + let shadow_received = + wait_for_sse_event(get2, "tools/list_changed", Duration::from_millis(500)).await; + assert!( + !shadow_received, + "Shadow stream should NOT receive notifications" + ); + + ct.cancel(); +} + +/// Dropping all shadow streams should not affect the primary channel. +/// Primary should still receive notifications after all shadows are dropped. +#[tokio::test] +async fn dropping_shadows_does_not_affect_primary() { + let ct = CancellationToken::new(); + let trigger = Arc::new(Notify::new()); + let url = start_test_server(ct.clone(), trigger.clone()).await; + let client = reqwest::Client::new(); + + let session_id = initialize_session(&client, &url).await; + send_initialized_notification(&client, &url, &session_id).await; + tokio::time::sleep(Duration::from_millis(200)).await; + + // Primary GET + let get1 = open_standalone_get(&client, &url, &session_id).await; + assert_eq!(get1.status(), 200); + tokio::time::sleep(Duration::from_millis(100)).await; + + // Create and drop several shadows + for _ in 0..3 { + let shadow = open_standalone_get(&client, &url, &session_id).await; + assert_eq!(shadow.status(), 200); + drop(shadow); + tokio::time::sleep(Duration::from_millis(50)).await; + } + + // Trigger notification β€” primary should still receive it + trigger.notify_one(); + + assert!( + wait_for_sse_event(get1, "tools/list_changed", Duration::from_secs(3)).await, + "Primary should still work after all shadows are dropped" + ); + + ct.cancel(); } From 4ef1d2640ce19ed1b5d376773b31e2410767ca0b Mon Sep 17 00:00:00 2001 From: Mohammod Al Amin Ashik Date: Sat, 14 Feb 2026 16:49:16 +0800 Subject: [PATCH 7/8] fix: use correct HTTP status codes for session errors per MCP spec MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit MCP spec (2025-11-25) section "Session Management" requires: - Missing session ID header β†’ 400 Bad Request (not 401) - Unknown/terminated session β†’ 404 Not Found (not 401) Using 401 Unauthorized caused MCP clients (e.g. VS Code) to trigger full OAuth re-authentication on server restart, instead of simply re-initializing the session. Signed-off-by: Mohammod Al Amin Ashik --- .../transport/streamable_http_server/tower.rs | 24 +++++++++---------- .../tests/test_sse_channel_replacement_bug.rs | 22 ++++++++--------- 2 files changed, 23 insertions(+), 23 deletions(-) diff --git a/crates/rmcp/src/transport/streamable_http_server/tower.rs b/crates/rmcp/src/transport/streamable_http_server/tower.rs index 0d70c6a19..9158be375 100644 --- a/crates/rmcp/src/transport/streamable_http_server/tower.rs +++ b/crates/rmcp/src/transport/streamable_http_server/tower.rs @@ -188,10 +188,10 @@ where .and_then(|v| v.to_str().ok()) .map(|s| s.to_owned().into()); let Some(session_id) = session_id else { - // unauthorized + // MCP spec: servers that require a session ID SHOULD respond with 400 Bad Request return Ok(Response::builder() - .status(http::StatusCode::UNAUTHORIZED) - .body(Full::new(Bytes::from("Unauthorized: Session ID is required")).boxed()) + .status(http::StatusCode::BAD_REQUEST) + .body(Full::new(Bytes::from("Bad Request: Session ID is required")).boxed()) .expect("valid response")); }; // check if session exists @@ -201,10 +201,10 @@ where .await .map_err(internal_error_response("check session"))?; if !has_session { - // unauthorized + // MCP spec: server MUST respond with 404 Not Found for terminated/unknown sessions return Ok(Response::builder() - .status(http::StatusCode::UNAUTHORIZED) - .body(Full::new(Bytes::from("Unauthorized: Session not found")).boxed()) + .status(http::StatusCode::NOT_FOUND) + .body(Full::new(Bytes::from("Not Found: Session not found")).boxed()) .expect("valid response")); } // check if last event id is provided @@ -331,10 +331,10 @@ where .await .map_err(internal_error_response("check session"))?; if !has_session { - // unauthorized + // MCP spec: server MUST respond with 404 Not Found for terminated/unknown sessions return Ok(Response::builder() - .status(http::StatusCode::UNAUTHORIZED) - .body(Full::new(Bytes::from("Unauthorized: Session not found")).boxed()) + .status(http::StatusCode::NOT_FOUND) + .body(Full::new(Bytes::from("Not Found: Session not found")).boxed()) .expect("valid response")); } @@ -523,10 +523,10 @@ where .and_then(|v| v.to_str().ok()) .map(|s| s.to_owned().into()); let Some(session_id) = session_id else { - // unauthorized + // MCP spec: servers that require a session ID SHOULD respond with 400 Bad Request return Ok(Response::builder() - .status(http::StatusCode::UNAUTHORIZED) - .body(Full::new(Bytes::from("Unauthorized: Session ID is required")).boxed()) + .status(http::StatusCode::BAD_REQUEST) + .body(Full::new(Bytes::from("Bad Request: Session ID is required")).boxed()) .expect("valid response")); }; // close session diff --git a/crates/rmcp/tests/test_sse_channel_replacement_bug.rs b/crates/rmcp/tests/test_sse_channel_replacement_bug.rs index de5cc1c3b..364112b97 100644 --- a/crates/rmcp/tests/test_sse_channel_replacement_bug.rs +++ b/crates/rmcp/tests/test_sse_channel_replacement_bug.rs @@ -577,9 +577,10 @@ async fn reconnect_after_stream_timeout() { // ─── Tests: Edge cases ────────────────────────────────────────────────────── -/// GET without a valid session ID should return 401 Unauthorized. +/// GET with an unknown session ID should return 404 Not Found per MCP spec. +/// This signals the client to re-initialize (not re-authenticate). #[tokio::test] -async fn get_without_valid_session_returns_401() { +async fn get_without_valid_session_returns_404() { let ct = CancellationToken::new(); let trigger = Arc::new(Notify::new()); let url = start_test_server(ct.clone(), trigger).await; @@ -595,16 +596,16 @@ async fn get_without_valid_session_returns_401() { assert_eq!( resp.status().as_u16(), - 401, - "GET with invalid session ID should return 401" + 404, + "GET with unknown session ID should return 404 Not Found per MCP spec" ); ct.cancel(); } -/// GET without session ID header should return an error (400 or similar). +/// GET without session ID header should return 400 Bad Request per MCP spec. #[tokio::test] -async fn get_without_session_id_header_returns_error() { +async fn get_without_session_id_header_returns_400() { let ct = CancellationToken::new(); let trigger = Arc::new(Notify::new()); let url = start_test_server(ct.clone(), trigger).await; @@ -617,11 +618,10 @@ async fn get_without_session_id_header_returns_error() { .await .expect("GET without session ID"); - // Should fail (400 Bad Request or similar) - assert!( - !resp.status().is_success(), - "GET without session ID should fail, got {}", - resp.status() + assert_eq!( + resp.status().as_u16(), + 400, + "GET without session ID should return 400 Bad Request per MCP spec" ); ct.cancel(); From 4fd9ae79f3b56d8346097c82ba5b3827f1b9fd21 Mon Sep 17 00:00:00 2001 From: Mohammod Al Amin Ashik Date: Wed, 18 Feb 2026 15:07:18 +0800 Subject: [PATCH 8/8] =?UTF-8?q?fix:=20address=20review=20feedback=20?= =?UTF-8?q?=E2=80=94=20remove=20dead=20Conflict=20variant,=20restore=20syn?= =?UTF-8?q?c=20on=20resume,=20rename=20test?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Remove unused SessionError::Conflict and dead string-matching in tower.rs (leftover from abandoned 409 approach) - Restore sync() replay when replacing a dead primary common channel so server-initiated requests and cached notifications are not lost on reconnect - Rename test from test_sse_channel_replacement_bug to test_sse_concurrent_streams per reviewer suggestion (describe what tests verify, not what triggered them) - Add test for cache replay on dead primary replacement - Use generic "MCP clients" in comments instead of specific client names Signed-off-by: Mohammod Al Amin Ashik --- crates/rmcp/Cargo.toml | 4 +- .../streamable_http_server/session/local.rs | 24 +++++---- .../transport/streamable_http_server/tower.rs | 26 ++------- ..._bug.rs => test_sse_concurrent_streams.rs} | 53 +++++++++++++++++-- 4 files changed, 69 insertions(+), 38 deletions(-) rename crates/rmcp/tests/{test_sse_channel_replacement_bug.rs => test_sse_concurrent_streams.rs} (91%) diff --git a/crates/rmcp/Cargo.toml b/crates/rmcp/Cargo.toml index 32acd1c5f..9f58ee81b 100644 --- a/crates/rmcp/Cargo.toml +++ b/crates/rmcp/Cargo.toml @@ -243,6 +243,6 @@ required-features = [ path = "tests/test_custom_headers.rs" [[test]] -name = "test_sse_channel_replacement_bug" +name = "test_sse_concurrent_streams" required-features = ["server", "client", "transport-streamable-http-server", "transport-streamable-http-client", "reqwest"] -path = "tests/test_sse_channel_replacement_bug.rs" +path = "tests/test_sse_concurrent_streams.rs" diff --git a/crates/rmcp/src/transport/streamable_http_server/session/local.rs b/crates/rmcp/src/transport/streamable_http_server/session/local.rs index 6744c28fd..8121e5a47 100644 --- a/crates/rmcp/src/transport/streamable_http_server/session/local.rs +++ b/crates/rmcp/src/transport/streamable_http_server/session/local.rs @@ -295,9 +295,9 @@ pub struct LocalSessionWorker { common: CachedTx, /// Shadow senders for secondary SSE streams (e.g. from POST EventSource /// reconnections). These keep the HTTP connections alive via SSE keep-alive - /// without receiving notifications, preventing clients like Cursor from - /// entering infinite reconnect loops when multiple EventSource connections - /// compete to replace the common channel. + /// without receiving notifications, preventing MCP clients from entering + /// infinite reconnect loops when multiple EventSource connections compete + /// to replace the common channel. shadow_txs: Vec>, event_rx: Receiver, session_config: SessionConfig, @@ -321,8 +321,6 @@ pub enum SessionError { SessionServiceTerminated, #[error("Invalid event id")] InvalidEventId, - #[error("Conflict: Only one standalone SSE stream is allowed per session")] - Conflict, #[error("IO error: {0}")] Io(#[from] std::io::Error), } @@ -546,10 +544,10 @@ impl LocalSessionWorker { http_request_id, "Request-wise channel completed, falling back to common channel" ); - self.resume_or_shadow_common() + self.resume_or_shadow_common(last_event_id.index).await } } - None => self.resume_or_shadow_common(), + None => self.resume_or_shadow_common(last_event_id.index).await, } } @@ -557,18 +555,26 @@ impl LocalSessionWorker { /// still active. /// /// When the primary common channel is dead (receiver dropped), replace it - /// so this stream becomes the new primary notification channel. + /// so this stream becomes the new primary notification channel. Cached + /// messages are replayed from `last_event_index` so the client receives + /// any events it missed (including server-initiated requests). /// /// When the primary is still active, create a "shadow" stream β€” an idle SSE /// connection kept alive by keep-alive pings. This prevents multiple /// EventSource connections (e.g. from POST response reconnections) from /// killing each other by repeatedly replacing the common channel sender. - fn resume_or_shadow_common(&mut self) -> Result { + async fn resume_or_shadow_common( + &mut self, + last_event_index: usize, + ) -> Result { let (tx, rx) = tokio::sync::mpsc::channel(self.session_config.channel_capacity); if self.common.tx.is_closed() { // Primary common channel is dead β€” replace it. tracing::debug!("Replacing dead common channel with new primary"); self.common.tx = tx; + // Replay cached messages from where the client left off so + // server-initiated requests and notifications are not lost. + self.common.sync(last_event_index).await?; } else { // Primary common channel is still active. Create a shadow stream // that stays alive via SSE keep-alive but doesn't receive diff --git a/crates/rmcp/src/transport/streamable_http_server/tower.rs b/crates/rmcp/src/transport/streamable_http_server/tower.rs index 9158be375..f7e006317 100644 --- a/crates/rmcp/src/transport/streamable_http_server/tower.rs +++ b/crates/rmcp/src/transport/streamable_http_server/tower.rs @@ -215,20 +215,11 @@ where .map(|s| s.to_owned()); if let Some(last_event_id) = last_event_id { // check if session has this event id - let stream = match self + let stream = self .session_manager .resume(&session_id, last_event_id) .await - { - Ok(stream) => stream, - Err(e) if e.to_string().contains("Conflict:") => { - return Ok(Response::builder() - .status(http::StatusCode::CONFLICT) - .body(Full::new(Bytes::from(e.to_string())).boxed()) - .expect("valid response")); - } - Err(e) => return Err(internal_error_response("resume session")(e)), - }; + .map_err(internal_error_response("resume session"))?; // Resume doesn't need priming - client already has the event ID Ok(sse_stream_response( stream, @@ -237,20 +228,11 @@ where )) } else { // create standalone stream - let stream = match self + let stream = self .session_manager .create_standalone_stream(&session_id) .await - { - Ok(stream) => stream, - Err(e) if e.to_string().contains("Conflict:") => { - return Ok(Response::builder() - .status(http::StatusCode::CONFLICT) - .body(Full::new(Bytes::from(e.to_string())).boxed()) - .expect("valid response")); - } - Err(e) => return Err(internal_error_response("create standalone stream")(e)), - }; + .map_err(internal_error_response("create standalone stream"))?; // Prepend priming event if sse_retry configured let stream = if let Some(retry) = self.config.sse_retry { let priming = ServerSseMessage { diff --git a/crates/rmcp/tests/test_sse_channel_replacement_bug.rs b/crates/rmcp/tests/test_sse_concurrent_streams.rs similarity index 91% rename from crates/rmcp/tests/test_sse_channel_replacement_bug.rs rename to crates/rmcp/tests/test_sse_concurrent_streams.rs index 364112b97..388e9a717 100644 --- a/crates/rmcp/tests/test_sse_channel_replacement_bug.rs +++ b/crates/rmcp/tests/test_sse_concurrent_streams.rs @@ -1,12 +1,12 @@ -/// Tests for SSE channel replacement fix (shadow channels) +/// Tests for concurrent SSE stream handling (shadow channels) /// /// These tests verify that multiple GET SSE streams on the same session /// don't kill each other by replacing the common channel sender. /// -/// Root cause: When POST SSE responses include `retry`, EventSource reconnects -/// via GET after the stream ends. Each GET was unconditionally replacing -/// `self.common.tx`, killing the other stream's receiver β€” causing an infinite -/// reconnect loop every `sse_retry` seconds. +/// Root cause: When POST SSE responses include `retry`, the EventSource API +/// reconnects via GET after the stream ends. Each GET was unconditionally +/// replacing `self.common.tx`, killing the other stream's receiver β€” causing +/// an infinite reconnect loop every `sse_retry` seconds. /// /// Fix: `resume_or_shadow_common()` checks if the primary common channel is /// still active. If so, it creates a "shadow" stream (idle, keep-alive only) @@ -700,3 +700,46 @@ async fn dropping_shadows_does_not_affect_primary() { ct.cancel(); } + +// ─── Tests: Cache replay on dead primary replacement ───────────────────────── + +/// When a notification is sent while the primary is alive, then the primary +/// dies and a new GET resumes with Last-Event-ID "0", the replacement primary +/// should receive the cached notification via sync() replay. +#[tokio::test] +async fn dead_primary_replacement_replays_cached_events() { + let ct = CancellationToken::new(); + let trigger = Arc::new(Notify::new()); + let url = start_test_server(ct.clone(), trigger.clone()).await; + let client = reqwest::Client::new(); + + let session_id = initialize_session(&client, &url).await; + send_initialized_notification(&client, &url, &session_id).await; + tokio::time::sleep(Duration::from_millis(200)).await; + + // First GET β€” becomes primary + let get1 = open_standalone_get(&client, &url, &session_id).await; + assert_eq!(get1.status(), 200); + tokio::time::sleep(Duration::from_millis(100)).await; + + // Trigger notification while primary is alive (gets cached) + trigger.notify_one(); + tokio::time::sleep(Duration::from_millis(200)).await; + + // Drop primary β€” notification was sent and cached + drop(get1); + tokio::time::sleep(Duration::from_millis(100)).await; + + // Resume with Last-Event-ID "0" β€” primary is dead, should replace it + // and replay cached events from index 0 + let get_resume = open_resume_get(&client, &url, &session_id, "0").await; + assert_eq!(get_resume.status(), 200); + + // The cached notification should be replayed on the new primary + assert!( + wait_for_sse_event(get_resume, "tools/list_changed", Duration::from_secs(3)).await, + "Replacement primary should receive cached notification via sync() replay" + ); + + ct.cancel(); +}