Skip to content

Conversation

@MackinnonBuck
Copy link
Collaborator

@MackinnonBuck MackinnonBuck commented Dec 9, 2025

Overview

This PR implements the following features:

  • Client-side polling via the Last-Event-ID header
  • An ISseEventStreamStore abstraction to allow for storage and replay of SSE events
  • Server-side resumability of SSE streams via the Last-Event-ID header
  • Server-side disconnect (to enable client-side polling)

Each of these features has been split into its own commit for ease of review.

Description

There are two disconnection scenarios covered by this PR:

  • Network errors (unintentional disconnection)
  • Server-side programmatic disconnection

If a network error occurs, and an ISseEventStreamStore is configured, then the client may attempt to reconnect by making a GET request with a Last-Event-ID header. The server will then replay stored events before continuing to use the new GET response to stream remaining events. If further disconnections happen, the client may continue to make new GET requests to resume the stream. This applies for both client-initiated requests (POST) and the unsolicited message stream (GET).

However, the server can also initiate a disconnection and force the client to poll for updates. This is useful for avoiding long-running streams. This can be done via the new RequestContext<TParams>.EnablePollingAsync(TimeSpan retryInterval, CancellationToken cancellationToken = default) API. When the client reconnects via GET with a Last-Event-ID, the response will only contain events currently available in the ISseEventStreamStore before completing. The client must continue initiating new GET requests at the specified retryInterval until the final response is received.

Fixes #510
Fixes #1020

Copy link
Contributor

@mikekistler mikekistler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't get through all of this but want to share the comments I've made so far.

