diff --git a/README.md b/README.md
index c1f5f10c6..34133a796 100644
--- a/README.md
+++ b/README.md
@@ -20,9 +20,9 @@ For comprehensive guides and SDK API documentation
- [Java MCP Server](https://modelcontextprotocol.github.io/java-sdk/server/) - Learn how to implement and configure a MCP servers.
#### Spring AI MCP documentation
-[Spring AI MCP](https://docs.spring.io/spring-ai/reference/api/mcp/mcp-overview.html) extends the MCP Java SDK with Spring Boot integration, providing both [client](https://docs.spring.io/spring-ai/reference/api/mcp/mcp-client-boot-starter-docs.html) and [server](https://docs.spring.io/spring-ai/reference/api/mcp/mcp-server-boot-starter-docs.html) starters.
-The [MCP Annotations](https://docs.spring.io/spring-ai/reference/api/mcp/mcp-annotations-overview.html) - provides annotation-based method handling for MCP servers and clients in Java.
-The [MCP Security](https://docs.spring.io/spring-ai/reference/api/mcp/mcp-security.html) - provides comprehensive OAuth 2.0 and API key-based security support for Model Context Protocol implementations in Spring AI.
+[Spring AI MCP](https://docs.spring.io/spring-ai/reference/2.0-SNAPSHOT/api/mcp/mcp-overview.html) extends the MCP Java SDK with Spring Boot integration, providing both [client](https://docs.spring.io/spring-ai/reference/2.0-SNAPSHOT/api/mcp/mcp-client-boot-starter-docs.html) and [server](https://docs.spring.io/spring-ai/reference/2.0-SNAPSHOT/api/mcp/mcp-server-boot-starter-docs.html) starters.
+The [MCP Annotations](https://docs.spring.io/spring-ai/reference/2.0-SNAPSHOT/api/mcp/mcp-annotations-overview.html) - provides annotation-based method handling for MCP servers and clients in Java.
+The [MCP Security](https://docs.spring.io/spring-ai/reference/2.0-SNAPSHOT/api/mcp/mcp-security.html) - provides comprehensive OAuth 2.0 and API key-based security support for Model Context Protocol implementations in Spring AI.
Bootstrap your AI applications with MCP support using [Spring Initializer](https://start.spring.io).
## Development
@@ -139,21 +139,21 @@ MCP supports both clients (applications consuming MCP servers) and servers (appl
#### Client Transport in the SDK
-* **SDK Choice**: JDK HttpClient (Java 11+) as the default client, with optional Spring WebClient support
+* **SDK Choice**: JDK HttpClient (Java 11+) as the default client
-* **Why**: The JDK HttpClient is built-in, portable, and supports streaming responses. This keeps the default lightweight with no extra dependencies. Spring WebClient support is available for Spring-based projects.
+* **Why**: The JDK HttpClient is built-in, portable, and supports streaming responses. This keeps the default lightweight with no extra dependencies.
-* **How we expose it**: MCP Client APIs are transport-agnostic. The core module ships with JDK HttpClient transport. A Spring module provides WebClient integration.
+* **How we expose it**: MCP Client APIs are transport-agnostic. The core module ships with JDK HttpClient transport. Spring WebClient-based transport is available in [Spring AI](https://docs.spring.io/spring-ai/reference/2.0-SNAPSHOT/api/mcp/mcp-overview.html) 2.0+.
* **How it fits the SDK**: This ensures all applications can talk to MCP servers out of the box, while allowing richer integration in Spring and other environments.
#### Server Transport in the SDK
-* **SDK Choice**: Jakarta Servlet implementation in core, with optional Spring WebFlux and Spring WebMVC providers
+* **SDK Choice**: Jakarta Servlet implementation in core
-* **Why**: Servlet is the most widely deployed Java server API. WebFlux and WebMVC cover a significant part of the Spring community. Together these provide reach across blocking and non-blocking models.
+* **Why**: Servlet is the most widely deployed Java server API, providing broad reach across blocking and non-blocking models without additional dependencies.
-* **How we expose it**: Server APIs are transport-agnostic. Core includes Servlet support. Spring modules extend support for WebFlux and WebMVC.
+* **How we expose it**: Server APIs are transport-agnostic. Core includes Servlet support. Spring WebFlux and WebMVC server transports are available in [Spring AI](https://docs.spring.io/spring-ai/reference/2.0-SNAPSHOT/api/mcp/mcp-overview.html) 2.0+.
* **How it fits the SDK**: This allows developers to expose MCP servers in the most common Java environments today, while enabling other transport implementations such as Netty, Vert.x, or Helidon.
@@ -176,9 +176,10 @@ The SDK is organized into modules to separate concerns and allow adopters to bri
* `mcp-json-jackson3` – Jackson 3 implementation of JSON binding
* `mcp` – Convenience bundle (core + Jackson 3)
* `mcp-test` – Shared testing utilities
-* `mcp-spring` – Spring integrations (WebClient, WebFlux, WebMVC)
-For example, a minimal adopter may depend only on `mcp` (core + Jackson), while a Spring-based application can use `mcp-spring` for deeper framework integration.
+Spring integrations (WebClient, WebFlux, WebMVC) are now part of [Spring AI](https://docs.spring.io/spring-ai/reference/2.0-SNAPSHOT/api/mcp/mcp-overview.html) 2.0+ (group `org.springframework.ai`).
+
+For example, a minimal adopter may depend only on `mcp` (core + Jackson), while a Spring-based application can use the Spring AI `mcp-spring-webflux` or `mcp-spring-webmvc` artifacts for deeper framework integration.
Additionally, `mcp-test` contains integration tests for `mcp-core`.
`mcp-core` needs a JSON implementation to run full integration tests.
diff --git a/SECURITY.md b/SECURITY.md
index 74e9880fd..502924200 100644
--- a/SECURITY.md
+++ b/SECURITY.md
@@ -1,21 +1,21 @@
# Security Policy
-Thank you for helping us keep the SDKs and systems they interact with secure.
+Thank you for helping keep the Model Context Protocol and its ecosystem secure.
## Reporting Security Issues
-This SDK is maintained by [Anthropic](https://www.anthropic.com/) as part of the Model
-Context Protocol project.
+If you discover a security vulnerability in this repository, please report it through
+the [GitHub Security Advisory process](https://docs.github.com/en/code-security/security-advisories/guidance-on-reporting-and-writing-information-about-vulnerabilities/privately-reporting-a-security-vulnerability)
+for this repository.
-The security of our systems and user data is Anthropic’s top priority. We appreciate the
-work of security researchers acting in good faith in identifying and reporting potential
-vulnerabilities.
+Please **do not** report security vulnerabilities through public GitHub issues, discussions,
+or pull requests.
-Our security program is managed on HackerOne and we ask that any validated vulnerability
-in this functionality be reported through their
-[submission form](https://hackerone.com/anthropic-vdp/reports/new?type=team&report_type=vulnerability).
+## What to Include
-## Vulnerability Disclosure Program
+To help us triage and respond quickly, please include:
-Our Vulnerability Program Guidelines are defined on our
-[HackerOne program page](https://hackerone.com/anthropic-vdp).
\ No newline at end of file
+- A description of the vulnerability
+- Steps to reproduce the issue
+- The potential impact
+- Any suggested fixes (optional)
diff --git a/docs/blog/index.md b/docs/blog/index.md
index 05761ac57..e61459078 100644
--- a/docs/blog/index.md
+++ b/docs/blog/index.md
@@ -1 +1 @@
-# Blog
+# News
diff --git a/docs/client.md b/docs/client.md
index 29cfcc3b7..6a99928c5 100644
--- a/docs/client.md
+++ b/docs/client.md
@@ -19,7 +19,8 @@ The MCP Client is a key component in the Model Context Protocol (MCP) architectu
!!! tip
The core `io.modelcontextprotocol.sdk:mcp` module provides STDIO, SSE, and Streamable HTTP client transport implementations without requiring external web frameworks.
- Spring-specific transport implementations are available as an **optional** dependency `io.modelcontextprotocol.sdk:mcp-spring-webflux` for [Spring Framework](https://docs.spring.io/spring-ai/reference/api/mcp/mcp-client-boot-starter-docs.html) users.
+ The Spring-specific WebFlux transport (`mcp-spring-webflux`) is now part of [Spring AI](https://docs.spring.io/spring-ai/reference/2.0-SNAPSHOT/api/mcp/mcp-overview.html) 2.0+ (group `org.springframework.ai`) and is no longer shipped by this SDK.
+ See the [MCP Client Boot Starter](https://docs.spring.io/spring-ai/reference/2.0-SNAPSHOT/api/mcp/mcp-client-boot-starter-docs.html) documentation for Spring-based client setup.
The client provides both synchronous and asynchronous APIs for flexibility in different application contexts.
@@ -135,26 +136,20 @@ The client provides both synchronous and asynchronous APIs for flexibility in di
The transport layer handles the communication between MCP clients and servers, providing different implementations for various use cases. The client transport manages message serialization, connection establishment, and protocol-specific communication patterns.
-=== "STDIO"
+### STDIO
- Creates transport for process-based communication using stdin/stdout:
+Creates transport for process-based communication using stdin/stdout:
- ```java
- ServerParameters params = ServerParameters.builder("npx")
- .args("-y", "@modelcontextprotocol/server-everything", "dir")
- .build();
- McpTransport transport = new StdioClientTransport(params);
- ```
-
-=== "SSE (HttpClient)"
-
- Creates a framework-agnostic (pure Java API) SSE client transport. Included in the core `mcp` module:
+```java
+ServerParameters params = ServerParameters.builder("npx")
+ .args("-y", "@modelcontextprotocol/server-everything", "dir")
+ .build();
+McpTransport transport = new StdioClientTransport(params);
+```
- ```java
- McpTransport transport = new HttpClientSseClientTransport("http://your-mcp-server");
- ```
+### Streamable HTTP
-=== "Streamable HTTP"
+=== "Streamable HttpClient"
Creates a Streamable HTTP client transport for efficient bidirectional communication. Included in the core `mcp` module:
@@ -172,9 +167,28 @@ The transport layer handles the communication between MCP clients and servers, p
- Custom HTTP request customization
- Multiple protocol version negotiation
-=== "SSE (WebFlux)"
+=== "Streamable WebClient (external)"
+
+ Creates Streamable HTTP WebClient-based client transport. Requires the `mcp-spring-webflux` dependency from [Spring AI](https://docs.spring.io/spring-ai/reference/2.0-SNAPSHOT/api/mcp/mcp-overview.html) 2.0+ (group `org.springframework.ai`):
- Creates WebFlux-based SSE client transport. Requires the `mcp-spring-webflux` dependency:
+ ```java
+ McpTransport transport = WebFluxSseClientTransport
+ .builder(WebClient.builder().baseUrl("http://your-mcp-server"))
+ .build();
+ ```
+
+### SSE HTTP (Legacy)
+
+=== "SSE HttpClient"
+
+ Creates a framework-agnostic (pure Java API) SSE client transport. Included in the core `mcp` module:
+
+ ```java
+ McpTransport transport = new HttpClientSseClientTransport("http://your-mcp-server");
+ ```
+=== "SSE WebClient (external)"
+
+ Creates WebFlux-based SSE client transport. Requires the `mcp-spring-webflux` dependency from [Spring AI](https://docs.spring.io/spring-ai/reference/2.0-SNAPSHOT/api/mcp/mcp-overview.html) 2.0+ (group `org.springframework.ai`):
```java
WebClient.Builder webClientBuilder = WebClient.builder()
@@ -182,6 +196,7 @@ The transport layer handles the communication between MCP clients and servers, p
McpTransport transport = new WebFluxSseClientTransport(webClientBuilder);
```
+
## Client Capabilities
The client can be configured with various capabilities:
diff --git a/docs/index.md b/docs/index.md
index 71dcecfa1..e6062b5ff 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -1,5 +1,5 @@
---
-title: Overview
+title: Index
description: Introduction to the Model Context Protocol (MCP) Java SDK
---
@@ -27,7 +27,7 @@ enables standardized integration between AI models and tools.
- Java HttpClient-based SSE client transport for HTTP SSE Client-side streaming
- Servlet-based SSE server transport for HTTP SSE Server streaming
- [Streamable HTTP](https://modelcontextprotocol.io/specification/2025-11-25/basic/transports#streamable-http) transport for efficient bidirectional communication (client and server)
- - Optional Spring-based transports (convenience if using Spring Framework):
+ - Optional Spring-based transports (available in [Spring AI](https://docs.spring.io/spring-ai/reference/2.0-SNAPSHOT/api/mcp/mcp-overview.html) 2.0+, no longer part of this SDK):
- WebFlux SSE client and server transports for reactive HTTP streaming
- WebFlux Streamable HTTP server transport
- WebMVC SSE server transport for servlet-based HTTP streaming
@@ -41,56 +41,9 @@ enables standardized integration between AI models and tools.
!!! tip
The core `io.modelcontextprotocol.sdk:mcp` module provides default STDIO, SSE, and Streamable HTTP client and server transport implementations without requiring external web frameworks.
- Spring-specific transports are available as optional dependencies for convenience when using the [MCP Client Boot Starter](https://docs.spring.io/spring-ai/reference/api/mcp/mcp-client-boot-starter-docs.html) and [MCP Server Boot Starter](https://docs.spring.io/spring-ai/reference/api/mcp/mcp-server-boot-starter-docs.html).
- Also consider the [MCP Annotations](https://docs.spring.io/spring-ai/reference/api/mcp/mcp-annotations-overview.html) and [MCP Security](https://docs.spring.io/spring-ai/reference/api/mcp/mcp-security.html).
-
-## Architecture
-
-The SDK follows a layered architecture with clear separation of concerns:
-
-
-
-- **Client/Server Layer (McpClient/McpServer)**: Both use McpSession for sync/async operations,
- with McpClient handling client-side protocol operations and McpServer managing server-side protocol operations.
-- **Session Layer (McpSession)**: Manages communication patterns and state.
-- **Transport Layer (McpTransport)**: Handles JSON-RPC message serialization/deserialization via:
- - StdioTransport (stdin/stdout) in the core module
- - HTTP SSE transports in dedicated transport modules (Java HttpClient, Spring WebFlux, Spring WebMVC)
- - Streamable HTTP transports for efficient bidirectional communication
-
-The MCP Client is a key component in the Model Context Protocol (MCP) architecture, responsible for establishing and managing connections with MCP servers.
-It implements the client-side of the protocol.
-
-
-
-The MCP Server is a foundational component in the Model Context Protocol (MCP) architecture that provides tools, resources, and capabilities to clients.
-It implements the server-side of the protocol.
-
-
-
-Key Interactions:
-
-- **Client/Server Initialization**: Transport setup, protocol compatibility check, capability negotiation, and implementation details exchange.
-- **Message Flow**: JSON-RPC message handling with validation, type-safe response processing, and error handling.
-- **Resource Management**: Resource discovery, URI template-based access, subscription system, and content retrieval.
-
-## Module Structure
-
-The SDK is organized into modules to separate concerns and allow adopters to bring in only what they need:
-
-| Module | Artifact ID | Purpose |
-|--------|------------|---------|
-| `mcp-bom` | `mcp-bom` | Bill of Materials for dependency management |
-| `mcp-core` | `mcp-core` | Core reference implementation (STDIO, JDK HttpClient, Servlet, Streamable HTTP) |
-| `mcp-json-jackson2` | `mcp-json-jackson2` | Jackson 2.x JSON serialization implementation |
-| `mcp-json-jackson3` | `mcp-json-jackson3` | Jackson 3.x JSON serialization implementation |
-| `mcp` | `mcp` | Convenience bundle (`mcp-core` + `mcp-json-jackson3`) |
-| `mcp-test` | `mcp-test` | Shared testing utilities and integration tests |
-| `mcp-spring-webflux` | `mcp-spring-webflux` | Spring WebFlux integration (SSE and Streamable HTTP) |
-| `mcp-spring-webmvc` | `mcp-spring-webmvc` | Spring WebMVC integration (SSE and Streamable HTTP) |
-
-!!! tip
- A minimal adopter may depend only on `mcp` (core + Jackson 3), while a Spring-based application can add `mcp-spring-webflux` or `mcp-spring-webmvc` for deeper framework integration.
+ Spring-specific transports (WebFlux, WebMVC) are now part of [Spring AI](https://docs.spring.io/spring-ai/reference/2.0-SNAPSHOT/api/mcp/mcp-overview.html) 2.0+ and are no longer shipped by this SDK.
+ Use the [MCP Client Boot Starter](https://docs.spring.io/spring-ai/reference/2.0-SNAPSHOT/api/mcp/mcp-client-boot-starter-docs.html) and [MCP Server Boot Starter](https://docs.spring.io/spring-ai/reference/2.0-SNAPSHOT/api/mcp/mcp-server-boot-starter-docs.html) from Spring AI.
+ Also consider the [MCP Annotations](https://docs.spring.io/spring-ai/reference/2.0-SNAPSHOT/api/mcp/mcp-annotations-overview.html) and [MCP Security](https://docs.spring.io/spring-ai/reference/2.0-SNAPSHOT/api/mcp/mcp-security.html).
## Next Steps
diff --git a/docs/overview.md b/docs/overview.md
new file mode 100644
index 000000000..9084b6a6a
--- /dev/null
+++ b/docs/overview.md
@@ -0,0 +1,93 @@
+---
+title: Overview
+description: Introduction to the Model Context Protocol (MCP) Java SDK
+---
+
+# Overview
+
+## Architecture
+
+The SDK follows a layered architecture with clear separation of concerns:
+
+
+
+- **Client/Server Layer (McpClient/McpServer)**: Both use McpSession for sync/async operations,
+ with McpClient handling client-side protocol operations and McpServer managing server-side protocol operations.
+- **Session Layer (McpSession)**: Manages communication patterns and state.
+- **Transport Layer (McpTransport)**: Handles JSON-RPC message serialization/deserialization via:
+ - StdioTransport (stdin/stdout) in the core module
+ - HTTP SSE transports in dedicated transport modules (Java HttpClient, Servlet)
+ - Streamable HTTP transports for efficient bidirectional communication
+ - Spring WebFlux and Spring WebMVC transports (available in [Spring AI](https://docs.spring.io/spring-ai/reference/2.0-SNAPSHOT/api/mcp/mcp-overview.html) 2.0+)
+
+The MCP Client is a key component in the Model Context Protocol (MCP) architecture, responsible for establishing and managing connections with MCP servers.
+It implements the client-side of the protocol.
+
+
+
+The MCP Server is a foundational component in the Model Context Protocol (MCP) architecture that provides tools, resources, and capabilities to clients.
+It implements the server-side of the protocol.
+
+
+
+Key Interactions:
+
+- **Client/Server Initialization**: Transport setup, protocol compatibility check, capability negotiation, and implementation details exchange.
+- **Message Flow**: JSON-RPC message handling with validation, type-safe response processing, and error handling.
+- **Resource Management**: Resource discovery, URI template-based access, subscription system, and content retrieval.
+
+## Module Structure
+
+The SDK is organized into modules to separate concerns and allow adopters to bring in only what they need:
+
+| Module | Artifact ID | Group | Purpose |
+|--------|------------|-------|---------|
+| `mcp-bom` | `mcp-bom` | `io.modelcontextprotocol.sdk` | Bill of Materials for dependency management |
+| `mcp-core` | `mcp-core` | `io.modelcontextprotocol.sdk` | Core reference implementation (STDIO, JDK HttpClient, Servlet, Streamable HTTP) |
+| `mcp-json-jackson2` | `mcp-json-jackson2` | `io.modelcontextprotocol.sdk` | Jackson 2.x JSON serialization implementation |
+| `mcp-json-jackson3` | `mcp-json-jackson3` | `io.modelcontextprotocol.sdk` | Jackson 3.x JSON serialization implementation |
+| `mcp` | `mcp` | `io.modelcontextprotocol.sdk` | Convenience bundle (`mcp-core` + `mcp-json-jackson3`) |
+| `mcp-test` | `mcp-test` | `io.modelcontextprotocol.sdk` | Shared testing utilities and integration tests |
+| `mcp-spring-webflux` _(external)_ | `mcp-spring-webflux` | `org.springframework.ai` | Spring WebFlux integration — part of [Spring AI](https://docs.spring.io/spring-ai/reference/2.0-SNAPSHOT/api/mcp/mcp-overview.html) 2.0+ |
+| `mcp-spring-webmvc` _(external)_ | `mcp-spring-webmvc` | `org.springframework.ai` | Spring WebMVC integration — part of [Spring AI](https://docs.spring.io/spring-ai/reference/2.0-SNAPSHOT/api/mcp/mcp-overview.html) 2.0+ |
+
+!!! tip
+ A minimal adopter may depend only on `mcp` (core + Jackson 3). Spring-based applications should use the `mcp-spring-webflux` or `mcp-spring-webmvc` artifacts from [Spring AI](https://docs.spring.io/spring-ai/reference/2.0-SNAPSHOT/api/mcp/mcp-overview.html) 2.0+ (group `org.springframework.ai`), no longer part of this SDK.
+
+## Next Steps
+
+
+
+- :rocket:{ .lg .middle } **Quickstart**
+
+ ---
+
+ Get started with dependencies and BOM configuration.
+
+ [:octicons-arrow-right-24: Quickstart](quickstart.md)
+
+- :material-monitor:{ .lg .middle } **MCP Client**
+
+ ---
+
+ Learn how to create and configure MCP clients.
+
+ [:octicons-arrow-right-24: Client](client.md)
+
+- :material-server:{ .lg .middle } **MCP Server**
+
+ ---
+
+ Learn how to implement and configure MCP servers.
+
+ [:octicons-arrow-right-24: Server](server.md)
+
+- :fontawesome-brands-github:{ .lg .middle } **GitHub**
+
+ ---
+
+ View the source code and contribute.
+
+ [:octicons-arrow-right-24: Repository](https://github.com/modelcontextprotocol/java-sdk)
+
+
diff --git a/docs/quickstart.md b/docs/quickstart.md
index 23cf2f75b..1ef69be04 100644
--- a/docs/quickstart.md
+++ b/docs/quickstart.md
@@ -44,22 +44,25 @@ Add the following dependency to your project:
```
- If you're using the Spring Framework and want Spring-specific transport implementations, add one of the following optional dependencies:
+ If you're using Spring Framework, the Spring-specific transport implementations are now part of [Spring AI](https://docs.spring.io/spring-ai/reference/2.0-SNAPSHOT/api/mcp/mcp-overview.html) 2.0+ (group `org.springframework.ai`):
```xml
-
+
- io.modelcontextprotocol.sdk
+ org.springframework.aimcp-spring-webflux
-
+
- io.modelcontextprotocol.sdk
+ org.springframework.aimcp-spring-webmvc
```
+ !!! note
+ When using the `spring-ai-bom` or Spring AI starter dependencies (`spring-ai-starter-mcp-server-webflux`, `spring-ai-starter-mcp-server-webmvc`, `spring-ai-starter-mcp-client-webflux`) no explicit version is needed — the BOM manages it automatically.
+
=== "Gradle"
The convenience `mcp` module bundles `mcp-core` with Jackson 3.x JSON serialization:
@@ -89,17 +92,17 @@ Add the following dependency to your project:
}
```
- If you're using the Spring Framework and want Spring-specific transport implementations, add one of the following optional dependencies:
+ If you're using Spring Framework, the Spring-specific transport implementations are now part of [Spring AI](https://docs.spring.io/spring-ai/reference/2.0-SNAPSHOT/api/mcp/mcp-overview.html) 2.0+ (group `org.springframework.ai`):
```groovy
- // Optional: Spring WebFlux-based SSE and Streamable HTTP client and server transport
+ // Optional: Spring WebFlux-based SSE and Streamable HTTP client and server transport (Spring AI 2.0+)
dependencies {
- implementation "io.modelcontextprotocol.sdk:mcp-spring-webflux"
+ implementation "org.springframework.ai:mcp-spring-webflux"
}
- // Optional: Spring WebMVC-based SSE and Streamable HTTP server transport
+ // Optional: Spring WebMVC-based SSE and Streamable HTTP server transport (Spring AI 2.0+)
dependencies {
- implementation "io.modelcontextprotocol.sdk:mcp-spring-webmvc"
+ implementation "org.springframework.ai:mcp-spring-webmvc"
}
```
@@ -153,8 +156,8 @@ The following dependencies are available and managed by the BOM:
- **JSON Serialization**
- `io.modelcontextprotocol.sdk:mcp-json-jackson3` - Jackson 3.x JSON serialization implementation (included in `mcp` bundle).
- `io.modelcontextprotocol.sdk:mcp-json-jackson2` - Jackson 2.x JSON serialization implementation for projects that require Jackson 2.x compatibility.
-- **Optional Transport Dependencies** (convenience if using Spring Framework)
- - `io.modelcontextprotocol.sdk:mcp-spring-webflux` - WebFlux-based SSE and Streamable HTTP transport implementation for reactive applications.
- - `io.modelcontextprotocol.sdk:mcp-spring-webmvc` - WebMVC-based SSE and Streamable HTTP transport implementation for servlet-based applications.
+- **Optional Spring Transport Dependencies** (part of [Spring AI](https://docs.spring.io/spring-ai/reference/2.0-SNAPSHOT/api/mcp/mcp-overview.html) 2.0+, group `org.springframework.ai`)
+ - `org.springframework.ai:mcp-spring-webflux` - WebFlux-based SSE and Streamable HTTP transport implementation for reactive applications.
+ - `org.springframework.ai:mcp-spring-webmvc` - WebMVC-based SSE and Streamable HTTP transport implementation for servlet-based applications.
- **Testing Dependencies**
- `io.modelcontextprotocol.sdk:mcp-test` - Testing utilities and support for MCP-based applications.
diff --git a/docs/server.md b/docs/server.md
index 3c05aee30..0753726e2 100644
--- a/docs/server.md
+++ b/docs/server.md
@@ -21,7 +21,8 @@ The MCP Server is a foundational component in the Model Context Protocol (MCP) a
!!! tip
The core `io.modelcontextprotocol.sdk:mcp` module provides STDIO, SSE, and Streamable HTTP server transport implementations without requiring external web frameworks.
- Spring-specific transport implementations are available as **optional** dependencies `io.modelcontextprotocol.sdk:mcp-spring-webflux`, `io.modelcontextprotocol.sdk:mcp-spring-webmvc` for [Spring Framework](https://docs.spring.io/spring-ai/reference/api/mcp/mcp-client-boot-starter-docs.html) users.
+ Spring-specific transport implementations (`mcp-spring-webflux`, `mcp-spring-webmvc`) are now part of [Spring AI](https://docs.spring.io/spring-ai/reference/2.0-SNAPSHOT/api/mcp/mcp-overview.html) 2.0+ (group `org.springframework.ai`) and are no longer shipped by this SDK.
+ See the [MCP Server Boot Starter](https://docs.spring.io/spring-ai/reference/2.0-SNAPSHOT/api/mcp/mcp-server-boot-starter-docs.html) documentation for Spring-based server setup.
The server supports both synchronous and asynchronous APIs, allowing for flexible integration in different application contexts.
@@ -104,25 +105,27 @@ The transport layer in the MCP SDK is responsible for handling the communication
It provides different implementations to support various communication protocols and patterns.
The SDK includes several built-in transport provider implementations:
-=== "STDIO"
+### STDIO
- Create process-based transport using stdin/stdout:
+Create process-based transport using stdin/stdout:
- ```java
- StdioServerTransportProvider transportProvider =
- new StdioServerTransportProvider(new ObjectMapper());
- ```
+```java
+StdioServerTransportProvider transportProvider =
+ new StdioServerTransportProvider(new ObjectMapper());
+```
- Provides bidirectional JSON-RPC message handling over standard input/output streams with non-blocking message processing, serialization/deserialization, and graceful shutdown support.
+Provides bidirectional JSON-RPC message handling over standard input/output streams with non-blocking message processing, serialization/deserialization, and graceful shutdown support.
- Key features:
+Key features:
- - Bidirectional communication through stdin/stdout
- - Process-based integration support
- - Simple setup and configuration
- - Lightweight implementation
+- Bidirectional communication through stdin/stdout
+- Process-based integration support
+- Simple setup and configuration
+- Lightweight implementation
-=== "Streamable HTTP (Servlet)"
+### Streamable HTTP
+
+=== "Streamable HTTP Servlet"
Creates a Servlet-based Streamable HTTP server transport. Included in the core `mcp` module:
@@ -165,9 +168,9 @@ The SDK includes several built-in transport provider implementations:
- Security validation support
- Graceful shutdown support
-=== "Streamable HTTP (WebFlux)"
+=== "Streamable HTTP WebFlux (external)"
- Creates WebFlux-based Streamable HTTP server transport. Requires the `mcp-spring-webflux` dependency:
+ Creates WebFlux-based Streamable HTTP server transport. Requires the `mcp-spring-webflux` dependency from [Spring AI](https://docs.spring.io/spring-ai/reference/2.0-SNAPSHOT/api/mcp/mcp-overview.html) 2.0+ (group `org.springframework.ai`):
```java
@Configuration
@@ -195,9 +198,9 @@ The SDK includes several built-in transport provider implementations:
- Configurable keep-alive intervals
- Security validation support
-=== "Streamable HTTP (WebMvc)"
+=== "Streamable HTTP WebMvc (external)"
- Creates WebMvc-based Streamable HTTP server transport. Requires the `mcp-spring-webmvc` dependency:
+ Creates WebMvc-based Streamable HTTP server transport. Requires the `mcp-spring-webmvc` dependency from [Spring AI](https://docs.spring.io/spring-ai/reference/2.0-SNAPSHOT/api/mcp/mcp-overview.html) 2.0+ (group `org.springframework.ai`):
```java
@Configuration
@@ -219,9 +222,45 @@ The SDK includes several built-in transport provider implementations:
}
```
-=== "SSE (WebFlux)"
+### SSE HTTP (Legacy)
+
+=== "SSE Servlet"
+
+ Creates a Servlet-based SSE server transport. Included in the core `mcp` module.
+ The `HttpServletSseServerTransportProvider` can be used with any Servlet container.
+ To use it with a Spring Web application, you can register it as a Servlet bean:
+
+ ```java
+ @Configuration
+ @EnableWebMvc
+ public class McpServerConfig implements WebMvcConfigurer {
+
+ @Bean
+ public HttpServletSseServerTransportProvider servletSseServerTransportProvider() {
+ return new HttpServletSseServerTransportProvider(new ObjectMapper(), "/mcp/message");
+ }
+
+ @Bean
+ public ServletRegistrationBean> customServletBean(
+ HttpServletSseServerTransportProvider transportProvider) {
+ return new ServletRegistrationBean<>(transportProvider);
+ }
+ }
+ ```
+
+ Implements the MCP HTTP with SSE transport specification using the traditional Servlet API, providing:
+
+ - Asynchronous message handling using Servlet 6.0 async support
+ - Session management for multiple client connections
+ - Two types of endpoints:
+ - SSE endpoint (`/sse`) for server-to-client events
+ - Message endpoint (configurable) for client-to-server requests
+ - Error handling and response formatting
+ - Graceful shutdown support
+
+=== "SSE WebFlux (external)"
- Creates WebFlux-based SSE server transport. Requires the `mcp-spring-webflux` dependency:
+ Creates WebFlux-based SSE server transport. Requires the `mcp-spring-webflux` dependency from [Spring AI](https://docs.spring.io/spring-ai/reference/2.0-SNAPSHOT/api/mcp/mcp-overview.html) 2.0+ (group `org.springframework.ai`):
```java
@Configuration
@@ -245,9 +284,9 @@ The SDK includes several built-in transport provider implementations:
- Message routing and session management
- Graceful shutdown capabilities
-=== "SSE (WebMvc)"
+=== "SSE WebMvc (external)"
- Creates WebMvc-based SSE server transport. Requires the `mcp-spring-webmvc` dependency:
+ Creates WebMvc-based SSE server transport. Requires the `mcp-spring-webmvc` dependency from [Spring AI](https://docs.spring.io/spring-ai/reference/2.0-SNAPSHOT/api/mcp/mcp-overview.html) 2.0+ (group `org.springframework.ai`):
```java
@Configuration
@@ -273,39 +312,6 @@ The SDK includes several built-in transport provider implementations:
- Support for traditional web applications
- Synchronous operation handling
-=== "SSE (Servlet)"
-
- Creates a Servlet-based SSE server transport. Included in the core `mcp` module.
- The `HttpServletSseServerTransportProvider` can be used with any Servlet container.
- To use it with a Spring Web application, you can register it as a Servlet bean:
-
- ```java
- @Configuration
- @EnableWebMvc
- public class McpServerConfig implements WebMvcConfigurer {
-
- @Bean
- public HttpServletSseServerTransportProvider servletSseServerTransportProvider() {
- return new HttpServletSseServerTransportProvider(new ObjectMapper(), "/mcp/message");
- }
-
- @Bean
- public ServletRegistrationBean> customServletBean(
- HttpServletSseServerTransportProvider transportProvider) {
- return new ServletRegistrationBean<>(transportProvider);
- }
- }
- ```
-
- Implements the MCP HTTP with SSE transport specification using the traditional Servlet API, providing:
-
- - Asynchronous message handling using Servlet 6.0 async support
- - Session management for multiple client connections
- - Two types of endpoints:
- - SSE endpoint (`/sse`) for server-to-client events
- - Message endpoint (configurable) for client-to-server requests
- - Error handling and response formatting
- - Graceful shutdown support
## Server Capabilities
diff --git a/mcp-bom/pom.xml b/mcp-bom/pom.xml
index ce24f9b11..b43b703fa 100644
--- a/mcp-bom/pom.xml
+++ b/mcp-bom/pom.xml
@@ -54,20 +54,6 @@
${project.version}
-
-
- io.modelcontextprotocol.sdk
- mcp-spring-webflux
- ${project.version}
-
-
-
-
- io.modelcontextprotocol.sdk
- mcp-spring-webmvc
- ${project.version}
-
-
diff --git a/mcp-spring/mcp-spring-webflux/README.md b/mcp-spring/mcp-spring-webflux/README.md
deleted file mode 100644
index e701e41e6..000000000
--- a/mcp-spring/mcp-spring-webflux/README.md
+++ /dev/null
@@ -1,55 +0,0 @@
-# WebFlux SSE Transport
-
-```xml
-
- io.modelcontextprotocol.sdk
- mcp-spring-webflux
-
-```
-
-```java
-String MESSAGE_ENDPOINT = "/mcp/message";
-
-@Configuration
-static class MyConfig {
-
- // SSE transport
- @Bean
- public WebFluxSseServerTransport sseServerTransport() {
- return new WebFluxSseServerTransport(new ObjectMapper(), "/mcp/message");
- }
-
- // Router function for SSE transport used by Spring WebFlux to start an HTTP
- // server.
- @Bean
- public RouterFunction> mcpRouterFunction(WebFluxSseServerTransport transport) {
- return transport.getRouterFunction();
- }
-
- @Bean
- public McpAsyncServer mcpServer(ServerMcpTransport transport, OpenLibrary openLibrary) {
-
- // Configure server capabilities with resource support
- var capabilities = McpSchema.ServerCapabilities.builder()
- .resources(false, true) // No subscribe support, but list changes notifications
- .tools(true) // Tool support with list changes notifications
- .prompts(true) // Prompt support with list changes notifications
- .logging() // Logging support
- .build();
-
- // Create the server with both tool and resource capabilities
- var server = McpServer.using(transport)
- .serverInfo("MCP Demo Server", "1.0.0")
- .capabilities(capabilities)
- .resources(systemInfoResourceRegistration())
- .prompts(greetingPromptRegistration())
- .tools(openLibraryToolRegistrations(openLibrary))
- .async();
-
- return server;
- }
-
- // ...
-
-}
-```
diff --git a/mcp-spring/mcp-spring-webflux/pom.xml b/mcp-spring/mcp-spring-webflux/pom.xml
deleted file mode 100644
index 875ade2e9..000000000
--- a/mcp-spring/mcp-spring-webflux/pom.xml
+++ /dev/null
@@ -1,148 +0,0 @@
-
-
- 4.0.0
-
- io.modelcontextprotocol.sdk
- mcp-parent
- 1.0.0-SNAPSHOT
- ../../pom.xml
-
- mcp-spring-webflux
- jar
- WebFlux transports
- WebFlux implementation for the SSE and Streamable Http Client and Server transports
- https://github.com/modelcontextprotocol/java-sdk
-
-
- https://github.com/modelcontextprotocol/java-sdk
- git://github.com/modelcontextprotocol/java-sdk.git
- git@github.com/modelcontextprotocol/java-sdk.git
-
-
-
-
-
- io.modelcontextprotocol.sdk
- mcp-core
- 1.0.0-SNAPSHOT
-
-
-
- io.modelcontextprotocol.sdk
- mcp-test
- 1.0.0-SNAPSHOT
- test
-
-
-
- org.springframework
- spring-webflux
- ${springframework.version}
-
-
-
- io.modelcontextprotocol.sdk
- mcp-json-jackson2
- 1.0.0-SNAPSHOT
- test
-
-
-
- io.projectreactor.netty
- reactor-netty-http
- test
-
-
-
-
- org.springframework
- spring-context
- ${springframework.version}
- test
-
-
-
- org.springframework
- spring-test
- ${springframework.version}
- test
-
-
-
- org.assertj
- assertj-core
- ${assert4j.version}
- test
-
-
- org.junit.jupiter
- junit-jupiter-api
- ${junit.version}
- test
-
-
- org.mockito
- mockito-core
- ${mockito.version}
- test
-
-
- net.bytebuddy
- byte-buddy
- ${byte-buddy.version}
- test
-
-
- io.projectreactor
- reactor-test
- test
-
-
- org.testcontainers
- junit-jupiter
- ${testcontainers.version}
- test
-
-
- org.testcontainers
- toxiproxy
- ${toxiproxy.version}
- test
-
-
-
- org.awaitility
- awaitility
- ${awaitility.version}
- test
-
-
-
- ch.qos.logback
- logback-classic
- ${logback.version}
- test
-
-
-
- org.junit.jupiter
- junit-jupiter-params
- ${junit.version}
- test
-
-
-
- net.javacrumbs.json-unit
- json-unit-assertj
- ${json-unit-assertj.version}
- test
-
-
-
-
-
-
diff --git a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java
deleted file mode 100644
index 18e9d8ecc..000000000
--- a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java
+++ /dev/null
@@ -1,625 +0,0 @@
-/*
- * Copyright 2025-2025 the original author or authors.
- */
-
-package io.modelcontextprotocol.client.transport;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Consumer;
-import java.util.function.Function;
-
-import org.reactivestreams.Publisher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.core.ParameterizedTypeReference;
-import org.springframework.http.HttpStatus;
-import org.springframework.http.MediaType;
-import org.springframework.http.codec.ServerSentEvent;
-import org.springframework.web.reactive.function.client.ClientResponse;
-import org.springframework.web.reactive.function.client.WebClient;
-import org.springframework.web.reactive.function.client.WebClientResponseException;
-
-import io.modelcontextprotocol.client.McpAsyncClient;
-import io.modelcontextprotocol.json.McpJsonDefaults;
-import io.modelcontextprotocol.json.McpJsonMapper;
-import io.modelcontextprotocol.json.TypeRef;
-import io.modelcontextprotocol.spec.ClosedMcpTransportSession;
-import io.modelcontextprotocol.spec.DefaultMcpTransportSession;
-import io.modelcontextprotocol.spec.DefaultMcpTransportStream;
-import io.modelcontextprotocol.spec.HttpHeaders;
-import io.modelcontextprotocol.spec.McpClientTransport;
-import io.modelcontextprotocol.spec.McpError;
-import io.modelcontextprotocol.spec.McpSchema;
-import io.modelcontextprotocol.spec.McpTransportException;
-import io.modelcontextprotocol.spec.McpTransportSession;
-import io.modelcontextprotocol.spec.McpTransportSessionNotFoundException;
-import io.modelcontextprotocol.spec.McpTransportStream;
-import io.modelcontextprotocol.spec.ProtocolVersions;
-import io.modelcontextprotocol.util.Assert;
-import io.modelcontextprotocol.util.Utils;
-import reactor.core.Disposable;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-import reactor.util.function.Tuple2;
-import reactor.util.function.Tuples;
-
-/**
- * An implementation of the Streamable HTTP protocol as defined by the
- * 2025-03-26 version of the MCP specification.
- *
- *
- * The transport is capable of resumability and reconnects. It reacts to transport-level
- * session invalidation and will propagate {@link McpTransportSessionNotFoundException
- * appropriate exceptions} to the higher level abstraction layer when needed in order to
- * allow proper state management. The implementation handles servers that are stateful and
- * provide session meta information, but can also communicate with stateless servers that
- * do not provide a session identifier and do not support SSE streams.
- *
- *
- * This implementation does not handle backwards compatibility with the "HTTP
- * with SSE" transport. In order to communicate over the phased-out
- * 2024-11-05 protocol, use {@link HttpClientSseClientTransport} or
- * {@link WebFluxSseClientTransport}.
- *
- *
- * @author Dariusz Jędrzejczyk
- * @see Streamable
- * HTTP transport specification
- */
-public class WebClientStreamableHttpTransport implements McpClientTransport {
-
- private static final String MISSING_SESSION_ID = "[missing_session_id]";
-
- private static final Logger logger = LoggerFactory.getLogger(WebClientStreamableHttpTransport.class);
-
- private static final String DEFAULT_ENDPOINT = "/mcp";
-
- /**
- * Event type for JSON-RPC messages received through the SSE connection. The server
- * sends messages with this event type to transmit JSON-RPC protocol data.
- */
- private static final String MESSAGE_EVENT_TYPE = "message";
-
- private static final ParameterizedTypeReference> PARAMETERIZED_TYPE_REF = new ParameterizedTypeReference<>() {
- };
-
- private final McpJsonMapper jsonMapper;
-
- private final WebClient webClient;
-
- private final String endpoint;
-
- private final boolean openConnectionOnStartup;
-
- private final boolean resumableStreams;
-
- private final AtomicReference> activeSession = new AtomicReference<>();
-
- private final AtomicReference, Mono>> handler = new AtomicReference<>();
-
- private final AtomicReference> exceptionHandler = new AtomicReference<>();
-
- private final List supportedProtocolVersions;
-
- private final String latestSupportedProtocolVersion;
-
- private WebClientStreamableHttpTransport(McpJsonMapper jsonMapper, WebClient.Builder webClientBuilder,
- String endpoint, boolean resumableStreams, boolean openConnectionOnStartup,
- List supportedProtocolVersions) {
- this.jsonMapper = jsonMapper;
- this.webClient = webClientBuilder.build();
- this.endpoint = endpoint;
- this.resumableStreams = resumableStreams;
- this.openConnectionOnStartup = openConnectionOnStartup;
- this.activeSession.set(createTransportSession());
- this.supportedProtocolVersions = List.copyOf(supportedProtocolVersions);
- this.latestSupportedProtocolVersion = this.supportedProtocolVersions.stream()
- .sorted(Comparator.reverseOrder())
- .findFirst()
- .get();
- }
-
- @Override
- public List protocolVersions() {
- return supportedProtocolVersions;
- }
-
- /**
- * Create a stateful builder for creating {@link WebClientStreamableHttpTransport}
- * instances.
- * @param webClientBuilder the {@link WebClient.Builder} to use
- * @return a builder which will create an instance of
- * {@link WebClientStreamableHttpTransport} once {@link Builder#build()} is called
- */
- public static Builder builder(WebClient.Builder webClientBuilder) {
- return new Builder(webClientBuilder);
- }
-
- @Override
- public Mono connect(Function, Mono> handler) {
- return Mono.deferContextual(ctx -> {
- this.handler.set(handler);
- if (openConnectionOnStartup) {
- logger.debug("Eagerly opening connection on startup");
- return this.reconnect(null).then();
- }
- return Mono.empty();
- });
- }
-
- private McpTransportSession createTransportSession() {
- Function> onClose = sessionId -> sessionId == null ? Mono.empty()
- : webClient.delete()
- .uri(this.endpoint)
- .header(HttpHeaders.MCP_SESSION_ID, sessionId)
- .header(HttpHeaders.PROTOCOL_VERSION, this.latestSupportedProtocolVersion)
- .retrieve()
- .toBodilessEntity()
- .onErrorComplete(e -> {
- logger.warn("Got error when closing transport", e);
- return true;
- })
- .then();
- return new DefaultMcpTransportSession(onClose);
- }
-
- private McpTransportSession createClosedSession(McpTransportSession existingSession) {
- var existingSessionId = Optional.ofNullable(existingSession)
- .filter(session -> !(session instanceof ClosedMcpTransportSession))
- .flatMap(McpTransportSession::sessionId)
- .orElse(null);
- return new ClosedMcpTransportSession<>(existingSessionId);
- }
-
- @Override
- public void setExceptionHandler(Consumer handler) {
- logger.debug("Exception handler registered");
- this.exceptionHandler.set(handler);
- }
-
- private void handleException(Throwable t) {
- logger.debug("Handling exception for session {}", sessionIdOrPlaceholder(this.activeSession.get()), t);
- if (t instanceof McpTransportSessionNotFoundException) {
- McpTransportSession> invalidSession = this.activeSession.getAndSet(createTransportSession());
- logger.warn("Server does not recognize session {}. Invalidating.", invalidSession.sessionId());
- invalidSession.close();
- }
- Consumer handler = this.exceptionHandler.get();
- if (handler != null) {
- handler.accept(t);
- }
- }
-
- @Override
- public Mono closeGracefully() {
- return Mono.defer(() -> {
- logger.debug("Graceful close triggered");
- McpTransportSession currentSession = this.activeSession.getAndUpdate(this::createClosedSession);
- if (currentSession != null) {
- return Mono.from(currentSession.closeGracefully());
- }
- return Mono.empty();
- });
- }
-
- private Mono reconnect(McpTransportStream stream) {
- return Mono.deferContextual(ctx -> {
- if (stream != null) {
- logger.debug("Reconnecting stream {} with lastId {}", stream.streamId(), stream.lastId());
- }
- else {
- logger.debug("Reconnecting with no prior stream");
- }
- // Here we attempt to initialize the client. In case the server supports SSE,
- // we will establish a long-running
- // session here and listen for messages. If it doesn't, that's ok, the server
- // is a simple, stateless one.
- final AtomicReference disposableRef = new AtomicReference<>();
- final McpTransportSession transportSession = this.activeSession.get();
-
- Disposable connection = webClient.get()
- .uri(this.endpoint)
- .accept(MediaType.TEXT_EVENT_STREAM)
- .header(HttpHeaders.PROTOCOL_VERSION,
- ctx.getOrDefault(McpAsyncClient.NEGOTIATED_PROTOCOL_VERSION,
- this.latestSupportedProtocolVersion))
- .headers(httpHeaders -> {
- transportSession.sessionId().ifPresent(id -> httpHeaders.add(HttpHeaders.MCP_SESSION_ID, id));
- if (stream != null) {
- stream.lastId().ifPresent(id -> httpHeaders.add(HttpHeaders.LAST_EVENT_ID, id));
- }
- })
- .exchangeToFlux(response -> {
- if (isEventStream(response)) {
- logger.debug("Established SSE stream via GET");
- return eventStream(stream, response);
- }
- else if (isNotAllowed(response)) {
- logger.debug("The server does not support SSE streams, using request-response mode.");
- return Flux.empty();
- }
- else if (isNotFound(response)) {
- if (transportSession.sessionId().isPresent()) {
- String sessionIdRepresentation = sessionIdOrPlaceholder(transportSession);
- return mcpSessionNotFoundError(sessionIdRepresentation);
- }
- else {
- return this.extractError(response, MISSING_SESSION_ID);
- }
- }
- else {
- return response.createError().doOnError(e -> {
- logger.info("Opening an SSE stream failed. This can be safely ignored.", e);
- }).flux();
- }
- })
- .flatMap(jsonrpcMessage -> this.handler.get().apply(Mono.just(jsonrpcMessage)))
- .onErrorComplete(t -> {
- this.handleException(t);
- return true;
- })
- .doFinally(s -> {
- Disposable ref = disposableRef.getAndSet(null);
- if (ref != null) {
- transportSession.removeConnection(ref);
- }
- })
- .contextWrite(ctx)
- .subscribe();
-
- disposableRef.set(connection);
- transportSession.addConnection(connection);
- return Mono.just(connection);
- });
- }
-
- @Override
- public Mono sendMessage(McpSchema.JSONRPCMessage message) {
- String jsonText;
- try {
- jsonText = jsonMapper.writeValueAsString(message);
- }
- catch (IOException e) {
- return Mono.error(new RuntimeException("Failed to serialize message", e));
- }
- return Mono.create(sink -> {
- logger.debug("Sending message {}", message);
- // Here we attempt to initialize the client.
- // In case the server supports SSE, we will establish a long-running session
- // here and
- // listen for messages.
- // If it doesn't, nothing actually happens here, that's just the way it is...
- final AtomicReference disposableRef = new AtomicReference<>();
- final McpTransportSession transportSession = this.activeSession.get();
-
- Disposable connection = Flux.deferContextual(ctx -> webClient.post()
- .uri(this.endpoint)
- .contentType(MediaType.APPLICATION_JSON)
- .accept(MediaType.APPLICATION_JSON, MediaType.TEXT_EVENT_STREAM)
- .header(HttpHeaders.PROTOCOL_VERSION,
- ctx.getOrDefault(McpAsyncClient.NEGOTIATED_PROTOCOL_VERSION,
- this.latestSupportedProtocolVersion))
- .headers(httpHeaders -> {
- transportSession.sessionId().ifPresent(id -> httpHeaders.add(HttpHeaders.MCP_SESSION_ID, id));
- })
- .bodyValue(jsonText)
- .exchangeToFlux(response -> {
- if (transportSession
- .markInitialized(response.headers().asHttpHeaders().getFirst(HttpHeaders.MCP_SESSION_ID))) {
- // Once we have a session, we try to open an async stream for
- // the server to send notifications and requests out-of-band.
- reconnect(null).contextWrite(sink.contextView()).subscribe();
- }
-
- String sessionRepresentation = sessionIdOrPlaceholder(transportSession);
-
- // The spec mentions only ACCEPTED, but the existing SDKs can return
- // 200 OK for notifications
- if (response.statusCode().is2xxSuccessful()) {
- Optional contentType = response.headers().contentType();
- long contentLength = response.headers().contentLength().orElse(-1);
- // Existing SDKs consume notifications with no response body nor
- // content type
- if (contentType.isEmpty() || contentLength == 0
- || response.statusCode().equals(HttpStatus.ACCEPTED)) {
- logger.trace("Message was successfully sent via POST for session {}",
- sessionRepresentation);
- // signal the caller that the message was successfully
- // delivered
- sink.success();
- // communicate to downstream there is no streamed data coming
- return Flux.empty();
- }
- else {
- MediaType mediaType = contentType.get();
- if (mediaType.isCompatibleWith(MediaType.TEXT_EVENT_STREAM)) {
- logger.debug("Established SSE stream via POST");
- // communicate to caller that the message was delivered
- sink.success();
- // starting a stream
- return newEventStream(response, sessionRepresentation);
- }
- else if (mediaType.isCompatibleWith(MediaType.APPLICATION_JSON)) {
- logger.trace("Received response to POST for session {}", sessionRepresentation);
- // communicate to caller the message was delivered
- sink.success();
- return directResponseFlux(message, response);
- }
- else {
- logger.warn("Unknown media type {} returned for POST in session {}", contentType,
- sessionRepresentation);
- return Flux.error(new RuntimeException("Unknown media type returned: " + contentType));
- }
- }
- }
- else {
- if (isNotFound(response) && !sessionRepresentation.equals(MISSING_SESSION_ID)) {
- return mcpSessionNotFoundError(sessionRepresentation);
- }
- return this.extractError(response, sessionRepresentation);
- }
- }))
- .flatMap(jsonRpcMessage -> this.handler.get().apply(Mono.just(jsonRpcMessage)))
- .onErrorComplete(t -> {
- // handle the error first
- this.handleException(t);
- // inform the caller of sendMessage
- sink.error(t);
- return true;
- })
- .doFinally(s -> {
- Disposable ref = disposableRef.getAndSet(null);
- if (ref != null) {
- transportSession.removeConnection(ref);
- }
- })
- .contextWrite(sink.contextView())
- .subscribe();
- disposableRef.set(connection);
- transportSession.addConnection(connection);
- });
- }
-
- private static Flux mcpSessionNotFoundError(String sessionRepresentation) {
- logger.warn("Session {} was not found on the MCP server", sessionRepresentation);
- // inform the stream/connection subscriber
- return Flux.error(new McpTransportSessionNotFoundException(sessionRepresentation));
- }
-
- private Flux extractError(ClientResponse response, String sessionRepresentation) {
- return response.createError().onErrorResume(e -> {
- WebClientResponseException responseException = (WebClientResponseException) e;
- byte[] body = responseException.getResponseBodyAsByteArray();
- McpSchema.JSONRPCResponse.JSONRPCError jsonRpcError = null;
- Exception toPropagate;
- try {
- McpSchema.JSONRPCResponse jsonRpcResponse = jsonMapper.readValue(body, McpSchema.JSONRPCResponse.class);
- jsonRpcError = jsonRpcResponse.error();
- toPropagate = jsonRpcError != null ? new McpError(jsonRpcError)
- : new McpTransportException("Can't parse the jsonResponse " + jsonRpcResponse);
- }
- catch (IOException ex) {
- toPropagate = new McpTransportException("Sending request failed, " + e.getMessage(), e);
- logger.debug("Received content together with {} HTTP code response: {}", response.statusCode(), body);
- }
-
- // Some implementations can return 400 when presented with a
- // session id that it doesn't know about, so we will
- // invalidate the session
- // https://github.com/modelcontextprotocol/typescript-sdk/issues/389
- if (responseException.getStatusCode().isSameCodeAs(HttpStatus.BAD_REQUEST)) {
- if (!sessionRepresentation.equals(MISSING_SESSION_ID)) {
- return Mono.error(new McpTransportSessionNotFoundException(sessionRepresentation, toPropagate));
- }
- return Mono.error(new McpTransportException("Received 400 BAD REQUEST for session "
- + sessionRepresentation + ". " + toPropagate.getMessage(), toPropagate));
- }
- return Mono.error(toPropagate);
- }).flux();
- }
-
- private Flux eventStream(McpTransportStream stream, ClientResponse response) {
- McpTransportStream sessionStream = stream != null ? stream
- : new DefaultMcpTransportStream<>(this.resumableStreams, this::reconnect);
- logger.debug("Connected stream {}", sessionStream.streamId());
-
- var idWithMessages = response.bodyToFlux(PARAMETERIZED_TYPE_REF).map(this::parse);
- return Flux.from(sessionStream.consumeSseStream(idWithMessages));
- }
-
- private static boolean isNotFound(ClientResponse response) {
- return response.statusCode().isSameCodeAs(HttpStatus.NOT_FOUND);
- }
-
- private static boolean isNotAllowed(ClientResponse response) {
- return response.statusCode().isSameCodeAs(HttpStatus.METHOD_NOT_ALLOWED);
- }
-
- private static boolean isEventStream(ClientResponse response) {
- return response.statusCode().is2xxSuccessful() && response.headers().contentType().isPresent()
- && response.headers().contentType().get().isCompatibleWith(MediaType.TEXT_EVENT_STREAM);
- }
-
- private static String sessionIdOrPlaceholder(McpTransportSession> transportSession) {
- return transportSession.sessionId().orElse(MISSING_SESSION_ID);
- }
-
- private Flux directResponseFlux(McpSchema.JSONRPCMessage sentMessage,
- ClientResponse response) {
- return response.bodyToMono(String.class).>handle((responseMessage, s) -> {
- try {
- if (sentMessage instanceof McpSchema.JSONRPCNotification) {
- logger.warn("Notification: {} received non-compliant response: {}", sentMessage,
- Utils.hasText(responseMessage) ? responseMessage : "[empty]");
- s.complete();
- }
- else {
- McpSchema.JSONRPCMessage jsonRpcResponse = McpSchema.deserializeJsonRpcMessage(jsonMapper,
- responseMessage);
- s.next(List.of(jsonRpcResponse));
- }
- }
- catch (IOException e) {
- s.error(new McpTransportException(e));
- }
- }).flatMapIterable(Function.identity());
- }
-
- private Flux newEventStream(ClientResponse response, String sessionRepresentation) {
- McpTransportStream sessionStream = new DefaultMcpTransportStream<>(this.resumableStreams,
- this::reconnect);
- logger.trace("Sent POST and opened a stream ({}) for session {}", sessionStream.streamId(),
- sessionRepresentation);
- return eventStream(sessionStream, response);
- }
-
- @Override
- public T unmarshalFrom(Object data, TypeRef typeRef) {
- return this.jsonMapper.convertValue(data, typeRef);
- }
-
- private Tuple2, Iterable> parse(ServerSentEvent event) {
- if (MESSAGE_EVENT_TYPE.equals(event.event())) {
- try {
- // We don't support batching ATM and probably won't since the next version
- // considers removing it.
- McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(this.jsonMapper, event.data());
- return Tuples.of(Optional.ofNullable(event.id()), List.of(message));
- }
- catch (IOException ioException) {
- throw new McpTransportException("Error parsing JSON-RPC message: " + event.data(), ioException);
- }
- }
- else {
- logger.debug("Received SSE event with type: {}", event);
- return Tuples.of(Optional.empty(), List.of());
- }
- }
-
- /**
- * Builder for {@link WebClientStreamableHttpTransport}.
- */
- public static class Builder {
-
- private McpJsonMapper jsonMapper;
-
- private WebClient.Builder webClientBuilder;
-
- private String endpoint = DEFAULT_ENDPOINT;
-
- private boolean resumableStreams = true;
-
- private boolean openConnectionOnStartup = false;
-
- private List supportedProtocolVersions = List.of(ProtocolVersions.MCP_2024_11_05,
- ProtocolVersions.MCP_2025_03_26, ProtocolVersions.MCP_2025_06_18, ProtocolVersions.MCP_2025_11_25);
-
- private Builder(WebClient.Builder webClientBuilder) {
- Assert.notNull(webClientBuilder, "WebClient.Builder must not be null");
- this.webClientBuilder = webClientBuilder;
- }
-
- /**
- * Configure the {@link McpJsonMapper} to use.
- * @param jsonMapper instance to use
- * @return the builder instance
- */
- public Builder jsonMapper(McpJsonMapper jsonMapper) {
- Assert.notNull(jsonMapper, "JsonMapper must not be null");
- this.jsonMapper = jsonMapper;
- return this;
- }
-
- /**
- * Configure the {@link WebClient.Builder} to construct the {@link WebClient}.
- * @param webClientBuilder instance to use
- * @return the builder instance
- */
- public Builder webClientBuilder(WebClient.Builder webClientBuilder) {
- Assert.notNull(webClientBuilder, "WebClient.Builder must not be null");
- this.webClientBuilder = webClientBuilder;
- return this;
- }
-
- /**
- * Configure the endpoint to make HTTP requests against.
- * @param endpoint endpoint to use
- * @return the builder instance
- */
- public Builder endpoint(String endpoint) {
- Assert.hasText(endpoint, "endpoint must be a non-empty String");
- this.endpoint = endpoint;
- return this;
- }
-
- /**
- * Configure whether to use the stream resumability feature by keeping track of
- * SSE event ids.
- * @param resumableStreams if {@code true} event ids will be tracked and upon
- * disconnection, the last seen id will be used upon reconnection as a header to
- * resume consuming messages.
- * @return the builder instance
- */
- public Builder resumableStreams(boolean resumableStreams) {
- this.resumableStreams = resumableStreams;
- return this;
- }
-
- /**
- * Configure whether the client should open an SSE connection upon startup. Not
- * all servers support this (although it is in theory possible with the current
- * specification), so use with caution. By default, this value is {@code false}.
- * @param openConnectionOnStartup if {@code true} the {@link #connect(Function)}
- * method call will try to open an SSE connection before sending any JSON-RPC
- * request
- * @return the builder instance
- */
- public Builder openConnectionOnStartup(boolean openConnectionOnStartup) {
- this.openConnectionOnStartup = openConnectionOnStartup;
- return this;
- }
-
- /**
- * Sets the list of supported protocol versions used in version negotiation. By
- * default, the client will send the latest of those versions in the
- * {@code MCP-Protocol-Version} header.
- *
- * Setting this value only updates the values used in version negotiation, and
- * does NOT impact the actual capabilities of the transport. It should only be
- * used for compatibility with servers having strict requirements around the
- * {@code MCP-Protocol-Version} header.
- * @param supportedProtocolVersions protocol versions supported by this transport
- * @return this builder
- * @see version
- * negotiation specification
- * @see Protocol
- * Version Header
- */
- public Builder supportedProtocolVersions(List supportedProtocolVersions) {
- Assert.notEmpty(supportedProtocolVersions, "supportedProtocolVersions must not be empty");
- this.supportedProtocolVersions = Collections.unmodifiableList(supportedProtocolVersions);
- return this;
- }
-
- /**
- * Construct a fresh instance of {@link WebClientStreamableHttpTransport} using
- * the current builder configuration.
- * @return a new instance of {@link WebClientStreamableHttpTransport}
- */
- public WebClientStreamableHttpTransport build() {
- return new WebClientStreamableHttpTransport(jsonMapper == null ? McpJsonDefaults.getMapper() : jsonMapper,
- webClientBuilder, endpoint, resumableStreams, openConnectionOnStartup, supportedProtocolVersions);
- }
-
- }
-
-}
diff --git a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransport.java b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransport.java
deleted file mode 100644
index 304a3435f..000000000
--- a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransport.java
+++ /dev/null
@@ -1,413 +0,0 @@
-/*
- * Copyright 2024 - 2024 the original author or authors.
- */
-
-package io.modelcontextprotocol.client.transport;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.function.BiConsumer;
-import java.util.function.Function;
-
-import io.modelcontextprotocol.json.McpJsonDefaults;
-import io.modelcontextprotocol.json.McpJsonMapper;
-import io.modelcontextprotocol.json.TypeRef;
-
-import io.modelcontextprotocol.spec.HttpHeaders;
-import io.modelcontextprotocol.spec.McpClientTransport;
-import io.modelcontextprotocol.spec.McpSchema;
-import io.modelcontextprotocol.spec.McpSchema.JSONRPCMessage;
-import io.modelcontextprotocol.spec.ProtocolVersions;
-import io.modelcontextprotocol.util.Assert;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import reactor.core.Disposable;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-import reactor.core.publisher.Sinks;
-import reactor.core.publisher.SynchronousSink;
-import reactor.core.scheduler.Schedulers;
-import reactor.util.retry.Retry;
-import reactor.util.retry.Retry.RetrySignal;
-
-import org.springframework.core.ParameterizedTypeReference;
-import org.springframework.http.MediaType;
-import org.springframework.http.codec.ServerSentEvent;
-import org.springframework.web.reactive.function.client.WebClient;
-
-/**
- * Server-Sent Events (SSE) implementation of the
- * {@link io.modelcontextprotocol.spec.McpTransport} that follows the MCP HTTP with SSE
- * transport specification.
- *
- *
- * This transport establishes a bidirectional communication channel where:
- *
- *
Inbound messages are received through an SSE connection from the server
- *
Outbound messages are sent via HTTP POST requests to a server-provided
- * endpoint
- *
- *
- *
- * The message flow follows these steps:
- *
- *
The client establishes an SSE connection to the server's /sse endpoint
- *
The server sends an 'endpoint' event containing the URI for sending messages
- *
- *
- * This implementation uses {@link WebClient} for HTTP communications and supports JSON
- * serialization/deserialization of messages.
- *
- * @author Christian Tzolov
- * @see MCP
- * HTTP with SSE Transport Specification
- */
-public class WebFluxSseClientTransport implements McpClientTransport {
-
- private static final Logger logger = LoggerFactory.getLogger(WebFluxSseClientTransport.class);
-
- private static final String MCP_PROTOCOL_VERSION = ProtocolVersions.MCP_2024_11_05;
-
- /**
- * Event type for JSON-RPC messages received through the SSE connection. The server
- * sends messages with this event type to transmit JSON-RPC protocol data.
- */
- private static final String MESSAGE_EVENT_TYPE = "message";
-
- /**
- * Event type for receiving the message endpoint URI from the server. The server MUST
- * send this event when a client connects, providing the URI where the client should
- * send its messages via HTTP POST.
- */
- private static final String ENDPOINT_EVENT_TYPE = "endpoint";
-
- /**
- * Default SSE endpoint path as specified by the MCP transport specification. This
- * endpoint is used to establish the SSE connection with the server.
- */
- private static final String DEFAULT_SSE_ENDPOINT = "/sse";
-
- /**
- * Type reference for parsing SSE events containing string data.
- */
- private static final ParameterizedTypeReference> SSE_TYPE = new ParameterizedTypeReference<>() {
- };
-
- /**
- * WebClient instance for handling both SSE connections and HTTP POST requests. Used
- * for establishing the SSE connection and sending outbound messages.
- */
- private final WebClient webClient;
-
- /**
- * JSON mapper for serializing outbound messages and deserializing inbound messages.
- * Handles conversion between JSON-RPC messages and their string representation.
- */
- protected McpJsonMapper jsonMapper;
-
- /**
- * Subscription for the SSE connection handling inbound messages. Used for cleanup
- * during transport shutdown.
- */
- private Disposable inboundSubscription;
-
- /**
- * Flag indicating if the transport is in the process of shutting down. Used to
- * prevent new operations during shutdown and handle cleanup gracefully.
- */
- private volatile boolean isClosing = false;
-
- /**
- * Sink for managing the message endpoint URI provided by the server. Stores the most
- * recent endpoint URI and makes it available for outbound message processing.
- */
- protected final Sinks.One messageEndpointSink = Sinks.one();
-
- /**
- * The SSE endpoint URI provided by the server. Used for sending outbound messages via
- * HTTP POST requests.
- */
- private String sseEndpoint;
-
- /**
- * Constructs a new SseClientTransport with the specified WebClient builder and
- * ObjectMapper. Initializes both inbound and outbound message processing pipelines.
- * @param webClientBuilder the WebClient.Builder to use for creating the WebClient
- * instance
- * @param jsonMapper the ObjectMapper to use for JSON processing
- * @throws IllegalArgumentException if either parameter is null
- */
- public WebFluxSseClientTransport(WebClient.Builder webClientBuilder, McpJsonMapper jsonMapper) {
- this(webClientBuilder, jsonMapper, DEFAULT_SSE_ENDPOINT);
- }
-
- /**
- * Constructs a new SseClientTransport with the specified WebClient builder and
- * ObjectMapper. Initializes both inbound and outbound message processing pipelines.
- * @param webClientBuilder the WebClient.Builder to use for creating the WebClient
- * instance
- * @param jsonMapper the ObjectMapper to use for JSON processing
- * @param sseEndpoint the SSE endpoint URI to use for establishing the connection
- * @throws IllegalArgumentException if either parameter is null
- */
- public WebFluxSseClientTransport(WebClient.Builder webClientBuilder, McpJsonMapper jsonMapper, String sseEndpoint) {
- Assert.notNull(jsonMapper, "jsonMapper must not be null");
- Assert.notNull(webClientBuilder, "WebClient.Builder must not be null");
- Assert.hasText(sseEndpoint, "SSE endpoint must not be null or empty");
-
- this.jsonMapper = jsonMapper;
- this.webClient = webClientBuilder.build();
- this.sseEndpoint = sseEndpoint;
- }
-
- @Override
- public List protocolVersions() {
- return List.of(MCP_PROTOCOL_VERSION);
- }
-
- /**
- * Establishes a connection to the MCP server using Server-Sent Events (SSE). This
- * method initiates the SSE connection and sets up the message processing pipeline.
- *
- *
- * The connection process follows these steps:
- *
- *
Establishes an SSE connection to the server's /sse endpoint
- *
Waits for the server to send an 'endpoint' event with the message posting
- * URI
- *
Sets up message handling for incoming JSON-RPC messages
- *
- *
- *
- * The connection is considered established only after receiving the endpoint event
- * from the server.
- * @param handler a function that processes incoming JSON-RPC messages and returns
- * responses
- * @return a Mono that completes when the connection is fully established
- */
- @Override
- public Mono connect(Function, Mono> handler) {
- // TODO: Avoid eager connection opening and enable resilience
- // -> upon disconnects, re-establish connection
- // -> allow optimizing for eager connection start using a constructor flag
- Flux> events = eventStream();
- this.inboundSubscription = events.concatMap(event -> Mono.just(event).handle((e, s) -> {
- if (ENDPOINT_EVENT_TYPE.equals(event.event())) {
- String messageEndpointUri = event.data();
- if (messageEndpointSink.tryEmitValue(messageEndpointUri).isSuccess()) {
- s.complete();
- }
- else {
- // TODO: clarify with the spec if multiple events can be
- // received
- s.error(new RuntimeException("Failed to handle SSE endpoint event"));
- }
- }
- else if (MESSAGE_EVENT_TYPE.equals(event.event())) {
- try {
- JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(this.jsonMapper, event.data());
- s.next(message);
- }
- catch (IOException ioException) {
- s.error(ioException);
- }
- }
- else {
- logger.debug("Received unrecognized SSE event type: {}", event);
- s.complete();
- }
- }).transform(handler)).subscribe();
-
- // The connection is established once the server sends the endpoint event
- return messageEndpointSink.asMono().then();
- }
-
- /**
- * Sends a JSON-RPC message to the server using the endpoint provided during
- * connection.
- *
- *
- * Messages are sent via HTTP POST requests to the server-provided endpoint URI. The
- * message is serialized to JSON before transmission. If the transport is in the
- * process of closing, the message send operation is skipped gracefully.
- * @param message the JSON-RPC message to send
- * @return a Mono that completes when the message has been sent successfully
- * @throws RuntimeException if message serialization fails
- */
- @Override
- public Mono sendMessage(JSONRPCMessage message) {
- // The messageEndpoint is the endpoint URI to send the messages
- // It is provided by the server as part of the endpoint event
- return messageEndpointSink.asMono().flatMap(messageEndpointUri -> {
- if (isClosing) {
- return Mono.empty();
- }
- try {
- String jsonText = this.jsonMapper.writeValueAsString(message);
- return webClient.post()
- .uri(messageEndpointUri)
- .contentType(MediaType.APPLICATION_JSON)
- .header(HttpHeaders.PROTOCOL_VERSION, MCP_PROTOCOL_VERSION)
- .bodyValue(jsonText)
- .retrieve()
- .toBodilessEntity()
- .doOnSuccess(response -> {
- logger.debug("Message sent successfully");
- })
- .doOnError(error -> {
- if (!isClosing) {
- logger.error("Error sending message: {}", error.getMessage());
- }
- });
- }
- catch (IOException e) {
- if (!isClosing) {
- return Mono.error(new RuntimeException("Failed to serialize message", e));
- }
- return Mono.empty();
- }
- }).then(); // TODO: Consider non-200-ok response
- }
-
- /**
- * Initializes and starts the inbound SSE event processing. Establishes the SSE
- * connection and sets up event handling for both message and endpoint events.
- * Includes automatic retry logic for handling transient connection failures.
- */
- // visible for tests
- protected Flux> eventStream() {// @formatter:off
- return this.webClient
- .get()
- .uri(this.sseEndpoint)
- .accept(MediaType.TEXT_EVENT_STREAM)
- .header(HttpHeaders.PROTOCOL_VERSION, MCP_PROTOCOL_VERSION)
- .retrieve()
- .bodyToFlux(SSE_TYPE)
- .retryWhen(Retry.from(retrySignal -> retrySignal.handle(inboundRetryHandler)));
- } // @formatter:on
-
- /**
- * Retry handler for the inbound SSE stream. Implements the retry logic for handling
- * connection failures and other errors.
- */
- private BiConsumer> inboundRetryHandler = (retrySpec, sink) -> {
- if (isClosing) {
- logger.debug("SSE connection closed during shutdown");
- sink.error(retrySpec.failure());
- return;
- }
- if (retrySpec.failure() instanceof IOException) {
- logger.debug("Retrying SSE connection after IO error");
- sink.next(retrySpec);
- return;
- }
- logger.error("Fatal SSE error, not retrying: {}", retrySpec.failure().getMessage());
- sink.error(retrySpec.failure());
- };
-
- /**
- * Implements graceful shutdown of the transport. Cleans up all resources including
- * subscriptions and schedulers. Ensures orderly shutdown of both inbound and outbound
- * message processing.
- * @return a Mono that completes when shutdown is finished
- */
- @Override
- public Mono closeGracefully() { // @formatter:off
- return Mono.fromRunnable(() -> {
- isClosing = true;
-
- // Dispose of subscriptions
-
- if (inboundSubscription != null) {
- inboundSubscription.dispose();
- }
-
- })
- .then()
- .subscribeOn(Schedulers.boundedElastic());
- } // @formatter:on
-
- /**
- * Unmarshalls data from a generic Object into the specified type using the configured
- * ObjectMapper.
- *
- *
- * This method is particularly useful when working with JSON-RPC parameters or result
- * objects that need to be converted to specific Java types. It leverages Jackson's
- * type conversion capabilities to handle complex object structures.
- * @param the target type to convert the data into
- * @param data the source object to convert
- * @param typeRef the TypeRef describing the target type
- * @return the unmarshalled object of type T
- * @throws IllegalArgumentException if the conversion cannot be performed
- */
- @Override
- public T unmarshalFrom(Object data, TypeRef typeRef) {
- return this.jsonMapper.convertValue(data, typeRef);
- }
-
- /**
- * Creates a new builder for {@link WebFluxSseClientTransport}.
- * @param webClientBuilder the WebClient.Builder to use for creating the WebClient
- * instance
- * @return a new builder instance
- */
- public static Builder builder(WebClient.Builder webClientBuilder) {
- return new Builder(webClientBuilder);
- }
-
- /**
- * Builder for {@link WebFluxSseClientTransport}.
- */
- public static class Builder {
-
- private final WebClient.Builder webClientBuilder;
-
- private String sseEndpoint = DEFAULT_SSE_ENDPOINT;
-
- private McpJsonMapper jsonMapper;
-
- /**
- * Creates a new builder with the specified WebClient.Builder.
- * @param webClientBuilder the WebClient.Builder to use
- */
- public Builder(WebClient.Builder webClientBuilder) {
- Assert.notNull(webClientBuilder, "WebClient.Builder must not be null");
- this.webClientBuilder = webClientBuilder;
- }
-
- /**
- * Sets the SSE endpoint path.
- * @param sseEndpoint the SSE endpoint path
- * @return this builder
- */
- public Builder sseEndpoint(String sseEndpoint) {
- Assert.hasText(sseEndpoint, "sseEndpoint must not be empty");
- this.sseEndpoint = sseEndpoint;
- return this;
- }
-
- /**
- * Sets the JSON mapper for serialization/deserialization.
- * @param jsonMapper the JsonMapper to use
- * @return this builder
- */
- public Builder jsonMapper(McpJsonMapper jsonMapper) {
- Assert.notNull(jsonMapper, "jsonMapper must not be null");
- this.jsonMapper = jsonMapper;
- return this;
- }
-
- /**
- * Builds a new {@link WebFluxSseClientTransport} instance.
- * @return a new transport instance
- */
- public WebFluxSseClientTransport build() {
- return new WebFluxSseClientTransport(webClientBuilder,
- jsonMapper == null ? McpJsonDefaults.getMapper() : jsonMapper, sseEndpoint);
- }
-
- }
-
-}
diff --git a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/server/transport/WebFluxSseServerTransportProvider.java b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/server/transport/WebFluxSseServerTransportProvider.java
deleted file mode 100644
index e950417d4..000000000
--- a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/server/transport/WebFluxSseServerTransportProvider.java
+++ /dev/null
@@ -1,571 +0,0 @@
-/*
- * Copyright 2025-2026 the original author or authors.
- */
-
-package io.modelcontextprotocol.server.transport;
-
-import java.io.IOException;
-import java.time.Duration;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import io.modelcontextprotocol.common.McpTransportContext;
-import io.modelcontextprotocol.json.McpJsonDefaults;
-import io.modelcontextprotocol.json.McpJsonMapper;
-import io.modelcontextprotocol.json.TypeRef;
-import io.modelcontextprotocol.server.McpTransportContextExtractor;
-import io.modelcontextprotocol.spec.McpError;
-import io.modelcontextprotocol.spec.McpSchema;
-import io.modelcontextprotocol.spec.McpServerSession;
-import io.modelcontextprotocol.spec.McpServerTransport;
-import io.modelcontextprotocol.spec.McpServerTransportProvider;
-import io.modelcontextprotocol.spec.ProtocolVersions;
-import io.modelcontextprotocol.util.Assert;
-import io.modelcontextprotocol.util.KeepAliveScheduler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import reactor.core.Exceptions;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.FluxSink;
-import reactor.core.publisher.Mono;
-
-import org.springframework.http.HttpStatus;
-import org.springframework.http.MediaType;
-import org.springframework.http.codec.ServerSentEvent;
-import org.springframework.web.reactive.function.server.RouterFunction;
-import org.springframework.web.reactive.function.server.RouterFunctions;
-import org.springframework.web.reactive.function.server.ServerRequest;
-import org.springframework.web.reactive.function.server.ServerResponse;
-import org.springframework.web.util.UriComponentsBuilder;
-
-/**
- * Server-side implementation of the MCP (Model Context Protocol) HTTP transport using
- * Server-Sent Events (SSE). This implementation provides a bidirectional communication
- * channel between MCP clients and servers using HTTP POST for client-to-server messages
- * and SSE for server-to-client messages.
- *
- *
- * Key features:
- *
- *
Implements the {@link McpServerTransportProvider} interface that allows managing
- * {@link McpServerSession} instances and enabling their communication with the
- * {@link McpServerTransport} abstraction.
- *
Uses WebFlux for non-blocking request handling and SSE support
- *
Maintains client sessions for reliable message delivery
- *
Supports graceful shutdown with session cleanup
- *
Thread-safe message broadcasting to multiple clients
- *
- *
- *
- * The transport sets up two main endpoints:
- *
- *
SSE endpoint (/sse) - For establishing SSE connections with clients
- *
Message endpoint (configurable) - For receiving JSON-RPC messages from clients
- *
- *
- *
- * This implementation is thread-safe and can handle multiple concurrent client
- * connections. It uses {@link ConcurrentHashMap} for session management and Project
- * Reactor's non-blocking APIs for message processing and delivery.
- *
- * @author Christian Tzolov
- * @author Alexandros Pappas
- * @author Dariusz Jędrzejczyk
- * @see McpServerTransport
- * @see ServerSentEvent
- */
-public class WebFluxSseServerTransportProvider implements McpServerTransportProvider {
-
- private static final Logger logger = LoggerFactory.getLogger(WebFluxSseServerTransportProvider.class);
-
- /**
- * Event type for JSON-RPC messages sent through the SSE connection.
- */
- public static final String MESSAGE_EVENT_TYPE = "message";
-
- /**
- * Event type for sending the message endpoint URI to clients.
- */
- public static final String ENDPOINT_EVENT_TYPE = "endpoint";
-
- private static final String MCP_PROTOCOL_VERSION = "2025-06-18";
-
- /**
- * Default SSE endpoint path as specified by the MCP transport specification.
- */
- public static final String DEFAULT_SSE_ENDPOINT = "/sse";
-
- public static final String SESSION_ID = "sessionId";
-
- public static final String DEFAULT_BASE_URL = "";
-
- private final McpJsonMapper jsonMapper;
-
- /**
- * Base URL for the message endpoint. This is used to construct the full URL for
- * clients to send their JSON-RPC messages.
- */
- private final String baseUrl;
-
- private final String messageEndpoint;
-
- private final String sseEndpoint;
-
- private final RouterFunction> routerFunction;
-
- private McpServerSession.Factory sessionFactory;
-
- /**
- * Map of active client sessions, keyed by session ID.
- */
- private final ConcurrentHashMap sessions = new ConcurrentHashMap<>();
-
- private McpTransportContextExtractor contextExtractor;
-
- /**
- * Flag indicating if the transport is shutting down.
- */
- private volatile boolean isClosing = false;
-
- /**
- * Keep-alive scheduler for managing session pings. Activated if keepAliveInterval is
- * set. Disabled by default.
- */
- private KeepAliveScheduler keepAliveScheduler;
-
- /**
- * Security validator for validating HTTP requests.
- */
- private final ServerTransportSecurityValidator securityValidator;
-
- /**
- * Constructs a new WebFlux SSE server transport provider instance.
- * @param jsonMapper The ObjectMapper to use for JSON serialization/deserialization of
- * MCP messages. Must not be null.
- * @param baseUrl webflux message base path
- * @param messageEndpoint The endpoint URI where clients should send their JSON-RPC
- * messages. This endpoint will be communicated to clients during SSE connection
- * setup. Must not be null.
- * @param sseEndpoint The SSE endpoint path. Must not be null.
- * @param keepAliveInterval The interval for sending keep-alive pings to clients.
- * @param contextExtractor The context extractor to use for extracting MCP transport
- * context from HTTP requests. Must not be null.
- * @param securityValidator The security validator for validating HTTP requests.
- * @throws IllegalArgumentException if either parameter is null
- */
- private WebFluxSseServerTransportProvider(McpJsonMapper jsonMapper, String baseUrl, String messageEndpoint,
- String sseEndpoint, Duration keepAliveInterval,
- McpTransportContextExtractor contextExtractor,
- ServerTransportSecurityValidator securityValidator) {
- Assert.notNull(jsonMapper, "ObjectMapper must not be null");
- Assert.notNull(baseUrl, "Message base path must not be null");
- Assert.notNull(messageEndpoint, "Message endpoint must not be null");
- Assert.notNull(sseEndpoint, "SSE endpoint must not be null");
- Assert.notNull(contextExtractor, "Context extractor must not be null");
- Assert.notNull(securityValidator, "Security validator must not be null");
-
- this.jsonMapper = jsonMapper;
- this.baseUrl = baseUrl;
- this.messageEndpoint = messageEndpoint;
- this.sseEndpoint = sseEndpoint;
- this.contextExtractor = contextExtractor;
- this.securityValidator = securityValidator;
- this.routerFunction = RouterFunctions.route()
- .GET(this.sseEndpoint, this::handleSseConnection)
- .POST(this.messageEndpoint, this::handleMessage)
- .build();
-
- if (keepAliveInterval != null) {
-
- this.keepAliveScheduler = KeepAliveScheduler
- .builder(() -> (isClosing) ? Flux.empty() : Flux.fromIterable(sessions.values()))
- .initialDelay(keepAliveInterval)
- .interval(keepAliveInterval)
- .build();
-
- this.keepAliveScheduler.start();
- }
- }
-
- @Override
- public List protocolVersions() {
- return List.of(ProtocolVersions.MCP_2024_11_05);
- }
-
- @Override
- public void setSessionFactory(McpServerSession.Factory sessionFactory) {
- this.sessionFactory = sessionFactory;
- }
-
- /**
- * Broadcasts a JSON-RPC message to all connected clients through their SSE
- * connections. The message is serialized to JSON and sent as a server-sent event to
- * each active session.
- *
- *
- * The method:
- *
- *
Serializes the message to JSON
- *
Creates a server-sent event with the message data
- *
Attempts to send the event to all active sessions
- *
Tracks and reports any delivery failures
- *
- * @param method The JSON-RPC method to send to clients
- * @param params The method parameters to send to clients
- * @return A Mono that completes when the message has been sent to all sessions, or
- * errors if any session fails to receive the message
- */
- @Override
- public Mono notifyClients(String method, Object params) {
- if (sessions.isEmpty()) {
- logger.debug("No active sessions to broadcast message to");
- return Mono.empty();
- }
-
- logger.debug("Attempting to broadcast message to {} active sessions", sessions.size());
-
- return Flux.fromIterable(sessions.values())
- .flatMap(session -> session.sendNotification(method, params)
- .doOnError(
- e -> logger.error("Failed to send message to session {}: {}", session.getId(), e.getMessage()))
- .onErrorComplete())
- .then();
- }
-
- // FIXME: This javadoc makes claims about using isClosing flag but it's not
- // actually
- // doing that.
-
- /**
- * Initiates a graceful shutdown of all the sessions. This method ensures all active
- * sessions are properly closed and cleaned up.
- * @return A Mono that completes when all sessions have been closed
- */
- @Override
- public Mono closeGracefully() {
- return Flux.fromIterable(sessions.values())
- .doFirst(() -> logger.debug("Initiating graceful shutdown with {} active sessions", sessions.size()))
- .flatMap(McpServerSession::closeGracefully)
- .then()
- .doOnSuccess(v -> {
- logger.debug("Graceful shutdown completed");
- sessions.clear();
- if (this.keepAliveScheduler != null) {
- this.keepAliveScheduler.shutdown();
- }
- });
- }
-
- /**
- * Returns the WebFlux router function that defines the transport's HTTP endpoints.
- * This router function should be integrated into the application's web configuration.
- *
- *
- * The router function defines two endpoints:
- *
- *
GET {sseEndpoint} - For establishing SSE connections
- *
POST {messageEndpoint} - For receiving client messages
- *
- * @return The configured {@link RouterFunction} for handling HTTP requests
- */
- public RouterFunction> getRouterFunction() {
- return this.routerFunction;
- }
-
- /**
- * Handles new SSE connection requests from clients. Creates a new session for each
- * connection and sets up the SSE event stream.
- * @param request The incoming server request
- * @return A Mono which emits a response with the SSE event stream
- */
- private Mono handleSseConnection(ServerRequest request) {
- if (isClosing) {
- return ServerResponse.status(HttpStatus.SERVICE_UNAVAILABLE).bodyValue("Server is shutting down");
- }
-
- try {
- Map> headers = request.headers().asHttpHeaders();
- this.securityValidator.validateHeaders(headers);
- }
- catch (ServerTransportSecurityException e) {
- return ServerResponse.status(e.getStatusCode()).bodyValue(e.getMessage());
- }
-
- McpTransportContext transportContext = this.contextExtractor.extract(request);
-
- return ServerResponse.ok()
- .contentType(MediaType.TEXT_EVENT_STREAM)
- .body(Flux.>create(sink -> {
- WebFluxMcpSessionTransport sessionTransport = new WebFluxMcpSessionTransport(sink);
-
- McpServerSession session = sessionFactory.create(sessionTransport);
- String sessionId = session.getId();
-
- logger.debug("Created new SSE connection for session: {}", sessionId);
- sessions.put(sessionId, session);
-
- // Send initial endpoint event
- logger.debug("Sending initial endpoint event to session: {}", sessionId);
- sink.next(
- ServerSentEvent.builder().event(ENDPOINT_EVENT_TYPE).data(buildEndpointUrl(sessionId)).build());
- sink.onCancel(() -> {
- logger.debug("Session {} cancelled", sessionId);
- sessions.remove(sessionId);
- });
- }).contextWrite(ctx -> ctx.put(McpTransportContext.KEY, transportContext)), ServerSentEvent.class);
- }
-
- /**
- * Constructs the full message endpoint URL by combining the base URL, message path,
- * and the required session_id query parameter.
- * @param sessionId the unique session identifier
- * @return the fully qualified endpoint URL as a string
- */
- private String buildEndpointUrl(String sessionId) {
- // for WebMVC compatibility
- return UriComponentsBuilder.fromUriString(this.baseUrl)
- .path(this.messageEndpoint)
- .queryParam(SESSION_ID, sessionId)
- .build()
- .toUriString();
- }
-
- /**
- * Handles incoming JSON-RPC messages from clients. Deserializes the message and
- * processes it through the configured message handler.
- *
- *
- * The handler:
- *
- *
Deserializes the incoming JSON-RPC message
- *
Passes it through the message handler chain
- *
Returns appropriate HTTP responses based on processing results
- *
Handles various error conditions with appropriate error responses
- *
- * @param request The incoming server request containing the JSON-RPC message
- * @return A Mono emitting the response indicating the message processing result
- */
- private Mono handleMessage(ServerRequest request) {
- if (isClosing) {
- return ServerResponse.status(HttpStatus.SERVICE_UNAVAILABLE).bodyValue("Server is shutting down");
- }
-
- try {
- Map> headers = request.headers().asHttpHeaders();
- this.securityValidator.validateHeaders(headers);
- }
- catch (ServerTransportSecurityException e) {
- return ServerResponse.status(e.getStatusCode()).bodyValue(e.getMessage());
- }
-
- if (request.queryParam("sessionId").isEmpty()) {
- return ServerResponse.badRequest().bodyValue(new McpError("Session ID missing in message endpoint"));
- }
-
- McpServerSession session = sessions.get(request.queryParam("sessionId").get());
-
- if (session == null) {
- return ServerResponse.status(HttpStatus.NOT_FOUND)
- .bodyValue(new McpError("Session not found: " + request.queryParam("sessionId").get()));
- }
-
- McpTransportContext transportContext = this.contextExtractor.extract(request);
-
- return request.bodyToMono(String.class).flatMap(body -> {
- try {
- McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(jsonMapper, body);
- return session.handle(message).flatMap(response -> ServerResponse.ok().build()).onErrorResume(error -> {
- logger.error("Error processing message: {}", error.getMessage());
- // TODO: instead of signalling the error, just respond with 200 OK
- // - the error is signalled on the SSE connection
- // return ServerResponse.ok().build();
- return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR)
- .bodyValue(new McpError(error.getMessage()));
- });
- }
- catch (IllegalArgumentException | IOException e) {
- logger.error("Failed to deserialize message: {}", e.getMessage());
- return ServerResponse.badRequest().bodyValue(new McpError("Invalid message format"));
- }
- }).contextWrite(ctx -> ctx.put(McpTransportContext.KEY, transportContext));
- }
-
- private class WebFluxMcpSessionTransport implements McpServerTransport {
-
- private final FluxSink> sink;
-
- public WebFluxMcpSessionTransport(FluxSink> sink) {
- this.sink = sink;
- }
-
- @Override
- public Mono sendMessage(McpSchema.JSONRPCMessage message) {
- return Mono.fromSupplier(() -> {
- try {
- return jsonMapper.writeValueAsString(message);
- }
- catch (IOException e) {
- throw Exceptions.propagate(e);
- }
- }).doOnNext(jsonText -> {
- ServerSentEvent