Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.context.ContextView;

/**
* The Model Context Protocol (MCP) client implementation that provides asynchronous
Expand Down Expand Up @@ -181,7 +182,7 @@ public class McpAsyncClient {
* @param features the MCP Client supported features. responses against output
* schemas.
*/
McpAsyncClient(McpClientTransport transport, Duration requestTimeout, Duration initializationTimeout,
public McpAsyncClient(McpClientTransport transport, Duration requestTimeout, Duration initializationTimeout,
JsonSchemaValidator jsonSchemaValidator, McpClientFeatures.Async features) {

Assert.notNull(transport, "Transport must not be null");
Expand Down Expand Up @@ -317,13 +318,20 @@ public class McpAsyncClient {
};

this.initializer = new LifecycleInitializer(clientCapabilities, clientInfo, transport.protocolVersions(),
initializationTimeout, ctx -> new McpClientSession(requestTimeout, transport, requestHandlers,
notificationHandlers, con -> con.contextWrite(ctx)),
initializationTimeout,
ctx -> buildClientSession(requestTimeout, transport, requestHandlers, notificationHandlers, ctx),
postInitializationHook);

this.transport.setExceptionHandler(this.initializer::handleException);
}

protected McpClientSession buildClientSession(Duration requestTimeout, McpClientTransport transport,
Map<String, RequestHandler<?>> requestHandlers, Map<String, NotificationHandler> notificationHandlers,
ContextView ctx) {
return new McpClientSession(requestTimeout, transport, requestHandlers, notificationHandlers,
con -> con.contextWrite(ctx));
}

/**
* Get the current initialization result.
* @return the initialization result.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
* @see McpSchema.Implementation
* @see McpSchema.ClientCapabilities
*/
class McpClientFeatures {
public class McpClientFeatures {

/**
* Asynchronous client features specification providing the capabilities and request
Expand All @@ -64,7 +64,7 @@ class McpClientFeatures {
* @param elicitationHandler the elicitation handler.
* @param enableCallToolSchemaCaching whether to enable call tool schema caching.
*/
record Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities clientCapabilities,
public record Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities clientCapabilities,
Map<String, McpSchema.Root> roots, List<Function<List<McpSchema.Tool>, Mono<Void>>> toolsChangeConsumers,
List<Function<List<McpSchema.Resource>, Mono<Void>>> resourcesChangeConsumers,
List<Function<List<McpSchema.ResourceContents>, Mono<Void>>> resourcesUpdateConsumers,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public class McpSyncClient implements AutoCloseable {
// is not a requirement?
private static final long DEFAULT_CLOSE_TIMEOUT_MS = 10_000L;

private final McpAsyncClient delegate;
protected final McpAsyncClient delegate;

private final Supplier<McpTransportContext> contextProvider;

Expand All @@ -75,7 +75,7 @@ public class McpSyncClient implements AutoCloseable {
* @param contextProvider the supplier of context before calling any non-blocking
* operation on underlying delegate
*/
McpSyncClient(McpAsyncClient delegate, Supplier<McpTransportContext> contextProvider) {
public McpSyncClient(McpAsyncClient delegate, Supplier<McpTransportContext> contextProvider) {
Assert.notNull(delegate, "The delegate can not be null");
Assert.notNull(contextProvider, "The contextProvider can not be null");
this.delegate = delegate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ protected TypeRef() {
this.type = ((ParameterizedType) superClass).getActualTypeArguments()[0];
}

protected TypeRef(Type type) {
this.type = type;
}

/**
* Returns the captured type information.
* @return the Type representing the actual type argument captured by this TypeRef
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,35 +91,35 @@ public class McpAsyncServer {

private static final Logger logger = LoggerFactory.getLogger(McpAsyncServer.class);

private final McpServerTransportProviderBase mcpTransportProvider;
protected final McpServerTransportProviderBase mcpTransportProvider;

private final McpJsonMapper jsonMapper;
protected final McpJsonMapper jsonMapper;

private final JsonSchemaValidator jsonSchemaValidator;
protected final JsonSchemaValidator jsonSchemaValidator;

private final McpSchema.ServerCapabilities serverCapabilities;
protected final McpSchema.ServerCapabilities serverCapabilities;

private final McpSchema.Implementation serverInfo;
protected final McpSchema.Implementation serverInfo;

private final String instructions;
protected final String instructions;

private final CopyOnWriteArrayList<McpServerFeatures.AsyncToolSpecification> tools = new CopyOnWriteArrayList<>();
protected final CopyOnWriteArrayList<McpServerFeatures.AsyncToolSpecification> tools = new CopyOnWriteArrayList<>();

private final ConcurrentHashMap<String, McpServerFeatures.AsyncResourceSpecification> resources = new ConcurrentHashMap<>();
protected final ConcurrentHashMap<String, McpServerFeatures.AsyncResourceSpecification> resources = new ConcurrentHashMap<>();

private final ConcurrentHashMap<String, McpServerFeatures.AsyncResourceTemplateSpecification> resourceTemplates = new ConcurrentHashMap<>();
protected final ConcurrentHashMap<String, McpServerFeatures.AsyncResourceTemplateSpecification> resourceTemplates = new ConcurrentHashMap<>();

private final ConcurrentHashMap<String, McpServerFeatures.AsyncPromptSpecification> prompts = new ConcurrentHashMap<>();
protected final ConcurrentHashMap<String, McpServerFeatures.AsyncPromptSpecification> prompts = new ConcurrentHashMap<>();

// FIXME: this field is deprecated and should be remvoed together with the
// broadcasting loggingNotification.
private LoggingLevel minLoggingLevel = LoggingLevel.DEBUG;

private final ConcurrentHashMap<McpSchema.CompleteReference, McpServerFeatures.AsyncCompletionSpecification> completions = new ConcurrentHashMap<>();
protected final ConcurrentHashMap<McpSchema.CompleteReference, McpServerFeatures.AsyncCompletionSpecification> completions = new ConcurrentHashMap<>();

private List<String> protocolVersions;
protected List<String> protocolVersions;

private McpUriTemplateManagerFactory uriTemplateManagerFactory = new DefaultMcpUriTemplateManagerFactory();
protected McpUriTemplateManagerFactory uriTemplateManagerFactory = new DefaultMcpUriTemplateManagerFactory();

/**
* Create a new McpAsyncServer with the given transport provider and capabilities.
Expand All @@ -128,7 +128,7 @@ public class McpAsyncServer {
* @param features The MCP server supported features.
* @param jsonMapper The JsonMapper to use for JSON serialization/deserialization
*/
McpAsyncServer(McpServerTransportProvider mcpTransportProvider, McpJsonMapper jsonMapper,
public McpAsyncServer(McpServerTransportProvider mcpTransportProvider, McpJsonMapper jsonMapper,
McpServerFeatures.Async features, Duration requestTimeout,
McpUriTemplateManagerFactory uriTemplateManagerFactory, JsonSchemaValidator jsonSchemaValidator) {
this.mcpTransportProvider = mcpTransportProvider;
Expand All @@ -153,7 +153,7 @@ public class McpAsyncServer {
requestTimeout, transport, this::asyncInitializeRequestHandler, requestHandlers, notificationHandlers));
}

McpAsyncServer(McpStreamableServerTransportProvider mcpTransportProvider, McpJsonMapper jsonMapper,
public McpAsyncServer(McpStreamableServerTransportProvider mcpTransportProvider, McpJsonMapper jsonMapper,
McpServerFeatures.Async features, Duration requestTimeout,
McpUriTemplateManagerFactory uriTemplateManagerFactory, JsonSchemaValidator jsonSchemaValidator) {
this.mcpTransportProvider = mcpTransportProvider;
Expand All @@ -178,7 +178,7 @@ public class McpAsyncServer {
this::asyncInitializeRequestHandler, requestHandlers, notificationHandlers));
}

private Map<String, McpNotificationHandler> prepareNotificationHandlers(McpServerFeatures.Async features) {
protected Map<String, McpNotificationHandler> prepareNotificationHandlers(McpServerFeatures.Async features) {
Map<String, McpNotificationHandler> notificationHandlers = new HashMap<>();

notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_INITIALIZED, (exchange, params) -> Mono.empty());
Expand All @@ -196,7 +196,7 @@ private Map<String, McpNotificationHandler> prepareNotificationHandlers(McpServe
return notificationHandlers;
}

private Map<String, McpRequestHandler<?>> prepareRequestHandlers() {
protected Map<String, McpRequestHandler<?>> prepareRequestHandlers() {
Map<String, McpRequestHandler<?>> requestHandlers = new HashMap<>();

// Initialize request handlers for standard MCP methods
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class McpServerFeatures {
* roots list changes
* @param instructions The server instructions text
*/
record Async(McpSchema.Implementation serverInfo, McpSchema.ServerCapabilities serverCapabilities,
public record Async(McpSchema.Implementation serverInfo, McpSchema.ServerCapabilities serverCapabilities,
List<McpServerFeatures.AsyncToolSpecification> tools, Map<String, AsyncResourceSpecification> resources,
Map<String, McpServerFeatures.AsyncResourceTemplateSpecification> resourceTemplates,
Map<String, McpServerFeatures.AsyncPromptSpecification> prompts,
Expand All @@ -59,7 +59,7 @@ record Async(McpSchema.Implementation serverInfo, McpSchema.ServerCapabilities s
* the roots list changes
* @param instructions The server instructions text
*/
Async(McpSchema.Implementation serverInfo, McpSchema.ServerCapabilities serverCapabilities,
public Async(McpSchema.Implementation serverInfo, McpSchema.ServerCapabilities serverCapabilities,
List<McpServerFeatures.AsyncToolSpecification> tools, Map<String, AsyncResourceSpecification> resources,
Map<String, McpServerFeatures.AsyncResourceTemplateSpecification> resourceTemplates,
Map<String, McpServerFeatures.AsyncPromptSpecification> prompts,
Expand Down Expand Up @@ -101,7 +101,7 @@ record Async(McpSchema.Implementation serverInfo, McpSchema.ServerCapabilities s
* @return a specification which is protected from blocking calls specified by the
* user.
*/
static Async fromSync(Sync syncSpec, boolean immediateExecution) {
public static Async fromSync(Sync syncSpec, boolean immediateExecution) {
List<McpServerFeatures.AsyncToolSpecification> tools = new ArrayList<>();
for (var tool : syncSpec.tools()) {
tools.add(AsyncToolSpecification.fromSync(tool, immediateExecution));
Expand Down Expand Up @@ -153,7 +153,7 @@ static Async fromSync(Sync syncSpec, boolean immediateExecution) {
* roots list changes
* @param instructions The server instructions text
*/
record Sync(McpSchema.Implementation serverInfo, McpSchema.ServerCapabilities serverCapabilities,
public record Sync(McpSchema.Implementation serverInfo, McpSchema.ServerCapabilities serverCapabilities,
List<McpServerFeatures.SyncToolSpecification> tools,
Map<String, McpServerFeatures.SyncResourceSpecification> resources,
Map<String, McpServerFeatures.SyncResourceTemplateSpecification> resourceTemplates,
Expand All @@ -173,7 +173,7 @@ record Sync(McpSchema.Implementation serverInfo, McpSchema.ServerCapabilities se
* the roots list changes
* @param instructions The server instructions text
*/
Sync(McpSchema.Implementation serverInfo, McpSchema.ServerCapabilities serverCapabilities,
public Sync(McpSchema.Implementation serverInfo, McpSchema.ServerCapabilities serverCapabilities,
List<McpServerFeatures.SyncToolSpecification> tools,
Map<String, McpServerFeatures.SyncResourceSpecification> resources,
Map<String, McpServerFeatures.SyncResourceTemplateSpecification> resourceTemplates,
Expand Down Expand Up @@ -354,7 +354,8 @@ public static Builder builder() {
public record AsyncResourceSpecification(McpSchema.Resource resource,
BiFunction<McpAsyncServerExchange, McpSchema.ReadResourceRequest, Mono<McpSchema.ReadResourceResult>> readHandler) {

static AsyncResourceSpecification fromSync(SyncResourceSpecification resource, boolean immediateExecution) {
public static AsyncResourceSpecification fromSync(SyncResourceSpecification resource,
boolean immediateExecution) {
// FIXME: This is temporary, proper validation should be implemented
if (resource == null) {
return null;
Expand Down Expand Up @@ -394,7 +395,7 @@ static AsyncResourceSpecification fromSync(SyncResourceSpecification resource, b
public record AsyncResourceTemplateSpecification(McpSchema.ResourceTemplate resourceTemplate,
BiFunction<McpAsyncServerExchange, McpSchema.ReadResourceRequest, Mono<McpSchema.ReadResourceResult>> readHandler) {

static AsyncResourceTemplateSpecification fromSync(SyncResourceTemplateSpecification resource,
public static AsyncResourceTemplateSpecification fromSync(SyncResourceTemplateSpecification resource,
boolean immediateExecution) {
// FIXME: This is temporary, proper validation should be implemented
if (resource == null) {
Expand Down Expand Up @@ -442,7 +443,7 @@ static AsyncResourceTemplateSpecification fromSync(SyncResourceTemplateSpecifica
public record AsyncPromptSpecification(McpSchema.Prompt prompt,
BiFunction<McpAsyncServerExchange, McpSchema.GetPromptRequest, Mono<McpSchema.GetPromptResult>> promptHandler) {

static AsyncPromptSpecification fromSync(SyncPromptSpecification prompt, boolean immediateExecution) {
public static AsyncPromptSpecification fromSync(SyncPromptSpecification prompt, boolean immediateExecution) {
// FIXME: This is temporary, proper validation should be implemented
if (prompt == null) {
return null;
Expand Down Expand Up @@ -482,7 +483,7 @@ public record AsyncCompletionSpecification(McpSchema.CompleteReference reference
* @return an asynchronous wrapper of the provided sync specification, or
* {@code null} if input is null
*/
static AsyncCompletionSpecification fromSync(SyncCompletionSpecification completion,
public static AsyncCompletionSpecification fromSync(SyncCompletionSpecification completion,
boolean immediateExecution) {
if (completion == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ public class McpStatelessAsyncServer {

private static final Logger logger = LoggerFactory.getLogger(McpStatelessAsyncServer.class);

private final McpStatelessServerTransport mcpTransportProvider;
protected final McpStatelessServerTransport mcpTransportProvider;

private final McpJsonMapper jsonMapper;
protected final McpJsonMapper jsonMapper;

private final McpSchema.ServerCapabilities serverCapabilities;

Expand All @@ -77,7 +77,7 @@ public class McpStatelessAsyncServer {

private final JsonSchemaValidator jsonSchemaValidator;

McpStatelessAsyncServer(McpStatelessServerTransport mcpTransport, McpJsonMapper jsonMapper,
public McpStatelessAsyncServer(McpStatelessServerTransport mcpTransport, McpJsonMapper jsonMapper,
McpStatelessServerFeatures.Async features, Duration requestTimeout,
McpUriTemplateManagerFactory uriTemplateManagerFactory, JsonSchemaValidator jsonSchemaValidator) {
this.mcpTransportProvider = mcpTransport;
Expand Down
Loading
Loading