@MackinnonBuck MackinnonBuck force-pushed the mbuck/resumability-redelivery branch from 348fac0 to 493062a Compare December 17, 2025 00:14
@MackinnonBuck MackinnonBuck marked this pull request as ready for review December 17, 2025 04:31
public Task<bool> SendPrimingEventAsync(TimeSpan retryInterval, ISseEventStreamWriter eventStreamWriter, CancellationToken cancellationToken = default)
{
// Create a priming event: empty data with an event ID
var primingItem = new SseItem<JsonRpcMessage?>(null, "prime")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to write "event: prime"? Do the other SDK's do that? I think the TS SDK just omits the event despite emiting a redundant "event: message" for normal messages.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mikekistler said that the event should be "prime": #1077 (comment) , but the spec doesn't seem to require a specific event name. It's possible we could just use an empty event.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that clients broke without "event: message" previously, I know they can be very picky. I think we should align with the TS SDK as much as possible for the text/event-stream response format unless it's clearly wrong.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the .NET SseItem type always includes an event type. If you specify null, it defaults to "message".

So if we're forced to have an event type, maybe specifying "prime" is better.

SetSessionId(sessionId);

// Call the subscribe method to capture the McpServer instance.
using var getResponse = await HttpClient.GetAsync("", HttpCompletionOption.ResponseHeadersRead, TestContext.Current.CancellationToken);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I adjusted this test to initiate the GET request earlier, before the server attempts to send a notification.

The test previously had the server send a notification before the client started the GET request. This worked because the SseWriter's channel buffered the message rather than immediately attempting to write it to the response stream. However, I doubt this was officially a supported scenario, because writing a second notification would have created a deadlock due to the channel filling up (it only had a capacity of 1).

The new implementation just throws if SendNotificationAsync is called before StreamableHttpServerTransport.HandleGetRequestAsync(), because the transport now writes directly to the response stream instead of flowing the message through a channel.

{
// The response has completed, and there is no event stream to store the message.
// Rather than drop the message, fall back to sending it via the parent transport.
await parentTransport.SendMessageAsync(message, cancellationToken).ConfigureAwait(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whether or not we send the message over the unsolicited GET SSE stream should never be dependent on whether an event store has been configured.

I understand that this logic is mostly for after the response or error for the given request has been written, but it could still be hit in a scenario where there never was an event store and the POST request was aborted prior to the associated request handler completing. We should drop the message in this scenario just like we would if there was an event store that the client never resumed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it could still be hit in a scenario where there never was an event store and the POST request was aborted prior to the associated request handler completing.

FWIW, this was true prior to this PR. This new logic preserves the original behavior of writing to the parent transport if a message is sent after the response completes, which was introduced in #832.

This logic was creating problems when an ISseEventStore was configured, because messages may then be received twice (once over the original GET stream and again on the resumed stream). Hence the new condition.

Maybe we should update this logic to only send messages via the parent transport if a response or error message has already been sent. This would address the notification scenario that the original PR was intended to support, and it wouldn't interfere with the ISseEventStore (as we'd simply skip writing to the stream in that case).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've just updated the logic to only fall back to the parent transport for messages sent after the final response/error message.

Copy link
Contributor

@halter73 halter73 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks really good. Thanks for responding to all the feedback!

// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

// Copied from https://github.com/dotnet/runtime/blob/main/src/libraries/Common/src/System/Net/ArrayBuffer.cs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Copied from https://github.com/dotnet/runtime/blob/main/src/libraries/Common/src/System/Net/ArrayBuffer.cs
// Copied from https://github.com/dotnet/runtime/blob/3499f694c22a8ed2c4cbdc2bc47de9868bbe32de/src/libraries/Common/src/System/Net/ArrayBuffer.cs

#if NET
private static int ArrayMaxLength => Array.MaxLength;
#else
private const int ArrayMaxLength = 0X7FFFFFC7;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private const int ArrayMaxLength = 0X7FFFFFC7;
private const int ArrayMaxLength = 0X7FFFFFC7;

private ISseEventStreamWriter? _sseEventStreamWriter;
private RequestId _pendingRequest;
private bool _finalResponseMessageSent;
private bool _originalResponseCompleted;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since there's both the JSON-RPC response and the HTTP response, I like adding HTTP to avoid conflating the two. It might be good even to rename responseStream to httpResponseStream.

These are just my suggestions though. You can definitely keep the current naming if you like it better.

Suggested change
private bool _originalResponseCompleted;
private readonly TaskCompletionSource<bool> _httpResponseTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
private readonly SseEventWriter _httpSseWriter = new(responseStream);
private TaskCompletionSource<bool>? _storeTcs;
private ISseEventStreamWriter? _storeSseWriter;
private RequestId _pendingRequest;
private bool _finalResponseMessageSent;
private bool _httpResponseCompleted;


if (message.Data is JsonRpcResponse or JsonRpcError && ((JsonRpcMessageWithId)message.Data).Id == _pendingRequest)
_streamTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
_ = HandleStreamWriterDisposalAsync(_streamTcs.Task, cancellationToken);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot always bubble up unhandled exceptions through the public API now that code can continue running after

I think we're going to have to bite the bullet and add a new constructor to StreamableHttpServerTransport that takes an optional ILoggerFactory, so we can at least log errors if _sseEventStreamWriter!.DisposeAsync() throws in HandleStreamWriterDisposalAsync.

We don't have to do it in this PR, but I think we should follow up quickly if not.

/// The HTTP application should typically respond with an empty "202 Accepted" response in this scenario.
/// </returns>
public async ValueTask<bool> HandlePostAsync(JsonRpcMessage message, CancellationToken cancellationToken)
public async ValueTask<bool> HandlePostAsync(JsonRpcMessage message, CancellationToken postCancellationToken, CancellationToken sessionCancellationToken)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was going to recommend keeping the sessionCancellationToken naming in HandleStreamWriterDisposalAsync and TryStartSseEventStreamAsync, but could we just make the sessionCancellationToken a constructor parameter for StreamableHttpPostTransport? If not, I think we should be more explicit about the name in the private methods too.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

SEP-1699: Support SSE polling via server-side disconnect Add support for resumability and redelivery to Streamable HTTP transport

6 participants