From 5cfae82db60ab3cacbd08f0204770f685739fff6 Mon Sep 17 00:00:00 2001 From: Adam Kowalski Date: Wed, 25 Feb 2026 20:09:16 -0800 Subject: [PATCH] fix(streamable-http): map stale session 401 to status-aware error --- .../common/reqwest/streamable_http_client.rs | 9 ++ .../test_streamable_http_stale_session.rs | 97 +++++++++++++++++++ 2 files changed, 106 insertions(+) create mode 100644 crates/rmcp/tests/test_streamable_http_stale_session.rs diff --git a/crates/rmcp/src/transport/common/reqwest/streamable_http_client.rs b/crates/rmcp/src/transport/common/reqwest/streamable_http_client.rs index b4cdafd1..9682d9f1 100644 --- a/crates/rmcp/src/transport/common/reqwest/streamable_http_client.rs +++ b/crates/rmcp/src/transport/common/reqwest/streamable_http_client.rs @@ -165,6 +165,15 @@ impl StreamableHttpClient for reqwest::Client { ) { return Ok(StreamableHttpPostResponse::Accepted); } + if !status.is_success() { + let body = response + .text() + .await + .unwrap_or_else(|_| "".to_owned()); + return Err(StreamableHttpError::UnexpectedServerResponse(Cow::Owned( + format!("HTTP {status}: {body}"), + ))); + } let content_type = response.headers().get(reqwest::header::CONTENT_TYPE); let session_id = response.headers().get(HEADER_SESSION_ID); let session_id = session_id diff --git a/crates/rmcp/tests/test_streamable_http_stale_session.rs b/crates/rmcp/tests/test_streamable_http_stale_session.rs new file mode 100644 index 00000000..a973e9f4 --- /dev/null +++ b/crates/rmcp/tests/test_streamable_http_stale_session.rs @@ -0,0 +1,97 @@ +#![cfg(all( + feature = "transport-streamable-http-client", + feature = "transport-streamable-http-client-reqwest", + feature = "transport-streamable-http-server" +))] + +use std::{collections::HashMap, sync::Arc}; + +use rmcp::{ + model::{ClientJsonRpcMessage, ClientRequest, PingRequest, RequestId}, + transport::{ + streamable_http_client::{StreamableHttpClient, StreamableHttpError}, + streamable_http_server::{ + StreamableHttpServerConfig, StreamableHttpService, session::local::LocalSessionManager, + }, + }, +}; +use tokio_util::sync::CancellationToken; + +mod common; +use common::calculator::Calculator; + +#[tokio::test] +async fn test_stale_session_id_returns_status_aware_error() -> anyhow::Result<()> { + let ct = CancellationToken::new(); + let service: StreamableHttpService = + StreamableHttpService::new( + || Ok(Calculator::new()), + Default::default(), + StreamableHttpServerConfig { + stateful_mode: true, + sse_keep_alive: None, + cancellation_token: ct.child_token(), + ..Default::default() + }, + ); + + let router = axum::Router::new().nest_service("/mcp", service); + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?; + let addr = listener.local_addr()?; + + let handle = tokio::spawn({ + let ct = ct.clone(); + async move { + let _ = axum::serve(listener, router) + .with_graceful_shutdown(async move { ct.cancelled_owned().await }) + .await; + } + }); + + let uri = Arc::::from(format!("http://{addr}/mcp")); + let message = ClientJsonRpcMessage::request( + ClientRequest::PingRequest(PingRequest::default()), + RequestId::Number(1), + ); + + let client = reqwest::Client::new(); + let result = client + .post_message( + uri.clone(), + message, + Some(Arc::from("stale-session-id")), + None, + HashMap::new(), + ) + .await; + + let raw_response = reqwest::Client::new() + .post(uri.as_ref()) + .header("accept", "application/json, text/event-stream") + .header("content-type", "application/json") + .header("mcp-session-id", "stale-session-id") + .body(r#"{"jsonrpc":"2.0","id":1,"method":"ping","params":{}}"#) + .send() + .await?; + + assert_eq!(raw_response.status(), reqwest::StatusCode::UNAUTHORIZED); + match result { + Err(StreamableHttpError::UnexpectedServerResponse(message)) => { + let message = message.to_string(); + assert!( + message.contains("401"), + "error should include HTTP status code, got: {message}" + ); + assert!( + message.to_ascii_lowercase().contains("session not found"), + "error should include session-not-found hint, got: {message}" + ); + } + other => panic!("expected UnexpectedServerResponse, got: {other:?}"), + } + + ct.cancel(); + handle.await?; + + Ok(()) +}