From eab4708105769ea9db34e5d1a01a2d855331a5b0 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 19 Feb 2026 23:36:04 +0000 Subject: [PATCH 1/7] Initial plan From ed1ed14ea9c64489c4349aa7f64872eed2aadfd8 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 19 Feb 2026 23:56:38 +0000 Subject: [PATCH 2/7] Avoid string intermediates in MCP transport read side using PipeReader Co-authored-by: ericstj <8918108+ericstj@users.noreply.github.com> --- .../System/Text/EncodingExtensions.cs | 19 +++ .../Client/StdioClientSessionTransport.cs | 2 +- .../Client/StreamClientSessionTransport.cs | 103 ++++++++++--- .../Client/StreamClientTransport.cs | 1 - .../Server/StreamServerTransport.cs | 137 +++++++++++++----- 5 files changed, 200 insertions(+), 62 deletions(-) diff --git a/src/Common/Polyfills/System/Text/EncodingExtensions.cs b/src/Common/Polyfills/System/Text/EncodingExtensions.cs index 65369adbb..436fa3ea2 100644 --- a/src/Common/Polyfills/System/Text/EncodingExtensions.cs +++ b/src/Common/Polyfills/System/Text/EncodingExtensions.cs @@ -45,6 +45,25 @@ public static int GetBytes(this Encoding encoding, ReadOnlySpan chars, Spa } } } + + /// + /// Decodes all the bytes in the specified span into a string. + /// + public static string GetString(this Encoding encoding, ReadOnlySpan bytes) + { + if (bytes.IsEmpty) + { + return string.Empty; + } + + unsafe + { + fixed (byte* bytesPtr = bytes) + { + return encoding.GetString(bytesPtr, bytes.Length); + } + } + } } #endif diff --git a/src/ModelContextProtocol.Core/Client/StdioClientSessionTransport.cs b/src/ModelContextProtocol.Core/Client/StdioClientSessionTransport.cs index f8b10746f..2844ca1e1 100644 --- a/src/ModelContextProtocol.Core/Client/StdioClientSessionTransport.cs +++ b/src/ModelContextProtocol.Core/Client/StdioClientSessionTransport.cs @@ -7,7 +7,7 @@ namespace ModelContextProtocol.Client; /// Provides the client side of a stdio-based session transport. internal sealed class StdioClientSessionTransport( StdioClientTransportOptions options, Process process, string endpointName, Queue stderrRollingLog, ILoggerFactory? loggerFactory) : - StreamClientSessionTransport(process.StandardInput.BaseStream, process.StandardOutput.BaseStream, encoding: null, endpointName, loggerFactory) + StreamClientSessionTransport(process.StandardInput.BaseStream, process.StandardOutput.BaseStream, endpointName, loggerFactory) { private readonly StdioClientTransportOptions _options = options; private readonly Process _process = process; diff --git a/src/ModelContextProtocol.Core/Client/StreamClientSessionTransport.cs b/src/ModelContextProtocol.Core/Client/StreamClientSessionTransport.cs index 19306349f..e8229ce94 100644 --- a/src/ModelContextProtocol.Core/Client/StreamClientSessionTransport.cs +++ b/src/ModelContextProtocol.Core/Client/StreamClientSessionTransport.cs @@ -1,5 +1,7 @@ using Microsoft.Extensions.Logging; using ModelContextProtocol.Protocol; +using System.Buffers; +using System.IO.Pipelines; using System.Text; using System.Text.Json; @@ -12,7 +14,7 @@ internal class StreamClientSessionTransport : TransportBase internal static UTF8Encoding NoBomUtf8Encoding { get; } = new(encoderShouldEmitUTF8Identifier: false); - private readonly TextReader _serverOutput; + private readonly PipeReader _serverOutputPipe; private readonly Stream _serverInputStream; private readonly SemaphoreSlim _sendLock = new(1, 1); private CancellationTokenSource? _shutdownCts = new(); @@ -27,9 +29,6 @@ internal class StreamClientSessionTransport : TransportBase /// /// The server's output stream. Messages read from this stream will be received from the server. /// - /// - /// The encoding used for reading and writing messages from the input and output streams. Defaults to UTF-8 without BOM if null. - /// /// /// A name that identifies this transport endpoint in logs. /// @@ -40,18 +39,14 @@ internal class StreamClientSessionTransport : TransportBase /// This constructor starts a background task to read messages from the server output stream. /// The transport will be marked as connected once initialized. /// - public StreamClientSessionTransport(Stream serverInput, Stream serverOutput, Encoding? encoding, string endpointName, ILoggerFactory? loggerFactory) + public StreamClientSessionTransport(Stream serverInput, Stream serverOutput, string endpointName, ILoggerFactory? loggerFactory) : base(endpointName, loggerFactory) { Throw.IfNull(serverInput); Throw.IfNull(serverOutput); _serverInputStream = serverInput; -#if NET - _serverOutput = new StreamReader(serverOutput, encoding ?? NoBomUtf8Encoding); -#else - _serverOutput = new CancellableStreamReader(serverOutput, encoding ?? NoBomUtf8Encoding); -#endif + _serverOutputPipe = PipeReader.Create(serverOutput); SetConnected(); @@ -105,20 +100,41 @@ private async Task ReadMessagesAsync(CancellationToken cancellationToken) while (true) { - if (await _serverOutput.ReadLineAsync(cancellationToken).ConfigureAwait(false) is not string line) - { - LogTransportEndOfStream(Name); - break; - } + ReadResult result = await _serverOutputPipe.ReadAsync(cancellationToken).ConfigureAwait(false); + ReadOnlySequence buffer = result.Buffer; - if (string.IsNullOrWhiteSpace(line)) + SequencePosition? position; + while ((position = buffer.PositionOf((byte)'\n')) != null) { - continue; + ReadOnlySequence line = buffer.Slice(0, position.Value); + + // Trim trailing \r for Windows-style CRLF line endings. + if (EndsWithCarriageReturn(line)) + { + line = line.Slice(0, line.Length - 1); + } + + if (!line.IsEmpty) + { + if (Logger.IsEnabled(LogLevel.Trace)) + { + LogTransportReceivedMessageSensitive(Name, GetString(line)); + } + + await ProcessLineAsync(line, cancellationToken).ConfigureAwait(false); + } + + // Advance past the '\n'. + buffer = buffer.Slice(buffer.GetPosition(1, position.Value)); } - LogTransportReceivedMessageSensitive(Name, line); + _serverOutputPipe.AdvanceTo(buffer.Start, buffer.End); - await ProcessMessageAsync(line, cancellationToken).ConfigureAwait(false); + if (result.IsCompleted) + { + LogTransportEndOfStream(Name); + break; + } } } catch (OperationCanceledException) @@ -137,25 +153,38 @@ private async Task ReadMessagesAsync(CancellationToken cancellationToken) } } - private async Task ProcessMessageAsync(string line, CancellationToken cancellationToken) + private async Task ProcessLineAsync(ReadOnlySequence line, CancellationToken cancellationToken) { try { - var message = (JsonRpcMessage?)JsonSerializer.Deserialize(line.AsSpan().Trim(), McpJsonUtilities.DefaultOptions.GetTypeInfo(typeof(JsonRpcMessage))); - if (message != null) + JsonRpcMessage? message; + if (line.IsSingleSegment) + { + message = JsonSerializer.Deserialize(line.First.Span, McpJsonUtilities.DefaultOptions.GetTypeInfo(typeof(JsonRpcMessage))) as JsonRpcMessage; + } + else + { + var reader = new Utf8JsonReader(line, isFinalBlock: true, state: default); + message = JsonSerializer.Deserialize(ref reader, McpJsonUtilities.DefaultOptions.GetTypeInfo(typeof(JsonRpcMessage))) as JsonRpcMessage; + } + + if (message is not null) { await WriteMessageAsync(message, cancellationToken).ConfigureAwait(false); } else { - LogTransportMessageParseUnexpectedTypeSensitive(Name, line); + if (Logger.IsEnabled(LogLevel.Trace)) + { + LogTransportMessageParseUnexpectedTypeSensitive(Name, GetString(line)); + } } } catch (JsonException ex) { if (Logger.IsEnabled(LogLevel.Trace)) { - LogTransportMessageParseFailedSensitive(Name, line, ex); + LogTransportMessageParseFailedSensitive(Name, GetString(line), ex); } else { @@ -164,6 +193,32 @@ private async Task ProcessMessageAsync(string line, CancellationToken cancellati } } + private static string GetString(in ReadOnlySequence sequence) => + sequence.IsSingleSegment + ? Encoding.UTF8.GetString(sequence.First.Span) + : Encoding.UTF8.GetString(sequence.ToArray()); + + private static bool EndsWithCarriageReturn(in ReadOnlySequence sequence) + { + if (sequence.IsSingleSegment) + { + ReadOnlySpan span = sequence.First.Span; + return span.Length > 0 && span[span.Length - 1] == (byte)'\r'; + } + + // Multi-segment: find the last non-empty segment to check its last byte. + ReadOnlyMemory last = default; + foreach (ReadOnlyMemory segment in sequence) + { + if (!segment.IsEmpty) + { + last = segment; + } + } + + return !last.IsEmpty && last.Span[last.Length - 1] == (byte)'\r'; + } + protected virtual async ValueTask CleanupAsync(Exception? error = null, CancellationToken cancellationToken = default) { LogTransportShuttingDown(Name); diff --git a/src/ModelContextProtocol.Core/Client/StreamClientTransport.cs b/src/ModelContextProtocol.Core/Client/StreamClientTransport.cs index deca7e6ef..c2429cdec 100644 --- a/src/ModelContextProtocol.Core/Client/StreamClientTransport.cs +++ b/src/ModelContextProtocol.Core/Client/StreamClientTransport.cs @@ -50,7 +50,6 @@ public Task ConnectAsync(CancellationToken cancellationToken = defau return Task.FromResult(new StreamClientSessionTransport( _serverInput, _serverOutput, - encoding: null, "Client (stream)", _loggerFactory)); } diff --git a/src/ModelContextProtocol.Core/Server/StreamServerTransport.cs b/src/ModelContextProtocol.Core/Server/StreamServerTransport.cs index 2202337f1..78262d2ed 100644 --- a/src/ModelContextProtocol.Core/Server/StreamServerTransport.cs +++ b/src/ModelContextProtocol.Core/Server/StreamServerTransport.cs @@ -1,6 +1,8 @@ using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using ModelContextProtocol.Protocol; +using System.Buffers; +using System.IO.Pipelines; using System.Text; using System.Text.Json; @@ -20,7 +22,8 @@ public class StreamServerTransport : TransportBase private readonly ILogger _logger; - private readonly TextReader _inputReader; + private readonly Stream _inputStream; + private readonly PipeReader _inputPipeReader; private readonly Stream _outputStream; private readonly SemaphoreSlim _sendLock = new(1, 1); @@ -45,11 +48,8 @@ public StreamServerTransport(Stream inputStream, Stream outputStream, string? se _logger = loggerFactory?.CreateLogger(GetType()) ?? NullLogger.Instance; -#if NET - _inputReader = new StreamReader(inputStream, Encoding.UTF8); -#else - _inputReader = new CancellableStreamReader(inputStream, Encoding.UTF8); -#endif + _inputStream = inputStream; + _inputPipeReader = PipeReader.Create(inputStream); _outputStream = outputStream; SetConnected(); @@ -97,43 +97,35 @@ private async Task ReadMessagesAsync() while (!shutdownToken.IsCancellationRequested) { - var line = await _inputReader.ReadLineAsync(shutdownToken).ConfigureAwait(false); - if (string.IsNullOrWhiteSpace(line)) - { - if (line is null) - { - LogTransportEndOfStream(Name); - break; - } + ReadResult result = await _inputPipeReader.ReadAsync(shutdownToken).ConfigureAwait(false); + ReadOnlySequence buffer = result.Buffer; - continue; - } - - LogTransportReceivedMessageSensitive(Name, line); - - try + SequencePosition? position; + while ((position = buffer.PositionOf((byte)'\n')) != null) { - if (JsonSerializer.Deserialize(line, McpJsonUtilities.DefaultOptions.GetTypeInfo(typeof(JsonRpcMessage))) is JsonRpcMessage message) + ReadOnlySequence line = buffer.Slice(0, position.Value); + + // Trim trailing \r for Windows-style CRLF line endings. + if (EndsWithCarriageReturn(line)) { - await WriteMessageAsync(message, shutdownToken).ConfigureAwait(false); + line = line.Slice(0, line.Length - 1); } - else + + if (!line.IsEmpty) { - LogTransportMessageParseUnexpectedTypeSensitive(Name, line); + await ProcessLineAsync(line, shutdownToken).ConfigureAwait(false); } + + // Advance past the '\n'. + buffer = buffer.Slice(buffer.GetPosition(1, position.Value)); } - catch (JsonException ex) - { - if (Logger.IsEnabled(LogLevel.Trace)) - { - LogTransportMessageParseFailedSensitive(Name, line, ex); - } - else - { - LogTransportMessageParseFailed(Name, ex); - } - // Continue reading even if we fail to parse a message + _inputPipeReader.AdvanceTo(buffer.Start, buffer.End); + + if (result.IsCompleted) + { + LogTransportEndOfStream(Name); + break; } } } @@ -152,6 +144,79 @@ private async Task ReadMessagesAsync() } } + private async Task ProcessLineAsync(ReadOnlySequence line, CancellationToken cancellationToken) + { + if (Logger.IsEnabled(LogLevel.Trace)) + { + LogTransportReceivedMessageSensitive(Name, GetString(line)); + } + + try + { + JsonRpcMessage? message; + if (line.IsSingleSegment) + { + message = JsonSerializer.Deserialize(line.First.Span, McpJsonUtilities.DefaultOptions.GetTypeInfo(typeof(JsonRpcMessage))) as JsonRpcMessage; + } + else + { + var reader = new Utf8JsonReader(line, isFinalBlock: true, state: default); + message = JsonSerializer.Deserialize(ref reader, McpJsonUtilities.DefaultOptions.GetTypeInfo(typeof(JsonRpcMessage))) as JsonRpcMessage; + } + + if (message is not null) + { + await WriteMessageAsync(message, cancellationToken).ConfigureAwait(false); + } + else + { + if (Logger.IsEnabled(LogLevel.Trace)) + { + LogTransportMessageParseUnexpectedTypeSensitive(Name, GetString(line)); + } + } + } + catch (JsonException ex) + { + if (Logger.IsEnabled(LogLevel.Trace)) + { + LogTransportMessageParseFailedSensitive(Name, GetString(line), ex); + } + else + { + LogTransportMessageParseFailed(Name, ex); + } + + // Continue reading even if we fail to parse a message. + } + } + + private static string GetString(in ReadOnlySequence sequence) => + sequence.IsSingleSegment + ? Encoding.UTF8.GetString(sequence.First.Span) + : Encoding.UTF8.GetString(sequence.ToArray()); + + private static bool EndsWithCarriageReturn(in ReadOnlySequence sequence) + { + if (sequence.IsSingleSegment) + { + ReadOnlySpan span = sequence.First.Span; + return span.Length > 0 && span[span.Length - 1] == (byte)'\r'; + } + + // Multi-segment: find the last non-empty segment to check its last byte. + ReadOnlyMemory last = default; + foreach (ReadOnlyMemory segment in sequence) + { + if (!segment.IsEmpty) + { + last = segment; + } + } + + return !last.IsEmpty && last.Span[last.Length - 1] == (byte)'\r'; + } + /// public override async ValueTask DisposeAsync() { @@ -170,7 +235,7 @@ public override async ValueTask DisposeAsync() // Dispose of stdin/out. Cancellation may not be able to wake up operations // synchronously blocked in a syscall; we need to forcefully close the handle / file descriptor. - _inputReader?.Dispose(); + _inputStream?.Dispose(); _outputStream?.Dispose(); // Make sure the work has quiesced. From b530d53cbf3b00766430a634eaeee873677492b9 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 20 Feb 2026 16:16:17 +0000 Subject: [PATCH 3/7] Factor out PipeReader line-reading into shared PipeReaderExtensions helper Co-authored-by: ericstj <8918108+ericstj@users.noreply.github.com> --- .../Client/StreamClientSessionTransport.cs | 76 +++--------------- .../Protocol/PipeReaderExtensions.cs | 79 +++++++++++++++++++ .../Server/StreamServerTransport.cs | 69 ++-------------- 3 files changed, 93 insertions(+), 131 deletions(-) create mode 100644 src/ModelContextProtocol.Core/Protocol/PipeReaderExtensions.cs diff --git a/src/ModelContextProtocol.Core/Client/StreamClientSessionTransport.cs b/src/ModelContextProtocol.Core/Client/StreamClientSessionTransport.cs index e8229ce94..b382b6751 100644 --- a/src/ModelContextProtocol.Core/Client/StreamClientSessionTransport.cs +++ b/src/ModelContextProtocol.Core/Client/StreamClientSessionTransport.cs @@ -97,45 +97,8 @@ private async Task ReadMessagesAsync(CancellationToken cancellationToken) try { LogTransportEnteringReadMessagesLoop(Name); - - while (true) - { - ReadResult result = await _serverOutputPipe.ReadAsync(cancellationToken).ConfigureAwait(false); - ReadOnlySequence buffer = result.Buffer; - - SequencePosition? position; - while ((position = buffer.PositionOf((byte)'\n')) != null) - { - ReadOnlySequence line = buffer.Slice(0, position.Value); - - // Trim trailing \r for Windows-style CRLF line endings. - if (EndsWithCarriageReturn(line)) - { - line = line.Slice(0, line.Length - 1); - } - - if (!line.IsEmpty) - { - if (Logger.IsEnabled(LogLevel.Trace)) - { - LogTransportReceivedMessageSensitive(Name, GetString(line)); - } - - await ProcessLineAsync(line, cancellationToken).ConfigureAwait(false); - } - - // Advance past the '\n'. - buffer = buffer.Slice(buffer.GetPosition(1, position.Value)); - } - - _serverOutputPipe.AdvanceTo(buffer.Start, buffer.End); - - if (result.IsCompleted) - { - LogTransportEndOfStream(Name); - break; - } - } + await _serverOutputPipe.ReadLinesAsync(ProcessLineAsync, cancellationToken).ConfigureAwait(false); + LogTransportEndOfStream(Name); } catch (OperationCanceledException) { @@ -155,6 +118,11 @@ private async Task ReadMessagesAsync(CancellationToken cancellationToken) private async Task ProcessLineAsync(ReadOnlySequence line, CancellationToken cancellationToken) { + if (Logger.IsEnabled(LogLevel.Trace)) + { + LogTransportReceivedMessageSensitive(Name, PipeReaderExtensions.GetUtf8String(line)); + } + try { JsonRpcMessage? message; @@ -176,7 +144,7 @@ private async Task ProcessLineAsync(ReadOnlySequence line, CancellationTok { if (Logger.IsEnabled(LogLevel.Trace)) { - LogTransportMessageParseUnexpectedTypeSensitive(Name, GetString(line)); + LogTransportMessageParseUnexpectedTypeSensitive(Name, PipeReaderExtensions.GetUtf8String(line)); } } } @@ -184,7 +152,7 @@ private async Task ProcessLineAsync(ReadOnlySequence line, CancellationTok { if (Logger.IsEnabled(LogLevel.Trace)) { - LogTransportMessageParseFailedSensitive(Name, GetString(line), ex); + LogTransportMessageParseFailedSensitive(Name, PipeReaderExtensions.GetUtf8String(line), ex); } else { @@ -193,32 +161,6 @@ private async Task ProcessLineAsync(ReadOnlySequence line, CancellationTok } } - private static string GetString(in ReadOnlySequence sequence) => - sequence.IsSingleSegment - ? Encoding.UTF8.GetString(sequence.First.Span) - : Encoding.UTF8.GetString(sequence.ToArray()); - - private static bool EndsWithCarriageReturn(in ReadOnlySequence sequence) - { - if (sequence.IsSingleSegment) - { - ReadOnlySpan span = sequence.First.Span; - return span.Length > 0 && span[span.Length - 1] == (byte)'\r'; - } - - // Multi-segment: find the last non-empty segment to check its last byte. - ReadOnlyMemory last = default; - foreach (ReadOnlyMemory segment in sequence) - { - if (!segment.IsEmpty) - { - last = segment; - } - } - - return !last.IsEmpty && last.Span[last.Length - 1] == (byte)'\r'; - } - protected virtual async ValueTask CleanupAsync(Exception? error = null, CancellationToken cancellationToken = default) { LogTransportShuttingDown(Name); diff --git a/src/ModelContextProtocol.Core/Protocol/PipeReaderExtensions.cs b/src/ModelContextProtocol.Core/Protocol/PipeReaderExtensions.cs new file mode 100644 index 000000000..79d5d054c --- /dev/null +++ b/src/ModelContextProtocol.Core/Protocol/PipeReaderExtensions.cs @@ -0,0 +1,79 @@ +using System.Buffers; +using System.IO.Pipelines; +using System.Text; + +namespace ModelContextProtocol.Protocol; + +/// Internal helper for reading newline-delimited UTF-8 lines from a . +internal static class PipeReaderExtensions +{ + /// + /// Reads newline-delimited lines from , invoking + /// for each non-empty line, until the reader signals completion. + /// + internal static async Task ReadLinesAsync( + this PipeReader reader, + Func, CancellationToken, Task> processLine, + CancellationToken cancellationToken) + { + while (true) + { + ReadResult result = await reader.ReadAsync(cancellationToken).ConfigureAwait(false); + ReadOnlySequence buffer = result.Buffer; + + SequencePosition? position; + while ((position = buffer.PositionOf((byte)'\n')) != null) + { + ReadOnlySequence line = buffer.Slice(0, position.Value); + + // Trim trailing \r for Windows-style CRLF line endings. + if (EndsWithCarriageReturn(line)) + { + line = line.Slice(0, line.Length - 1); + } + + if (!line.IsEmpty) + { + await processLine(line, cancellationToken).ConfigureAwait(false); + } + + // Advance past the '\n'. + buffer = buffer.Slice(buffer.GetPosition(1, position.Value)); + } + + reader.AdvanceTo(buffer.Start, buffer.End); + + if (result.IsCompleted) + { + break; + } + } + } + + /// Decodes a UTF-8 to a . + internal static string GetUtf8String(in ReadOnlySequence sequence) => + sequence.IsSingleSegment + ? Encoding.UTF8.GetString(sequence.First.Span) + : Encoding.UTF8.GetString(sequence.ToArray()); + + private static bool EndsWithCarriageReturn(in ReadOnlySequence sequence) + { + if (sequence.IsSingleSegment) + { + ReadOnlySpan span = sequence.First.Span; + return span.Length > 0 && span[span.Length - 1] == (byte)'\r'; + } + + // Multi-segment: find the last non-empty segment to check its last byte. + ReadOnlyMemory last = default; + foreach (ReadOnlyMemory segment in sequence) + { + if (!segment.IsEmpty) + { + last = segment; + } + } + + return !last.IsEmpty && last.Span[last.Length - 1] == (byte)'\r'; + } +} diff --git a/src/ModelContextProtocol.Core/Server/StreamServerTransport.cs b/src/ModelContextProtocol.Core/Server/StreamServerTransport.cs index 78262d2ed..8aa15a63f 100644 --- a/src/ModelContextProtocol.Core/Server/StreamServerTransport.cs +++ b/src/ModelContextProtocol.Core/Server/StreamServerTransport.cs @@ -3,7 +3,6 @@ using ModelContextProtocol.Protocol; using System.Buffers; using System.IO.Pipelines; -using System.Text; using System.Text.Json; namespace ModelContextProtocol.Server; @@ -94,40 +93,8 @@ private async Task ReadMessagesAsync() try { LogTransportEnteringReadMessagesLoop(Name); - - while (!shutdownToken.IsCancellationRequested) - { - ReadResult result = await _inputPipeReader.ReadAsync(shutdownToken).ConfigureAwait(false); - ReadOnlySequence buffer = result.Buffer; - - SequencePosition? position; - while ((position = buffer.PositionOf((byte)'\n')) != null) - { - ReadOnlySequence line = buffer.Slice(0, position.Value); - - // Trim trailing \r for Windows-style CRLF line endings. - if (EndsWithCarriageReturn(line)) - { - line = line.Slice(0, line.Length - 1); - } - - if (!line.IsEmpty) - { - await ProcessLineAsync(line, shutdownToken).ConfigureAwait(false); - } - - // Advance past the '\n'. - buffer = buffer.Slice(buffer.GetPosition(1, position.Value)); - } - - _inputPipeReader.AdvanceTo(buffer.Start, buffer.End); - - if (result.IsCompleted) - { - LogTransportEndOfStream(Name); - break; - } - } + await _inputPipeReader.ReadLinesAsync(ProcessLineAsync, shutdownToken).ConfigureAwait(false); + LogTransportEndOfStream(Name); } catch (OperationCanceledException) { @@ -148,7 +115,7 @@ private async Task ProcessLineAsync(ReadOnlySequence line, CancellationTok { if (Logger.IsEnabled(LogLevel.Trace)) { - LogTransportReceivedMessageSensitive(Name, GetString(line)); + LogTransportReceivedMessageSensitive(Name, PipeReaderExtensions.GetUtf8String(line)); } try @@ -172,7 +139,7 @@ private async Task ProcessLineAsync(ReadOnlySequence line, CancellationTok { if (Logger.IsEnabled(LogLevel.Trace)) { - LogTransportMessageParseUnexpectedTypeSensitive(Name, GetString(line)); + LogTransportMessageParseUnexpectedTypeSensitive(Name, PipeReaderExtensions.GetUtf8String(line)); } } } @@ -180,7 +147,7 @@ private async Task ProcessLineAsync(ReadOnlySequence line, CancellationTok { if (Logger.IsEnabled(LogLevel.Trace)) { - LogTransportMessageParseFailedSensitive(Name, GetString(line), ex); + LogTransportMessageParseFailedSensitive(Name, PipeReaderExtensions.GetUtf8String(line), ex); } else { @@ -191,32 +158,6 @@ private async Task ProcessLineAsync(ReadOnlySequence line, CancellationTok } } - private static string GetString(in ReadOnlySequence sequence) => - sequence.IsSingleSegment - ? Encoding.UTF8.GetString(sequence.First.Span) - : Encoding.UTF8.GetString(sequence.ToArray()); - - private static bool EndsWithCarriageReturn(in ReadOnlySequence sequence) - { - if (sequence.IsSingleSegment) - { - ReadOnlySpan span = sequence.First.Span; - return span.Length > 0 && span[span.Length - 1] == (byte)'\r'; - } - - // Multi-segment: find the last non-empty segment to check its last byte. - ReadOnlyMemory last = default; - foreach (ReadOnlyMemory segment in sequence) - { - if (!segment.IsEmpty) - { - last = segment; - } - } - - return !last.IsEmpty && last.Span[last.Length - 1] == (byte)'\r'; - } - /// public override async ValueTask DisposeAsync() { From 09cf5eb8351bef225d210dd8da5cee7fb3c9bfab Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 20 Feb 2026 19:53:09 +0000 Subject: [PATCH 4/7] Address review feedback: move GetUtf8String to EncodingUtilities, remove NoBomUtf8Encoding from StreamClientSessionTransport Co-authored-by: ericstj <8918108+ericstj@users.noreply.github.com> --- src/Common/EncodingUtilities.cs | 13 +++++++++++++ .../Client/StdioClientTransport.cs | 10 ++++++---- .../Client/StreamClientSessionTransport.cs | 9 +++------ .../Protocol/PipeReaderExtensions.cs | 7 ------- .../Server/StreamServerTransport.cs | 6 +++--- 5 files changed, 25 insertions(+), 20 deletions(-) diff --git a/src/Common/EncodingUtilities.cs b/src/Common/EncodingUtilities.cs index 50127882b..7e5b69d7c 100644 --- a/src/Common/EncodingUtilities.cs +++ b/src/Common/EncodingUtilities.cs @@ -20,6 +20,19 @@ public static byte[] GetUtf8Bytes(ReadOnlySpan utf16) return bytes; } + /// Decodes a UTF-8 to a . + public static string GetUtf8String(in ReadOnlySequence sequence) + { + if (sequence.IsEmpty) + { + return string.Empty; + } + + return sequence.IsSingleSegment + ? Encoding.UTF8.GetString(sequence.First.Span) + : Encoding.UTF8.GetString(sequence.ToArray()); + } + /// /// Encodes binary data to base64-encoded UTF-8 bytes. /// diff --git a/src/ModelContextProtocol.Core/Client/StdioClientTransport.cs b/src/ModelContextProtocol.Core/Client/StdioClientTransport.cs index 191a512b1..906ae2ebf 100644 --- a/src/ModelContextProtocol.Core/Client/StdioClientTransport.cs +++ b/src/ModelContextProtocol.Core/Client/StdioClientTransport.cs @@ -31,6 +31,8 @@ public sealed partial class StdioClientTransport : IClientTransport private static readonly object s_consoleEncodingLock = new(); #endif + private static readonly UTF8Encoding s_noBomUtf8Encoding = new(encoderShouldEmitUTF8Identifier: false); + private readonly StdioClientTransportOptions _options; private readonly ILoggerFactory? _loggerFactory; @@ -85,10 +87,10 @@ public async Task ConnectAsync(CancellationToken cancellationToken = UseShellExecute = false, CreateNoWindow = true, WorkingDirectory = _options.WorkingDirectory ?? Environment.CurrentDirectory, - StandardOutputEncoding = StreamClientSessionTransport.NoBomUtf8Encoding, - StandardErrorEncoding = StreamClientSessionTransport.NoBomUtf8Encoding, + StandardOutputEncoding = s_noBomUtf8Encoding, + StandardErrorEncoding = s_noBomUtf8Encoding, #if NET - StandardInputEncoding = StreamClientSessionTransport.NoBomUtf8Encoding, + StandardInputEncoding = s_noBomUtf8Encoding, #endif }; @@ -173,7 +175,7 @@ public async Task ConnectAsync(CancellationToken cancellationToken = Encoding originalInputEncoding = Console.InputEncoding; try { - Console.InputEncoding = StreamClientSessionTransport.NoBomUtf8Encoding; + Console.InputEncoding = s_noBomUtf8Encoding; processStarted = process.Start(); } finally diff --git a/src/ModelContextProtocol.Core/Client/StreamClientSessionTransport.cs b/src/ModelContextProtocol.Core/Client/StreamClientSessionTransport.cs index b382b6751..7a493cbe5 100644 --- a/src/ModelContextProtocol.Core/Client/StreamClientSessionTransport.cs +++ b/src/ModelContextProtocol.Core/Client/StreamClientSessionTransport.cs @@ -2,7 +2,6 @@ using ModelContextProtocol.Protocol; using System.Buffers; using System.IO.Pipelines; -using System.Text; using System.Text.Json; namespace ModelContextProtocol.Client; @@ -12,8 +11,6 @@ internal class StreamClientSessionTransport : TransportBase { private static readonly byte[] s_newlineBytes = "\n"u8.ToArray(); - internal static UTF8Encoding NoBomUtf8Encoding { get; } = new(encoderShouldEmitUTF8Identifier: false); - private readonly PipeReader _serverOutputPipe; private readonly Stream _serverInputStream; private readonly SemaphoreSlim _sendLock = new(1, 1); @@ -120,7 +117,7 @@ private async Task ProcessLineAsync(ReadOnlySequence line, CancellationTok { if (Logger.IsEnabled(LogLevel.Trace)) { - LogTransportReceivedMessageSensitive(Name, PipeReaderExtensions.GetUtf8String(line)); + LogTransportReceivedMessageSensitive(Name, EncodingUtilities.GetUtf8String(line)); } try @@ -144,7 +141,7 @@ private async Task ProcessLineAsync(ReadOnlySequence line, CancellationTok { if (Logger.IsEnabled(LogLevel.Trace)) { - LogTransportMessageParseUnexpectedTypeSensitive(Name, PipeReaderExtensions.GetUtf8String(line)); + LogTransportMessageParseUnexpectedTypeSensitive(Name, EncodingUtilities.GetUtf8String(line)); } } } @@ -152,7 +149,7 @@ private async Task ProcessLineAsync(ReadOnlySequence line, CancellationTok { if (Logger.IsEnabled(LogLevel.Trace)) { - LogTransportMessageParseFailedSensitive(Name, PipeReaderExtensions.GetUtf8String(line), ex); + LogTransportMessageParseFailedSensitive(Name, EncodingUtilities.GetUtf8String(line), ex); } else { diff --git a/src/ModelContextProtocol.Core/Protocol/PipeReaderExtensions.cs b/src/ModelContextProtocol.Core/Protocol/PipeReaderExtensions.cs index 79d5d054c..101691f09 100644 --- a/src/ModelContextProtocol.Core/Protocol/PipeReaderExtensions.cs +++ b/src/ModelContextProtocol.Core/Protocol/PipeReaderExtensions.cs @@ -1,6 +1,5 @@ using System.Buffers; using System.IO.Pipelines; -using System.Text; namespace ModelContextProtocol.Protocol; @@ -50,12 +49,6 @@ internal static async Task ReadLinesAsync( } } - /// Decodes a UTF-8 to a . - internal static string GetUtf8String(in ReadOnlySequence sequence) => - sequence.IsSingleSegment - ? Encoding.UTF8.GetString(sequence.First.Span) - : Encoding.UTF8.GetString(sequence.ToArray()); - private static bool EndsWithCarriageReturn(in ReadOnlySequence sequence) { if (sequence.IsSingleSegment) diff --git a/src/ModelContextProtocol.Core/Server/StreamServerTransport.cs b/src/ModelContextProtocol.Core/Server/StreamServerTransport.cs index 8aa15a63f..49d1c17ca 100644 --- a/src/ModelContextProtocol.Core/Server/StreamServerTransport.cs +++ b/src/ModelContextProtocol.Core/Server/StreamServerTransport.cs @@ -115,7 +115,7 @@ private async Task ProcessLineAsync(ReadOnlySequence line, CancellationTok { if (Logger.IsEnabled(LogLevel.Trace)) { - LogTransportReceivedMessageSensitive(Name, PipeReaderExtensions.GetUtf8String(line)); + LogTransportReceivedMessageSensitive(Name, EncodingUtilities.GetUtf8String(line)); } try @@ -139,7 +139,7 @@ private async Task ProcessLineAsync(ReadOnlySequence line, CancellationTok { if (Logger.IsEnabled(LogLevel.Trace)) { - LogTransportMessageParseUnexpectedTypeSensitive(Name, PipeReaderExtensions.GetUtf8String(line)); + LogTransportMessageParseUnexpectedTypeSensitive(Name, EncodingUtilities.GetUtf8String(line)); } } } @@ -147,7 +147,7 @@ private async Task ProcessLineAsync(ReadOnlySequence line, CancellationTok { if (Logger.IsEnabled(LogLevel.Trace)) { - LogTransportMessageParseFailedSensitive(Name, PipeReaderExtensions.GetUtf8String(line), ex); + LogTransportMessageParseFailedSensitive(Name, EncodingUtilities.GetUtf8String(line), ex); } else { From abb18790f120e96d16e7e57f3c891ea4cb1e05e3 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 20 Feb 2026 20:33:28 +0000 Subject: [PATCH 5/7] Add PipeReaderExtensionsTests covering line-splitting edge cases Co-authored-by: ericstj <8918108+ericstj@users.noreply.github.com> --- .../Transport/PipeReaderExtensionsTests.cs | 289 ++++++++++++++++++ 1 file changed, 289 insertions(+) create mode 100644 tests/ModelContextProtocol.Tests/Transport/PipeReaderExtensionsTests.cs diff --git a/tests/ModelContextProtocol.Tests/Transport/PipeReaderExtensionsTests.cs b/tests/ModelContextProtocol.Tests/Transport/PipeReaderExtensionsTests.cs new file mode 100644 index 000000000..3dd225e25 --- /dev/null +++ b/tests/ModelContextProtocol.Tests/Transport/PipeReaderExtensionsTests.cs @@ -0,0 +1,289 @@ +using ModelContextProtocol.Protocol; +using ModelContextProtocol.Server; +using System.IO.Pipelines; +using System.Text; +using System.Text.Json; + +namespace ModelContextProtocol.Tests.Transport; + +/// +/// Tests for the line-reading behavior of PipeReaderExtensions (exercised via StreamServerTransport). +/// Covers: empty lines, LF/CRLF endings, multi-segment buffers, non-ASCII characters, +/// and invalid byte sequences adjacent to newlines. +/// +public class PipeReaderExtensionsTests +{ + private static readonly JsonRpcRequest s_testMessage = new() { Method = "ping", Id = new RequestId(1) }; + private static readonly string s_testJson = JsonSerializer.Serialize(s_testMessage, McpJsonUtilities.DefaultOptions); + private static readonly byte[] s_testJsonBytes = Encoding.UTF8.GetBytes(s_testJson); + + // Writes bytes to a pipe, wires it up to a StreamServerTransport, and returns the first received message. + private static async Task ReadOneMessageAsync(byte[] lineBytes) + { + var ct = TestContext.Current.CancellationToken; + var pipe = new Pipe(); + await using var transport = new StreamServerTransport(pipe.Reader.AsStream(), Stream.Null); + + // Write the line, then close the writer so the transport loop terminates. + await pipe.Writer.WriteAsync(lineBytes, ct); + await pipe.Writer.CompleteAsync(); + + transport.MessageReader.TryPeek(out _); // prime + + if (!await transport.MessageReader.WaitToReadAsync(ct)) + { + return null; + } + + transport.MessageReader.TryRead(out var message); + return message; + } + + // Writes bytes to a pipe one byte at a time, exercising multi-segment PipeReader buffers. + private static async Task ReadOneMessageByteByByteAsync(byte[] lineBytes) + { + var ct = TestContext.Current.CancellationToken; + var pipe = new Pipe(); + await using var transport = new StreamServerTransport(pipe.Reader.AsStream(), Stream.Null); + + foreach (byte b in lineBytes) + { + await pipe.Writer.WriteAsync(new[] { b }, ct); + await pipe.Writer.FlushAsync(ct); + } + await pipe.Writer.CompleteAsync(); + + if (!await transport.MessageReader.WaitToReadAsync(ct)) + { + return null; + } + + transport.MessageReader.TryRead(out var message); + return message; + } + + [Fact] + public async Task EmptyInput_ProducesNoMessages() + { + var ct = TestContext.Current.CancellationToken; + var pipe = new Pipe(); + await using var transport = new StreamServerTransport(pipe.Reader.AsStream(), Stream.Null); + + await pipe.Writer.CompleteAsync(); + + // Channel should complete immediately with no messages. + Assert.False(await transport.MessageReader.WaitToReadAsync(ct)); + } + + [Fact] + public async Task EmptyLine_LF_IsSkipped() + { + // A bare \n (empty line) must not cause a parse error and must not deliver a message. + var ct = TestContext.Current.CancellationToken; + var pipe = new Pipe(); + await using var transport = new StreamServerTransport(pipe.Reader.AsStream(), Stream.Null); + + // Write an empty line, then a real message. + var bytes = Encoding.UTF8.GetBytes($"\n{s_testJson}\n"); + await pipe.Writer.WriteAsync(bytes, ct); + await pipe.Writer.CompleteAsync(); + + var message = await transport.MessageReader.ReadAsync(ct); + Assert.IsType(message); + } + + [Fact] + public async Task EmptyLine_CRLF_IsSkipped() + { + var ct = TestContext.Current.CancellationToken; + var pipe = new Pipe(); + await using var transport = new StreamServerTransport(pipe.Reader.AsStream(), Stream.Null); + + var bytes = Encoding.UTF8.GetBytes($"\r\n{s_testJson}\n"); + await pipe.Writer.WriteAsync(bytes, ct); + await pipe.Writer.CompleteAsync(); + + var message = await transport.MessageReader.ReadAsync(ct); + Assert.IsType(message); + } + + [Fact] + public async Task LfTerminatedLine_IsDelivered() + { + var msg = await ReadOneMessageAsync(Encoding.UTF8.GetBytes(s_testJson + "\n")); + Assert.IsType(msg); + Assert.Equal(s_testMessage.Method, ((JsonRpcRequest)msg!).Method); + } + + [Fact] + public async Task CrLfTerminatedLine_IsDelivered() + { + var msg = await ReadOneMessageAsync(Encoding.UTF8.GetBytes(s_testJson + "\r\n")); + Assert.IsType(msg); + Assert.Equal(s_testMessage.Method, ((JsonRpcRequest)msg!).Method); + } + + [Fact] + public async Task MultipleLines_AllDelivered() + { + var ct = TestContext.Current.CancellationToken; + var pipe = new Pipe(); + await using var transport = new StreamServerTransport(pipe.Reader.AsStream(), Stream.Null); + + // Three messages: LF, CRLF, LF + var line1 = JsonSerializer.Serialize(new JsonRpcRequest { Method = "m1", Id = new RequestId(1) }, McpJsonUtilities.DefaultOptions); + var line2 = JsonSerializer.Serialize(new JsonRpcRequest { Method = "m2", Id = new RequestId(2) }, McpJsonUtilities.DefaultOptions); + var line3 = JsonSerializer.Serialize(new JsonRpcRequest { Method = "m3", Id = new RequestId(3) }, McpJsonUtilities.DefaultOptions); + + var bytes = Encoding.UTF8.GetBytes($"{line1}\n{line2}\r\n{line3}\n"); + await pipe.Writer.WriteAsync(bytes, ct); + await pipe.Writer.CompleteAsync(); + + var methods = new List(); + await foreach (var msg in transport.MessageReader.ReadAllAsync(ct)) + { + methods.Add(((JsonRpcRequest)msg).Method); + } + + Assert.Equal(["m1", "m2", "m3"], methods); + } + + [Fact] + public async Task LfTerminatedLine_MultiSegment_IsDelivered() + { + var msg = await ReadOneMessageByteByByteAsync(Encoding.UTF8.GetBytes(s_testJson + "\n")); + Assert.IsType(msg); + } + + [Fact] + public async Task CrLfTerminatedLine_MultiSegment_IsDelivered() + { + var msg = await ReadOneMessageByteByByteAsync(Encoding.UTF8.GetBytes(s_testJson + "\r\n")); + Assert.IsType(msg); + } + + [Fact] + public async Task CrLfWhereCrIsLastByteOfSegment_IsTrimmed() + { + // Force \r and \n into separate pipe writes to exercise multi-segment CRLF trimming. + var ct = TestContext.Current.CancellationToken; + var pipe = new Pipe(); + await using var transport = new StreamServerTransport(pipe.Reader.AsStream(), Stream.Null); + + await pipe.Writer.WriteAsync(Encoding.UTF8.GetBytes(s_testJson + "\r"), ct); + await pipe.Writer.FlushAsync(ct); + await pipe.Writer.WriteAsync("\n"u8.ToArray(), ct); + await pipe.Writer.CompleteAsync(); + + var msg = await transport.MessageReader.ReadAsync(ct); + Assert.IsType(msg); + } + + [Fact] + public async Task NonAsciiContentInJsonValue_IsPreserved() + { + var ct = TestContext.Current.CancellationToken; + // Build a request whose method name contains multi-byte UTF-8 and an emoji. + string method = "上下文伺服器🚀"; + var request = new JsonRpcRequest { Method = method, Id = new RequestId(99) }; + string json = JsonSerializer.Serialize(request, McpJsonUtilities.DefaultOptions); + + var msg = await ReadOneMessageAsync(Encoding.UTF8.GetBytes(json + "\n")); + + Assert.IsType(msg); + Assert.Equal(method, ((JsonRpcRequest)msg!).Method); + } + + [Fact] + public async Task NonAsciiContentInJsonValue_MultiSegment_IsPreserved() + { + string method = "上下文伺服器🚀"; + var request = new JsonRpcRequest { Method = method, Id = new RequestId(99) }; + string json = JsonSerializer.Serialize(request, McpJsonUtilities.DefaultOptions); + + var msg = await ReadOneMessageByteByByteAsync(Encoding.UTF8.GetBytes(json + "\n")); + + Assert.IsType(msg); + Assert.Equal(method, ((JsonRpcRequest)msg!).Method); + } + + [Fact] + public async Task MultiByteCharacterSplitAcrossSegments_IsPreserved() + { + // '€' is the 3-byte UTF-8 sequence 0xE2 0x82 0xAC. Place it in a method value so we + // can assert round-trip integrity, then split the encode bytes across two writes. + string method = "test€method"; + var request = new JsonRpcRequest { Method = method, Id = new RequestId(5) }; + byte[] jsonBytes = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(request, McpJsonUtilities.DefaultOptions) + "\n"); + + var ct = TestContext.Current.CancellationToken; + var pipe = new Pipe(); + await using var transport = new StreamServerTransport(pipe.Reader.AsStream(), Stream.Null); + + // Split at the midpoint so the multi-byte euro sign (near the method value) is likely split. + int mid = jsonBytes.Length / 2; + await pipe.Writer.WriteAsync(jsonBytes.AsMemory(0, mid), ct); + await pipe.Writer.FlushAsync(ct); + await pipe.Writer.WriteAsync(jsonBytes.AsMemory(mid), ct); + await pipe.Writer.CompleteAsync(); + + var msg = await transport.MessageReader.ReadAsync(ct); + Assert.IsType(msg); + Assert.Equal(method, ((JsonRpcRequest)msg!).Method); + } + + [Fact] + public async Task InvalidJsonLine_IsSkippedAndNextLineIsDelivered() + { + // An invalid JSON line must be silently skipped; subsequent valid lines must still be delivered. + var ct = TestContext.Current.CancellationToken; + var pipe = new Pipe(); + await using var transport = new StreamServerTransport(pipe.Reader.AsStream(), Stream.Null); + + var bytes = Encoding.UTF8.GetBytes($"not-valid-json\n{s_testJson}\n"); + await pipe.Writer.WriteAsync(bytes, ct); + await pipe.Writer.CompleteAsync(); + + var message = await transport.MessageReader.ReadAsync(ct); + Assert.IsType(message); + } + + [Fact] + public async Task StandaloneCrNotFollowedByLf_IsIncludedInLine() + { + // A \r that is NOT immediately before \n must remain in the payload. + // We can't embed a raw \r in a JSON method name as JSON would reject it, so we verify + // the behavior by confirming the transport still delivers the first valid JSON after an invalid line. + var ct = TestContext.Current.CancellationToken; + var pipe = new Pipe(); + await using var transport = new StreamServerTransport(pipe.Reader.AsStream(), Stream.Null); + + // Craft a sequence where a line contains a lone \r before valid JSON data. + // The first line has a lone \r inside it (invalid JSON), so it should be skipped. + // The second line is valid JSON. + byte[] line1 = Encoding.UTF8.GetBytes("has\ra\ralone\n"); + byte[] line2 = Encoding.UTF8.GetBytes(s_testJson + "\n"); + byte[] combined = [.. line1, .. line2]; + await pipe.Writer.WriteAsync(combined, ct); + await pipe.Writer.CompleteAsync(); + + var message = await transport.MessageReader.ReadAsync(ct); + Assert.IsType(message); + } + + [Fact] + public async Task LineWithNoTerminatingNewline_IsNotDelivered() + { + // Data without a trailing newline should not produce a message. + var ct = TestContext.Current.CancellationToken; + var pipe = new Pipe(); + await using var transport = new StreamServerTransport(pipe.Reader.AsStream(), Stream.Null); + + await pipe.Writer.WriteAsync(s_testJsonBytes, ct); + await pipe.Writer.CompleteAsync(); + + // Wait briefly — no message should arrive. + using var timeoutCts = new CancellationTokenSource(TimeSpan.FromSeconds(1)); + Assert.False(await transport.MessageReader.WaitToReadAsync(timeoutCts.Token)); + } +} From d050187683d384c3ef20d3f36002518201862bbc Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 20 Feb 2026 20:53:13 +0000 Subject: [PATCH 6/7] Add test for multi-byte sequence interrupted by newline Co-authored-by: ericstj <8918108+ericstj@users.noreply.github.com> --- .../Transport/PipeReaderExtensionsTests.cs | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/tests/ModelContextProtocol.Tests/Transport/PipeReaderExtensionsTests.cs b/tests/ModelContextProtocol.Tests/Transport/PipeReaderExtensionsTests.cs index 3dd225e25..7b9491d31 100644 --- a/tests/ModelContextProtocol.Tests/Transport/PipeReaderExtensionsTests.cs +++ b/tests/ModelContextProtocol.Tests/Transport/PipeReaderExtensionsTests.cs @@ -271,6 +271,42 @@ public async Task StandaloneCrNotFollowedByLf_IsIncludedInLine() Assert.IsType(message); } + [Fact] + public async Task MultiByteSequenceInterruptedByNewline_BothLinesSkipped_NextValidLineDelivered() + { + // '€' encodes as 3 UTF-8 bytes: 0xE2 0x82 0xAC. If a newline is injected after the + // first byte, the two resulting lines both contain invalid byte sequences: + // Line 1: ...0xE2\n — a truncated 3-byte lead byte; invalid JSON in both old and new impl + // Line 2: 0x82 0xAC...\n — continuation bytes without a lead byte; also invalid JSON + // + // Both the old StreamReader-based path (which produced U+FFFD replacement chars before + // passing to JsonSerializer) and the new PipeReader-based path (which passes raw bytes to + // JsonSerializer) raise JsonException for each line and silently skip them. A subsequent + // valid JSON line must still be delivered. + var ct = TestContext.Current.CancellationToken; + var pipe = new Pipe(); + await using var transport = new StreamServerTransport(pipe.Reader.AsStream(), Stream.Null); + + // Build line 1: a JSON string where '€' is split after byte 0xE2, terminated with \n. + byte[] euroBytes = Encoding.UTF8.GetBytes("€"); // [0xE2, 0x82, 0xAC] + byte[] line1 = [.. Encoding.UTF8.GetBytes("{\"method\":\"te"), euroBytes[0], (byte)'\n']; + // Build line 2: the remaining continuation bytes + rest of JSON, terminated with \n. + byte[] line2 = [euroBytes[1], euroBytes[2], .. Encoding.UTF8.GetBytes("st\"}\n")]; + // Build line 3: a valid JSON message that must survive the two bad lines. + byte[] line3 = Encoding.UTF8.GetBytes(s_testJson + "\n"); + + byte[] allBytes = [.. line1, .. line2, .. line3]; + await pipe.Writer.WriteAsync(allBytes, ct); + await pipe.Writer.CompleteAsync(); + + // Only the valid line 3 should produce a message; lines 1 and 2 are silently skipped. + var message = await transport.MessageReader.ReadAsync(ct); + Assert.IsType(message); + + // No further messages. + Assert.False(transport.MessageReader.TryRead(out _)); + } + [Fact] public async Task LineWithNoTerminatingNewline_IsNotDelivered() { From a5951b910f5aaf6c3fcd1338920cb09354d153e5 Mon Sep 17 00:00:00 2001 From: Eric StJohn Date: Mon, 23 Feb 2026 15:18:00 -0800 Subject: [PATCH 7/7] Use larger buffer in for PipeReader --- .../Client/StreamClientSessionTransport.cs | 3 ++- src/ModelContextProtocol.Core/Server/StreamServerTransport.cs | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/ModelContextProtocol.Core/Client/StreamClientSessionTransport.cs b/src/ModelContextProtocol.Core/Client/StreamClientSessionTransport.cs index 7a493cbe5..51c433841 100644 --- a/src/ModelContextProtocol.Core/Client/StreamClientSessionTransport.cs +++ b/src/ModelContextProtocol.Core/Client/StreamClientSessionTransport.cs @@ -10,6 +10,7 @@ namespace ModelContextProtocol.Client; internal class StreamClientSessionTransport : TransportBase { private static readonly byte[] s_newlineBytes = "\n"u8.ToArray(); + private static readonly StreamPipeReaderOptions s_pipeReaderOptions = new(bufferSize: 64 * 1024); // 64KB minimum buffer private readonly PipeReader _serverOutputPipe; private readonly Stream _serverInputStream; @@ -43,7 +44,7 @@ public StreamClientSessionTransport(Stream serverInput, Stream serverOutput, str Throw.IfNull(serverOutput); _serverInputStream = serverInput; - _serverOutputPipe = PipeReader.Create(serverOutput); + _serverOutputPipe = PipeReader.Create(serverOutput, s_pipeReaderOptions); SetConnected(); diff --git a/src/ModelContextProtocol.Core/Server/StreamServerTransport.cs b/src/ModelContextProtocol.Core/Server/StreamServerTransport.cs index 49d1c17ca..323d1ddd8 100644 --- a/src/ModelContextProtocol.Core/Server/StreamServerTransport.cs +++ b/src/ModelContextProtocol.Core/Server/StreamServerTransport.cs @@ -18,6 +18,7 @@ namespace ModelContextProtocol.Server; public class StreamServerTransport : TransportBase { private static readonly byte[] s_newlineBytes = "\n"u8.ToArray(); + private static readonly StreamPipeReaderOptions s_pipeReaderOptions = new(bufferSize: 64 * 1024); // 64KB minimum buffer private readonly ILogger _logger; @@ -48,7 +49,7 @@ public StreamServerTransport(Stream inputStream, Stream outputStream, string? se _logger = loggerFactory?.CreateLogger(GetType()) ?? NullLogger.Instance; _inputStream = inputStream; - _inputPipeReader = PipeReader.Create(inputStream); + _inputPipeReader = PipeReader.Create(inputStream, s_pipeReaderOptions); _outputStream = outputStream; SetConnected();