From 4bd2d97ef3790164a3b97dbbf5c08e2f27368ae3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martin=20St=C3=BChmer?= Date: Mon, 13 Apr 2026 19:52:41 +0200 Subject: [PATCH] refactor: Use OutboxEventStore for IEventOutbox DI Remove direct registration and usage of provider-specific IEventOutbox implementations (EntityFrameworkOutbox, SqlServerEventOutbox, PostgreSqlEventOutbox, SQLiteEventOutbox) from the public API and DI setup. Instead, always register OutboxEventStore as the IEventOutbox implementation. Update extension method docs and tests to reflect this change. Delete provider-specific IEventOutbox classes and their unit tests. This enforces OutboxEventStore as the single entry point for event outbox operations, simplifying the API and reducing coupling to provider-specific types. --- .../EntityFrameworkExtensions.cs | 31 +- .../Outbox/EntityFrameworkOutbox.cs | 104 ------- .../Outbox/PostgreSqlEventOutbox.cs | 136 -------- .../Outbox/SQLiteEventOutbox.cs | 138 -------- .../Outbox/SqlServerEventOutbox.cs | 216 ------------- .../SqlServerExtensions.cs | 20 +- .../EntityFrameworkEventOutboxTests.cs | 119 ------- .../PostgreSql/PostgreSqlEventOutboxTests.cs | 101 ------ .../SQLite/SQLiteEventOutboxTests.cs | 294 ------------------ .../SqlServer/SqlServerEventOutboxTests.cs | 209 ------------- .../SqlServer/SqlServerExtensionsTests.cs | 4 +- 11 files changed, 24 insertions(+), 1348 deletions(-) delete mode 100644 src/NetEvolve.Pulse.EntityFramework/Outbox/EntityFrameworkOutbox.cs delete mode 100644 src/NetEvolve.Pulse.PostgreSql/Outbox/PostgreSqlEventOutbox.cs delete mode 100644 src/NetEvolve.Pulse.SQLite/Outbox/SQLiteEventOutbox.cs delete mode 100644 src/NetEvolve.Pulse.SqlServer/Outbox/SqlServerEventOutbox.cs delete mode 100644 tests/NetEvolve.Pulse.Tests.Unit/EntityFramework/EntityFrameworkEventOutboxTests.cs delete mode 100644 tests/NetEvolve.Pulse.Tests.Unit/PostgreSql/PostgreSqlEventOutboxTests.cs delete mode 100644 tests/NetEvolve.Pulse.Tests.Unit/SQLite/SQLiteEventOutboxTests.cs delete mode 100644 tests/NetEvolve.Pulse.Tests.Unit/SqlServer/SqlServerEventOutboxTests.cs diff --git a/src/NetEvolve.Pulse.EntityFramework/EntityFrameworkExtensions.cs b/src/NetEvolve.Pulse.EntityFramework/EntityFrameworkExtensions.cs index f73c5bd4..b115b03c 100644 --- a/src/NetEvolve.Pulse.EntityFramework/EntityFrameworkExtensions.cs +++ b/src/NetEvolve.Pulse.EntityFramework/EntityFrameworkExtensions.cs @@ -29,7 +29,7 @@ public static class EntityFrameworkExtensions /// Registered Services: /// /// as (Scoped) - /// as (Scoped) + /// as (Scoped) /// as (Scoped) /// as (Scoped) /// @@ -58,23 +58,18 @@ public static IMediatorBuilder AddEntityFrameworkOutbox( { ArgumentNullException.ThrowIfNull(configurator); - var services = configurator.AddOutbox(configureOptions).Services; - - // Register the repository - _ = services.RemoveAll(); - _ = services.AddScoped>(); - - // Register the event outbox (overrides the default OutboxEventStore) - _ = services.RemoveAll(); - _ = services.AddScoped>(); - - // Register the transaction scope - _ = services.RemoveAll(); - _ = services.AddScoped>(); - - // Register the management API - _ = services.RemoveAll(); - _ = services.AddScoped>(); + _ = configurator + .AddOutbox(configureOptions) + .Services + // Register the repository + .RemoveAll() + .AddScoped>() + // Register the transaction scope + .RemoveAll() + .AddScoped>() + // Register the management API + .RemoveAll() + .AddScoped>(); return configurator; } diff --git a/src/NetEvolve.Pulse.EntityFramework/Outbox/EntityFrameworkOutbox.cs b/src/NetEvolve.Pulse.EntityFramework/Outbox/EntityFrameworkOutbox.cs deleted file mode 100644 index 90d8bea4..00000000 --- a/src/NetEvolve.Pulse.EntityFramework/Outbox/EntityFrameworkOutbox.cs +++ /dev/null @@ -1,104 +0,0 @@ -namespace NetEvolve.Pulse.Outbox; - -using System.Text.Json; -using Microsoft.EntityFrameworkCore; -using Microsoft.Extensions.Options; -using NetEvolve.Pulse.Extensibility; -using NetEvolve.Pulse.Extensibility.Outbox; - -/// -/// Entity Framework Core implementation of that stores events -/// using the DbContext and participates in ambient transactions. -/// -/// -/// Transaction Integration: -/// Events stored via this class automatically participate in the current DbContext transaction. -/// If the transaction is rolled back, the event is also discarded. -/// Usage Pattern: -/// Use within a DbContext scope where business operations and event storage share the same transaction. -/// -/// -/// -/// public class OrderService -/// { -/// private readonly MyDbContext _context; -/// private readonly IEventOutbox _outbox; -/// -/// public async Task CreateOrderAsync(Order order, CancellationToken ct) -/// { -/// await using var transaction = await _context.Database.BeginTransactionAsync(ct); -/// -/// _context.Orders.Add(order); -/// await _context.SaveChangesAsync(ct); -/// -/// await _outbox.StoreAsync(new OrderCreatedEvent { OrderId = order.Id }, ct); -/// -/// await transaction.CommitAsync(ct); -/// } -/// } -/// -/// -/// The DbContext type that implements . -internal sealed class EntityFrameworkOutbox : IEventOutbox - where TContext : DbContext, IOutboxDbContext -{ - /// The DbContext used for all database operations within the current scope. - private readonly TContext _context; - - /// The resolved outbox options controlling serialization and table configuration. - private readonly OutboxOptions _options; - - /// The time provider used to generate consistent creation and update timestamps. - private readonly TimeProvider _timeProvider; - - /// - /// Initializes a new instance of the class. - /// - /// The DbContext for database operations. - /// The outbox configuration options. - /// The time provider for timestamps. - public EntityFrameworkOutbox(TContext context, IOptions options, TimeProvider timeProvider) - { - ArgumentNullException.ThrowIfNull(context); - ArgumentNullException.ThrowIfNull(options); - ArgumentNullException.ThrowIfNull(timeProvider); - - _context = context; - _options = options.Value; - _timeProvider = timeProvider; - } - - /// - public async Task StoreAsync(TEvent message, CancellationToken cancellationToken = default) - where TEvent : IEvent - { - ArgumentNullException.ThrowIfNull(message); - - var messageType = message.GetType(); - - var correlationId = message.CorrelationId; - - if (correlationId is { Length: > OutboxMessageSchema.MaxLengths.CorrelationId }) - { - throw new InvalidOperationException( - $"CorrelationId exceeds the maximum length of {OutboxMessageSchema.MaxLengths.CorrelationId} characters defined by the OutboxMessage schema. " - + "Provide a shorter correlation identifier to comply with the database constraint." - ); - } - - var now = _timeProvider.GetUtcNow(); - var outboxMessage = new OutboxMessage - { - Id = message.ToOutboxId(), - EventType = messageType, - Payload = JsonSerializer.Serialize(message, messageType, _options.JsonSerializerOptions), - CorrelationId = correlationId, - CreatedAt = now, - UpdatedAt = now, - Status = OutboxMessageStatus.Pending, - }; - - _ = await _context.OutboxMessages.AddAsync(outboxMessage, cancellationToken).ConfigureAwait(false); - _ = await _context.SaveChangesAsync(cancellationToken).ConfigureAwait(false); - } -} diff --git a/src/NetEvolve.Pulse.PostgreSql/Outbox/PostgreSqlEventOutbox.cs b/src/NetEvolve.Pulse.PostgreSql/Outbox/PostgreSqlEventOutbox.cs deleted file mode 100644 index de98709e..00000000 --- a/src/NetEvolve.Pulse.PostgreSql/Outbox/PostgreSqlEventOutbox.cs +++ /dev/null @@ -1,136 +0,0 @@ -namespace NetEvolve.Pulse.Outbox; - -using System.Diagnostics.CodeAnalysis; -using System.Text.Json; -using Microsoft.Extensions.Options; -using NetEvolve.Pulse.Extensibility; -using NetEvolve.Pulse.Extensibility.Outbox; -using Npgsql; - -/// -/// PostgreSQL implementation of that stores events using ADO.NET -/// with support for enlisting in existing instances. -/// -/// -/// Transaction Integration: -/// This implementation can participate in an existing PostgreSQL transaction by: -/// -/// Accepting a with active transaction via constructor -/// Using for ambient transaction support -/// -/// Atomicity: -/// When called within a transaction, the event is stored atomically with business data. -/// If the transaction rolls back, the event is also discarded. -/// -[SuppressMessage( - "Reliability", - "CA2007:Consider calling ConfigureAwait on the awaited task", - Justification = "await using statements in library code; ConfigureAwait applied to all Task-returning awaits." -)] -[SuppressMessage( - "Security", - "CA2100:Review SQL queries for security vulnerabilities", - Justification = "SQL constructed from validated OutboxOptions properties and strongly-typed parameters, not user input." -)] -public sealed class PostgreSqlEventOutbox : IEventOutbox -{ - /// The open PostgreSQL connection used to execute insert commands. - private readonly NpgsqlConnection _connection; - - /// The optional PostgreSQL transaction to enlist with, ensuring atomicity with business operations. - private readonly NpgsqlTransaction? _transaction; - - /// The resolved outbox options controlling table name, schema, and JSON serialization. - private readonly OutboxOptions _options; - - /// The time provider used to generate consistent creation and update timestamps. - private readonly TimeProvider _timeProvider; - - /// Represents the SQL statement used for inserting data into a database table. - private readonly string _sqlInsertInto; - - /// - /// Initializes a new instance of the class. - /// - /// The PostgreSQL connection (should already be open). - /// The outbox configuration options. - /// The time provider for timestamps. - /// Optional transaction to enlist with. - public PostgreSqlEventOutbox( - NpgsqlConnection connection, - IOptions options, - TimeProvider timeProvider, - NpgsqlTransaction? transaction = null - ) - { - ArgumentNullException.ThrowIfNull(connection); - ArgumentNullException.ThrowIfNull(options); - ArgumentNullException.ThrowIfNull(timeProvider); - - _connection = connection; - _options = options.Value; - _timeProvider = timeProvider; - _transaction = transaction; - - _sqlInsertInto = $""" - INSERT INTO {_options.FullTableName} - ("{OutboxMessageSchema.Columns.Id}", - "{OutboxMessageSchema.Columns.EventType}", - "{OutboxMessageSchema.Columns.Payload}", - "{OutboxMessageSchema.Columns.CorrelationId}", - "{OutboxMessageSchema.Columns.CreatedAt}", - "{OutboxMessageSchema.Columns.UpdatedAt}", - "{OutboxMessageSchema.Columns.ProcessedAt}", - "{OutboxMessageSchema.Columns.RetryCount}", - "{OutboxMessageSchema.Columns.Error}", - "{OutboxMessageSchema.Columns.Status}") - VALUES - (@Id, @EventType, @Payload, @CorrelationId, @CreatedAt, @UpdatedAt, NULL, 0, NULL, @Status) - """; - } - - /// - public async Task StoreAsync(TEvent message, CancellationToken cancellationToken = default) - where TEvent : IEvent - { - ArgumentNullException.ThrowIfNull(message); - - var messageType = message.GetType(); - var eventType = - messageType.AssemblyQualifiedName - ?? throw new InvalidOperationException($"Cannot get assembly-qualified name for type: {messageType}"); - - if (eventType is { Length: > OutboxMessageSchema.MaxLengths.EventType }) - { - throw new InvalidOperationException( - $"Event type identifier exceeds the EventType column maximum length of {OutboxMessageSchema.MaxLengths.EventType} characters. " - + "Shorten the type identifier, increase the database column length, or use Type.FullName with a type registry." - ); - } - - var correlationId = message.CorrelationId; - - if (correlationId is { Length: > OutboxMessageSchema.MaxLengths.CorrelationId }) - { - throw new InvalidOperationException( - $"CorrelationId exceeds the maximum length of {OutboxMessageSchema.MaxLengths.CorrelationId} characters defined by the OutboxMessage schema. " - + "Provide a shorter correlation identifier to comply with the database constraint." - ); - } - - await using var command = new NpgsqlCommand(_sqlInsertInto, _connection, _transaction); - - var now = _timeProvider.GetUtcNow(); - var payload = JsonSerializer.Serialize(message, messageType, _options.JsonSerializerOptions); - - _ = command.Parameters.AddWithValue("Id", message.ToOutboxId()); - _ = command.Parameters.AddWithValue("EventType", eventType); - _ = command.Parameters.AddWithValue("Payload", payload); - _ = command.Parameters.AddWithValue("CorrelationId", (object?)correlationId ?? DBNull.Value); - _ = command.Parameters.AddWithValue("CreatedAt", now); - _ = command.Parameters.AddWithValue("UpdatedAt", now); - _ = command.Parameters.AddWithValue("Status", (int)OutboxMessageStatus.Pending); - - _ = await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false); - } -} diff --git a/src/NetEvolve.Pulse.SQLite/Outbox/SQLiteEventOutbox.cs b/src/NetEvolve.Pulse.SQLite/Outbox/SQLiteEventOutbox.cs deleted file mode 100644 index 65f541d6..00000000 --- a/src/NetEvolve.Pulse.SQLite/Outbox/SQLiteEventOutbox.cs +++ /dev/null @@ -1,138 +0,0 @@ -namespace NetEvolve.Pulse.Outbox; - -using System.Diagnostics.CodeAnalysis; -using System.Text.Json; -using Microsoft.Data.Sqlite; -using Microsoft.Extensions.Options; -using NetEvolve.Pulse.Extensibility; -using NetEvolve.Pulse.Extensibility.Outbox; - -/// -/// SQLite implementation of that stores events using ADO.NET -/// with support for enlisting in existing instances. -/// -/// -/// Transaction Integration: -/// This implementation can participate in an existing SQLite transaction by: -/// -/// Accepting a with active transaction via constructor -/// Using for ambient transaction support -/// -/// Atomicity: -/// When called within a transaction, the event is stored atomically with business data. -/// If the transaction rolls back, the event is also discarded. -/// -[SuppressMessage( - "Reliability", - "CA2007:Consider calling ConfigureAwait on the awaited task", - Justification = "await using statements in library code; ConfigureAwait applied to all Task-returning awaits." -)] -[SuppressMessage( - "Security", - "CA2100:Review SQL queries for security vulnerabilities", - Justification = "SQL constructed from validated OutboxOptions properties and strongly-typed parameters, not user input." -)] -public sealed class SQLiteEventOutbox : IEventOutbox -{ - /// The open SQLite connection used to execute insert commands. - private readonly SqliteConnection _connection; - - /// The optional SQLite transaction to enlist with, ensuring atomicity with business operations. - private readonly SqliteTransaction? _transaction; - - /// The resolved outbox options controlling table name and JSON serialization. - private readonly OutboxOptions _options; - - /// The time provider used to generate consistent creation and update timestamps. - private readonly TimeProvider _timeProvider; - - /// The SQL statement used for inserting a new outbox message. - private readonly string _sqlInsertInto; - - /// - /// Initializes a new instance of the class. - /// - /// The SQLite connection (should already be open). - /// The outbox configuration options. - /// The time provider for timestamps. - /// Optional transaction to enlist with. - public SQLiteEventOutbox( - SqliteConnection connection, - IOptions options, - TimeProvider timeProvider, - SqliteTransaction? transaction = null - ) - { - ArgumentNullException.ThrowIfNull(connection); - ArgumentNullException.ThrowIfNull(options); - ArgumentNullException.ThrowIfNull(timeProvider); - - _connection = connection; - _options = options.Value; - _timeProvider = timeProvider; - _transaction = transaction; - - var table = _options.FullTableName; - - _sqlInsertInto = $""" - INSERT INTO {table} - ("{OutboxMessageSchema.Columns.Id}", - "{OutboxMessageSchema.Columns.EventType}", - "{OutboxMessageSchema.Columns.Payload}", - "{OutboxMessageSchema.Columns.CorrelationId}", - "{OutboxMessageSchema.Columns.CreatedAt}", - "{OutboxMessageSchema.Columns.UpdatedAt}", - "{OutboxMessageSchema.Columns.ProcessedAt}", - "{OutboxMessageSchema.Columns.RetryCount}", - "{OutboxMessageSchema.Columns.Error}", - "{OutboxMessageSchema.Columns.Status}") - VALUES - (@Id, @EventType, @Payload, @CorrelationId, @CreatedAt, @UpdatedAt, NULL, 0, NULL, @Status); - """; - } - - /// - public async Task StoreAsync(TEvent message, CancellationToken cancellationToken = default) - where TEvent : IEvent - { - ArgumentNullException.ThrowIfNull(message); - - var messageType = message.GetType(); - var eventType = - messageType.AssemblyQualifiedName - ?? throw new InvalidOperationException($"Cannot get assembly-qualified name for type: {messageType}"); - - if (eventType.Length > OutboxMessageSchema.MaxLengths.EventType) - { - throw new InvalidOperationException( - $"Event type identifier exceeds the EventType column maximum length of {OutboxMessageSchema.MaxLengths.EventType} characters. " - + "Shorten the type identifier, increase the database column length, or use Type.FullName with a type registry." - ); - } - - var correlationId = message.CorrelationId; - - if (correlationId is { Length: > OutboxMessageSchema.MaxLengths.CorrelationId }) - { - throw new InvalidOperationException( - $"CorrelationId exceeds the maximum length of {OutboxMessageSchema.MaxLengths.CorrelationId} characters defined by the OutboxMessage schema. " - + "Provide a shorter correlation identifier to comply with the database constraint." - ); - } - - await using var command = new SqliteCommand(_sqlInsertInto, _connection, _transaction); - - var now = _timeProvider.GetUtcNow(); - var payload = JsonSerializer.Serialize(message, messageType, _options.JsonSerializerOptions); - - _ = command.Parameters.AddWithValue("@Id", message.ToOutboxId().ToString()); - _ = command.Parameters.AddWithValue("@EventType", eventType); - _ = command.Parameters.AddWithValue("@Payload", payload); - _ = command.Parameters.AddWithValue("@CorrelationId", (object?)correlationId ?? DBNull.Value); - _ = command.Parameters.AddWithValue("@CreatedAt", now); - _ = command.Parameters.AddWithValue("@UpdatedAt", now); - _ = command.Parameters.AddWithValue("@Status", (int)OutboxMessageStatus.Pending); - - _ = await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false); - } -} diff --git a/src/NetEvolve.Pulse.SqlServer/Outbox/SqlServerEventOutbox.cs b/src/NetEvolve.Pulse.SqlServer/Outbox/SqlServerEventOutbox.cs deleted file mode 100644 index 9fb26844..00000000 --- a/src/NetEvolve.Pulse.SqlServer/Outbox/SqlServerEventOutbox.cs +++ /dev/null @@ -1,216 +0,0 @@ -namespace NetEvolve.Pulse.Outbox; - -using System.Diagnostics.CodeAnalysis; -using System.Text.Json; -using Microsoft.Data.SqlClient; -using Microsoft.Extensions.Options; -using NetEvolve.Pulse.Extensibility; -using NetEvolve.Pulse.Extensibility.Outbox; - -/// -/// SQL Server implementation of that stores events using ADO.NET -/// with support for enlisting in existing instances. -/// -/// -/// Transaction Integration: -/// This implementation can participate in an existing SQL Server transaction by: -/// -/// Accepting a with active transaction via constructor -/// Using for ambient transaction support -/// -/// Atomicity: -/// When called within a transaction, the event is stored atomically with business data. -/// If the transaction rolls back, the event is also discarded. -/// -[SuppressMessage( - "Reliability", - "CA2007:Consider calling ConfigureAwait on the awaited task", - Justification = "await using statements in library code; ConfigureAwait applied to all Task-returning awaits." -)] -[SuppressMessage( - "Security", - "CA2100:Review SQL queries for security vulnerabilities", - Justification = "SQL constructed from validated OutboxOptions properties and strongly-typed parameters, not user input." -)] -public sealed class SqlServerEventOutbox : IEventOutbox -{ - /// The open SQL connection provided explicitly; null when using the DI-friendly constructor. - private readonly SqlConnection? _explicitConnection; - - /// The optional SQL transaction provided explicitly; null when using the DI-friendly constructor. - private readonly SqlTransaction? _explicitTransaction; - - /// The connection string used by the DI-friendly constructor; null when using the explicit-connection constructor. - private readonly string? _connectionString; - - /// The optional transaction scope used by the DI-friendly constructor to obtain an ambient transaction. - private readonly IOutboxTransactionScope? _transactionScope; - - /// The resolved outbox options controlling table name, schema, and JSON serialization. - private readonly OutboxOptions _options; - - /// The time provider used to generate consistent creation and update timestamps. - private readonly TimeProvider _timeProvider; - - /// Represents the SQL statement used for inserting data into a database table. - private readonly string _sqlInsertInto; - - /// - /// Initializes a new instance of the class using an explicit connection. - /// - /// The SQL connection (should already be open). - /// The outbox configuration options. - /// The time provider for timestamps. - /// Optional transaction to enlist with. - public SqlServerEventOutbox( - SqlConnection connection, - IOptions options, - TimeProvider timeProvider, - SqlTransaction? transaction = null - ) - { - ArgumentNullException.ThrowIfNull(connection); - ArgumentNullException.ThrowIfNull(options); - ArgumentNullException.ThrowIfNull(timeProvider); - - _explicitConnection = connection; - _explicitTransaction = transaction; - _options = options.Value; - _timeProvider = timeProvider; - - _sqlInsertInto = BuildInsertSql(_options); - } - - /// - /// Initializes a new instance of the class for use with dependency injection. - /// Opens its own on each call and enlists in - /// an active from when present. - /// - /// The outbox configuration options (must include a connection string). - /// The time provider for timestamps. - /// Optional ambient transaction scope; when active, the event is stored within the same transaction. - public SqlServerEventOutbox( - IOptions options, - TimeProvider timeProvider, - IOutboxTransactionScope? transactionScope = null - ) - { - ArgumentNullException.ThrowIfNull(options); - ArgumentException.ThrowIfNullOrWhiteSpace(options.Value.ConnectionString); - ArgumentNullException.ThrowIfNull(timeProvider); - - _connectionString = options.Value.ConnectionString; - _transactionScope = transactionScope; - _options = options.Value; - _timeProvider = timeProvider; - - _sqlInsertInto = BuildInsertSql(_options); - } - - /// - public async Task StoreAsync(TEvent message, CancellationToken cancellationToken = default) - where TEvent : IEvent - { - ArgumentNullException.ThrowIfNull(message); - - var messageType = message.GetType(); - var eventType = - messageType.AssemblyQualifiedName - ?? throw new InvalidOperationException($"Cannot get assembly-qualified name for type: {messageType}"); - - if (eventType.Length > OutboxMessageSchema.MaxLengths.EventType) - { - throw new InvalidOperationException( - $"Event type identifier exceeds the EventType column maximum length of {OutboxMessageSchema.MaxLengths.EventType} characters. " - + "Shorten the type identifier, increase the database column length, or use Type.FullName with a type registry." - ); - } - - var correlationId = message.CorrelationId; - - if (correlationId is { Length: > OutboxMessageSchema.MaxLengths.CorrelationId }) - { - throw new InvalidOperationException( - $"CorrelationId exceeds the maximum length of {OutboxMessageSchema.MaxLengths.CorrelationId} characters defined by the OutboxMessage schema. " - + "Provide a shorter correlation identifier to comply with the database constraint." - ); - } - - var now = _timeProvider.GetUtcNow(); - var payload = JsonSerializer.Serialize(message, messageType, _options.JsonSerializerOptions); - - if (_explicitConnection is not null) - { - await using var command = new SqlCommand(_sqlInsertInto, _explicitConnection, _explicitTransaction); - AddParameters(command, message, eventType, correlationId, now, payload); - _ = await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false); - } - else - { - var currentTransaction = _transactionScope?.GetCurrentTransaction(); - - if (currentTransaction is not null and not SqlTransaction) - { - throw new InvalidOperationException( - $"IOutboxTransactionScope returned a transaction of type '{currentTransaction.GetType().Name}', but SqlServerEventOutbox requires a SqlTransaction." - ); - } - - var ambientTransaction = currentTransaction as SqlTransaction; - - if (ambientTransaction is not null) - { - var connection = - ambientTransaction.Connection - ?? throw new InvalidOperationException("Transaction has no associated connection."); - - await using var command = new SqlCommand(_sqlInsertInto, connection, ambientTransaction); - AddParameters(command, message, eventType, correlationId, now, payload); - _ = await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false); - } - else - { - await using var connection = new SqlConnection(_connectionString); - await connection.OpenAsync(cancellationToken).ConfigureAwait(false); - await using var command = new SqlCommand(_sqlInsertInto, connection); - AddParameters(command, message, eventType, correlationId, now, payload); - _ = await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false); - } - } - } - - private static string BuildInsertSql(OutboxOptions options) => - $""" - INSERT INTO {options.FullTableName} - ([{OutboxMessageSchema.Columns.Id}], - [{OutboxMessageSchema.Columns.EventType}], - [{OutboxMessageSchema.Columns.Payload}], - [{OutboxMessageSchema.Columns.CorrelationId}], - [{OutboxMessageSchema.Columns.CreatedAt}], - [{OutboxMessageSchema.Columns.UpdatedAt}], - [{OutboxMessageSchema.Columns.ProcessedAt}], - [{OutboxMessageSchema.Columns.RetryCount}], - [{OutboxMessageSchema.Columns.Error}], - [{OutboxMessageSchema.Columns.Status}]) - VALUES - (@Id, @EventType, @Payload, @CorrelationId, @CreatedAt, @UpdatedAt, NULL, 0, NULL, @Status) - """; - - private static void AddParameters( - SqlCommand command, - IEvent message, - string eventType, - string? correlationId, - DateTimeOffset now, - string payload - ) - { - _ = command.Parameters.AddWithValue("@Id", message.ToOutboxId()); - _ = command.Parameters.AddWithValue("@EventType", eventType); - _ = command.Parameters.AddWithValue("@Payload", payload); - _ = command.Parameters.AddWithValue("@CorrelationId", (object?)correlationId ?? DBNull.Value); - _ = command.Parameters.AddWithValue("@CreatedAt", now); - _ = command.Parameters.AddWithValue("@UpdatedAt", now); - _ = command.Parameters.AddWithValue("@Status", OutboxMessageStatus.Pending); - } -} diff --git a/src/NetEvolve.Pulse.SqlServer/SqlServerExtensions.cs b/src/NetEvolve.Pulse.SqlServer/SqlServerExtensions.cs index 33682eb6..993ed45c 100644 --- a/src/NetEvolve.Pulse.SqlServer/SqlServerExtensions.cs +++ b/src/NetEvolve.Pulse.SqlServer/SqlServerExtensions.cs @@ -25,7 +25,7 @@ public static class SqlServerExtensions /// database objects before using this provider. /// Registered Services: /// - /// as (Scoped) + /// as (Scoped) /// as (Scoped) /// as (Scoped) /// (Singleton, if not already registered) @@ -72,7 +72,7 @@ public static IMediatorBuilder AddSqlServerOutbox( /// database objects before using this provider. /// Registered Services: /// - /// as (Scoped) + /// as (Scoped) /// as (Scoped) /// as (Scoped) /// (Singleton, if not already registered) @@ -125,7 +125,7 @@ public static IMediatorBuilder AddSqlServerOutbox( /// database objects before using this provider. /// Registered Services: /// - /// as (Scoped) + /// as (Scoped) /// as (Scoped) /// as (Scoped) /// (Singleton, if not already registered) @@ -160,7 +160,7 @@ Action configureOptions /// /// Registers a unit-of-work type as (Scoped) so that - /// can enlist in the caller's transaction automatically. + /// can enlist in the caller's transaction automatically. /// /// /// A type that implements both the application unit-of-work contract and . @@ -169,7 +169,7 @@ Action configureOptions /// The configurator for chaining. /// /// Call this method after - /// to wire up your unit-of-work so that automatically + /// to wire up your unit-of-work so that automatically /// enlists in any active owned by the unit-of-work. /// /// @@ -194,13 +194,11 @@ public static IMediatorBuilder AddSqlServerOutboxTransactionScope(t private static IMediatorBuilder RegisterSqlServerOutboxServices(this IMediatorBuilder configurator) { // AddOutbox() uses TryAdd* internally, so this call is safe even when AddOutbox() was already invoked. - _ = configurator.AddOutbox(); - - var services = configurator.Services; - _ = services.RemoveAll(); - _ = services - .AddScoped() + _ = configurator + .AddOutbox() + .Services.RemoveAll() .AddScoped() + .RemoveAll() .AddScoped(); return configurator; diff --git a/tests/NetEvolve.Pulse.Tests.Unit/EntityFramework/EntityFrameworkEventOutboxTests.cs b/tests/NetEvolve.Pulse.Tests.Unit/EntityFramework/EntityFrameworkEventOutboxTests.cs deleted file mode 100644 index 0c81de36..00000000 --- a/tests/NetEvolve.Pulse.Tests.Unit/EntityFramework/EntityFrameworkEventOutboxTests.cs +++ /dev/null @@ -1,119 +0,0 @@ -namespace NetEvolve.Pulse.Tests.Unit.EntityFramework; - -using System; -using System.Threading.Tasks; -using Microsoft.EntityFrameworkCore; -using Microsoft.Extensions.Options; -using NetEvolve.Extensions.TUnit; -using NetEvolve.Pulse.Extensibility; -using NetEvolve.Pulse.Extensibility.Outbox; -using NetEvolve.Pulse.Outbox; -using TUnit.Core; - -[TestGroup("EntityFramework")] -public sealed class EntityFrameworkEventOutboxTests -{ - [Test] - public async Task Constructor_WithNullContext_ThrowsArgumentNullException() => - _ = await Assert - .That(() => - new EntityFrameworkOutbox( - null!, - Options.Create(new OutboxOptions()), - TimeProvider.System - ) - ) - .Throws(); - - [Test] - public async Task Constructor_WithNullOptions_ThrowsArgumentNullException() - { - var options = new DbContextOptionsBuilder() - .UseInMemoryDatabase(nameof(Constructor_WithNullOptions_ThrowsArgumentNullException)) - .Options; - await using var context = new TestDbContext(options); - - _ = await Assert - .That(() => new EntityFrameworkOutbox(context, null!, TimeProvider.System)) - .Throws(); - } - - [Test] - public async Task Constructor_WithNullTimeProvider_ThrowsArgumentNullException() - { - var options = new DbContextOptionsBuilder() - .UseInMemoryDatabase(nameof(Constructor_WithNullTimeProvider_ThrowsArgumentNullException)) - .Options; - await using var context = new TestDbContext(options); - - _ = await Assert - .That(() => new EntityFrameworkOutbox(context, Options.Create(new OutboxOptions()), null!)) - .Throws(); - } - - [Test] - public async Task Constructor_WithValidArguments_CreatesInstance() - { - var options = new DbContextOptionsBuilder() - .UseInMemoryDatabase(nameof(Constructor_WithValidArguments_CreatesInstance)) - .Options; - await using var context = new TestDbContext(options); - - var outbox = new EntityFrameworkOutbox( - context, - Options.Create(new OutboxOptions()), - TimeProvider.System - ); - - _ = await Assert.That(outbox).IsNotNull(); - } - - [Test] - public async Task StoreAsync_WithNullMessage_ThrowsArgumentNullException() - { - var options = new DbContextOptionsBuilder() - .UseInMemoryDatabase(nameof(StoreAsync_WithNullMessage_ThrowsArgumentNullException)) - .Options; - await using var context = new TestDbContext(options); - var outbox = new EntityFrameworkOutbox( - context, - Options.Create(new OutboxOptions()), - TimeProvider.System - ); - - _ = await Assert - .That(async () => await outbox.StoreAsync(null!).ConfigureAwait(false)) - .Throws(); - } - - [Test] - public async Task StoreAsync_WithLongCorrelationId_ThrowsInvalidOperationException( - CancellationToken cancellationToken - ) - { - var options = new DbContextOptionsBuilder() - .UseInMemoryDatabase(nameof(StoreAsync_WithLongCorrelationId_ThrowsInvalidOperationException)) - .Options; - await using var context = new TestDbContext(options); - var outbox = new EntityFrameworkOutbox( - context, - Options.Create(new OutboxOptions()), - TimeProvider.System - ); - var message = new TestEvent - { - CorrelationId = new string('x', OutboxMessageSchema.MaxLengths.CorrelationId + 1), - }; - - _ = await Assert - .That(async () => await outbox.StoreAsync(message, cancellationToken).ConfigureAwait(false)) - .Throws(); - } - - private sealed record TestEvent : IEvent - { - public string? CorrelationId { get; set; } - public string Id { get; init; } = Guid.NewGuid().ToString(); - public DateTimeOffset? PublishedAt { get; set; } - } -} diff --git a/tests/NetEvolve.Pulse.Tests.Unit/PostgreSql/PostgreSqlEventOutboxTests.cs b/tests/NetEvolve.Pulse.Tests.Unit/PostgreSql/PostgreSqlEventOutboxTests.cs deleted file mode 100644 index d16bfc9f..00000000 --- a/tests/NetEvolve.Pulse.Tests.Unit/PostgreSql/PostgreSqlEventOutboxTests.cs +++ /dev/null @@ -1,101 +0,0 @@ -namespace NetEvolve.Pulse.Tests.Unit.PostgreSql; - -using System; -using System.Threading.Tasks; -using Microsoft.Extensions.Options; -using NetEvolve.Extensions.TUnit; -using NetEvolve.Pulse.Extensibility; -using NetEvolve.Pulse.Extensibility.Outbox; -using NetEvolve.Pulse.Outbox; -using Npgsql; -using TUnit.Core; - -[TestGroup("PostgreSql")] -public sealed class PostgreSqlEventOutboxTests -{ - [Test] - public async Task Constructor_WithNullConnection_ThrowsArgumentNullException() => - _ = await Assert - .That(() => new PostgreSqlEventOutbox(null!, Options.Create(new OutboxOptions()), TimeProvider.System)) - .Throws(); - - [Test] - public async Task Constructor_WithNullOptions_ThrowsArgumentNullException() - { - await using var connection = new NpgsqlConnection("Host=localhost;"); - - _ = await Assert - .That(() => new PostgreSqlEventOutbox(connection, null!, TimeProvider.System)) - .Throws(); - } - - [Test] - public async Task Constructor_WithNullTimeProvider_ThrowsArgumentNullException() - { - await using var connection = new NpgsqlConnection("Host=localhost;"); - - _ = await Assert - .That(() => new PostgreSqlEventOutbox(connection, Options.Create(new OutboxOptions()), null!)) - .Throws(); - } - - [Test] - public async Task Constructor_WithValidArguments_CreatesInstance() - { - await using var connection = new NpgsqlConnection("Host=localhost;"); - - var outbox = new PostgreSqlEventOutbox(connection, Options.Create(new OutboxOptions()), TimeProvider.System); - - _ = await Assert.That(outbox).IsNotNull(); - } - - [Test] - public async Task Constructor_WithTransaction_CreatesInstance() - { - await using var connection = new NpgsqlConnection("Host=localhost;"); - - var outbox = new PostgreSqlEventOutbox( - connection, - Options.Create(new OutboxOptions()), - TimeProvider.System, - transaction: null - ); - - _ = await Assert.That(outbox).IsNotNull(); - } - - [Test] - public async Task StoreAsync_WithNullMessage_ThrowsArgumentNullException() - { - await using var connection = new NpgsqlConnection("Host=localhost;"); - var outbox = new PostgreSqlEventOutbox(connection, Options.Create(new OutboxOptions()), TimeProvider.System); - - _ = await Assert - .That(async () => await outbox.StoreAsync(null!).ConfigureAwait(false)) - .Throws(); - } - - [Test] - public async Task StoreAsync_WithLongCorrelationId_ThrowsInvalidOperationException( - CancellationToken cancellationToken - ) - { - await using var connection = new NpgsqlConnection("Host=localhost;"); - var outbox = new PostgreSqlEventOutbox(connection, Options.Create(new OutboxOptions()), TimeProvider.System); - var message = new TestEvent - { - CorrelationId = new string('x', OutboxMessageSchema.MaxLengths.CorrelationId + 1), - }; - - _ = await Assert - .That(async () => await outbox.StoreAsync(message, cancellationToken).ConfigureAwait(false)) - .Throws(); - } - - private sealed record TestEvent : IEvent - { - public string? CorrelationId { get; set; } - public string Id { get; init; } = Guid.NewGuid().ToString(); - public DateTimeOffset? PublishedAt { get; set; } - } -} diff --git a/tests/NetEvolve.Pulse.Tests.Unit/SQLite/SQLiteEventOutboxTests.cs b/tests/NetEvolve.Pulse.Tests.Unit/SQLite/SQLiteEventOutboxTests.cs deleted file mode 100644 index 93fa6fbe..00000000 --- a/tests/NetEvolve.Pulse.Tests.Unit/SQLite/SQLiteEventOutboxTests.cs +++ /dev/null @@ -1,294 +0,0 @@ -namespace NetEvolve.Pulse.Tests.Unit.SQLite; - -using System; -using System.Reflection; -using System.Reflection.Emit; -using System.Threading.Tasks; -using Microsoft.Data.Sqlite; -using Microsoft.Extensions.Options; -using NetEvolve.Extensions.TUnit; -using NetEvolve.Pulse.Extensibility; -using NetEvolve.Pulse.Extensibility.Outbox; -using NetEvolve.Pulse.Outbox; -using TUnit.Core; - -[TestGroup("SQLite")] -public sealed class SQLiteEventOutboxTests -{ - [Test] - public async Task Constructor_WithNullConnection_ThrowsArgumentNullException() => - _ = await Assert - .That(() => new SQLiteEventOutbox(null!, Options.Create(new OutboxOptions()), TimeProvider.System)) - .Throws(); - - [Test] - public async Task Constructor_WithNullOptions_ThrowsArgumentNullException() - { - await using var connection = new SqliteConnection("Data Source=:memory:"); - - _ = await Assert - .That(() => new SQLiteEventOutbox(connection, null!, TimeProvider.System)) - .Throws(); - } - - [Test] - public async Task Constructor_WithNullTimeProvider_ThrowsArgumentNullException() - { - await using var connection = new SqliteConnection("Data Source=:memory:"); - - _ = await Assert - .That(() => new SQLiteEventOutbox(connection, Options.Create(new OutboxOptions()), null!)) - .Throws(); - } - - [Test] - public async Task Constructor_WithValidArguments_CreatesInstance() - { - await using var connection = new SqliteConnection("Data Source=:memory:"); - - var outbox = new SQLiteEventOutbox(connection, Options.Create(new OutboxOptions()), TimeProvider.System); - - _ = await Assert.That(outbox).IsNotNull(); - } - - [Test] - public async Task Constructor_WithTransaction_CreatesInstance() - { - await using var connection = new SqliteConnection("Data Source=:memory:"); - - var outbox = new SQLiteEventOutbox( - connection, - Options.Create(new OutboxOptions()), - TimeProvider.System, - transaction: null - ); - - _ = await Assert.That(outbox).IsNotNull(); - } - - [Test] - public async Task StoreAsync_WithNullMessage_ThrowsArgumentNullException() - { - await using var connection = new SqliteConnection("Data Source=:memory:"); - var outbox = new SQLiteEventOutbox(connection, Options.Create(new OutboxOptions()), TimeProvider.System); - - _ = await Assert - .That(async () => await outbox.StoreAsync(null!).ConfigureAwait(false)) - .Throws(); - } - - [Test] - public async Task StoreAsync_WithLongCorrelationId_ThrowsInvalidOperationException( - CancellationToken cancellationToken - ) - { - await using var connection = new SqliteConnection("Data Source=:memory:"); - var outbox = new SQLiteEventOutbox(connection, Options.Create(new OutboxOptions()), TimeProvider.System); - var message = new TestEvent - { - CorrelationId = new string('x', OutboxMessageSchema.MaxLengths.CorrelationId + 1), - }; - - _ = await Assert - .That(async () => await outbox.StoreAsync(message, cancellationToken).ConfigureAwait(false)) - .Throws(); - } - - [Test] - public async Task StoreAsync_WithValidEvent_PersistsRow(CancellationToken cancellationToken) - { - var connectionString = $"Data Source=store_{Guid.NewGuid():N};Mode=Memory;Cache=Shared"; - await using var keepAlive = new SqliteConnection(connectionString); - await keepAlive.OpenAsync(cancellationToken).ConfigureAwait(false); - - await using ( - var create = new SqliteCommand( - """ - CREATE TABLE "OutboxMessage"( - "Id" TEXT NOT NULL, - "EventType" TEXT NOT NULL, - "Payload" TEXT NOT NULL, - "CorrelationId" TEXT NULL, - "CreatedAt" TEXT NOT NULL, - "UpdatedAt" TEXT NOT NULL, - "ProcessedAt" TEXT NULL, - "NextRetryAt" TEXT NULL, - "RetryCount" INTEGER NOT NULL DEFAULT 0, - "Error" TEXT NULL, - "Status" INTEGER NOT NULL DEFAULT 0, - CONSTRAINT "PK_OutboxMessage" PRIMARY KEY ("Id") - ); - """, - keepAlive - ) - ) - { - _ = await create.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false); - } - - await using var connection = new SqliteConnection(connectionString); - await connection.OpenAsync(cancellationToken).ConfigureAwait(false); - - var outbox = new SQLiteEventOutbox( - connection, - Options.Create(new OutboxOptions { ConnectionString = connectionString }), - TimeProvider.System - ); - - var evt = new TestEvent { CorrelationId = "corr" }; - - await outbox.StoreAsync(evt, cancellationToken).ConfigureAwait(false); - - await using var cmd = new SqliteCommand( - "SELECT \"EventType\",\"CorrelationId\",\"Status\",\"Payload\" FROM \"OutboxMessage\" WHERE \"Id\" = @Id", - keepAlive - ); - _ = cmd.Parameters.AddWithValue("@Id", evt.Id); - await using var reader = await cmd.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false); - _ = await reader.ReadAsync(cancellationToken).ConfigureAwait(false); - - using (Assert.Multiple()) - { - _ = await Assert.That(reader.GetString(0)).IsEqualTo(evt.GetType().AssemblyQualifiedName); - _ = await Assert.That(reader.GetString(1)).IsEqualTo("corr"); - _ = await Assert.That(reader.GetInt64(2)).IsEqualTo((long)OutboxMessageStatus.Pending); - _ = await Assert.That(reader.GetString(3)).IsNotNullOrWhiteSpace(); - } - } - - [Test] - public async Task StoreAsync_WithOversizedEventType_ThrowsInvalidOperationException( - CancellationToken cancellationToken - ) - { - var connectionString = $"Data Source=type_{Guid.NewGuid():N};Mode=Memory;Cache=Shared"; - await using var keepAlive = new SqliteConnection(connectionString); - await keepAlive.OpenAsync(cancellationToken).ConfigureAwait(false); - await using ( - var create = new SqliteCommand( - """ - CREATE TABLE "OutboxMessage"( - "Id" TEXT NOT NULL, - "EventType" TEXT NOT NULL, - "Payload" TEXT NOT NULL, - "CorrelationId" TEXT NULL, - "CreatedAt" TEXT NOT NULL, - "UpdatedAt" TEXT NOT NULL, - "ProcessedAt" TEXT NULL, - "NextRetryAt" TEXT NULL, - "RetryCount" INTEGER NOT NULL DEFAULT 0, - "Error" TEXT NULL, - "Status" INTEGER NOT NULL DEFAULT 0, - CONSTRAINT "PK_OutboxMessage" PRIMARY KEY ("Id") - ); - """, - keepAlive - ) - ) - { - _ = await create.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false); - } - - await using var connection = new SqliteConnection(connectionString); - await connection.OpenAsync(cancellationToken).ConfigureAwait(false); - - var outbox = new SQLiteEventOutbox( - connection, - Options.Create(new OutboxOptions { ConnectionString = connectionString }), - TimeProvider.System - ); - - var longEvent = CreateLongTypeEvent(); - - _ = await Assert - .That(async () => await outbox.StoreAsync(longEvent, cancellationToken).ConfigureAwait(false)) - .Throws(); - } - - private sealed record TestEvent : IEvent - { - public string? CorrelationId { get; set; } - public string Id { get; init; } = Guid.NewGuid().ToString(); - public DateTimeOffset? PublishedAt { get; set; } - } - - private static IEvent CreateLongTypeEvent() - { - var assemblyName = new AssemblyName($"DynEvt_{Guid.NewGuid():N}"); - var assemblyBuilder = AssemblyBuilder.DefineDynamicAssembly(assemblyName, AssemblyBuilderAccess.Run); - var moduleBuilder = assemblyBuilder.DefineDynamicModule("dyn"); - var typeName = new string('E', OutboxMessageSchema.MaxLengths.EventType + 10); - var typeBuilder = moduleBuilder.DefineType( - typeName, - TypeAttributes.Public | TypeAttributes.Class | TypeAttributes.Sealed, - typeof(object), - [typeof(IEvent)] - ); - - var idField = typeBuilder.DefineField("_id", typeof(string), FieldAttributes.Private); - var corrField = typeBuilder.DefineField("_corr", typeof(string), FieldAttributes.Private); - var publishedField = typeBuilder.DefineField("_pub", typeof(DateTimeOffset?), FieldAttributes.Private); - - DefineAutoProperty(typeBuilder, idField, nameof(IEvent.Id), typeof(string)); - DefineAutoProperty(typeBuilder, corrField, nameof(IEvent.CorrelationId), typeof(string)); - DefineAutoProperty(typeBuilder, publishedField, nameof(IEvent.PublishedAt), typeof(DateTimeOffset?)); - - var ctor = typeBuilder.DefineConstructor(MethodAttributes.Public, CallingConventions.Standard, Type.EmptyTypes); - var il = ctor.GetILGenerator(); - il.Emit(OpCodes.Ldarg_0); - il.Emit(OpCodes.Call, typeof(object).GetConstructor(Type.EmptyTypes)!); - il.Emit(OpCodes.Ldarg_0); - il.Emit(OpCodes.Ldstr, Guid.NewGuid().ToString()); - il.Emit(OpCodes.Stfld, idField); - il.Emit(OpCodes.Ret); - - var eventType = typeBuilder.CreateType()!; - return (IEvent)Activator.CreateInstance(eventType)!; - } - - private static void DefineAutoProperty( - TypeBuilder typeBuilder, - FieldBuilder backingField, - string propertyName, - Type propertyType - ) - { - var propertyBuilder = typeBuilder.DefineProperty( - propertyName, - PropertyAttributes.None, - propertyType, - Type.EmptyTypes - ); - - var getMethod = typeBuilder.DefineMethod( - $"get_{propertyName}", - MethodAttributes.Public - | MethodAttributes.Virtual - | MethodAttributes.SpecialName - | MethodAttributes.HideBySig, - propertyType, - Type.EmptyTypes - ); - var getIl = getMethod.GetILGenerator(); - getIl.Emit(OpCodes.Ldarg_0); - getIl.Emit(OpCodes.Ldfld, backingField); - getIl.Emit(OpCodes.Ret); - propertyBuilder.SetGetMethod(getMethod); - - var setMethod = typeBuilder.DefineMethod( - $"set_{propertyName}", - MethodAttributes.Public - | MethodAttributes.Virtual - | MethodAttributes.SpecialName - | MethodAttributes.HideBySig, - null, - [propertyType] - ); - var setIl = setMethod.GetILGenerator(); - setIl.Emit(OpCodes.Ldarg_0); - setIl.Emit(OpCodes.Ldarg_1); - setIl.Emit(OpCodes.Stfld, backingField); - setIl.Emit(OpCodes.Ret); - propertyBuilder.SetSetMethod(setMethod); - } -} diff --git a/tests/NetEvolve.Pulse.Tests.Unit/SqlServer/SqlServerEventOutboxTests.cs b/tests/NetEvolve.Pulse.Tests.Unit/SqlServer/SqlServerEventOutboxTests.cs deleted file mode 100644 index 66fb573e..00000000 --- a/tests/NetEvolve.Pulse.Tests.Unit/SqlServer/SqlServerEventOutboxTests.cs +++ /dev/null @@ -1,209 +0,0 @@ -namespace NetEvolve.Pulse.Tests.Unit.SqlServer; - -using System; -using System.Threading.Tasks; -using Microsoft.Data.SqlClient; -using Microsoft.Extensions.Options; -using NetEvolve.Extensions.TUnit; -using NetEvolve.Pulse.Extensibility; -using NetEvolve.Pulse.Extensibility.Outbox; -using NetEvolve.Pulse.Outbox; -using TUnit.Core; -using TUnit.Mocks; - -[TestGroup("SqlServer")] -public sealed class SqlServerEventOutboxTests -{ - [Test] - public async Task Constructor_WithNullConnection_ThrowsArgumentNullException() => - _ = await Assert - .That(() => new SqlServerEventOutbox(null!, Options.Create(new OutboxOptions()), TimeProvider.System)) - .Throws(); - - [Test] - public async Task Constructor_WithNullOptions_ThrowsArgumentNullException() - { - await using var connection = new SqlConnection("Server=.;Encrypt=true;"); - - _ = await Assert - .That(() => new SqlServerEventOutbox(connection, null!, TimeProvider.System)) - .Throws(); - } - - [Test] - public async Task Constructor_WithNullTimeProvider_ThrowsArgumentNullException() - { - await using var connection = new SqlConnection("Server=.;Encrypt=true;"); - - _ = await Assert - .That(() => new SqlServerEventOutbox(connection, Options.Create(new OutboxOptions()), null!)) - .Throws(); - } - - [Test] - public async Task Constructor_WithValidArguments_CreatesInstance() - { - await using var connection = new SqlConnection("Server=.;Encrypt=true;"); - - var outbox = new SqlServerEventOutbox(connection, Options.Create(new OutboxOptions()), TimeProvider.System); - - _ = await Assert.That(outbox).IsNotNull(); - } - - [Test] - public async Task Constructor_WithTransaction_CreatesInstance() - { - await using var connection = new SqlConnection("Server=.;Encrypt=true;"); - - var outbox = new SqlServerEventOutbox( - connection, - Options.Create(new OutboxOptions()), - TimeProvider.System, - transaction: null - ); - - _ = await Assert.That(outbox).IsNotNull(); - } - - [Test] - public async Task Constructor_DiConstructor_WithNullOptions_ThrowsArgumentNullException() => - _ = await Assert - .That(() => new SqlServerEventOutbox((IOptions)null!, TimeProvider.System)) - .Throws(); - - [Test] - public async Task Constructor_DiConstructor_WithNullTimeProvider_ThrowsArgumentNullException() => - _ = await Assert - .That(() => - new SqlServerEventOutbox( - Options.Create(new OutboxOptions { ConnectionString = "Server=.;Encrypt=true;" }), - null! - ) - ) - .Throws(); - - [Test] - public async Task Constructor_DiConstructor_WithEmptyConnectionString_ThrowsArgumentException() => - _ = await Assert - .That(() => - new SqlServerEventOutbox( - Options.Create(new OutboxOptions { ConnectionString = string.Empty }), - TimeProvider.System - ) - ) - .Throws(); - - [Test] - public async Task Constructor_DiConstructor_WithValidConnectionString_CreatesInstance() - { - var outbox = new SqlServerEventOutbox( - Options.Create(new OutboxOptions { ConnectionString = "Server=.;Encrypt=true;" }), - TimeProvider.System - ); - - _ = await Assert.That(outbox).IsNotNull(); - } - - [Test] - public async Task Constructor_DiConstructor_WithTransactionScope_CreatesInstance() - { - var transactionScope = Mock.Of(); - _ = transactionScope.GetCurrentTransaction().Returns(null!); - - var outbox = new SqlServerEventOutbox( - Options.Create(new OutboxOptions { ConnectionString = "Server=.;Encrypt=true;" }), - TimeProvider.System, - transactionScope.Object - ); - - _ = await Assert.That(outbox).IsNotNull(); - } - - [Test] - public async Task StoreAsync_WithNullMessage_ThrowsArgumentNullException() - { - await using var connection = new SqlConnection("Server=.;Encrypt=true;"); - var outbox = new SqlServerEventOutbox(connection, Options.Create(new OutboxOptions()), TimeProvider.System); - - _ = await Assert - .That(async () => await outbox.StoreAsync(null!).ConfigureAwait(false)) - .Throws(); - } - - [Test] - public async Task StoreAsync_WithLongCorrelationId_ThrowsInvalidOperationException( - CancellationToken cancellationToken - ) - { - await using var connection = new SqlConnection("Server=.;Encrypt=true;"); - var outbox = new SqlServerEventOutbox(connection, Options.Create(new OutboxOptions()), TimeProvider.System); - var message = new TestEvent - { - CorrelationId = new string('x', OutboxMessageSchema.MaxLengths.CorrelationId + 1), - }; - - _ = await Assert - .That(async () => await outbox.StoreAsync(message, cancellationToken).ConfigureAwait(false)) - .Throws(); - } - - [Test] - public async Task StoreAsync_DiConstructor_WithNullMessage_ThrowsArgumentNullException() - { - var outbox = new SqlServerEventOutbox( - Options.Create(new OutboxOptions { ConnectionString = "Server=.;Encrypt=true;" }), - TimeProvider.System - ); - - _ = await Assert - .That(async () => await outbox.StoreAsync(null!).ConfigureAwait(false)) - .Throws(); - } - - [Test] - public async Task StoreAsync_DiConstructor_WithLongCorrelationId_ThrowsInvalidOperationException( - CancellationToken cancellationToken - ) - { - var outbox = new SqlServerEventOutbox( - Options.Create(new OutboxOptions { ConnectionString = "Server=.;Encrypt=true;" }), - TimeProvider.System - ); - var message = new TestEvent - { - CorrelationId = new string('x', OutboxMessageSchema.MaxLengths.CorrelationId + 1), - }; - - _ = await Assert - .That(async () => await outbox.StoreAsync(message, cancellationToken).ConfigureAwait(false)) - .Throws(); - } - - [Test] - public async Task StoreAsync_DiConstructor_WithInvalidTransactionType_ThrowsInvalidOperationException( - CancellationToken cancellationToken - ) - { - var transactionScope = Mock.Of(); - _ = transactionScope.GetCurrentTransaction().Returns(new object()); // not a SqlTransaction - - var outbox = new SqlServerEventOutbox( - Options.Create(new OutboxOptions { ConnectionString = "Server=.;Encrypt=true;" }), - TimeProvider.System, - transactionScope.Object - ); - - var message = new TestEvent(); - - _ = await Assert - .That(async () => await outbox.StoreAsync(message, cancellationToken).ConfigureAwait(false)) - .Throws(); - } - - private sealed record TestEvent : IEvent - { - public string? CorrelationId { get; set; } - public string Id { get; init; } = Guid.NewGuid().ToString(); - public DateTimeOffset? PublishedAt { get; set; } - } -} diff --git a/tests/NetEvolve.Pulse.Tests.Unit/SqlServer/SqlServerExtensionsTests.cs b/tests/NetEvolve.Pulse.Tests.Unit/SqlServer/SqlServerExtensionsTests.cs index 07d04eaa..c699deed 100644 --- a/tests/NetEvolve.Pulse.Tests.Unit/SqlServer/SqlServerExtensionsTests.cs +++ b/tests/NetEvolve.Pulse.Tests.Unit/SqlServer/SqlServerExtensionsTests.cs @@ -78,7 +78,7 @@ public async Task AddSqlServerOutbox_WithValidConnectionString_RegistersEventOut { _ = await Assert.That(descriptor).IsNotNull(); _ = await Assert.That(descriptor!.Lifetime).IsEqualTo(ServiceLifetime.Scoped); - _ = await Assert.That(descriptor!.ImplementationType).IsEqualTo(typeof(SqlServerEventOutbox)); + _ = await Assert.That(descriptor!.ImplementationType).IsEqualTo(typeof(OutboxEventStore)); } } @@ -161,7 +161,7 @@ public async Task AddSqlServerOutbox_WithFactory_RegistersEventOutboxAsScoped() { _ = await Assert.That(descriptor).IsNotNull(); _ = await Assert.That(descriptor!.Lifetime).IsEqualTo(ServiceLifetime.Scoped); - _ = await Assert.That(descriptor!.ImplementationType).IsEqualTo(typeof(SqlServerEventOutbox)); + _ = await Assert.That(descriptor!.ImplementationType).IsEqualTo(typeof(OutboxEventStore)); } }