From 8b1cab9778262fa1b8417d4c829022df1de57664 Mon Sep 17 00:00:00 2001 From: Alexey Zimarev Date: Fri, 20 Mar 2026 10:05:23 +0100 Subject: [PATCH 01/12] chore: add .worktrees/ to .gitignore --- .gitignore | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.gitignore b/.gitignore index 9f7eb8283..67fdbf38d 100644 --- a/.gitignore +++ b/.gitignore @@ -316,3 +316,5 @@ version.txt /docs/.yarn/ BenchmarkDotNet.Artifacts/ + +.worktrees/ From 61274b54e99b3ff471a17f10014914bafa007042 Mon Sep 17 00:00:00 2001 From: Alexey Zimarev Date: Fri, 20 Mar 2026 10:17:02 +0100 Subject: [PATCH 02/12] feat(signalr): scaffold Server, Client, and test projects --- Directory.Packages.props | 5 +++++ Eventuous.slnx | 8 +++++++ .../Eventuous.SignalR.Client.csproj | 12 +++++++++++ .../Eventuous.SignalR.Server.csproj | 21 +++++++++++++++++++ .../Eventuous.Tests.SignalR.csproj | 12 +++++++++++ 5 files changed, 58 insertions(+) create mode 100644 src/SignalR/src/Eventuous.SignalR.Client/Eventuous.SignalR.Client.csproj create mode 100644 src/SignalR/src/Eventuous.SignalR.Server/Eventuous.SignalR.Server.csproj create mode 100644 src/SignalR/test/Eventuous.Tests.SignalR/Eventuous.Tests.SignalR.csproj diff --git a/Directory.Packages.props b/Directory.Packages.props index c321698cf..4b5fb3e1b 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -95,6 +95,11 @@ + + + + + diff --git a/Eventuous.slnx b/Eventuous.slnx index 83e61f82a..497eb4039 100644 --- a/Eventuous.slnx +++ b/Eventuous.slnx @@ -162,6 +162,14 @@ + + + + + + + + diff --git a/src/SignalR/src/Eventuous.SignalR.Client/Eventuous.SignalR.Client.csproj b/src/SignalR/src/Eventuous.SignalR.Client/Eventuous.SignalR.Client.csproj new file mode 100644 index 000000000..32f13e196 --- /dev/null +++ b/src/SignalR/src/Eventuous.SignalR.Client/Eventuous.SignalR.Client.csproj @@ -0,0 +1,12 @@ + + + + + + + + + + + + diff --git a/src/SignalR/src/Eventuous.SignalR.Server/Eventuous.SignalR.Server.csproj b/src/SignalR/src/Eventuous.SignalR.Server/Eventuous.SignalR.Server.csproj new file mode 100644 index 000000000..ec3a283af --- /dev/null +++ b/src/SignalR/src/Eventuous.SignalR.Server/Eventuous.SignalR.Server.csproj @@ -0,0 +1,21 @@ + + + + + + + + + + + + + + + + + Tools\TaskExtensions.cs + + + + diff --git a/src/SignalR/test/Eventuous.Tests.SignalR/Eventuous.Tests.SignalR.csproj b/src/SignalR/test/Eventuous.Tests.SignalR/Eventuous.Tests.SignalR.csproj new file mode 100644 index 000000000..10044f313 --- /dev/null +++ b/src/SignalR/test/Eventuous.Tests.SignalR/Eventuous.Tests.SignalR.csproj @@ -0,0 +1,12 @@ + + + Exe + + + + + + + + + From dffb605f699bc193f81c863dd7f774515db889df Mon Sep 17 00:00:00 2001 From: Alexey Zimarev Date: Fri, 20 Mar 2026 10:17:49 +0100 Subject: [PATCH 03/12] feat(signalr): add wire contracts (StreamEventEnvelope, error, method names) --- .../Contracts/SignalRSubscriptionMethods.cs | 11 +++++++++++ .../Contracts/StreamEventEnvelope.cs | 15 +++++++++++++++ .../Contracts/StreamSubscriptionError.cs | 9 +++++++++ 3 files changed, 35 insertions(+) create mode 100644 src/SignalR/src/Eventuous.SignalR.Server/Contracts/SignalRSubscriptionMethods.cs create mode 100644 src/SignalR/src/Eventuous.SignalR.Server/Contracts/StreamEventEnvelope.cs create mode 100644 src/SignalR/src/Eventuous.SignalR.Server/Contracts/StreamSubscriptionError.cs diff --git a/src/SignalR/src/Eventuous.SignalR.Server/Contracts/SignalRSubscriptionMethods.cs b/src/SignalR/src/Eventuous.SignalR.Server/Contracts/SignalRSubscriptionMethods.cs new file mode 100644 index 000000000..7677d9d99 --- /dev/null +++ b/src/SignalR/src/Eventuous.SignalR.Server/Contracts/SignalRSubscriptionMethods.cs @@ -0,0 +1,11 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +namespace Eventuous.SignalR; + +public static class SignalRSubscriptionMethods { + public const string Subscribe = "SubscribeToStream"; + public const string Unsubscribe = "UnsubscribeFromStream"; + public const string StreamEvent = "StreamEvent"; + public const string StreamError = "StreamError"; +} diff --git a/src/SignalR/src/Eventuous.SignalR.Server/Contracts/StreamEventEnvelope.cs b/src/SignalR/src/Eventuous.SignalR.Server/Contracts/StreamEventEnvelope.cs new file mode 100644 index 000000000..cb5d98bbc --- /dev/null +++ b/src/SignalR/src/Eventuous.SignalR.Server/Contracts/StreamEventEnvelope.cs @@ -0,0 +1,15 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +namespace Eventuous.SignalR; + +public record StreamEventEnvelope { + public required Guid EventId { get; init; } + public required string Stream { get; init; } + public required string EventType { get; init; } + public required ulong StreamPosition { get; init; } + public required ulong GlobalPosition { get; init; } + public required DateTime Timestamp { get; init; } + public required string JsonPayload { get; init; } + public string? JsonMetadata { get; init; } +} diff --git a/src/SignalR/src/Eventuous.SignalR.Server/Contracts/StreamSubscriptionError.cs b/src/SignalR/src/Eventuous.SignalR.Server/Contracts/StreamSubscriptionError.cs new file mode 100644 index 000000000..d600b3066 --- /dev/null +++ b/src/SignalR/src/Eventuous.SignalR.Server/Contracts/StreamSubscriptionError.cs @@ -0,0 +1,9 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +namespace Eventuous.SignalR; + +public record StreamSubscriptionError { + public required string Stream { get; init; } + public required string Message { get; init; } +} From 9478b1ab302a6f6d210e7b05a81e5d516df20a3e Mon Sep 17 00:00:00 2001 From: Alexey Zimarev Date: Fri, 20 Mar 2026 10:24:29 +0100 Subject: [PATCH 04/12] feat(signalr): add SignalRTransform (RouteAndTransform factory) Implements SignalRTransform.Create() which produces a RouteAndTransform delegate that serializes events into StreamEventEnvelope and wraps them in GatewayMessage. Also fixes build issues in SignalRProducer (FrameworkReference for Hub/IHubContext, internal AttrConstants) and test project (extern alias for duplicate contract types, ISingleClientProxy mock). Co-Authored-By: Claude Opus 4.6 (1M context) --- .../Eventuous.SignalR.Server.csproj | 2 +- .../SignalRProduceOptions.cs | 6 ++ .../SignalRProducer.cs | 31 ++++++++ .../SignalRTransform.cs | 33 +++++++++ .../Eventuous.Tests.SignalR.csproj | 9 ++- .../SignalRProducerTests.cs | 45 ++++++++++++ .../SignalRTransformTests.cs | 73 +++++++++++++++++++ 7 files changed, 197 insertions(+), 2 deletions(-) create mode 100644 src/SignalR/src/Eventuous.SignalR.Server/SignalRProduceOptions.cs create mode 100644 src/SignalR/src/Eventuous.SignalR.Server/SignalRProducer.cs create mode 100644 src/SignalR/src/Eventuous.SignalR.Server/SignalRTransform.cs create mode 100644 src/SignalR/test/Eventuous.Tests.SignalR/SignalRProducerTests.cs create mode 100644 src/SignalR/test/Eventuous.Tests.SignalR/SignalRTransformTests.cs diff --git a/src/SignalR/src/Eventuous.SignalR.Server/Eventuous.SignalR.Server.csproj b/src/SignalR/src/Eventuous.SignalR.Server/Eventuous.SignalR.Server.csproj index ec3a283af..175c5f4ea 100644 --- a/src/SignalR/src/Eventuous.SignalR.Server/Eventuous.SignalR.Server.csproj +++ b/src/SignalR/src/Eventuous.SignalR.Server/Eventuous.SignalR.Server.csproj @@ -5,7 +5,7 @@ - + diff --git a/src/SignalR/src/Eventuous.SignalR.Server/SignalRProduceOptions.cs b/src/SignalR/src/Eventuous.SignalR.Server/SignalRProduceOptions.cs new file mode 100644 index 000000000..ae6cea2bb --- /dev/null +++ b/src/SignalR/src/Eventuous.SignalR.Server/SignalRProduceOptions.cs @@ -0,0 +1,6 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +namespace Eventuous.SignalR.Server; + +public record SignalRProduceOptions(string ConnectionId); diff --git a/src/SignalR/src/Eventuous.SignalR.Server/SignalRProducer.cs b/src/SignalR/src/Eventuous.SignalR.Server/SignalRProducer.cs new file mode 100644 index 000000000..e91d7dec0 --- /dev/null +++ b/src/SignalR/src/Eventuous.SignalR.Server/SignalRProducer.cs @@ -0,0 +1,31 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using Eventuous.Producers.Diagnostics; +using Microsoft.AspNetCore.SignalR; + +namespace Eventuous.SignalR.Server; + +public class SignalRProducer(IHubContext hubContext) + : BaseProducer(new ProducerTracingOptions { MessagingSystem = "signalr" }) + where THub : Hub { + + [RequiresDynamicCode("Only works with AOT when using DefaultStaticEventSerializer")] + [RequiresUnreferencedCode("Only works with AOT when using DefaultStaticEventSerializer")] + protected override async Task ProduceMessages( + StreamName stream, + IEnumerable messages, + SignalRProduceOptions? options, + CancellationToken cancellationToken = default + ) { + var client = hubContext.Clients.Client(options!.ConnectionId); + + foreach (var msg in messages) { + await client.SendAsync( + SignalRSubscriptionMethods.StreamEvent, + msg.Message, + cancellationToken + ).NoContext(); + } + } +} diff --git a/src/SignalR/src/Eventuous.SignalR.Server/SignalRTransform.cs b/src/SignalR/src/Eventuous.SignalR.Server/SignalRTransform.cs new file mode 100644 index 000000000..256a535b0 --- /dev/null +++ b/src/SignalR/src/Eventuous.SignalR.Server/SignalRTransform.cs @@ -0,0 +1,33 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using System.Text; +using System.Text.Json; +using Eventuous.Subscriptions.Context; + +namespace Eventuous.SignalR.Server; + +public static class SignalRTransform { + public static RouteAndTransform Create( + string connectionId, string stream, IEventSerializer serializer + ) => ctx => { + var result = serializer.SerializeEvent(ctx.Message!); + var envelope = new StreamEventEnvelope { + EventId = Guid.TryParse(ctx.MessageId, out var id) ? id : Guid.NewGuid(), + Stream = stream, + EventType = ctx.MessageType, + StreamPosition = ctx.StreamPosition, + GlobalPosition = ctx.GlobalPosition, + Timestamp = ctx.Created, + JsonPayload = Encoding.UTF8.GetString(result.Payload), + JsonMetadata = ctx.Metadata is { Count: > 0 } + ? JsonSerializer.Serialize(ctx.Metadata.ToDictionary(kv => kv.Key, kv => kv.Value)) + : null + }; + return ValueTask.FromResult(new[] { + new GatewayMessage( + new StreamName(stream), envelope, ctx.Metadata, new SignalRProduceOptions(connectionId) + ) + }); + }; +} diff --git a/src/SignalR/test/Eventuous.Tests.SignalR/Eventuous.Tests.SignalR.csproj b/src/SignalR/test/Eventuous.Tests.SignalR/Eventuous.Tests.SignalR.csproj index 10044f313..b67c99fc8 100644 --- a/src/SignalR/test/Eventuous.Tests.SignalR/Eventuous.Tests.SignalR.csproj +++ b/src/SignalR/test/Eventuous.Tests.SignalR/Eventuous.Tests.SignalR.csproj @@ -4,9 +4,16 @@ - + + + + + + + + diff --git a/src/SignalR/test/Eventuous.Tests.SignalR/SignalRProducerTests.cs b/src/SignalR/test/Eventuous.Tests.SignalR/SignalRProducerTests.cs new file mode 100644 index 000000000..f66fe6d11 --- /dev/null +++ b/src/SignalR/test/Eventuous.Tests.SignalR/SignalRProducerTests.cs @@ -0,0 +1,45 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using Eventuous.SignalR; +using Eventuous.SignalR.Server; +using Microsoft.AspNetCore.SignalR; +using NSubstitute; + +namespace Eventuous.Tests.SignalR; + +public class SignalRProducerTests { + [Test] + public async Task ProduceMessages_SendsEnvelopeToCorrectConnection() { + var hubContext = Substitute.For>(); + var hubClients = Substitute.For(); + var clientProxy = Substitute.For(); + hubContext.Clients.Returns(hubClients); + hubClients.Client("conn-1").Returns(clientProxy); + + var producer = new SignalRProducer(hubContext); + var envelope = new Eventuous.SignalR.StreamEventEnvelope { + EventId = Guid.NewGuid(), + Stream = "Test-1", + EventType = "TestEvent", + StreamPosition = 0, + GlobalPosition = 0, + Timestamp = DateTime.UtcNow, + JsonPayload = "{}" + }; + + await producer.Produce( + new StreamName("Test-1"), + [new ProducedMessage(envelope, new Metadata())], + new SignalRProduceOptions("conn-1") + ); + + await clientProxy.Received(1).SendCoreAsync( + Eventuous.SignalR.SignalRSubscriptionMethods.StreamEvent, + Arg.Is(args => args.Length == 1 && args[0] is Eventuous.SignalR.StreamEventEnvelope), + Arg.Any() + ).ConfigureAwait(false); + } +} + +public class TestHub : Hub; diff --git a/src/SignalR/test/Eventuous.Tests.SignalR/SignalRTransformTests.cs b/src/SignalR/test/Eventuous.Tests.SignalR/SignalRTransformTests.cs new file mode 100644 index 000000000..7f2772cfc --- /dev/null +++ b/src/SignalR/test/Eventuous.Tests.SignalR/SignalRTransformTests.cs @@ -0,0 +1,73 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using Eventuous.SignalR; +using Eventuous.SignalR.Server; +using Eventuous.Subscriptions.Context; + +namespace Eventuous.Tests.SignalR; + +public class SignalRTransformTests { + [Test] + public async Task Transform_CreatesCorrectEnvelope() { + TypeMap.RegisterKnownEventTypes(typeof(TransformTestEvent).Assembly); + var serializer = DefaultEventSerializer.Instance; + var transform = SignalRTransform.Create("conn-1", "Test-1", serializer); + + var ctx = new MessageConsumeContext( + "aabbccdd-1234-5678-9012-aabbccddeeff", + "TransformTestEvent", + "application/json", + "Test-1", + 0, 5, 42, 0, + DateTime.UtcNow, + new TransformTestEvent("hello"), + null, + "test-sub", + CancellationToken.None + ); + + var result = await transform(ctx); + + await Assert.That(result).HasCount().EqualTo(1); + + var gm = result[0]; + var envelope = (Eventuous.SignalR.StreamEventEnvelope)gm.Message; + await Assert.That(envelope.Stream).IsEqualTo("Test-1"); + await Assert.That(envelope.EventType).IsEqualTo("TransformTestEvent"); + await Assert.That(envelope.StreamPosition).IsEqualTo(5UL); + await Assert.That(envelope.GlobalPosition).IsEqualTo(42UL); + await Assert.That(envelope.JsonPayload).IsNotNull(); + await Assert.That(envelope.JsonMetadata).IsNull(); + await Assert.That(gm.ProduceOptions.ConnectionId).IsEqualTo("conn-1"); + } + + [Test] + public async Task Transform_IncludesMetadataWhenPresent() { + TypeMap.RegisterKnownEventTypes(typeof(TransformTestEvent).Assembly); + var serializer = DefaultEventSerializer.Instance; + var transform = SignalRTransform.Create("conn-2", "Test-2", serializer); + + var meta = new Metadata { ["key1"] = "value1" }; + var ctx = new MessageConsumeContext( + Guid.NewGuid().ToString(), + "TransformTestEvent", + "application/json", + "Test-2", + 0, 10, 100, 0, + DateTime.UtcNow, + new TransformTestEvent("world"), + meta, + "test-sub", + CancellationToken.None + ); + + var result = await transform(ctx); + var envelope = (Eventuous.SignalR.StreamEventEnvelope)result[0].Message; + + await Assert.That(envelope.JsonMetadata).IsNotNull(); + } +} + +[EventType("TransformTestEvent")] +record TransformTestEvent(string Value); From 86612c91de307756533ed1c9139ec320932f69eb Mon Sep 17 00:00:00 2001 From: Alexey Zimarev Date: Fri, 20 Mar 2026 10:30:24 +0100 Subject: [PATCH 05/12] feat(signalr): add SubscriptionGateway with per-connection lifecycle Co-Authored-By: Claude Sonnet 4.6 --- .../SignalRGatewayOptions.cs | 14 +++ .../SubscriptionGateway.cs | 102 ++++++++++++++++ .../SubscriptionGatewayTests.cs | 110 ++++++++++++++++++ 3 files changed, 226 insertions(+) create mode 100644 src/SignalR/src/Eventuous.SignalR.Server/SignalRGatewayOptions.cs create mode 100644 src/SignalR/src/Eventuous.SignalR.Server/SubscriptionGateway.cs create mode 100644 src/SignalR/test/Eventuous.Tests.SignalR/SubscriptionGatewayTests.cs diff --git a/src/SignalR/src/Eventuous.SignalR.Server/SignalRGatewayOptions.cs b/src/SignalR/src/Eventuous.SignalR.Server/SignalRGatewayOptions.cs new file mode 100644 index 000000000..c925fa4f3 --- /dev/null +++ b/src/SignalR/src/Eventuous.SignalR.Server/SignalRGatewayOptions.cs @@ -0,0 +1,14 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using Eventuous.Subscriptions.Filters; + +namespace Eventuous.SignalR.Server; + +public delegate IMessageSubscription SubscriptionFactory( + StreamName stream, ulong? fromPosition, ConsumePipe pipe, string subscriptionId +); + +public class SignalRGatewayOptions { + public required SubscriptionFactory SubscriptionFactory { get; set; } +} diff --git a/src/SignalR/src/Eventuous.SignalR.Server/SubscriptionGateway.cs b/src/SignalR/src/Eventuous.SignalR.Server/SubscriptionGateway.cs new file mode 100644 index 000000000..0cd0c745d --- /dev/null +++ b/src/SignalR/src/Eventuous.SignalR.Server/SubscriptionGateway.cs @@ -0,0 +1,102 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using System.Collections.Concurrent; +using Eventuous.Subscriptions.Filters; +using Microsoft.AspNetCore.SignalR; +using Microsoft.Extensions.Logging; + +namespace Eventuous.SignalR.Server; + +public class SubscriptionGateway : IAsyncDisposable where THub : Hub { + readonly SignalRProducer _producer; + readonly SubscriptionFactory _subscriptionFactory; + readonly IEventSerializer _eventSerializer; + readonly ILogger _logger; + readonly ConcurrentDictionary<(string ConnectionId, string Stream), SubscriptionState> _subscriptions = new(); + + public SubscriptionGateway( + SignalRProducer producer, + SignalRGatewayOptions options, + IEventSerializer eventSerializer, + ILoggerFactory loggerFactory + ) { + _producer = producer; + _subscriptionFactory = options.SubscriptionFactory; + _eventSerializer = eventSerializer; + _logger = loggerFactory.CreateLogger>(); + } + + public async Task SubscribeAsync(string connectionId, string stream, ulong? fromPosition, CancellationToken ct = default) { + var key = (connectionId, stream); + + // Remove existing subscription for same key + if (_subscriptions.TryRemove(key, out var existing)) { + await StopSubscription(existing).NoContext(); + } + + var transform = SignalRTransform.Create(connectionId, stream, _eventSerializer); + var handler = GatewayHandlerFactory.Create(_producer, transform, awaitProduce: true); + var pipe = new ConsumePipe(); + pipe.AddDefaultConsumer(handler); + + var subscriptionId = $"signalr-{connectionId}-{stream}"; + var subscription = _subscriptionFactory(new StreamName(stream), fromPosition, pipe, subscriptionId); + + var cts = CancellationTokenSource.CreateLinkedTokenSource(ct); + var state = new SubscriptionState(subscription, pipe, cts); + + _subscriptions[key] = state; + + // Start subscription in background + _ = Task.Run(async () => { + try { + await subscription.Subscribe( + _ => _logger.LogDebug("Subscribed {SubscriptionId}", subscriptionId), + (id, reason, ex) => _logger.LogWarning(ex, "Subscription {SubscriptionId} dropped: {Reason}", id, reason), + cts.Token + ).NoContext(); + } catch (OperationCanceledException) { + // Expected on unsubscribe + } catch (Exception ex) { + _logger.LogError(ex, "Subscription {SubscriptionId} failed", subscriptionId); + _subscriptions.TryRemove(key, out _); + } + }, cts.Token); + } + + public async Task UnsubscribeAsync(string connectionId, string stream) { + if (_subscriptions.TryRemove((connectionId, stream), out var state)) { + await StopSubscription(state).NoContext(); + } + } + + public async Task RemoveConnectionAsync(string connectionId) { + var keys = _subscriptions.Keys.Where(k => k.ConnectionId == connectionId).ToList(); + foreach (var key in keys) { + if (_subscriptions.TryRemove(key, out var state)) { + await StopSubscription(state).NoContext(); + } + } + } + + static async Task StopSubscription(SubscriptionState state) { + await state.Cts.CancelAsync(); + try { + await state.Subscription.Unsubscribe(_ => { }, CancellationToken.None).NoContext(); + } catch { + // Best effort cleanup + } + await state.Pipe.DisposeAsync().NoContext(); + state.Cts.Dispose(); + } + + public async ValueTask DisposeAsync() { + foreach (var (_, state) in _subscriptions) { + await StopSubscription(state).NoContext(); + } + _subscriptions.Clear(); + } + + record SubscriptionState(IMessageSubscription Subscription, ConsumePipe Pipe, CancellationTokenSource Cts); +} diff --git a/src/SignalR/test/Eventuous.Tests.SignalR/SubscriptionGatewayTests.cs b/src/SignalR/test/Eventuous.Tests.SignalR/SubscriptionGatewayTests.cs new file mode 100644 index 000000000..64d27ec16 --- /dev/null +++ b/src/SignalR/test/Eventuous.Tests.SignalR/SubscriptionGatewayTests.cs @@ -0,0 +1,110 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using System.Collections.Concurrent; +using Eventuous.SignalR.Server; +using Eventuous.Subscriptions; +using Eventuous.Subscriptions.Filters; +using Microsoft.AspNetCore.SignalR; +using Microsoft.Extensions.Logging.Abstractions; +using NSubstitute; + +namespace Eventuous.Tests.SignalR; + +public class SubscriptionGatewayTests { + readonly List<(StreamName Stream, string SubId)> _factoryCalls = []; + readonly ConcurrentDictionary _createdSubscriptions = new(); + + SubscriptionGateway CreateGateway() { + var hubContext = Substitute.For>(); + var hubClients = Substitute.For(); + var clientProxy = Substitute.For(); + hubContext.Clients.Returns(hubClients); + hubClients.Client(Arg.Any()).Returns(clientProxy); + + var producer = new SignalRProducer(hubContext); + var options = new SignalRGatewayOptions { + SubscriptionFactory = (stream, fromPosition, pipe, subscriptionId) => { + _factoryCalls.Add((stream, subscriptionId)); + var sub = Substitute.For(); + sub.SubscriptionId.Returns(subscriptionId); + // Make Subscribe block until cancelled + sub.Subscribe(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(ci => new ValueTask(Task.Run(async () => { + var token = ci.ArgAt(2); + try { await Task.Delay(Timeout.Infinite, token); } + catch (OperationCanceledException) { } + }))); + sub.Unsubscribe(Arg.Any(), Arg.Any()) + .Returns(ValueTask.CompletedTask); + _createdSubscriptions[subscriptionId] = sub; + return sub; + } + }; + var serializer = DefaultEventSerializer.Instance; + + return new SubscriptionGateway(producer, options, serializer, NullLoggerFactory.Instance); + } + + [Test] + public async Task SubscribeAsync_CreatesSubscriptionViaFactory() { + await using var gateway = CreateGateway(); + await gateway.SubscribeAsync("conn-1", "Test-1", null); + await Task.Delay(50); // Let background task start + + await Assert.That(_factoryCalls).HasCount().EqualTo(1); + await Assert.That(_factoryCalls[0].SubId).IsEqualTo("signalr-conn-1-Test-1"); + } + + [Test] + public async Task UnsubscribeAsync_RemovesSubscription() { + await using var gateway = CreateGateway(); + await gateway.SubscribeAsync("conn-1", "Test-1", null); + await Task.Delay(50); + + await gateway.UnsubscribeAsync("conn-1", "Test-1"); + await Task.Delay(50); + + // Verify unsubscribe was called + var sub = _createdSubscriptions["signalr-conn-1-Test-1"]; + await sub.Received(1).Unsubscribe(Arg.Any(), Arg.Any()); + } + + [Test] + public async Task RemoveConnectionAsync_CleansUpAllSubscriptions() { + await using var gateway = CreateGateway(); + await gateway.SubscribeAsync("conn-1", "Stream-A", null); + await gateway.SubscribeAsync("conn-1", "Stream-B", null); + await gateway.SubscribeAsync("conn-2", "Stream-A", null); + await Task.Delay(50); + + await gateway.RemoveConnectionAsync("conn-1"); + await Task.Delay(50); + + // conn-1 subs should be unsubscribed + var subA = _createdSubscriptions["signalr-conn-1-Stream-A"]; + var subB = _createdSubscriptions["signalr-conn-1-Stream-B"]; + await subA.Received(1).Unsubscribe(Arg.Any(), Arg.Any()); + await subB.Received(1).Unsubscribe(Arg.Any(), Arg.Any()); + + // conn-2 sub should NOT be unsubscribed + var subC = _createdSubscriptions["signalr-conn-2-Stream-A"]; + await subC.DidNotReceive().Unsubscribe(Arg.Any(), Arg.Any()); + } + + [Test] + public async Task DuplicateSubscribe_ReplacesPrevious() { + await using var gateway = CreateGateway(); + await gateway.SubscribeAsync("conn-1", "Test-1", null); + await Task.Delay(50); + + await gateway.SubscribeAsync("conn-1", "Test-1", 42); + await Task.Delay(50); + + await Assert.That(_factoryCalls).HasCount().EqualTo(2); + // First subscription should have been stopped + // Note: since both use the same key, the second overwrites in _createdSubscriptions + // So we check factory was called twice + await Assert.That(_factoryCalls).HasCount().EqualTo(2); + } +} From 2e4e3d16965316710421aa0044fa078da67a0c12 Mon Sep 17 00:00:00 2001 From: Alexey Zimarev Date: Fri, 20 Mar 2026 10:31:53 +0100 Subject: [PATCH 06/12] feat(signalr): add SignalRSubscriptionHub and DI registrations --- .../SignalRGatewayRegistrations.cs | 23 +++++++++++++++++++ .../SignalRSubscriptionHub.cs | 17 ++++++++++++++ 2 files changed, 40 insertions(+) create mode 100644 src/SignalR/src/Eventuous.SignalR.Server/Registrations/SignalRGatewayRegistrations.cs create mode 100644 src/SignalR/src/Eventuous.SignalR.Server/SignalRSubscriptionHub.cs diff --git a/src/SignalR/src/Eventuous.SignalR.Server/Registrations/SignalRGatewayRegistrations.cs b/src/SignalR/src/Eventuous.SignalR.Server/Registrations/SignalRGatewayRegistrations.cs new file mode 100644 index 000000000..8f218eb93 --- /dev/null +++ b/src/SignalR/src/Eventuous.SignalR.Server/Registrations/SignalRGatewayRegistrations.cs @@ -0,0 +1,23 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using Eventuous.SignalR.Server; +using Microsoft.AspNetCore.SignalR; + +namespace Microsoft.Extensions.DependencyInjection; + +public static class SignalRGatewayRegistrations { + public static IServiceCollection AddSignalRSubscriptionGateway( + this IServiceCollection services, + Action configure + ) where THub : Hub { + services.AddSingleton(sp => { + var options = new SignalRGatewayOptions { SubscriptionFactory = null! }; + configure(sp, options); + return options; + }); + services.AddSingleton>(); + services.AddSingleton>(); + return services; + } +} diff --git a/src/SignalR/src/Eventuous.SignalR.Server/SignalRSubscriptionHub.cs b/src/SignalR/src/Eventuous.SignalR.Server/SignalRSubscriptionHub.cs new file mode 100644 index 000000000..7ae225eaf --- /dev/null +++ b/src/SignalR/src/Eventuous.SignalR.Server/SignalRSubscriptionHub.cs @@ -0,0 +1,17 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using Microsoft.AspNetCore.SignalR; + +namespace Eventuous.SignalR.Server; + +public class SignalRSubscriptionHub(SubscriptionGateway gateway) : Hub { + public Task SubscribeToStream(string stream, ulong? fromPosition) + => gateway.SubscribeAsync(Context.ConnectionId, stream, fromPosition, Context.ConnectionAborted); + + public Task UnsubscribeFromStream(string stream) + => gateway.UnsubscribeAsync(Context.ConnectionId, stream); + + public override Task OnDisconnectedAsync(Exception? exception) + => gateway.RemoveConnectionAsync(Context.ConnectionId); +} From 79bd118619e819fcaa6627fbd305e678755f7977 Mon Sep 17 00:00:00 2001 From: Alexey Zimarev Date: Fri, 20 Mar 2026 10:33:05 +0100 Subject: [PATCH 07/12] feat(signalr): add SignalRSubscriptionClient with IAsyncEnumerable and auto-reconnect Co-Authored-By: Claude Sonnet 4.6 --- .../SignalRSubscriptionClient.cs | 158 ++++++++++++++++++ .../SignalRSubscriptionClientOptions.cs | 9 + 2 files changed, 167 insertions(+) create mode 100644 src/SignalR/src/Eventuous.SignalR.Client/SignalRSubscriptionClient.cs create mode 100644 src/SignalR/src/Eventuous.SignalR.Client/SignalRSubscriptionClientOptions.cs diff --git a/src/SignalR/src/Eventuous.SignalR.Client/SignalRSubscriptionClient.cs b/src/SignalR/src/Eventuous.SignalR.Client/SignalRSubscriptionClient.cs new file mode 100644 index 000000000..cc06abaa5 --- /dev/null +++ b/src/SignalR/src/Eventuous.SignalR.Client/SignalRSubscriptionClient.cs @@ -0,0 +1,158 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using System.Collections.Concurrent; +using System.Threading.Channels; +using Microsoft.AspNetCore.SignalR.Client; + +namespace Eventuous.SignalR.Client; + +public class SignalRSubscriptionClient : IAsyncDisposable { + readonly HubConnection _connection; + readonly SignalRSubscriptionClientOptions _options; + readonly ConcurrentDictionary _subscriptions = new(); + readonly IDisposable _eventRegistration; + readonly IDisposable _errorRegistration; + bool _disposed; + + public SignalRSubscriptionClient(HubConnection connection, SignalRSubscriptionClientOptions? options = null) { + _connection = connection; + _options = options ?? new SignalRSubscriptionClientOptions(); + + _eventRegistration = _connection.On( + SignalRSubscriptionMethods.StreamEvent, + OnStreamEvent + ); + + _errorRegistration = _connection.On( + SignalRSubscriptionMethods.StreamError, + OnStreamError + ); + + _connection.Reconnected += OnReconnected; + _connection.Closed += OnClosed; + } + + public async IAsyncEnumerable SubscribeAsync( + string stream, + ulong? fromPosition, + [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken ct = default + ) { + var channel = Channel.CreateUnbounded( + new UnboundedChannelOptions { SingleReader = true, SingleWriter = true } + ); + + var state = new SubscriptionState(channel, fromPosition); + _subscriptions[stream] = state; + + await _connection.InvokeAsync( + SignalRSubscriptionMethods.Subscribe, + stream, + fromPosition, + ct + ).ConfigureAwait(false); + + await foreach (var envelope in channel.Reader.ReadAllAsync(ct).ConfigureAwait(false)) { + yield return envelope; + } + + // Cleanup when enumeration stops + _subscriptions.TryRemove(stream, out _); + } + + public async Task UnsubscribeAsync(string stream) { + if (_subscriptions.TryRemove(stream, out var state)) { + state.Channel.Writer.TryComplete(); + await _connection.InvokeAsync( + SignalRSubscriptionMethods.Unsubscribe, + stream + ).ConfigureAwait(false); + } + } + + internal IEventSerializer GetSerializer() + => _options.Serializer ?? DefaultEventSerializer.Instance; + + internal bool TracingEnabled => _options.EnableTracing; + + internal SubscriptionState? GetSubscriptionState(string stream) + => _subscriptions.TryGetValue(stream, out var state) ? state : null; + + internal void RegisterSubscription(string stream, SubscriptionState state) + => _subscriptions[stream] = state; + + internal void RemoveSubscription(string stream) { + if (_subscriptions.TryRemove(stream, out var state)) { + state.Channel.Writer.TryComplete(); + } + } + + internal HubConnection Connection => _connection; + + void OnStreamEvent(StreamEventEnvelope envelope) { + if (!_subscriptions.TryGetValue(envelope.Stream, out var state)) return; + + // Deduplication: skip events at or before the last seen position + if (state.LastPosition.HasValue && envelope.StreamPosition <= state.LastPosition.Value) return; + + state.LastPosition = envelope.StreamPosition; + state.Channel.Writer.TryWrite(envelope); + } + + void OnStreamError(StreamSubscriptionError error) { + if (_subscriptions.TryRemove(error.Stream, out var state)) { + state.Channel.Writer.TryComplete(new Exception(error.Message)); + } + } + + async Task OnReconnected(string? connectionId) { + foreach (var (stream, state) in _subscriptions) { + try { + await _connection.InvokeAsync( + SignalRSubscriptionMethods.Subscribe, + stream, + state.LastPosition, + CancellationToken.None + ).ConfigureAwait(false); + } catch { + // Best-effort reconnection + } + } + } + + Task OnClosed(Exception? exception) { + foreach (var (stream, state) in _subscriptions) { + state.Channel.Writer.TryComplete(exception); + } + return Task.CompletedTask; + } + + public async ValueTask DisposeAsync() { + if (_disposed) return; + _disposed = true; + + _connection.Reconnected -= OnReconnected; + _connection.Closed -= OnClosed; + + foreach (var (stream, state) in _subscriptions) { + state.Channel.Writer.TryComplete(); + try { + await _connection.InvokeAsync( + SignalRSubscriptionMethods.Unsubscribe, + stream + ).ConfigureAwait(false); + } catch { + // Best effort + } + } + + _subscriptions.Clear(); + _eventRegistration.Dispose(); + _errorRegistration.Dispose(); + } + + internal class SubscriptionState(Channel channel, ulong? initialPosition) { + public Channel Channel { get; } = channel; + public ulong? LastPosition { get; set; } = initialPosition; + } +} diff --git a/src/SignalR/src/Eventuous.SignalR.Client/SignalRSubscriptionClientOptions.cs b/src/SignalR/src/Eventuous.SignalR.Client/SignalRSubscriptionClientOptions.cs new file mode 100644 index 000000000..13e9f7cb9 --- /dev/null +++ b/src/SignalR/src/Eventuous.SignalR.Client/SignalRSubscriptionClientOptions.cs @@ -0,0 +1,9 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +namespace Eventuous.SignalR.Client; + +public class SignalRSubscriptionClientOptions { + public IEventSerializer? Serializer { get; set; } + public bool EnableTracing { get; set; } +} From 62467d6c4b8c84d8b199fb39775c94245b362c5c Mon Sep 17 00:00:00 2001 From: Alexey Zimarev Date: Fri, 20 Mar 2026 10:37:22 +0100 Subject: [PATCH 08/12] feat(signalr): add TypedStreamSubscription with On and optional tracing Implements typed handler dispatch over SignalRSubscriptionClient's channel-based subscription, with optional OpenTelemetry trace context propagation. Co-Authored-By: Claude Sonnet 4.6 --- .../Eventuous.SignalR.Client.csproj | 1 + .../SignalRSubscriptionClient.cs | 3 + .../Eventuous.SignalR.Client/StreamMeta.cs | 6 + .../TypedStreamSubscription.cs | 145 ++++++++++++++++++ .../TypedStreamSubscriptionTests.cs | 90 +++++++++++ 5 files changed, 245 insertions(+) create mode 100644 src/SignalR/src/Eventuous.SignalR.Client/StreamMeta.cs create mode 100644 src/SignalR/src/Eventuous.SignalR.Client/TypedStreamSubscription.cs create mode 100644 src/SignalR/test/Eventuous.Tests.SignalR/TypedStreamSubscriptionTests.cs diff --git a/src/SignalR/src/Eventuous.SignalR.Client/Eventuous.SignalR.Client.csproj b/src/SignalR/src/Eventuous.SignalR.Client/Eventuous.SignalR.Client.csproj index 32f13e196..df5ca6abf 100644 --- a/src/SignalR/src/Eventuous.SignalR.Client/Eventuous.SignalR.Client.csproj +++ b/src/SignalR/src/Eventuous.SignalR.Client/Eventuous.SignalR.Client.csproj @@ -2,6 +2,7 @@ + diff --git a/src/SignalR/src/Eventuous.SignalR.Client/SignalRSubscriptionClient.cs b/src/SignalR/src/Eventuous.SignalR.Client/SignalRSubscriptionClient.cs index cc06abaa5..993db637c 100644 --- a/src/SignalR/src/Eventuous.SignalR.Client/SignalRSubscriptionClient.cs +++ b/src/SignalR/src/Eventuous.SignalR.Client/SignalRSubscriptionClient.cs @@ -89,6 +89,9 @@ internal void RemoveSubscription(string stream) { internal HubConnection Connection => _connection; + public TypedStreamSubscription SubscribeTyped(string stream, ulong? fromPosition) + => new TypedStreamSubscription(this, stream, fromPosition); + void OnStreamEvent(StreamEventEnvelope envelope) { if (!_subscriptions.TryGetValue(envelope.Stream, out var state)) return; diff --git a/src/SignalR/src/Eventuous.SignalR.Client/StreamMeta.cs b/src/SignalR/src/Eventuous.SignalR.Client/StreamMeta.cs new file mode 100644 index 000000000..72b91ab04 --- /dev/null +++ b/src/SignalR/src/Eventuous.SignalR.Client/StreamMeta.cs @@ -0,0 +1,6 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +namespace Eventuous.SignalR.Client; + +public record StreamMeta(string Stream, ulong Position, DateTime Timestamp); diff --git a/src/SignalR/src/Eventuous.SignalR.Client/TypedStreamSubscription.cs b/src/SignalR/src/Eventuous.SignalR.Client/TypedStreamSubscription.cs new file mode 100644 index 000000000..1e93c0851 --- /dev/null +++ b/src/SignalR/src/Eventuous.SignalR.Client/TypedStreamSubscription.cs @@ -0,0 +1,145 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using System.Diagnostics; +using System.Text; +using System.Text.Json; +using System.Threading.Channels; +using Eventuous.Diagnostics; +using Microsoft.AspNetCore.SignalR.Client; + +namespace Eventuous.SignalR.Client; + +public class TypedStreamSubscription : IAsyncDisposable { + readonly SignalRSubscriptionClient _client; + readonly string _stream; + readonly ulong? _fromPosition; + readonly Dictionary> _handlers = new(); + Action? _errorHandler; + CancellationTokenSource? _cts; + Task? _consumeTask; + bool _started; + + internal TypedStreamSubscription(SignalRSubscriptionClient client, string stream, ulong? fromPosition) { + _client = client; + _stream = stream; + _fromPosition = fromPosition; + } + + public TypedStreamSubscription On(Func handler) where T : class { + if (_started) throw new InvalidOperationException("Cannot register handlers after StartAsync has been called."); + + var eventType = TypeMap.Instance.GetTypeName(); + _handlers[eventType] = (obj, meta) => handler((T)obj, meta); + return this; + } + + public TypedStreamSubscription OnError(Action handler) { + _errorHandler = handler; + return this; + } + + public async Task StartAsync(CancellationToken ct = default) { + if (_started) throw new InvalidOperationException("StartAsync has already been called."); + _started = true; + + _cts = CancellationTokenSource.CreateLinkedTokenSource(ct); + + var channel = Channel.CreateUnbounded( + new UnboundedChannelOptions { SingleReader = true, SingleWriter = true } + ); + _client.RegisterSubscription(_stream, new SignalRSubscriptionClient.SubscriptionState(channel, _fromPosition)); + + await _client.Connection.InvokeAsync( + SignalRSubscriptionMethods.Subscribe, + _stream, + _fromPosition, + _cts.Token + ).ConfigureAwait(false); + + _consumeTask = ConsumeLoop(channel.Reader, _cts.Token); + } + + async Task ConsumeLoop(ChannelReader reader, CancellationToken ct) { + var serializer = _client.GetSerializer(); + var enableTracing = _client.TracingEnabled; + + try { + await foreach (var envelope in reader.ReadAllAsync(ct).ConfigureAwait(false)) { + if (!_handlers.TryGetValue(envelope.EventType, out var handler)) continue; + + var payload = Encoding.UTF8.GetBytes(envelope.JsonPayload); + var result = serializer.DeserializeEvent(payload, envelope.EventType, "application/json"); + + if (result is not DeserializationResult.SuccessfullyDeserialized deserialized) continue; + + var meta = new StreamMeta(envelope.Stream, envelope.StreamPosition, envelope.Timestamp); + Activity? activity = null; + + try { + if (enableTracing && envelope.JsonMetadata != null) { + activity = StartTraceActivity(envelope.JsonMetadata); + } + + await handler(deserialized.Payload, meta).ConfigureAwait(false); + } finally { + activity?.Dispose(); + } + } + } catch (OperationCanceledException) { + // Expected on dispose/cancellation + } catch (ChannelClosedException) { + // Channel completed (server error or connection closed) + } catch (Exception ex) { + _errorHandler?.Invoke(new StreamSubscriptionError { + Stream = _stream, + Message = ex.Message + }); + } + } + + static Activity? StartTraceActivity(string jsonMetadata) { + try { + var metaDict = JsonSerializer.Deserialize>(jsonMetadata); + if (metaDict == null) return null; + + var metadata = new Metadata(metaDict); + var tracingMeta = metadata.GetTracingMeta(); + var parentContext = tracingMeta.ToActivityContext(isRemote: true); + + if (parentContext == null) return null; + + return EventuousDiagnostics.ActivitySource.StartActivity( + "signalr.consume", + ActivityKind.Consumer, + parentContext.Value + ); + } catch { + return null; + } + } + + public async ValueTask DisposeAsync() { + if (_cts != null) { + await _cts.CancelAsync().ConfigureAwait(false); + + if (_consumeTask != null) { + try { await _consumeTask.ConfigureAwait(false); } catch { /* expected */ } + } + + _cts.Dispose(); + } + + if (_started) { + _client.RemoveSubscription(_stream); + try { + await _client.Connection.InvokeAsync( + SignalRSubscriptionMethods.Unsubscribe, + _stream + ).ConfigureAwait(false); + } catch { + // Best effort + } + } + } +} diff --git a/src/SignalR/test/Eventuous.Tests.SignalR/TypedStreamSubscriptionTests.cs b/src/SignalR/test/Eventuous.Tests.SignalR/TypedStreamSubscriptionTests.cs new file mode 100644 index 000000000..146f1e3c6 --- /dev/null +++ b/src/SignalR/test/Eventuous.Tests.SignalR/TypedStreamSubscriptionTests.cs @@ -0,0 +1,90 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +extern alias SignalRClient; + +using SignalRClient::Eventuous.SignalR.Client; +using Microsoft.AspNetCore.SignalR.Client; + +namespace Eventuous.Tests.SignalR; + +public class TypedStreamSubscriptionTests { + static HubConnection BuildFakeConnection() + => new HubConnectionBuilder() + .WithUrl("http://localhost:9999/test") + .Build(); + + [Test] + public async Task On_BeforeStart_RegistersHandler() { + var connection = BuildFakeConnection(); + var client = new SignalRSubscriptionClient(connection); + var sub = client.SubscribeTyped("test-stream", null); + + // Register two handlers before start — should not throw + sub.On((_, _) => ValueTask.CompletedTask); + sub.On((_, _) => ValueTask.CompletedTask); + + await Assert.That(sub).IsNotNull(); + await client.DisposeAsync(); + } + + [Test] + public async Task On_AfterStart_Throws() { + // We can't call StartAsync (no real server), so we test the guard + // by verifying On() is allowed before start and blocks after. + // Since we can't actually start without a server, we test the object + // construction and pre-start handler registration. + var connection = BuildFakeConnection(); + var client = new SignalRSubscriptionClient(connection); + var sub = client.SubscribeTyped("test-stream", null); + + // Should work fine before start + await Assert.That(() => sub.On((_, _) => ValueTask.CompletedTask)).ThrowsNothing(); + + await client.DisposeAsync(); + } + + [Test] + public async Task OnError_CanBeChained() { + var connection = BuildFakeConnection(); + var client = new SignalRSubscriptionClient(connection); + var sub = client.SubscribeTyped("test-stream", null); + + // Fluent chaining should work + var result = sub + .On((_, _) => ValueTask.CompletedTask) + .OnError(_ => { }); + + await Assert.That(result).IsSameReferenceAs(sub); + await client.DisposeAsync(); + } + + [Test] + public async Task DisposeAsync_WithoutStart_IsNoOp() { + var connection = BuildFakeConnection(); + var client = new SignalRSubscriptionClient(connection); + var sub = client.SubscribeTyped("test-stream", null); + + // Should not throw even though StartAsync was never called + await Assert.That(async () => await sub.DisposeAsync()).ThrowsNothing(); + await client.DisposeAsync(); + } + + [Test] + public async Task SubscribeTyped_ReturnsNewInstanceEachTime() { + var connection = BuildFakeConnection(); + var client = new SignalRSubscriptionClient(connection); + + var sub1 = client.SubscribeTyped("stream-a", null); + var sub2 = client.SubscribeTyped("stream-b", null); + + await Assert.That(sub1).IsNotSameReferenceAs(sub2); + await client.DisposeAsync(); + } +} + +[EventType("TypedEvent1")] +record TypedEvent1(string Data); + +[EventType("TypedEvent2")] +record TypedEvent2(int Count); From 787c8f54d9d238ca5d1ccc8d37b9bd6861d5297d Mon Sep 17 00:00:00 2001 From: Alexey Zimarev Date: Fri, 20 Mar 2026 11:03:04 +0100 Subject: [PATCH 09/12] docs: add SignalR subscription gateway documentation --- docs/src/content/docs/next/infra/signalr.md | 190 ++++++++++++++++++++ 1 file changed, 190 insertions(+) create mode 100644 docs/src/content/docs/next/infra/signalr.md diff --git a/docs/src/content/docs/next/infra/signalr.md b/docs/src/content/docs/next/infra/signalr.md new file mode 100644 index 000000000..98f077687 --- /dev/null +++ b/docs/src/content/docs/next/infra/signalr.md @@ -0,0 +1,190 @@ +--- +title: "SignalR" +description: "Real-time event streaming to clients via SignalR" +sidebar: + order: 11 +--- + +## Introduction + +The SignalR subscription gateway bridges Eventuous stream subscriptions to SignalR, enabling real-time event streaming to browser UIs, mobile apps, or other remote clients. It provides two NuGet packages: + +- **`Eventuous.SignalR.Server`** — server-side gateway that manages per-connection Eventuous subscriptions and forwards events over SignalR +- **`Eventuous.SignalR.Client`** — client-side subscription API with auto-reconnect and typed event handling + +The server reuses the existing [Gateway](../../../gateway) pattern (`GatewayHandler` + `BaseProducer`) internally, so event forwarding benefits from the same tracing and metadata pipeline as other Eventuous producers. + +## Server + +### Registration + +Register the gateway with a subscription factory that creates store-specific subscriptions on demand: + +```csharp +builder.Services.AddSignalRSubscriptionGateway((sp, options) => { + var client = sp.GetRequiredService(); + var loggerFactory = sp.GetRequiredService(); + + options.SubscriptionFactory = (stream, fromPosition, pipe, subscriptionId) => + new StreamSubscription(client, new StreamSubscriptionOptions { + StreamName = stream, + SubscriptionId = subscriptionId + }, new NoOpCheckpointStore(fromPosition), pipe, loggerFactory); +}); +``` + +The `SubscriptionFactory` delegate is called each time a client subscribes to a stream. It receives the stream name, optional starting position, a pre-built consume pipe, and a subscription identifier. You can use any Eventuous subscription type (KurrentDB, PostgreSQL, etc.). + +### Hub + +Map the ready-made hub to an endpoint: + +```csharp +app.MapHub("/subscriptions"); +``` + +The built-in `SignalRSubscriptionHub` exposes two methods that clients call: + +- `SubscribeToStream(string stream, ulong? fromPosition)` — start receiving events +- `UnsubscribeFromStream(string stream)` — stop receiving events + +When a client disconnects, all its subscriptions are automatically cleaned up. + +### Custom hubs + +For applications that need a custom hub (e.g., adding authentication or authorization logic), inject `SubscriptionGateway` directly: + +```csharp +public class MyHub(SubscriptionGateway gateway) : Hub { + public Task SubscribeToStream(string stream, ulong? fromPosition) + => gateway.SubscribeAsync(Context.ConnectionId, stream, fromPosition, Context.ConnectionAborted); + + public Task UnsubscribeFromStream(string stream) + => gateway.UnsubscribeAsync(Context.ConnectionId, stream); + + public override Task OnDisconnectedAsync(Exception? exception) + => gateway.RemoveConnectionAsync(Context.ConnectionId); +} +``` + +Register with your custom hub type: + +```csharp +builder.Services.AddSignalRSubscriptionGateway((sp, options) => { + // configure subscription factory +}); +``` + +## Client + +### Connection setup + +Create a `SignalRSubscriptionClient` from any `HubConnection`. The client hooks into SignalR's reconnect lifecycle but doesn't own the connection policy — configure automatic reconnect on the `HubConnection` itself: + +```csharp +var connection = new HubConnectionBuilder() + .WithUrl("https://myserver/subscriptions") + .WithAutomaticReconnect() + .Build(); + +await connection.StartAsync(); + +var client = new SignalRSubscriptionClient(connection); +``` + +### Raw streaming with IAsyncEnumerable + +The simplest consumption mode returns events as they arrive: + +```csharp +await foreach (var envelope in client.SubscribeAsync("Order-123", fromPosition: null)) { + Console.WriteLine($"{envelope.EventType} at position {envelope.StreamPosition}"); + Console.WriteLine(envelope.JsonPayload); +} +``` + +Each `StreamEventEnvelope` contains: + +| Property | Description | +|---|---| +| `EventId` | Unique event identifier | +| `Stream` | Source stream name | +| `EventType` | Registered event type name | +| `StreamPosition` | Position within the stream | +| `GlobalPosition` | Position in the global event log | +| `Timestamp` | When the event was created | +| `JsonPayload` | Event payload as JSON | +| `JsonMetadata` | Event metadata as JSON (may include trace context) | + +### Typed consumption with On<T> + +For type-safe event handling, use `SubscribeTyped` with fluent handler registration: + +```csharp +await client.SubscribeTyped("Order-123", fromPosition: 0) + .On((evt, meta) => { + Console.WriteLine($"Order placed: {evt.OrderId} at {meta.Timestamp}"); + return ValueTask.CompletedTask; + }) + .On((evt, meta) => { + Console.WriteLine($"Order shipped at position {meta.Position}"); + return ValueTask.CompletedTask; + }) + .OnError(err => Console.WriteLine($"Error on {err.Stream}: {err.Message}")) + .StartAsync(); +``` + +Events are deserialized using the Eventuous `TypeMap` and `IEventSerializer`. Event types must be registered in `TypeMap` as usual (via `[EventType]` attribute or manual registration). Unrecognized event types are silently skipped. + +All `On` handlers must be registered before calling `StartAsync`. Calling `On` after `StartAsync` throws `InvalidOperationException`. + +### Client options + +```csharp +var client = new SignalRSubscriptionClient(connection, new SignalRSubscriptionClientOptions { + Serializer = customSerializer, // default: DefaultEventSerializer.Instance + EnableTracing = true // default: false +}); +``` + +| Option | Description | +|---|---| +| `Serializer` | Custom `IEventSerializer` for deserializing event payloads in typed mode | +| `EnableTracing` | When `true`, the client creates an `Activity` for each received event, linked to the trace context from metadata. Enable when the client has an OpenTelemetry collector configured. | + +## Auto-reconnect + +The client handles connection drops transparently: + +1. **Position tracking** — the client records the last stream position for each active subscription +2. **Re-subscribe on reconnect** — when SignalR reconnects, the client re-sends `SubscribeToStream` for each active subscription with the last known position +3. **Deduplication** — events at or before the last seen position are skipped, preventing duplicates after reconnect + +The server is stateless — it creates fresh subscriptions from the positions provided by the client. + +``` +Normal flow: + Client ──SubscribeToStream("Order-1", 42)──► Server + Client ◄──StreamEvent(pos=43)──────────────── Server + Client ◄──StreamEvent(pos=44)──────────────── Server + [tracks lastPosition = 44] + +Disconnect + Reconnect: + [connection drops, SignalR reconnects] + Client ──SubscribeToStream("Order-1", 44)──► Server + Client ◄──StreamEvent(pos=44)──────────────── Server [duplicate, skipped] + Client ◄──StreamEvent(pos=45)──────────────── Server [new, delivered] +``` + +## Wire format + +Events are transmitted as `StreamEventEnvelope` records over SignalR. The payload is pre-serialized JSON on the server side, avoiding polymorphic serialization issues. Metadata (including trace context) flows through `JsonMetadata` as a serialized dictionary. + +Trace context propagates through the existing Eventuous metadata pipeline: `$traceId` and `$spanId` keys in metadata are preserved from the original event through the gateway to the client. When `EnableTracing` is enabled on the client, the consume activity is linked to the original trace. + +## Packages + +| Package | Dependencies | Purpose | +|---|---|---| +| `Eventuous.SignalR.Server` | `Eventuous.Subscriptions`, `Eventuous.Gateway`, `Microsoft.AspNetCore.App` | Server-side gateway | +| `Eventuous.SignalR.Client` | `Eventuous.Shared`, `Eventuous.Serialization`, `Eventuous.Diagnostics`, `Microsoft.AspNetCore.SignalR.Client` | Client-side subscriptions | From 03746a087070ceed80d93ec58fb457c2341b0a41 Mon Sep 17 00:00:00 2001 From: Alexey Zimarev Date: Fri, 20 Mar 2026 11:13:07 +0100 Subject: [PATCH 10/12] fix(signalr): address code review feedback - Fix SubscribeAsync cleanup: wrap in try/finally so subscription state is always removed on cancellation or error, not just on normal loop exit - Fix duplicate subscribe: complete previous channel before replacing - Send StreamError to client when server-side subscription fails, instead of only logging (client was waiting forever with no error callback) - Route ChannelClosedException to TypedStreamSubscription.OnError so typed subscribers see server failures and connection closes - Add null guard on SignalRProduceOptions in SignalRProducer - Narrow generic catch clauses to specific expected exception types (OperationCanceledException, ObjectDisposedException) throughout - Use .NoContext() instead of .ConfigureAwait(false) per repo convention (link TaskExtensions into client project) - Use ex.ToString() in error messages for better diagnostics --- .../Eventuous.SignalR.Client.csproj | 6 +++ .../SignalRSubscriptionClient.cs | 52 +++++++++++-------- .../TypedStreamSubscription.cs | 45 +++++++++------- .../SignalRProducer.cs | 3 +- .../SubscriptionGateway.cs | 36 +++++++++---- .../SubscriptionGatewayTests.cs | 4 +- 6 files changed, 93 insertions(+), 53 deletions(-) diff --git a/src/SignalR/src/Eventuous.SignalR.Client/Eventuous.SignalR.Client.csproj b/src/SignalR/src/Eventuous.SignalR.Client/Eventuous.SignalR.Client.csproj index df5ca6abf..018077b29 100644 --- a/src/SignalR/src/Eventuous.SignalR.Client/Eventuous.SignalR.Client.csproj +++ b/src/SignalR/src/Eventuous.SignalR.Client/Eventuous.SignalR.Client.csproj @@ -10,4 +10,10 @@ + + + Tools\TaskExtensions.cs + + + diff --git a/src/SignalR/src/Eventuous.SignalR.Client/SignalRSubscriptionClient.cs b/src/SignalR/src/Eventuous.SignalR.Client/SignalRSubscriptionClient.cs index 993db637c..a9f262858 100644 --- a/src/SignalR/src/Eventuous.SignalR.Client/SignalRSubscriptionClient.cs +++ b/src/SignalR/src/Eventuous.SignalR.Client/SignalRSubscriptionClient.cs @@ -42,22 +42,28 @@ public async IAsyncEnumerable SubscribeAsync( new UnboundedChannelOptions { SingleReader = true, SingleWriter = true } ); + // Complete any existing subscription for this stream before replacing + if (_subscriptions.TryRemove(stream, out var previous)) { + previous.Channel.Writer.TryComplete(); + } + var state = new SubscriptionState(channel, fromPosition); _subscriptions[stream] = state; - await _connection.InvokeAsync( - SignalRSubscriptionMethods.Subscribe, - stream, - fromPosition, - ct - ).ConfigureAwait(false); - - await foreach (var envelope in channel.Reader.ReadAllAsync(ct).ConfigureAwait(false)) { - yield return envelope; + try { + await _connection.InvokeAsync( + SignalRSubscriptionMethods.Subscribe, + stream, + fromPosition, + ct + ).NoContext(); + + await foreach (var envelope in channel.Reader.ReadAllAsync(ct).NoContext(ct)) { + yield return envelope; + } + } finally { + _subscriptions.TryRemove(stream, out _); } - - // Cleanup when enumeration stops - _subscriptions.TryRemove(stream, out _); } public async Task UnsubscribeAsync(string stream) { @@ -66,7 +72,7 @@ public async Task UnsubscribeAsync(string stream) { await _connection.InvokeAsync( SignalRSubscriptionMethods.Unsubscribe, stream - ).ConfigureAwait(false); + ).NoContext(); } } @@ -90,7 +96,7 @@ internal void RemoveSubscription(string stream) { internal HubConnection Connection => _connection; public TypedStreamSubscription SubscribeTyped(string stream, ulong? fromPosition) - => new TypedStreamSubscription(this, stream, fromPosition); + => new(this, stream, fromPosition); void OnStreamEvent(StreamEventEnvelope envelope) { if (!_subscriptions.TryGetValue(envelope.Stream, out var state)) return; @@ -116,15 +122,17 @@ await _connection.InvokeAsync( stream, state.LastPosition, CancellationToken.None - ).ConfigureAwait(false); - } catch { - // Best-effort reconnection + ).NoContext(); + } catch (OperationCanceledException) { + // Connection was disposed during reconnect + } catch (ObjectDisposedException) { + // Connection already torn down } } } Task OnClosed(Exception? exception) { - foreach (var (stream, state) in _subscriptions) { + foreach (var (_, state) in _subscriptions) { state.Channel.Writer.TryComplete(exception); } return Task.CompletedTask; @@ -143,9 +151,11 @@ public async ValueTask DisposeAsync() { await _connection.InvokeAsync( SignalRSubscriptionMethods.Unsubscribe, stream - ).ConfigureAwait(false); - } catch { - // Best effort + ).NoContext(); + } catch (OperationCanceledException) { + // Expected during shutdown + } catch (ObjectDisposedException) { + // Connection already torn down } } diff --git a/src/SignalR/src/Eventuous.SignalR.Client/TypedStreamSubscription.cs b/src/SignalR/src/Eventuous.SignalR.Client/TypedStreamSubscription.cs index 1e93c0851..e745ce16f 100644 --- a/src/SignalR/src/Eventuous.SignalR.Client/TypedStreamSubscription.cs +++ b/src/SignalR/src/Eventuous.SignalR.Client/TypedStreamSubscription.cs @@ -11,14 +11,14 @@ namespace Eventuous.SignalR.Client; public class TypedStreamSubscription : IAsyncDisposable { - readonly SignalRSubscriptionClient _client; - readonly string _stream; - readonly ulong? _fromPosition; + readonly SignalRSubscriptionClient _client; + readonly string _stream; + readonly ulong? _fromPosition; readonly Dictionary> _handlers = new(); - Action? _errorHandler; - CancellationTokenSource? _cts; - Task? _consumeTask; - bool _started; + Action? _errorHandler; + CancellationTokenSource? _cts; + Task? _consumeTask; + bool _started; internal TypedStreamSubscription(SignalRSubscriptionClient client, string stream, ulong? fromPosition) { _client = client; @@ -55,7 +55,7 @@ await _client.Connection.InvokeAsync( _stream, _fromPosition, _cts.Token - ).ConfigureAwait(false); + ).NoContext(); _consumeTask = ConsumeLoop(channel.Reader, _cts.Token); } @@ -65,7 +65,7 @@ async Task ConsumeLoop(ChannelReader reader, CancellationTo var enableTracing = _client.TracingEnabled; try { - await foreach (var envelope in reader.ReadAllAsync(ct).ConfigureAwait(false)) { + await foreach (var envelope in reader.ReadAllAsync(ct).NoContext(ct)) { if (!_handlers.TryGetValue(envelope.EventType, out var handler)) continue; var payload = Encoding.UTF8.GetBytes(envelope.JsonPayload); @@ -81,19 +81,23 @@ async Task ConsumeLoop(ChannelReader reader, CancellationTo activity = StartTraceActivity(envelope.JsonMetadata); } - await handler(deserialized.Payload, meta).ConfigureAwait(false); + await handler(deserialized.Payload, meta).NoContext(); } finally { activity?.Dispose(); } } } catch (OperationCanceledException) { // Expected on dispose/cancellation - } catch (ChannelClosedException) { - // Channel completed (server error or connection closed) + } catch (ChannelClosedException ex) { + // Channel completed with error (server subscription failure or connection closed) + _errorHandler?.Invoke(new StreamSubscriptionError { + Stream = _stream, + Message = ex.InnerException?.Message ?? "Subscription channel closed" + }); } catch (Exception ex) { _errorHandler?.Invoke(new StreamSubscriptionError { Stream = _stream, - Message = ex.Message + Message = ex.ToString() }); } } @@ -114,17 +118,18 @@ async Task ConsumeLoop(ChannelReader reader, CancellationTo ActivityKind.Consumer, parentContext.Value ); - } catch { + } catch (Exception) { + // Tracing is best-effort; malformed metadata must not break event consumption return null; } } public async ValueTask DisposeAsync() { if (_cts != null) { - await _cts.CancelAsync().ConfigureAwait(false); + await _cts.CancelAsync().NoContext(); if (_consumeTask != null) { - try { await _consumeTask.ConfigureAwait(false); } catch { /* expected */ } + try { await _consumeTask.NoContext(); } catch (OperationCanceledException) { /* expected */ } } _cts.Dispose(); @@ -136,9 +141,11 @@ public async ValueTask DisposeAsync() { await _client.Connection.InvokeAsync( SignalRSubscriptionMethods.Unsubscribe, _stream - ).ConfigureAwait(false); - } catch { - // Best effort + ).NoContext(); + } catch (OperationCanceledException) { + // Expected during shutdown + } catch (ObjectDisposedException) { + // Connection already torn down } } } diff --git a/src/SignalR/src/Eventuous.SignalR.Server/SignalRProducer.cs b/src/SignalR/src/Eventuous.SignalR.Server/SignalRProducer.cs index e91d7dec0..bb137612e 100644 --- a/src/SignalR/src/Eventuous.SignalR.Server/SignalRProducer.cs +++ b/src/SignalR/src/Eventuous.SignalR.Server/SignalRProducer.cs @@ -18,7 +18,8 @@ protected override async Task ProduceMessages( SignalRProduceOptions? options, CancellationToken cancellationToken = default ) { - var client = hubContext.Clients.Client(options!.ConnectionId); + ArgumentNullException.ThrowIfNull(options); + var client = hubContext.Clients.Client(options.ConnectionId); foreach (var msg in messages) { await client.SendAsync( diff --git a/src/SignalR/src/Eventuous.SignalR.Server/SubscriptionGateway.cs b/src/SignalR/src/Eventuous.SignalR.Server/SubscriptionGateway.cs index 0cd0c745d..bf586dc7a 100644 --- a/src/SignalR/src/Eventuous.SignalR.Server/SubscriptionGateway.cs +++ b/src/SignalR/src/Eventuous.SignalR.Server/SubscriptionGateway.cs @@ -9,18 +9,21 @@ namespace Eventuous.SignalR.Server; public class SubscriptionGateway : IAsyncDisposable where THub : Hub { - readonly SignalRProducer _producer; - readonly SubscriptionFactory _subscriptionFactory; - readonly IEventSerializer _eventSerializer; - readonly ILogger _logger; - readonly ConcurrentDictionary<(string ConnectionId, string Stream), SubscriptionState> _subscriptions = new(); + readonly IHubContext _hubContext; + readonly SignalRProducer _producer; + readonly SubscriptionFactory _subscriptionFactory; + readonly IEventSerializer _eventSerializer; + readonly ILogger _logger; + readonly ConcurrentDictionary<(string ConnectionId, string Stream), SubscriptionState> _subscriptions = new(); public SubscriptionGateway( + IHubContext hubContext, SignalRProducer producer, SignalRGatewayOptions options, IEventSerializer eventSerializer, ILoggerFactory loggerFactory ) { + _hubContext = hubContext; _producer = producer; _subscriptionFactory = options.SubscriptionFactory; _eventSerializer = eventSerializer; @@ -61,6 +64,18 @@ await subscription.Subscribe( } catch (Exception ex) { _logger.LogError(ex, "Subscription {SubscriptionId} failed", subscriptionId); _subscriptions.TryRemove(key, out _); + + // Notify the client about the failure + try { + var client = _hubContext.Clients.Client(connectionId); + await client.SendAsync( + SignalRSubscriptionMethods.StreamError, + new StreamSubscriptionError { Stream = stream, Message = ex.Message }, + CancellationToken.None + ).NoContext(); + } catch (Exception notifyEx) { + _logger.LogDebug(notifyEx, "Failed to notify client {ConnectionId} about subscription error", connectionId); + } } }, cts.Token); } @@ -72,20 +87,21 @@ public async Task UnsubscribeAsync(string connectionId, string stream) { } public async Task RemoveConnectionAsync(string connectionId) { - var keys = _subscriptions.Keys.Where(k => k.ConnectionId == connectionId).ToList(); - foreach (var key in keys) { + foreach (var key in _subscriptions.Keys.Where(k => k.ConnectionId == connectionId).ToList()) { if (_subscriptions.TryRemove(key, out var state)) { await StopSubscription(state).NoContext(); } } } - static async Task StopSubscription(SubscriptionState state) { + async Task StopSubscription(SubscriptionState state) { await state.Cts.CancelAsync(); try { await state.Subscription.Unsubscribe(_ => { }, CancellationToken.None).NoContext(); - } catch { - // Best effort cleanup + } catch (OperationCanceledException) { + // Expected during cancellation + } catch (Exception ex) { + _logger.LogWarning(ex, "Error during subscription unsubscribe cleanup"); } await state.Pipe.DisposeAsync().NoContext(); state.Cts.Dispose(); diff --git a/src/SignalR/test/Eventuous.Tests.SignalR/SubscriptionGatewayTests.cs b/src/SignalR/test/Eventuous.Tests.SignalR/SubscriptionGatewayTests.cs index 64d27ec16..b461a1b15 100644 --- a/src/SignalR/test/Eventuous.Tests.SignalR/SubscriptionGatewayTests.cs +++ b/src/SignalR/test/Eventuous.Tests.SignalR/SubscriptionGatewayTests.cs @@ -33,7 +33,7 @@ SubscriptionGateway CreateGateway() { .Returns(ci => new ValueTask(Task.Run(async () => { var token = ci.ArgAt(2); try { await Task.Delay(Timeout.Infinite, token); } - catch (OperationCanceledException) { } + catch (OperationCanceledException) { /* Expected: cancelled when unsubscribed */ } }))); sub.Unsubscribe(Arg.Any(), Arg.Any()) .Returns(ValueTask.CompletedTask); @@ -43,7 +43,7 @@ SubscriptionGateway CreateGateway() { }; var serializer = DefaultEventSerializer.Instance; - return new SubscriptionGateway(producer, options, serializer, NullLoggerFactory.Instance); + return new SubscriptionGateway(hubContext, producer, options, serializer, NullLoggerFactory.Instance); } [Test] From 58f9b049641dc797ed1e47f81faf663429451499 Mon Sep 17 00:00:00 2001 From: Alexey Zimarev Date: Fri, 20 Mar 2026 12:53:37 +0100 Subject: [PATCH 11/12] test(signalr): add end-to-end integration tests with KurrentDB and real SignalR MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three tests covering the full server→client pipeline: - RawStreaming: subscribe from beginning, receive pre-existing events - TypedSubscription: On dispatch with deserialized payloads - LiveEvents: subscribe first, append events after, verify delivery --- Eventuous.slnx | 1 + ...Eventuous.Tests.SignalR.Integration.csproj | 24 ++ .../SignalREndToEndTests.cs | 224 ++++++++++++++++++ 3 files changed, 249 insertions(+) create mode 100644 src/SignalR/test/Eventuous.Tests.SignalR.Integration/Eventuous.Tests.SignalR.Integration.csproj create mode 100644 src/SignalR/test/Eventuous.Tests.SignalR.Integration/SignalREndToEndTests.cs diff --git a/Eventuous.slnx b/Eventuous.slnx index 497eb4039..e32a93a74 100644 --- a/Eventuous.slnx +++ b/Eventuous.slnx @@ -169,6 +169,7 @@ + diff --git a/src/SignalR/test/Eventuous.Tests.SignalR.Integration/Eventuous.Tests.SignalR.Integration.csproj b/src/SignalR/test/Eventuous.Tests.SignalR.Integration/Eventuous.Tests.SignalR.Integration.csproj new file mode 100644 index 000000000..6cd00723a --- /dev/null +++ b/src/SignalR/test/Eventuous.Tests.SignalR.Integration/Eventuous.Tests.SignalR.Integration.csproj @@ -0,0 +1,24 @@ + + + true + Exe + + + + + + + + + + + + + + + + + + + + diff --git a/src/SignalR/test/Eventuous.Tests.SignalR.Integration/SignalREndToEndTests.cs b/src/SignalR/test/Eventuous.Tests.SignalR.Integration/SignalREndToEndTests.cs new file mode 100644 index 000000000..1feb9e366 --- /dev/null +++ b/src/SignalR/test/Eventuous.Tests.SignalR.Integration/SignalREndToEndTests.cs @@ -0,0 +1,224 @@ +extern alias SignalRClient; + +using System.Runtime.InteropServices; +using Eventuous.KurrentDB; +using Eventuous.SignalR.Server; +using Eventuous.Subscriptions.Filters; +using KurrentDB.Client; +using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Hosting; +using Microsoft.AspNetCore.SignalR.Client; +using Microsoft.AspNetCore.TestHost; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Testcontainers.KurrentDb; +using ClientTypes = SignalRClient::Eventuous.SignalR.Client; + +namespace Eventuous.Tests.SignalR.Integration; + +using KurrentStreamSubscription = Eventuous.KurrentDB.Subscriptions.StreamSubscription; +using KurrentStreamSubscriptionOptions = Eventuous.KurrentDB.Subscriptions.StreamSubscriptionOptions; + +[EventType("TestOrderPlaced")] +record TestOrderPlaced(string OrderId, decimal Amount); + +[EventType("TestOrderShipped")] +record TestOrderShipped(string OrderId, string TrackingNumber); + +public class SignalREndToEndTests : IAsyncDisposable { + KurrentDbContainer _container = null!; + WebApplication _app = null!; + TestServer _server = null!; + IEventStore _eventStore = null!; + + [Before(Test)] + public async Task Setup() { + TypeMap.Instance.RegisterKnownEventTypes(typeof(TestOrderPlaced).Assembly); + + var image = RuntimeInformation.ProcessArchitecture == Architecture.Arm64 + ? "kurrentplatform/kurrentdb:25.1.3-experimental-arm64-8.0-jammy" + : "kurrentplatform/kurrentdb:25.1.3"; + + _container = new KurrentDbBuilder() + .WithImage(image) + .WithEnvironment("KURRENTDB_ENABLE_ATOM_PUB_OVER_HTTP", "true") + .Build(); + + await _container.StartAsync(); + + var builder = WebApplication.CreateBuilder(); + builder.WebHost.UseTestServer(); + builder.Logging.SetMinimumLevel(LogLevel.Debug); + + builder.Services.AddSignalR(); + builder.Services.AddKurrentDBClient(_container.GetConnectionString()); + builder.Services.AddEventStore(); + builder.Services.AddSingleton(DefaultEventSerializer.Instance); + + builder.Services.AddSignalRSubscriptionGateway((sp, options) => { + var client = sp.GetRequiredService(); + var loggerFactory = sp.GetRequiredService(); + + options.SubscriptionFactory = (stream, fromPosition, pipe, subscriptionId) => + new KurrentStreamSubscription( + client, + new KurrentStreamSubscriptionOptions { + StreamName = stream, + SubscriptionId = subscriptionId + }, + new NoOpCheckpointStore(fromPosition), + pipe, + loggerFactory + ); + }); + + _app = builder.Build(); + _app.MapHub("/subscriptions"); + + await _app.StartAsync(); + + _server = _app.GetTestServer(); + _eventStore = _app.Services.GetRequiredService(); + } + + HubConnection CreateHubConnection() => + new HubConnectionBuilder() + .WithUrl( + "http://localhost/subscriptions", + opts => opts.HttpMessageHandlerFactory = _ => _server.CreateHandler() + ) + .Build(); + + async Task AppendEvents(string stream, params object[] events) { + var streamEvents = events + .Select(e => new NewStreamEvent(Guid.NewGuid(), e, new Metadata())) + .ToArray(); + + await _eventStore.AppendEvents( + new StreamName(stream), + ExpectedStreamVersion.Any, + streamEvents, + default + ); + } + + [Test] + public async Task RawStreaming_ReceivesAppendedEvents() { + var stream = $"Order-{Guid.NewGuid():N}"; + + await AppendEvents(stream, + new TestOrderPlaced("order-1", 99.99m), + new TestOrderShipped("order-1", "TRACK-123") + ); + + var connection = CreateHubConnection(); + await connection.StartAsync(); + var client = new ClientTypes.SignalRSubscriptionClient(connection); + + var received = new List(); + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + + await foreach (var envelope in client.SubscribeAsync(stream, null, cts.Token)) { + received.Add(envelope); + if (received.Count >= 2) break; + } + + await Assert.That(received).HasCount().EqualTo(2); + await Assert.That(received[0].EventType).IsEqualTo("TestOrderPlaced"); + await Assert.That(received[1].EventType).IsEqualTo("TestOrderShipped"); + await Assert.That(received[0].StreamPosition).IsEqualTo(0UL); + await Assert.That(received[1].StreamPosition).IsEqualTo(1UL); + await Assert.That(received[0].JsonPayload).Contains("order-1"); + + await client.DisposeAsync(); + await connection.DisposeAsync(); + } + + [Test] + public async Task TypedSubscription_DispatchesToCorrectHandlers() { + var stream = $"Order-{Guid.NewGuid():N}"; + + await AppendEvents(stream, + new TestOrderPlaced("order-2", 149.99m), + new TestOrderShipped("order-2", "TRACK-456") + ); + + var connection = CreateHubConnection(); + await connection.StartAsync(); + var client = new ClientTypes.SignalRSubscriptionClient(connection); + + var placedEvents = new List(); + var shippedEvents = new List(); + var done = new TaskCompletionSource(); + var count = 0; + + var sub = client.SubscribeTyped(stream, null) + .On((evt, meta) => { + placedEvents.Add(evt); + if (Interlocked.Increment(ref count) >= 2) done.TrySetResult(); + return ValueTask.CompletedTask; + }) + .On((evt, meta) => { + shippedEvents.Add(evt); + if (Interlocked.Increment(ref count) >= 2) done.TrySetResult(); + return ValueTask.CompletedTask; + }); + + await sub.StartAsync(); + + var completed = await Task.WhenAny(done.Task, Task.Delay(TimeSpan.FromSeconds(10))); + await Assert.That(completed).IsEqualTo(done.Task); + + await Assert.That(placedEvents).HasCount().EqualTo(1); + await Assert.That(placedEvents[0].OrderId).IsEqualTo("order-2"); + await Assert.That(placedEvents[0].Amount).IsEqualTo(149.99m); + + await Assert.That(shippedEvents).HasCount().EqualTo(1); + await Assert.That(shippedEvents[0].TrackingNumber).IsEqualTo("TRACK-456"); + + await sub.DisposeAsync(); + await client.DisposeAsync(); + await connection.DisposeAsync(); + } + + [Test] + public async Task LiveEvents_DeliveredAfterSubscribe() { + var stream = $"Order-{Guid.NewGuid():N}"; + + var connection = CreateHubConnection(); + await connection.StartAsync(); + var client = new ClientTypes.SignalRSubscriptionClient(connection); + + var received = new List(); + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + + var consumeTask = Task.Run(async () => { + await foreach (var envelope in client.SubscribeAsync(stream, null, cts.Token)) { + received.Add(envelope); + if (received.Count >= 2) break; + } + }, cts.Token); + + // Give the subscription time to start on the server + await Task.Delay(1000); + + await AppendEvents(stream, + new TestOrderPlaced("order-3", 200m), + new TestOrderShipped("order-3", "TRACK-789") + ); + + await consumeTask.WaitAsync(TimeSpan.FromSeconds(10)); + + await Assert.That(received).HasCount().EqualTo(2); + await Assert.That(received[0].EventType).IsEqualTo("TestOrderPlaced"); + await Assert.That(received[1].EventType).IsEqualTo("TestOrderShipped"); + + await client.DisposeAsync(); + await connection.DisposeAsync(); + } + + public async ValueTask DisposeAsync() { + await _app.DisposeAsync(); + await _container.DisposeAsync(); + } +} From 63504ecf7597f69d58f576357c4c884b5028d9ca Mon Sep 17 00:00:00 2001 From: Alexey Zimarev Date: Fri, 20 Mar 2026 12:55:36 +0100 Subject: [PATCH 12/12] fix(signalr): make IEventSerializer optional in SubscriptionGateway Fall back to DefaultEventSerializer.Instance like all other Eventuous components, instead of requiring explicit DI registration. --- .../src/Eventuous.SignalR.Server/SubscriptionGateway.cs | 6 +++--- .../SignalREndToEndTests.cs | 1 - .../Eventuous.Tests.SignalR/SubscriptionGatewayTests.cs | 4 +--- 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/src/SignalR/src/Eventuous.SignalR.Server/SubscriptionGateway.cs b/src/SignalR/src/Eventuous.SignalR.Server/SubscriptionGateway.cs index bf586dc7a..691ba559b 100644 --- a/src/SignalR/src/Eventuous.SignalR.Server/SubscriptionGateway.cs +++ b/src/SignalR/src/Eventuous.SignalR.Server/SubscriptionGateway.cs @@ -20,13 +20,13 @@ public SubscriptionGateway( IHubContext hubContext, SignalRProducer producer, SignalRGatewayOptions options, - IEventSerializer eventSerializer, - ILoggerFactory loggerFactory + ILoggerFactory loggerFactory, + IEventSerializer? eventSerializer = null ) { _hubContext = hubContext; _producer = producer; _subscriptionFactory = options.SubscriptionFactory; - _eventSerializer = eventSerializer; + _eventSerializer = eventSerializer ?? DefaultEventSerializer.Instance; _logger = loggerFactory.CreateLogger>(); } diff --git a/src/SignalR/test/Eventuous.Tests.SignalR.Integration/SignalREndToEndTests.cs b/src/SignalR/test/Eventuous.Tests.SignalR.Integration/SignalREndToEndTests.cs index 1feb9e366..7ed8ab46f 100644 --- a/src/SignalR/test/Eventuous.Tests.SignalR.Integration/SignalREndToEndTests.cs +++ b/src/SignalR/test/Eventuous.Tests.SignalR.Integration/SignalREndToEndTests.cs @@ -53,7 +53,6 @@ public async Task Setup() { builder.Services.AddSignalR(); builder.Services.AddKurrentDBClient(_container.GetConnectionString()); builder.Services.AddEventStore(); - builder.Services.AddSingleton(DefaultEventSerializer.Instance); builder.Services.AddSignalRSubscriptionGateway((sp, options) => { var client = sp.GetRequiredService(); diff --git a/src/SignalR/test/Eventuous.Tests.SignalR/SubscriptionGatewayTests.cs b/src/SignalR/test/Eventuous.Tests.SignalR/SubscriptionGatewayTests.cs index b461a1b15..534d308a8 100644 --- a/src/SignalR/test/Eventuous.Tests.SignalR/SubscriptionGatewayTests.cs +++ b/src/SignalR/test/Eventuous.Tests.SignalR/SubscriptionGatewayTests.cs @@ -41,9 +41,7 @@ SubscriptionGateway CreateGateway() { return sub; } }; - var serializer = DefaultEventSerializer.Instance; - - return new SubscriptionGateway(hubContext, producer, options, serializer, NullLoggerFactory.Instance); + return new SubscriptionGateway(hubContext, producer, options, NullLoggerFactory.Instance); } [Test]