-
Notifications
You must be signed in to change notification settings - Fork 598
Streamable HTTP resumability + redelivery + SSE polling via server-side disconnect #1077
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
src/ModelContextProtocol.Core/Protocol/JsonRpcMessageContext.cs
Outdated
Show resolved
Hide resolved
src/ModelContextProtocol.Core/Server/StreamableHttpServerTransport.cs
Outdated
Show resolved
Hide resolved
tests/ModelContextProtocol.AspNetCore.Tests/Utils/InMemoryEventStore.cs
Outdated
Show resolved
Hide resolved
src/ModelContextProtocol.AspNetCore/HttpServerTransportOptions.cs
Outdated
Show resolved
Hide resolved
src/ModelContextProtocol.Core/Client/StreamableHttpClientSessionTransport.cs
Outdated
Show resolved
Hide resolved
src/ModelContextProtocol.Core/Client/StreamableHttpClientSessionTransport.cs
Show resolved
Hide resolved
mikekistler
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't get through all of this but want to share the comments I've made so far.
348fac0 to
493062a
Compare
src/ModelContextProtocol.AspNetCore/McpEndpointRouteBuilderExtensions.cs
Show resolved
Hide resolved
src/ModelContextProtocol.Core/Server/StreamableHttpServerTransport.cs
Outdated
Show resolved
Hide resolved
src/ModelContextProtocol.AspNetCore/McpEndpointRouteBuilderExtensions.cs
Show resolved
Hide resolved
src/ModelContextProtocol.Core/Server/StreamableHttpServerTransport.cs
Outdated
Show resolved
Hide resolved
src/ModelContextProtocol.Core/Server/StreamableHttpServerTransport.cs
Outdated
Show resolved
Hide resolved
tests/ModelContextProtocol.AspNetCore.Tests/Utils/KestrelInMemoryTest.cs
Outdated
Show resolved
Hide resolved
src/ModelContextProtocol.Core/Server/StreamableHttpServerTransport.cs
Outdated
Show resolved
Hide resolved
src/ModelContextProtocol.Core/Client/HttpClientTransportOptions.cs
Outdated
Show resolved
Hide resolved
src/ModelContextProtocol.Core/Server/StreamableHttpPostTransport.cs
Outdated
Show resolved
Hide resolved
…s.cs Co-authored-by: Stephen Halter <halter73@gmail.com>
src/ModelContextProtocol.Core/Server/StreamableHttpServerTransport.cs
Outdated
Show resolved
Hide resolved
| public Task<bool> SendPrimingEventAsync(TimeSpan retryInterval, ISseEventStreamWriter eventStreamWriter, CancellationToken cancellationToken = default) | ||
| { | ||
| // Create a priming event: empty data with an event ID | ||
| var primingItem = new SseItem<JsonRpcMessage?>(null, "prime") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need to write "event: prime"? Do the other SDK's do that? I think the TS SDK just omits the event despite emiting a redundant "event: message" for normal messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mikekistler said that the event should be "prime": #1077 (comment) , but the spec doesn't seem to require a specific event name. It's possible we could just use an empty event.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that clients broke without "event: message" previously, I know they can be very picky. I think we should align with the TS SDK as much as possible for the text/event-stream response format unless it's clearly wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like the .NET SseItem type always includes an event type. If you specify null, it defaults to "message".
So if we're forced to have an event type, maybe specifying "prime" is better.
src/ModelContextProtocol.Core/Server/StreamableHttpPostTransport.cs
Outdated
Show resolved
Hide resolved
tests/ModelContextProtocol.AspNetCore.Tests/SseEventStreamStoreTests.cs
Outdated
Show resolved
Hide resolved
| SetSessionId(sessionId); | ||
|
|
||
| // Call the subscribe method to capture the McpServer instance. | ||
| using var getResponse = await HttpClient.GetAsync("", HttpCompletionOption.ResponseHeadersRead, TestContext.Current.CancellationToken); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I adjusted this test to initiate the GET request earlier, before the server attempts to send a notification.
The test previously had the server send a notification before the client started the GET request. This worked because the SseWriter's channel buffered the message rather than immediately attempting to write it to the response stream. However, I doubt this was officially a supported scenario, because writing a second notification would have created a deadlock due to the channel filling up (it only had a capacity of 1).
The new implementation just throws if SendNotificationAsync is called before StreamableHttpServerTransport.HandleGetRequestAsync(), because the transport now writes directly to the response stream instead of flowing the message through a channel.
src/ModelContextProtocol.Core/Client/StreamableHttpClientSessionTransport.cs
Outdated
Show resolved
Hide resolved
src/ModelContextProtocol.Core/Client/StreamableHttpClientSessionTransport.cs
Show resolved
Hide resolved
src/ModelContextProtocol.Core/Server/StreamableHttpPostTransport.cs
Outdated
Show resolved
Hide resolved
| { | ||
| // The response has completed, and there is no event stream to store the message. | ||
| // Rather than drop the message, fall back to sending it via the parent transport. | ||
| await parentTransport.SendMessageAsync(message, cancellationToken).ConfigureAwait(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whether or not we send the message over the unsolicited GET SSE stream should never be dependent on whether an event store has been configured.
I understand that this logic is mostly for after the response or error for the given request has been written, but it could still be hit in a scenario where there never was an event store and the POST request was aborted prior to the associated request handler completing. We should drop the message in this scenario just like we would if there was an event store that the client never resumed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it could still be hit in a scenario where there never was an event store and the POST request was aborted prior to the associated request handler completing.
FWIW, this was true prior to this PR. This new logic preserves the original behavior of writing to the parent transport if a message is sent after the response completes, which was introduced in #832.
This logic was creating problems when an ISseEventStore was configured, because messages may then be received twice (once over the original GET stream and again on the resumed stream). Hence the new condition.
Maybe we should update this logic to only send messages via the parent transport if a response or error message has already been sent. This would address the notification scenario that the original PR was intended to support, and it wouldn't interfere with the ISseEventStore (as we'd simply skip writing to the stream in that case).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've just updated the logic to only fall back to the parent transport for messages sent after the final response/error message.
src/ModelContextProtocol.Core/Server/StreamableHttpServerTransport.cs
Outdated
Show resolved
Hide resolved
src/ModelContextProtocol.Core/Server/StreamableHttpServerTransport.cs
Outdated
Show resolved
Hide resolved
tests/ModelContextProtocol.AspNetCore.Tests/ResumabilityIntegrationTests.cs
Show resolved
Hide resolved
…onTransport.cs Co-authored-by: Stephen Halter <halter73@gmail.com>
halter73
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks really good. Thanks for responding to all the feedback!
| // Licensed to the .NET Foundation under one or more agreements. | ||
| // The .NET Foundation licenses this file to you under the MIT license. | ||
|
|
||
| // Copied from https://github.com/dotnet/runtime/blob/main/src/libraries/Common/src/System/Net/ArrayBuffer.cs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // Copied from https://github.com/dotnet/runtime/blob/main/src/libraries/Common/src/System/Net/ArrayBuffer.cs | |
| // Copied from https://github.com/dotnet/runtime/blob/3499f694c22a8ed2c4cbdc2bc47de9868bbe32de/src/libraries/Common/src/System/Net/ArrayBuffer.cs |
| #if NET | ||
| private static int ArrayMaxLength => Array.MaxLength; | ||
| #else | ||
| private const int ArrayMaxLength = 0X7FFFFFC7; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| private const int ArrayMaxLength = 0X7FFFFFC7; | |
| private const int ArrayMaxLength = 0X7FFFFFC7; |
| private ISseEventStreamWriter? _sseEventStreamWriter; | ||
| private RequestId _pendingRequest; | ||
| private bool _finalResponseMessageSent; | ||
| private bool _originalResponseCompleted; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since there's both the JSON-RPC response and the HTTP response, I like adding HTTP to avoid conflating the two. It might be good even to rename responseStream to httpResponseStream.
These are just my suggestions though. You can definitely keep the current naming if you like it better.
| private bool _originalResponseCompleted; | |
| private readonly TaskCompletionSource<bool> _httpResponseTcs = new(TaskCreationOptions.RunContinuationsAsynchronously); | |
| private readonly SseEventWriter _httpSseWriter = new(responseStream); | |
| private TaskCompletionSource<bool>? _storeTcs; | |
| private ISseEventStreamWriter? _storeSseWriter; | |
| private RequestId _pendingRequest; | |
| private bool _finalResponseMessageSent; | |
| private bool _httpResponseCompleted; |
|
|
||
| if (message.Data is JsonRpcResponse or JsonRpcError && ((JsonRpcMessageWithId)message.Data).Id == _pendingRequest) | ||
| _streamTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously); | ||
| _ = HandleStreamWriterDisposalAsync(_streamTcs.Task, cancellationToken); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We cannot always bubble up unhandled exceptions through the public API now that code can continue running after
I think we're going to have to bite the bullet and add a new constructor to StreamableHttpServerTransport that takes an optional ILoggerFactory, so we can at least log errors if _sseEventStreamWriter!.DisposeAsync() throws in HandleStreamWriterDisposalAsync.
We don't have to do it in this PR, but I think we should follow up quickly if not.
| /// The HTTP application should typically respond with an empty "202 Accepted" response in this scenario. | ||
| /// </returns> | ||
| public async ValueTask<bool> HandlePostAsync(JsonRpcMessage message, CancellationToken cancellationToken) | ||
| public async ValueTask<bool> HandlePostAsync(JsonRpcMessage message, CancellationToken postCancellationToken, CancellationToken sessionCancellationToken) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was going to recommend keeping the sessionCancellationToken naming in HandleStreamWriterDisposalAsync and TryStartSseEventStreamAsync, but could we just make the sessionCancellationToken a constructor parameter for StreamableHttpPostTransport? If not, I think we should be more explicit about the name in the private methods too.
Overview
This PR implements the following features:
Last-Event-IDheaderISseEventStreamStoreabstraction to allow for storage and replay of SSE eventsLast-Event-IDheaderEach of these features has been split into its own commit for ease of review.
Description
There are two disconnection scenarios covered by this PR:
If a network error occurs, and an
ISseEventStreamStoreis configured, then the client may attempt to reconnect by making a GET request with aLast-Event-IDheader. The server will then replay stored events before continuing to use the new GET response to stream remaining events. If further disconnections happen, the client may continue to make new GET requests to resume the stream. This applies for both client-initiated requests (POST) and the unsolicited message stream (GET).However, the server can also initiate a disconnection and force the client to poll for updates. This is useful for avoiding long-running streams. This can be done via the new
RequestContext<TParams>.EnablePollingAsync(TimeSpan retryInterval, CancellationToken cancellationToken = default)API. When the client reconnects via GET with aLast-Event-ID, the response will only contain events currently available in theISseEventStreamStorebefore completing. The client must continue initiating new GET requests at the specifiedretryIntervaluntil the final response is received.Fixes #510
Fixes #1020