Skip to content

Commit d6703da

Browse files
authored
feat(streamable-http): add json_response option for stateless server mode (#683)
* feat(streamable-http): add json_response option for stateless server mode Adds `json_response: bool` field to `StreamableHttpServerConfig`. When true and `stateful_mode` is false, the server returns `Content-Type: application/json` directly instead of `text/event-stream`, eliminating SSE framing overhead for simple request-response patterns. This completes server-side JSON response support (client-side was added in #540) and contributes to the stateless server goals of SEP-1442 (#526). Backwards-compatible: `json_response: false` (default) preserves all existing SSE behaviour unchanged, and `stateful_mode: true` is unaffected. Benchmark evidence (50 VUs, 5min, 2 CPUs): - RPS: 770 → 1139 (+48%) - get_user_cart latency: 41ms → 0.76ms (-98%) - checkout latency: 41ms → 0.55ms (-99%) - Zero regressions, zero errors * fix(tower): add cancellation awareness and logging to JSON response path * fix(test): add missing Default to StreamableHttpServerConfig in concurrent streams test Made-with: Cursor
1 parent 4677a65 commit d6703da

4 files changed

Lines changed: 210 additions & 14 deletions

File tree

crates/rmcp/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,11 @@ name = "test_streamable_http_priming"
210210
required-features = ["server", "client", "transport-streamable-http-server", "reqwest"]
211211
path = "tests/test_streamable_http_priming.rs"
212212

213+
[[test]]
214+
name = "test_streamable_http_json_response"
215+
required-features = ["server", "client", "transport-streamable-http-server", "reqwest"]
216+
path = "tests/test_streamable_http_json_response.rs"
217+
213218

214219
[[test]]
215220
name = "test_custom_request"

crates/rmcp/src/transport/streamable_http_server/tower.rs

Lines changed: 49 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@ pub struct StreamableHttpServerConfig {
3838
/// If true, the server will create a session for each request and keep it alive.
3939
/// When enabled, SSE priming events are sent to enable client reconnection.
4040
pub stateful_mode: bool,
41+
/// When true and `stateful_mode` is false, the server returns
42+
/// `Content-Type: application/json` directly instead of `text/event-stream`.
43+
/// This eliminates SSE framing overhead for simple request-response tools,
44+
/// allowed by the MCP Streamable HTTP spec (2025-06-18).
45+
pub json_response: bool,
4146
/// Cancellation token for the Streamable HTTP server.
4247
///
4348
/// When this token is cancelled, all active sessions are terminated and
@@ -51,6 +56,7 @@ impl Default for StreamableHttpServerConfig {
5156
sse_keep_alive: Some(Duration::from_secs(15)),
5257
sse_retry: Some(Duration::from_secs(3)),
5358
stateful_mode: true,
59+
json_response: false,
5460
cancellation_token: CancellationToken::new(),
5561
}
5662
}
@@ -585,27 +591,56 @@ where
585591
match message {
586592
ClientJsonRpcMessage::Request(mut request) => {
587593
request.request.extensions_mut().insert(part);
588-
let (transport, receiver) =
594+
let (transport, mut receiver) =
589595
OneshotTransport::<RoleServer>::new(ClientJsonRpcMessage::Request(request));
590596
let service = serve_directly(service, transport, None);
591597
tokio::spawn(async move {
592598
// on service created
593599
let _ = service.waiting().await;
594600
});
595-
// Stateless mode: no priming (no session to resume)
596-
let stream = ReceiverStream::new(receiver).map(|message| {
597-
tracing::info!(?message);
598-
ServerSseMessage {
599-
event_id: None,
600-
message: Some(Arc::new(message)),
601-
retry: None,
601+
if self.config.json_response {
602+
// JSON-direct mode: await the single response and return as
603+
// application/json, eliminating SSE framing overhead.
604+
// Allowed by MCP Streamable HTTP spec (2025-06-18).
605+
let cancel = self.config.cancellation_token.child_token();
606+
match tokio::select! {
607+
res = receiver.recv() => res,
608+
_ = cancel.cancelled() => None,
609+
} {
610+
Some(message) => {
611+
tracing::info!(?message);
612+
let body = serde_json::to_vec(&message).map_err(|e| {
613+
internal_error_response("serialize json response")(e)
614+
})?;
615+
Ok(Response::builder()
616+
.status(http::StatusCode::OK)
617+
.header(http::header::CONTENT_TYPE, JSON_MIME_TYPE)
618+
.body(Full::new(Bytes::from(body)).boxed())
619+
.expect("valid response"))
620+
}
621+
None => Err(internal_error_response("empty response")(
622+
std::io::Error::new(
623+
std::io::ErrorKind::UnexpectedEof,
624+
"no response message received from handler",
625+
),
626+
)),
602627
}
603-
});
604-
Ok(sse_stream_response(
605-
stream,
606-
self.config.sse_keep_alive,
607-
self.config.cancellation_token.child_token(),
608-
))
628+
} else {
629+
// SSE mode (default): original behaviour preserved unchanged
630+
let stream = ReceiverStream::new(receiver).map(|message| {
631+
tracing::info!(?message);
632+
ServerSseMessage {
633+
event_id: None,
634+
message: Some(Arc::new(message)),
635+
retry: None,
636+
}
637+
});
638+
Ok(sse_stream_response(
639+
stream,
640+
self.config.sse_keep_alive,
641+
self.config.cancellation_token.child_token(),
642+
))
643+
}
609644
}
610645
ClientJsonRpcMessage::Notification(_notification) => {
611646
// ignore

crates/rmcp/tests/test_sse_concurrent_streams.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ async fn start_test_server(ct: CancellationToken, trigger: Arc<Notify>) -> Strin
8484
sse_keep_alive: Some(Duration::from_secs(15)),
8585
sse_retry: Some(Duration::from_secs(3)),
8686
cancellation_token: ct.child_token(),
87+
..Default::default()
8788
},
8889
);
8990

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
use rmcp::transport::streamable_http_server::{
2+
StreamableHttpServerConfig, StreamableHttpService, session::local::LocalSessionManager,
3+
};
4+
use tokio_util::sync::CancellationToken;
5+
6+
mod common;
7+
use common::calculator::Calculator;
8+
9+
const INIT_BODY: &str = r#"{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2025-03-26","capabilities":{},"clientInfo":{"name":"test","version":"1.0"}}}"#;
10+
11+
async fn spawn_server(
12+
config: StreamableHttpServerConfig,
13+
) -> (reqwest::Client, String, CancellationToken) {
14+
let ct = config.cancellation_token.clone();
15+
let service: StreamableHttpService<Calculator, LocalSessionManager> =
16+
StreamableHttpService::new(|| Ok(Calculator::new()), Default::default(), config);
17+
18+
let router = axum::Router::new().nest_service("/mcp", service);
19+
let tcp_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
20+
let addr = tcp_listener.local_addr().unwrap();
21+
22+
tokio::spawn({
23+
let ct = ct.clone();
24+
async move {
25+
let _ = axum::serve(tcp_listener, router)
26+
.with_graceful_shutdown(async move { ct.cancelled_owned().await })
27+
.await;
28+
}
29+
});
30+
31+
let client = reqwest::Client::new();
32+
let base_url = format!("http://{addr}/mcp");
33+
(client, base_url, ct)
34+
}
35+
36+
#[tokio::test]
37+
async fn stateless_json_response_returns_application_json() -> anyhow::Result<()> {
38+
let ct = CancellationToken::new();
39+
let (client, url, ct) = spawn_server(StreamableHttpServerConfig {
40+
stateful_mode: false,
41+
json_response: true,
42+
sse_keep_alive: None,
43+
cancellation_token: ct.child_token(),
44+
..Default::default()
45+
})
46+
.await;
47+
48+
let response = client
49+
.post(&url)
50+
.header("Content-Type", "application/json")
51+
.header("Accept", "application/json, text/event-stream")
52+
.body(INIT_BODY)
53+
.send()
54+
.await?;
55+
56+
assert_eq!(response.status(), 200);
57+
58+
let content_type = response
59+
.headers()
60+
.get("content-type")
61+
.and_then(|v| v.to_str().ok())
62+
.unwrap_or("");
63+
assert!(
64+
content_type.contains("application/json"),
65+
"Expected application/json, got: {content_type}"
66+
);
67+
68+
let body = response.text().await?;
69+
let parsed: serde_json::Value = serde_json::from_str(&body)?;
70+
assert_eq!(parsed["jsonrpc"], "2.0");
71+
assert_eq!(parsed["id"], 1);
72+
assert!(parsed["result"].is_object(), "Expected result object");
73+
74+
ct.cancel();
75+
Ok(())
76+
}
77+
78+
#[tokio::test]
79+
async fn stateless_sse_mode_default_unchanged() -> anyhow::Result<()> {
80+
let ct = CancellationToken::new();
81+
let (client, url, ct) = spawn_server(StreamableHttpServerConfig {
82+
stateful_mode: false,
83+
json_response: false,
84+
sse_keep_alive: None,
85+
cancellation_token: ct.child_token(),
86+
..Default::default()
87+
})
88+
.await;
89+
90+
let response = client
91+
.post(&url)
92+
.header("Content-Type", "application/json")
93+
.header("Accept", "application/json, text/event-stream")
94+
.body(INIT_BODY)
95+
.send()
96+
.await?;
97+
98+
assert_eq!(response.status(), 200);
99+
100+
let content_type = response
101+
.headers()
102+
.get("content-type")
103+
.and_then(|v| v.to_str().ok())
104+
.unwrap_or("");
105+
assert!(
106+
content_type.contains("text/event-stream"),
107+
"Expected text/event-stream, got: {content_type}"
108+
);
109+
110+
let body = response.text().await?;
111+
assert!(
112+
body.contains("data:"),
113+
"Expected SSE framing (data: prefix), got: {body}"
114+
);
115+
116+
ct.cancel();
117+
Ok(())
118+
}
119+
120+
#[tokio::test]
121+
async fn json_response_ignored_in_stateful_mode() -> anyhow::Result<()> {
122+
let ct = CancellationToken::new();
123+
// json_response: true has no effect when stateful_mode: true — server still uses SSE
124+
let (client, url, ct) = spawn_server(StreamableHttpServerConfig {
125+
stateful_mode: true,
126+
json_response: true,
127+
sse_keep_alive: None,
128+
cancellation_token: ct.child_token(),
129+
..Default::default()
130+
})
131+
.await;
132+
133+
let response = client
134+
.post(&url)
135+
.header("Content-Type", "application/json")
136+
.header("Accept", "application/json, text/event-stream")
137+
.body(INIT_BODY)
138+
.send()
139+
.await?;
140+
141+
assert_eq!(response.status(), 200);
142+
143+
let content_type = response
144+
.headers()
145+
.get("content-type")
146+
.and_then(|v| v.to_str().ok())
147+
.unwrap_or("");
148+
assert!(
149+
content_type.contains("text/event-stream"),
150+
"Stateful mode should always use SSE regardless of json_response, got: {content_type}"
151+
);
152+
153+
ct.cancel();
154+
Ok(())
155+
}

0 commit comments

Comments
 (0)