diff --git a/src/NetEvolve.Pulse.EntityFramework/EntityFrameworkExtensions.cs b/src/NetEvolve.Pulse.EntityFramework/EntityFrameworkExtensions.cs
index f73c5bd4..b115b03c 100644
--- a/src/NetEvolve.Pulse.EntityFramework/EntityFrameworkExtensions.cs
+++ b/src/NetEvolve.Pulse.EntityFramework/EntityFrameworkExtensions.cs
@@ -29,7 +29,7 @@ public static class EntityFrameworkExtensions
/// Registered Services:
///
/// - as (Scoped)
- /// - as (Scoped)
+ /// - as (Scoped)
/// - as (Scoped)
/// - as (Scoped)
///
@@ -58,23 +58,18 @@ public static IMediatorBuilder AddEntityFrameworkOutbox(
{
ArgumentNullException.ThrowIfNull(configurator);
- var services = configurator.AddOutbox(configureOptions).Services;
-
- // Register the repository
- _ = services.RemoveAll();
- _ = services.AddScoped>();
-
- // Register the event outbox (overrides the default OutboxEventStore)
- _ = services.RemoveAll();
- _ = services.AddScoped>();
-
- // Register the transaction scope
- _ = services.RemoveAll();
- _ = services.AddScoped>();
-
- // Register the management API
- _ = services.RemoveAll();
- _ = services.AddScoped>();
+ _ = configurator
+ .AddOutbox(configureOptions)
+ .Services
+ // Register the repository
+ .RemoveAll()
+ .AddScoped>()
+ // Register the transaction scope
+ .RemoveAll()
+ .AddScoped>()
+ // Register the management API
+ .RemoveAll()
+ .AddScoped>();
return configurator;
}
diff --git a/src/NetEvolve.Pulse.EntityFramework/Outbox/EntityFrameworkOutbox.cs b/src/NetEvolve.Pulse.EntityFramework/Outbox/EntityFrameworkOutbox.cs
deleted file mode 100644
index 90d8bea4..00000000
--- a/src/NetEvolve.Pulse.EntityFramework/Outbox/EntityFrameworkOutbox.cs
+++ /dev/null
@@ -1,104 +0,0 @@
-namespace NetEvolve.Pulse.Outbox;
-
-using System.Text.Json;
-using Microsoft.EntityFrameworkCore;
-using Microsoft.Extensions.Options;
-using NetEvolve.Pulse.Extensibility;
-using NetEvolve.Pulse.Extensibility.Outbox;
-
-///
-/// Entity Framework Core implementation of that stores events
-/// using the DbContext and participates in ambient transactions.
-///
-///
-/// Transaction Integration:
-/// Events stored via this class automatically participate in the current DbContext transaction.
-/// If the transaction is rolled back, the event is also discarded.
-/// Usage Pattern:
-/// Use within a DbContext scope where business operations and event storage share the same transaction.
-///
-///
-///
-/// public class OrderService
-/// {
-/// private readonly MyDbContext _context;
-/// private readonly IEventOutbox _outbox;
-///
-/// public async Task CreateOrderAsync(Order order, CancellationToken ct)
-/// {
-/// await using var transaction = await _context.Database.BeginTransactionAsync(ct);
-///
-/// _context.Orders.Add(order);
-/// await _context.SaveChangesAsync(ct);
-///
-/// await _outbox.StoreAsync(new OrderCreatedEvent { OrderId = order.Id }, ct);
-///
-/// await transaction.CommitAsync(ct);
-/// }
-/// }
-///
-///
-/// The DbContext type that implements .
-internal sealed class EntityFrameworkOutbox : IEventOutbox
- where TContext : DbContext, IOutboxDbContext
-{
- /// The DbContext used for all database operations within the current scope.
- private readonly TContext _context;
-
- /// The resolved outbox options controlling serialization and table configuration.
- private readonly OutboxOptions _options;
-
- /// The time provider used to generate consistent creation and update timestamps.
- private readonly TimeProvider _timeProvider;
-
- ///
- /// Initializes a new instance of the class.
- ///
- /// The DbContext for database operations.
- /// The outbox configuration options.
- /// The time provider for timestamps.
- public EntityFrameworkOutbox(TContext context, IOptions options, TimeProvider timeProvider)
- {
- ArgumentNullException.ThrowIfNull(context);
- ArgumentNullException.ThrowIfNull(options);
- ArgumentNullException.ThrowIfNull(timeProvider);
-
- _context = context;
- _options = options.Value;
- _timeProvider = timeProvider;
- }
-
- ///
- public async Task StoreAsync(TEvent message, CancellationToken cancellationToken = default)
- where TEvent : IEvent
- {
- ArgumentNullException.ThrowIfNull(message);
-
- var messageType = message.GetType();
-
- var correlationId = message.CorrelationId;
-
- if (correlationId is { Length: > OutboxMessageSchema.MaxLengths.CorrelationId })
- {
- throw new InvalidOperationException(
- $"CorrelationId exceeds the maximum length of {OutboxMessageSchema.MaxLengths.CorrelationId} characters defined by the OutboxMessage schema. "
- + "Provide a shorter correlation identifier to comply with the database constraint."
- );
- }
-
- var now = _timeProvider.GetUtcNow();
- var outboxMessage = new OutboxMessage
- {
- Id = message.ToOutboxId(),
- EventType = messageType,
- Payload = JsonSerializer.Serialize(message, messageType, _options.JsonSerializerOptions),
- CorrelationId = correlationId,
- CreatedAt = now,
- UpdatedAt = now,
- Status = OutboxMessageStatus.Pending,
- };
-
- _ = await _context.OutboxMessages.AddAsync(outboxMessage, cancellationToken).ConfigureAwait(false);
- _ = await _context.SaveChangesAsync(cancellationToken).ConfigureAwait(false);
- }
-}
diff --git a/src/NetEvolve.Pulse.PostgreSql/Outbox/PostgreSqlEventOutbox.cs b/src/NetEvolve.Pulse.PostgreSql/Outbox/PostgreSqlEventOutbox.cs
deleted file mode 100644
index de98709e..00000000
--- a/src/NetEvolve.Pulse.PostgreSql/Outbox/PostgreSqlEventOutbox.cs
+++ /dev/null
@@ -1,136 +0,0 @@
-namespace NetEvolve.Pulse.Outbox;
-
-using System.Diagnostics.CodeAnalysis;
-using System.Text.Json;
-using Microsoft.Extensions.Options;
-using NetEvolve.Pulse.Extensibility;
-using NetEvolve.Pulse.Extensibility.Outbox;
-using Npgsql;
-
-///
-/// PostgreSQL implementation of that stores events using ADO.NET
-/// with support for enlisting in existing instances.
-///
-///
-/// Transaction Integration:
-/// This implementation can participate in an existing PostgreSQL transaction by:
-///
-/// - Accepting a with active transaction via constructor
-/// - Using for ambient transaction support
-///
-/// Atomicity:
-/// When called within a transaction, the event is stored atomically with business data.
-/// If the transaction rolls back, the event is also discarded.
-///
-[SuppressMessage(
- "Reliability",
- "CA2007:Consider calling ConfigureAwait on the awaited task",
- Justification = "await using statements in library code; ConfigureAwait applied to all Task-returning awaits."
-)]
-[SuppressMessage(
- "Security",
- "CA2100:Review SQL queries for security vulnerabilities",
- Justification = "SQL constructed from validated OutboxOptions properties and strongly-typed parameters, not user input."
-)]
-public sealed class PostgreSqlEventOutbox : IEventOutbox
-{
- /// The open PostgreSQL connection used to execute insert commands.
- private readonly NpgsqlConnection _connection;
-
- /// The optional PostgreSQL transaction to enlist with, ensuring atomicity with business operations.
- private readonly NpgsqlTransaction? _transaction;
-
- /// The resolved outbox options controlling table name, schema, and JSON serialization.
- private readonly OutboxOptions _options;
-
- /// The time provider used to generate consistent creation and update timestamps.
- private readonly TimeProvider _timeProvider;
-
- /// Represents the SQL statement used for inserting data into a database table.
- private readonly string _sqlInsertInto;
-
- ///
- /// Initializes a new instance of the class.
- ///
- /// The PostgreSQL connection (should already be open).
- /// The outbox configuration options.
- /// The time provider for timestamps.
- /// Optional transaction to enlist with.
- public PostgreSqlEventOutbox(
- NpgsqlConnection connection,
- IOptions options,
- TimeProvider timeProvider,
- NpgsqlTransaction? transaction = null
- )
- {
- ArgumentNullException.ThrowIfNull(connection);
- ArgumentNullException.ThrowIfNull(options);
- ArgumentNullException.ThrowIfNull(timeProvider);
-
- _connection = connection;
- _options = options.Value;
- _timeProvider = timeProvider;
- _transaction = transaction;
-
- _sqlInsertInto = $"""
- INSERT INTO {_options.FullTableName}
- ("{OutboxMessageSchema.Columns.Id}",
- "{OutboxMessageSchema.Columns.EventType}",
- "{OutboxMessageSchema.Columns.Payload}",
- "{OutboxMessageSchema.Columns.CorrelationId}",
- "{OutboxMessageSchema.Columns.CreatedAt}",
- "{OutboxMessageSchema.Columns.UpdatedAt}",
- "{OutboxMessageSchema.Columns.ProcessedAt}",
- "{OutboxMessageSchema.Columns.RetryCount}",
- "{OutboxMessageSchema.Columns.Error}",
- "{OutboxMessageSchema.Columns.Status}")
- VALUES
- (@Id, @EventType, @Payload, @CorrelationId, @CreatedAt, @UpdatedAt, NULL, 0, NULL, @Status)
- """;
- }
-
- ///
- public async Task StoreAsync(TEvent message, CancellationToken cancellationToken = default)
- where TEvent : IEvent
- {
- ArgumentNullException.ThrowIfNull(message);
-
- var messageType = message.GetType();
- var eventType =
- messageType.AssemblyQualifiedName
- ?? throw new InvalidOperationException($"Cannot get assembly-qualified name for type: {messageType}");
-
- if (eventType is { Length: > OutboxMessageSchema.MaxLengths.EventType })
- {
- throw new InvalidOperationException(
- $"Event type identifier exceeds the EventType column maximum length of {OutboxMessageSchema.MaxLengths.EventType} characters. "
- + "Shorten the type identifier, increase the database column length, or use Type.FullName with a type registry."
- );
- }
-
- var correlationId = message.CorrelationId;
-
- if (correlationId is { Length: > OutboxMessageSchema.MaxLengths.CorrelationId })
- {
- throw new InvalidOperationException(
- $"CorrelationId exceeds the maximum length of {OutboxMessageSchema.MaxLengths.CorrelationId} characters defined by the OutboxMessage schema. "
- + "Provide a shorter correlation identifier to comply with the database constraint."
- );
- }
-
- await using var command = new NpgsqlCommand(_sqlInsertInto, _connection, _transaction);
-
- var now = _timeProvider.GetUtcNow();
- var payload = JsonSerializer.Serialize(message, messageType, _options.JsonSerializerOptions);
-
- _ = command.Parameters.AddWithValue("Id", message.ToOutboxId());
- _ = command.Parameters.AddWithValue("EventType", eventType);
- _ = command.Parameters.AddWithValue("Payload", payload);
- _ = command.Parameters.AddWithValue("CorrelationId", (object?)correlationId ?? DBNull.Value);
- _ = command.Parameters.AddWithValue("CreatedAt", now);
- _ = command.Parameters.AddWithValue("UpdatedAt", now);
- _ = command.Parameters.AddWithValue("Status", (int)OutboxMessageStatus.Pending);
-
- _ = await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
- }
-}
diff --git a/src/NetEvolve.Pulse.SQLite/Outbox/SQLiteEventOutbox.cs b/src/NetEvolve.Pulse.SQLite/Outbox/SQLiteEventOutbox.cs
deleted file mode 100644
index 65f541d6..00000000
--- a/src/NetEvolve.Pulse.SQLite/Outbox/SQLiteEventOutbox.cs
+++ /dev/null
@@ -1,138 +0,0 @@
-namespace NetEvolve.Pulse.Outbox;
-
-using System.Diagnostics.CodeAnalysis;
-using System.Text.Json;
-using Microsoft.Data.Sqlite;
-using Microsoft.Extensions.Options;
-using NetEvolve.Pulse.Extensibility;
-using NetEvolve.Pulse.Extensibility.Outbox;
-
-///
-/// SQLite implementation of that stores events using ADO.NET
-/// with support for enlisting in existing instances.
-///
-///
-/// Transaction Integration:
-/// This implementation can participate in an existing SQLite transaction by:
-///
-/// - Accepting a with active transaction via constructor
-/// - Using for ambient transaction support
-///
-/// Atomicity:
-/// When called within a transaction, the event is stored atomically with business data.
-/// If the transaction rolls back, the event is also discarded.
-///
-[SuppressMessage(
- "Reliability",
- "CA2007:Consider calling ConfigureAwait on the awaited task",
- Justification = "await using statements in library code; ConfigureAwait applied to all Task-returning awaits."
-)]
-[SuppressMessage(
- "Security",
- "CA2100:Review SQL queries for security vulnerabilities",
- Justification = "SQL constructed from validated OutboxOptions properties and strongly-typed parameters, not user input."
-)]
-public sealed class SQLiteEventOutbox : IEventOutbox
-{
- /// The open SQLite connection used to execute insert commands.
- private readonly SqliteConnection _connection;
-
- /// The optional SQLite transaction to enlist with, ensuring atomicity with business operations.
- private readonly SqliteTransaction? _transaction;
-
- /// The resolved outbox options controlling table name and JSON serialization.
- private readonly OutboxOptions _options;
-
- /// The time provider used to generate consistent creation and update timestamps.
- private readonly TimeProvider _timeProvider;
-
- /// The SQL statement used for inserting a new outbox message.
- private readonly string _sqlInsertInto;
-
- ///
- /// Initializes a new instance of the class.
- ///
- /// The SQLite connection (should already be open).
- /// The outbox configuration options.
- /// The time provider for timestamps.
- /// Optional transaction to enlist with.
- public SQLiteEventOutbox(
- SqliteConnection connection,
- IOptions options,
- TimeProvider timeProvider,
- SqliteTransaction? transaction = null
- )
- {
- ArgumentNullException.ThrowIfNull(connection);
- ArgumentNullException.ThrowIfNull(options);
- ArgumentNullException.ThrowIfNull(timeProvider);
-
- _connection = connection;
- _options = options.Value;
- _timeProvider = timeProvider;
- _transaction = transaction;
-
- var table = _options.FullTableName;
-
- _sqlInsertInto = $"""
- INSERT INTO {table}
- ("{OutboxMessageSchema.Columns.Id}",
- "{OutboxMessageSchema.Columns.EventType}",
- "{OutboxMessageSchema.Columns.Payload}",
- "{OutboxMessageSchema.Columns.CorrelationId}",
- "{OutboxMessageSchema.Columns.CreatedAt}",
- "{OutboxMessageSchema.Columns.UpdatedAt}",
- "{OutboxMessageSchema.Columns.ProcessedAt}",
- "{OutboxMessageSchema.Columns.RetryCount}",
- "{OutboxMessageSchema.Columns.Error}",
- "{OutboxMessageSchema.Columns.Status}")
- VALUES
- (@Id, @EventType, @Payload, @CorrelationId, @CreatedAt, @UpdatedAt, NULL, 0, NULL, @Status);
- """;
- }
-
- ///
- public async Task StoreAsync(TEvent message, CancellationToken cancellationToken = default)
- where TEvent : IEvent
- {
- ArgumentNullException.ThrowIfNull(message);
-
- var messageType = message.GetType();
- var eventType =
- messageType.AssemblyQualifiedName
- ?? throw new InvalidOperationException($"Cannot get assembly-qualified name for type: {messageType}");
-
- if (eventType.Length > OutboxMessageSchema.MaxLengths.EventType)
- {
- throw new InvalidOperationException(
- $"Event type identifier exceeds the EventType column maximum length of {OutboxMessageSchema.MaxLengths.EventType} characters. "
- + "Shorten the type identifier, increase the database column length, or use Type.FullName with a type registry."
- );
- }
-
- var correlationId = message.CorrelationId;
-
- if (correlationId is { Length: > OutboxMessageSchema.MaxLengths.CorrelationId })
- {
- throw new InvalidOperationException(
- $"CorrelationId exceeds the maximum length of {OutboxMessageSchema.MaxLengths.CorrelationId} characters defined by the OutboxMessage schema. "
- + "Provide a shorter correlation identifier to comply with the database constraint."
- );
- }
-
- await using var command = new SqliteCommand(_sqlInsertInto, _connection, _transaction);
-
- var now = _timeProvider.GetUtcNow();
- var payload = JsonSerializer.Serialize(message, messageType, _options.JsonSerializerOptions);
-
- _ = command.Parameters.AddWithValue("@Id", message.ToOutboxId().ToString());
- _ = command.Parameters.AddWithValue("@EventType", eventType);
- _ = command.Parameters.AddWithValue("@Payload", payload);
- _ = command.Parameters.AddWithValue("@CorrelationId", (object?)correlationId ?? DBNull.Value);
- _ = command.Parameters.AddWithValue("@CreatedAt", now);
- _ = command.Parameters.AddWithValue("@UpdatedAt", now);
- _ = command.Parameters.AddWithValue("@Status", (int)OutboxMessageStatus.Pending);
-
- _ = await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
- }
-}
diff --git a/src/NetEvolve.Pulse.SqlServer/Outbox/SqlServerEventOutbox.cs b/src/NetEvolve.Pulse.SqlServer/Outbox/SqlServerEventOutbox.cs
deleted file mode 100644
index 9fb26844..00000000
--- a/src/NetEvolve.Pulse.SqlServer/Outbox/SqlServerEventOutbox.cs
+++ /dev/null
@@ -1,216 +0,0 @@
-namespace NetEvolve.Pulse.Outbox;
-
-using System.Diagnostics.CodeAnalysis;
-using System.Text.Json;
-using Microsoft.Data.SqlClient;
-using Microsoft.Extensions.Options;
-using NetEvolve.Pulse.Extensibility;
-using NetEvolve.Pulse.Extensibility.Outbox;
-
-///
-/// SQL Server implementation of that stores events using ADO.NET
-/// with support for enlisting in existing instances.
-///
-///
-/// Transaction Integration:
-/// This implementation can participate in an existing SQL Server transaction by:
-///
-/// - Accepting a with active transaction via constructor
-/// - Using for ambient transaction support
-///
-/// Atomicity:
-/// When called within a transaction, the event is stored atomically with business data.
-/// If the transaction rolls back, the event is also discarded.
-///
-[SuppressMessage(
- "Reliability",
- "CA2007:Consider calling ConfigureAwait on the awaited task",
- Justification = "await using statements in library code; ConfigureAwait applied to all Task-returning awaits."
-)]
-[SuppressMessage(
- "Security",
- "CA2100:Review SQL queries for security vulnerabilities",
- Justification = "SQL constructed from validated OutboxOptions properties and strongly-typed parameters, not user input."
-)]
-public sealed class SqlServerEventOutbox : IEventOutbox
-{
- /// The open SQL connection provided explicitly; null when using the DI-friendly constructor.
- private readonly SqlConnection? _explicitConnection;
-
- /// The optional SQL transaction provided explicitly; null when using the DI-friendly constructor.
- private readonly SqlTransaction? _explicitTransaction;
-
- /// The connection string used by the DI-friendly constructor; null when using the explicit-connection constructor.
- private readonly string? _connectionString;
-
- /// The optional transaction scope used by the DI-friendly constructor to obtain an ambient transaction.
- private readonly IOutboxTransactionScope? _transactionScope;
-
- /// The resolved outbox options controlling table name, schema, and JSON serialization.
- private readonly OutboxOptions _options;
-
- /// The time provider used to generate consistent creation and update timestamps.
- private readonly TimeProvider _timeProvider;
-
- /// Represents the SQL statement used for inserting data into a database table.
- private readonly string _sqlInsertInto;
-
- ///
- /// Initializes a new instance of the class using an explicit connection.
- ///
- /// The SQL connection (should already be open).
- /// The outbox configuration options.
- /// The time provider for timestamps.
- /// Optional transaction to enlist with.
- public SqlServerEventOutbox(
- SqlConnection connection,
- IOptions options,
- TimeProvider timeProvider,
- SqlTransaction? transaction = null
- )
- {
- ArgumentNullException.ThrowIfNull(connection);
- ArgumentNullException.ThrowIfNull(options);
- ArgumentNullException.ThrowIfNull(timeProvider);
-
- _explicitConnection = connection;
- _explicitTransaction = transaction;
- _options = options.Value;
- _timeProvider = timeProvider;
-
- _sqlInsertInto = BuildInsertSql(_options);
- }
-
- ///
- /// Initializes a new instance of the class for use with dependency injection.
- /// Opens its own on each call and enlists in
- /// an active from when present.
- ///
- /// The outbox configuration options (must include a connection string).
- /// The time provider for timestamps.
- /// Optional ambient transaction scope; when active, the event is stored within the same transaction.
- public SqlServerEventOutbox(
- IOptions options,
- TimeProvider timeProvider,
- IOutboxTransactionScope? transactionScope = null
- )
- {
- ArgumentNullException.ThrowIfNull(options);
- ArgumentException.ThrowIfNullOrWhiteSpace(options.Value.ConnectionString);
- ArgumentNullException.ThrowIfNull(timeProvider);
-
- _connectionString = options.Value.ConnectionString;
- _transactionScope = transactionScope;
- _options = options.Value;
- _timeProvider = timeProvider;
-
- _sqlInsertInto = BuildInsertSql(_options);
- }
-
- ///
- public async Task StoreAsync(TEvent message, CancellationToken cancellationToken = default)
- where TEvent : IEvent
- {
- ArgumentNullException.ThrowIfNull(message);
-
- var messageType = message.GetType();
- var eventType =
- messageType.AssemblyQualifiedName
- ?? throw new InvalidOperationException($"Cannot get assembly-qualified name for type: {messageType}");
-
- if (eventType.Length > OutboxMessageSchema.MaxLengths.EventType)
- {
- throw new InvalidOperationException(
- $"Event type identifier exceeds the EventType column maximum length of {OutboxMessageSchema.MaxLengths.EventType} characters. "
- + "Shorten the type identifier, increase the database column length, or use Type.FullName with a type registry."
- );
- }
-
- var correlationId = message.CorrelationId;
-
- if (correlationId is { Length: > OutboxMessageSchema.MaxLengths.CorrelationId })
- {
- throw new InvalidOperationException(
- $"CorrelationId exceeds the maximum length of {OutboxMessageSchema.MaxLengths.CorrelationId} characters defined by the OutboxMessage schema. "
- + "Provide a shorter correlation identifier to comply with the database constraint."
- );
- }
-
- var now = _timeProvider.GetUtcNow();
- var payload = JsonSerializer.Serialize(message, messageType, _options.JsonSerializerOptions);
-
- if (_explicitConnection is not null)
- {
- await using var command = new SqlCommand(_sqlInsertInto, _explicitConnection, _explicitTransaction);
- AddParameters(command, message, eventType, correlationId, now, payload);
- _ = await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
- }
- else
- {
- var currentTransaction = _transactionScope?.GetCurrentTransaction();
-
- if (currentTransaction is not null and not SqlTransaction)
- {
- throw new InvalidOperationException(
- $"IOutboxTransactionScope returned a transaction of type '{currentTransaction.GetType().Name}', but SqlServerEventOutbox requires a SqlTransaction."
- );
- }
-
- var ambientTransaction = currentTransaction as SqlTransaction;
-
- if (ambientTransaction is not null)
- {
- var connection =
- ambientTransaction.Connection
- ?? throw new InvalidOperationException("Transaction has no associated connection.");
-
- await using var command = new SqlCommand(_sqlInsertInto, connection, ambientTransaction);
- AddParameters(command, message, eventType, correlationId, now, payload);
- _ = await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
- }
- else
- {
- await using var connection = new SqlConnection(_connectionString);
- await connection.OpenAsync(cancellationToken).ConfigureAwait(false);
- await using var command = new SqlCommand(_sqlInsertInto, connection);
- AddParameters(command, message, eventType, correlationId, now, payload);
- _ = await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
- }
- }
- }
-
- private static string BuildInsertSql(OutboxOptions options) =>
- $"""
- INSERT INTO {options.FullTableName}
- ([{OutboxMessageSchema.Columns.Id}],
- [{OutboxMessageSchema.Columns.EventType}],
- [{OutboxMessageSchema.Columns.Payload}],
- [{OutboxMessageSchema.Columns.CorrelationId}],
- [{OutboxMessageSchema.Columns.CreatedAt}],
- [{OutboxMessageSchema.Columns.UpdatedAt}],
- [{OutboxMessageSchema.Columns.ProcessedAt}],
- [{OutboxMessageSchema.Columns.RetryCount}],
- [{OutboxMessageSchema.Columns.Error}],
- [{OutboxMessageSchema.Columns.Status}])
- VALUES
- (@Id, @EventType, @Payload, @CorrelationId, @CreatedAt, @UpdatedAt, NULL, 0, NULL, @Status)
- """;
-
- private static void AddParameters(
- SqlCommand command,
- IEvent message,
- string eventType,
- string? correlationId,
- DateTimeOffset now,
- string payload
- )
- {
- _ = command.Parameters.AddWithValue("@Id", message.ToOutboxId());
- _ = command.Parameters.AddWithValue("@EventType", eventType);
- _ = command.Parameters.AddWithValue("@Payload", payload);
- _ = command.Parameters.AddWithValue("@CorrelationId", (object?)correlationId ?? DBNull.Value);
- _ = command.Parameters.AddWithValue("@CreatedAt", now);
- _ = command.Parameters.AddWithValue("@UpdatedAt", now);
- _ = command.Parameters.AddWithValue("@Status", OutboxMessageStatus.Pending);
- }
-}
diff --git a/src/NetEvolve.Pulse.SqlServer/SqlServerExtensions.cs b/src/NetEvolve.Pulse.SqlServer/SqlServerExtensions.cs
index 33682eb6..993ed45c 100644
--- a/src/NetEvolve.Pulse.SqlServer/SqlServerExtensions.cs
+++ b/src/NetEvolve.Pulse.SqlServer/SqlServerExtensions.cs
@@ -25,7 +25,7 @@ public static class SqlServerExtensions
/// database objects before using this provider.
/// Registered Services:
///
- /// - as (Scoped)
+ /// - as (Scoped)
/// - as (Scoped)
/// - as (Scoped)
/// - (Singleton, if not already registered)
@@ -72,7 +72,7 @@ public static IMediatorBuilder AddSqlServerOutbox(
/// database objects before using this provider.
/// Registered Services:
///
- /// - as (Scoped)
+ /// - as (Scoped)
/// - as (Scoped)
/// - as (Scoped)
/// - (Singleton, if not already registered)
@@ -125,7 +125,7 @@ public static IMediatorBuilder AddSqlServerOutbox(
/// database objects before using this provider.
/// Registered Services:
///
- /// - as (Scoped)
+ /// - as (Scoped)
/// - as (Scoped)
/// - as (Scoped)
/// - (Singleton, if not already registered)
@@ -160,7 +160,7 @@ Action configureOptions
///
/// Registers a unit-of-work type as (Scoped) so that
- /// can enlist in the caller's transaction automatically.
+ /// can enlist in the caller's transaction automatically.
///
///
/// A type that implements both the application unit-of-work contract and .
@@ -169,7 +169,7 @@ Action configureOptions
/// The configurator for chaining.
///
/// Call this method after
- /// to wire up your unit-of-work so that automatically
+ /// to wire up your unit-of-work so that automatically
/// enlists in any active owned by the unit-of-work.
///
///
@@ -194,13 +194,11 @@ public static IMediatorBuilder AddSqlServerOutboxTransactionScope(t
private static IMediatorBuilder RegisterSqlServerOutboxServices(this IMediatorBuilder configurator)
{
// AddOutbox() uses TryAdd* internally, so this call is safe even when AddOutbox() was already invoked.
- _ = configurator.AddOutbox();
-
- var services = configurator.Services;
- _ = services.RemoveAll();
- _ = services
- .AddScoped()
+ _ = configurator
+ .AddOutbox()
+ .Services.RemoveAll()
.AddScoped()
+ .RemoveAll()
.AddScoped();
return configurator;
diff --git a/tests/NetEvolve.Pulse.Tests.Unit/EntityFramework/EntityFrameworkEventOutboxTests.cs b/tests/NetEvolve.Pulse.Tests.Unit/EntityFramework/EntityFrameworkEventOutboxTests.cs
deleted file mode 100644
index 0c81de36..00000000
--- a/tests/NetEvolve.Pulse.Tests.Unit/EntityFramework/EntityFrameworkEventOutboxTests.cs
+++ /dev/null
@@ -1,119 +0,0 @@
-namespace NetEvolve.Pulse.Tests.Unit.EntityFramework;
-
-using System;
-using System.Threading.Tasks;
-using Microsoft.EntityFrameworkCore;
-using Microsoft.Extensions.Options;
-using NetEvolve.Extensions.TUnit;
-using NetEvolve.Pulse.Extensibility;
-using NetEvolve.Pulse.Extensibility.Outbox;
-using NetEvolve.Pulse.Outbox;
-using TUnit.Core;
-
-[TestGroup("EntityFramework")]
-public sealed class EntityFrameworkEventOutboxTests
-{
- [Test]
- public async Task Constructor_WithNullContext_ThrowsArgumentNullException() =>
- _ = await Assert
- .That(() =>
- new EntityFrameworkOutbox(
- null!,
- Options.Create(new OutboxOptions()),
- TimeProvider.System
- )
- )
- .Throws();
-
- [Test]
- public async Task Constructor_WithNullOptions_ThrowsArgumentNullException()
- {
- var options = new DbContextOptionsBuilder()
- .UseInMemoryDatabase(nameof(Constructor_WithNullOptions_ThrowsArgumentNullException))
- .Options;
- await using var context = new TestDbContext(options);
-
- _ = await Assert
- .That(() => new EntityFrameworkOutbox(context, null!, TimeProvider.System))
- .Throws();
- }
-
- [Test]
- public async Task Constructor_WithNullTimeProvider_ThrowsArgumentNullException()
- {
- var options = new DbContextOptionsBuilder()
- .UseInMemoryDatabase(nameof(Constructor_WithNullTimeProvider_ThrowsArgumentNullException))
- .Options;
- await using var context = new TestDbContext(options);
-
- _ = await Assert
- .That(() => new EntityFrameworkOutbox(context, Options.Create(new OutboxOptions()), null!))
- .Throws();
- }
-
- [Test]
- public async Task Constructor_WithValidArguments_CreatesInstance()
- {
- var options = new DbContextOptionsBuilder()
- .UseInMemoryDatabase(nameof(Constructor_WithValidArguments_CreatesInstance))
- .Options;
- await using var context = new TestDbContext(options);
-
- var outbox = new EntityFrameworkOutbox(
- context,
- Options.Create(new OutboxOptions()),
- TimeProvider.System
- );
-
- _ = await Assert.That(outbox).IsNotNull();
- }
-
- [Test]
- public async Task StoreAsync_WithNullMessage_ThrowsArgumentNullException()
- {
- var options = new DbContextOptionsBuilder()
- .UseInMemoryDatabase(nameof(StoreAsync_WithNullMessage_ThrowsArgumentNullException))
- .Options;
- await using var context = new TestDbContext(options);
- var outbox = new EntityFrameworkOutbox(
- context,
- Options.Create(new OutboxOptions()),
- TimeProvider.System
- );
-
- _ = await Assert
- .That(async () => await outbox.StoreAsync(null!).ConfigureAwait(false))
- .Throws();
- }
-
- [Test]
- public async Task StoreAsync_WithLongCorrelationId_ThrowsInvalidOperationException(
- CancellationToken cancellationToken
- )
- {
- var options = new DbContextOptionsBuilder()
- .UseInMemoryDatabase(nameof(StoreAsync_WithLongCorrelationId_ThrowsInvalidOperationException))
- .Options;
- await using var context = new TestDbContext(options);
- var outbox = new EntityFrameworkOutbox(
- context,
- Options.Create(new OutboxOptions()),
- TimeProvider.System
- );
- var message = new TestEvent
- {
- CorrelationId = new string('x', OutboxMessageSchema.MaxLengths.CorrelationId + 1),
- };
-
- _ = await Assert
- .That(async () => await outbox.StoreAsync(message, cancellationToken).ConfigureAwait(false))
- .Throws();
- }
-
- private sealed record TestEvent : IEvent
- {
- public string? CorrelationId { get; set; }
- public string Id { get; init; } = Guid.NewGuid().ToString();
- public DateTimeOffset? PublishedAt { get; set; }
- }
-}
diff --git a/tests/NetEvolve.Pulse.Tests.Unit/PostgreSql/PostgreSqlEventOutboxTests.cs b/tests/NetEvolve.Pulse.Tests.Unit/PostgreSql/PostgreSqlEventOutboxTests.cs
deleted file mode 100644
index d16bfc9f..00000000
--- a/tests/NetEvolve.Pulse.Tests.Unit/PostgreSql/PostgreSqlEventOutboxTests.cs
+++ /dev/null
@@ -1,101 +0,0 @@
-namespace NetEvolve.Pulse.Tests.Unit.PostgreSql;
-
-using System;
-using System.Threading.Tasks;
-using Microsoft.Extensions.Options;
-using NetEvolve.Extensions.TUnit;
-using NetEvolve.Pulse.Extensibility;
-using NetEvolve.Pulse.Extensibility.Outbox;
-using NetEvolve.Pulse.Outbox;
-using Npgsql;
-using TUnit.Core;
-
-[TestGroup("PostgreSql")]
-public sealed class PostgreSqlEventOutboxTests
-{
- [Test]
- public async Task Constructor_WithNullConnection_ThrowsArgumentNullException() =>
- _ = await Assert
- .That(() => new PostgreSqlEventOutbox(null!, Options.Create(new OutboxOptions()), TimeProvider.System))
- .Throws();
-
- [Test]
- public async Task Constructor_WithNullOptions_ThrowsArgumentNullException()
- {
- await using var connection = new NpgsqlConnection("Host=localhost;");
-
- _ = await Assert
- .That(() => new PostgreSqlEventOutbox(connection, null!, TimeProvider.System))
- .Throws();
- }
-
- [Test]
- public async Task Constructor_WithNullTimeProvider_ThrowsArgumentNullException()
- {
- await using var connection = new NpgsqlConnection("Host=localhost;");
-
- _ = await Assert
- .That(() => new PostgreSqlEventOutbox(connection, Options.Create(new OutboxOptions()), null!))
- .Throws();
- }
-
- [Test]
- public async Task Constructor_WithValidArguments_CreatesInstance()
- {
- await using var connection = new NpgsqlConnection("Host=localhost;");
-
- var outbox = new PostgreSqlEventOutbox(connection, Options.Create(new OutboxOptions()), TimeProvider.System);
-
- _ = await Assert.That(outbox).IsNotNull();
- }
-
- [Test]
- public async Task Constructor_WithTransaction_CreatesInstance()
- {
- await using var connection = new NpgsqlConnection("Host=localhost;");
-
- var outbox = new PostgreSqlEventOutbox(
- connection,
- Options.Create(new OutboxOptions()),
- TimeProvider.System,
- transaction: null
- );
-
- _ = await Assert.That(outbox).IsNotNull();
- }
-
- [Test]
- public async Task StoreAsync_WithNullMessage_ThrowsArgumentNullException()
- {
- await using var connection = new NpgsqlConnection("Host=localhost;");
- var outbox = new PostgreSqlEventOutbox(connection, Options.Create(new OutboxOptions()), TimeProvider.System);
-
- _ = await Assert
- .That(async () => await outbox.StoreAsync(null!).ConfigureAwait(false))
- .Throws();
- }
-
- [Test]
- public async Task StoreAsync_WithLongCorrelationId_ThrowsInvalidOperationException(
- CancellationToken cancellationToken
- )
- {
- await using var connection = new NpgsqlConnection("Host=localhost;");
- var outbox = new PostgreSqlEventOutbox(connection, Options.Create(new OutboxOptions()), TimeProvider.System);
- var message = new TestEvent
- {
- CorrelationId = new string('x', OutboxMessageSchema.MaxLengths.CorrelationId + 1),
- };
-
- _ = await Assert
- .That(async () => await outbox.StoreAsync(message, cancellationToken).ConfigureAwait(false))
- .Throws();
- }
-
- private sealed record TestEvent : IEvent
- {
- public string? CorrelationId { get; set; }
- public string Id { get; init; } = Guid.NewGuid().ToString();
- public DateTimeOffset? PublishedAt { get; set; }
- }
-}
diff --git a/tests/NetEvolve.Pulse.Tests.Unit/SQLite/SQLiteEventOutboxTests.cs b/tests/NetEvolve.Pulse.Tests.Unit/SQLite/SQLiteEventOutboxTests.cs
deleted file mode 100644
index 93fa6fbe..00000000
--- a/tests/NetEvolve.Pulse.Tests.Unit/SQLite/SQLiteEventOutboxTests.cs
+++ /dev/null
@@ -1,294 +0,0 @@
-namespace NetEvolve.Pulse.Tests.Unit.SQLite;
-
-using System;
-using System.Reflection;
-using System.Reflection.Emit;
-using System.Threading.Tasks;
-using Microsoft.Data.Sqlite;
-using Microsoft.Extensions.Options;
-using NetEvolve.Extensions.TUnit;
-using NetEvolve.Pulse.Extensibility;
-using NetEvolve.Pulse.Extensibility.Outbox;
-using NetEvolve.Pulse.Outbox;
-using TUnit.Core;
-
-[TestGroup("SQLite")]
-public sealed class SQLiteEventOutboxTests
-{
- [Test]
- public async Task Constructor_WithNullConnection_ThrowsArgumentNullException() =>
- _ = await Assert
- .That(() => new SQLiteEventOutbox(null!, Options.Create(new OutboxOptions()), TimeProvider.System))
- .Throws();
-
- [Test]
- public async Task Constructor_WithNullOptions_ThrowsArgumentNullException()
- {
- await using var connection = new SqliteConnection("Data Source=:memory:");
-
- _ = await Assert
- .That(() => new SQLiteEventOutbox(connection, null!, TimeProvider.System))
- .Throws();
- }
-
- [Test]
- public async Task Constructor_WithNullTimeProvider_ThrowsArgumentNullException()
- {
- await using var connection = new SqliteConnection("Data Source=:memory:");
-
- _ = await Assert
- .That(() => new SQLiteEventOutbox(connection, Options.Create(new OutboxOptions()), null!))
- .Throws();
- }
-
- [Test]
- public async Task Constructor_WithValidArguments_CreatesInstance()
- {
- await using var connection = new SqliteConnection("Data Source=:memory:");
-
- var outbox = new SQLiteEventOutbox(connection, Options.Create(new OutboxOptions()), TimeProvider.System);
-
- _ = await Assert.That(outbox).IsNotNull();
- }
-
- [Test]
- public async Task Constructor_WithTransaction_CreatesInstance()
- {
- await using var connection = new SqliteConnection("Data Source=:memory:");
-
- var outbox = new SQLiteEventOutbox(
- connection,
- Options.Create(new OutboxOptions()),
- TimeProvider.System,
- transaction: null
- );
-
- _ = await Assert.That(outbox).IsNotNull();
- }
-
- [Test]
- public async Task StoreAsync_WithNullMessage_ThrowsArgumentNullException()
- {
- await using var connection = new SqliteConnection("Data Source=:memory:");
- var outbox = new SQLiteEventOutbox(connection, Options.Create(new OutboxOptions()), TimeProvider.System);
-
- _ = await Assert
- .That(async () => await outbox.StoreAsync(null!).ConfigureAwait(false))
- .Throws();
- }
-
- [Test]
- public async Task StoreAsync_WithLongCorrelationId_ThrowsInvalidOperationException(
- CancellationToken cancellationToken
- )
- {
- await using var connection = new SqliteConnection("Data Source=:memory:");
- var outbox = new SQLiteEventOutbox(connection, Options.Create(new OutboxOptions()), TimeProvider.System);
- var message = new TestEvent
- {
- CorrelationId = new string('x', OutboxMessageSchema.MaxLengths.CorrelationId + 1),
- };
-
- _ = await Assert
- .That(async () => await outbox.StoreAsync(message, cancellationToken).ConfigureAwait(false))
- .Throws();
- }
-
- [Test]
- public async Task StoreAsync_WithValidEvent_PersistsRow(CancellationToken cancellationToken)
- {
- var connectionString = $"Data Source=store_{Guid.NewGuid():N};Mode=Memory;Cache=Shared";
- await using var keepAlive = new SqliteConnection(connectionString);
- await keepAlive.OpenAsync(cancellationToken).ConfigureAwait(false);
-
- await using (
- var create = new SqliteCommand(
- """
- CREATE TABLE "OutboxMessage"(
- "Id" TEXT NOT NULL,
- "EventType" TEXT NOT NULL,
- "Payload" TEXT NOT NULL,
- "CorrelationId" TEXT NULL,
- "CreatedAt" TEXT NOT NULL,
- "UpdatedAt" TEXT NOT NULL,
- "ProcessedAt" TEXT NULL,
- "NextRetryAt" TEXT NULL,
- "RetryCount" INTEGER NOT NULL DEFAULT 0,
- "Error" TEXT NULL,
- "Status" INTEGER NOT NULL DEFAULT 0,
- CONSTRAINT "PK_OutboxMessage" PRIMARY KEY ("Id")
- );
- """,
- keepAlive
- )
- )
- {
- _ = await create.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
- }
-
- await using var connection = new SqliteConnection(connectionString);
- await connection.OpenAsync(cancellationToken).ConfigureAwait(false);
-
- var outbox = new SQLiteEventOutbox(
- connection,
- Options.Create(new OutboxOptions { ConnectionString = connectionString }),
- TimeProvider.System
- );
-
- var evt = new TestEvent { CorrelationId = "corr" };
-
- await outbox.StoreAsync(evt, cancellationToken).ConfigureAwait(false);
-
- await using var cmd = new SqliteCommand(
- "SELECT \"EventType\",\"CorrelationId\",\"Status\",\"Payload\" FROM \"OutboxMessage\" WHERE \"Id\" = @Id",
- keepAlive
- );
- _ = cmd.Parameters.AddWithValue("@Id", evt.Id);
- await using var reader = await cmd.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
- _ = await reader.ReadAsync(cancellationToken).ConfigureAwait(false);
-
- using (Assert.Multiple())
- {
- _ = await Assert.That(reader.GetString(0)).IsEqualTo(evt.GetType().AssemblyQualifiedName);
- _ = await Assert.That(reader.GetString(1)).IsEqualTo("corr");
- _ = await Assert.That(reader.GetInt64(2)).IsEqualTo((long)OutboxMessageStatus.Pending);
- _ = await Assert.That(reader.GetString(3)).IsNotNullOrWhiteSpace();
- }
- }
-
- [Test]
- public async Task StoreAsync_WithOversizedEventType_ThrowsInvalidOperationException(
- CancellationToken cancellationToken
- )
- {
- var connectionString = $"Data Source=type_{Guid.NewGuid():N};Mode=Memory;Cache=Shared";
- await using var keepAlive = new SqliteConnection(connectionString);
- await keepAlive.OpenAsync(cancellationToken).ConfigureAwait(false);
- await using (
- var create = new SqliteCommand(
- """
- CREATE TABLE "OutboxMessage"(
- "Id" TEXT NOT NULL,
- "EventType" TEXT NOT NULL,
- "Payload" TEXT NOT NULL,
- "CorrelationId" TEXT NULL,
- "CreatedAt" TEXT NOT NULL,
- "UpdatedAt" TEXT NOT NULL,
- "ProcessedAt" TEXT NULL,
- "NextRetryAt" TEXT NULL,
- "RetryCount" INTEGER NOT NULL DEFAULT 0,
- "Error" TEXT NULL,
- "Status" INTEGER NOT NULL DEFAULT 0,
- CONSTRAINT "PK_OutboxMessage" PRIMARY KEY ("Id")
- );
- """,
- keepAlive
- )
- )
- {
- _ = await create.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
- }
-
- await using var connection = new SqliteConnection(connectionString);
- await connection.OpenAsync(cancellationToken).ConfigureAwait(false);
-
- var outbox = new SQLiteEventOutbox(
- connection,
- Options.Create(new OutboxOptions { ConnectionString = connectionString }),
- TimeProvider.System
- );
-
- var longEvent = CreateLongTypeEvent();
-
- _ = await Assert
- .That(async () => await outbox.StoreAsync(longEvent, cancellationToken).ConfigureAwait(false))
- .Throws();
- }
-
- private sealed record TestEvent : IEvent
- {
- public string? CorrelationId { get; set; }
- public string Id { get; init; } = Guid.NewGuid().ToString();
- public DateTimeOffset? PublishedAt { get; set; }
- }
-
- private static IEvent CreateLongTypeEvent()
- {
- var assemblyName = new AssemblyName($"DynEvt_{Guid.NewGuid():N}");
- var assemblyBuilder = AssemblyBuilder.DefineDynamicAssembly(assemblyName, AssemblyBuilderAccess.Run);
- var moduleBuilder = assemblyBuilder.DefineDynamicModule("dyn");
- var typeName = new string('E', OutboxMessageSchema.MaxLengths.EventType + 10);
- var typeBuilder = moduleBuilder.DefineType(
- typeName,
- TypeAttributes.Public | TypeAttributes.Class | TypeAttributes.Sealed,
- typeof(object),
- [typeof(IEvent)]
- );
-
- var idField = typeBuilder.DefineField("_id", typeof(string), FieldAttributes.Private);
- var corrField = typeBuilder.DefineField("_corr", typeof(string), FieldAttributes.Private);
- var publishedField = typeBuilder.DefineField("_pub", typeof(DateTimeOffset?), FieldAttributes.Private);
-
- DefineAutoProperty(typeBuilder, idField, nameof(IEvent.Id), typeof(string));
- DefineAutoProperty(typeBuilder, corrField, nameof(IEvent.CorrelationId), typeof(string));
- DefineAutoProperty(typeBuilder, publishedField, nameof(IEvent.PublishedAt), typeof(DateTimeOffset?));
-
- var ctor = typeBuilder.DefineConstructor(MethodAttributes.Public, CallingConventions.Standard, Type.EmptyTypes);
- var il = ctor.GetILGenerator();
- il.Emit(OpCodes.Ldarg_0);
- il.Emit(OpCodes.Call, typeof(object).GetConstructor(Type.EmptyTypes)!);
- il.Emit(OpCodes.Ldarg_0);
- il.Emit(OpCodes.Ldstr, Guid.NewGuid().ToString());
- il.Emit(OpCodes.Stfld, idField);
- il.Emit(OpCodes.Ret);
-
- var eventType = typeBuilder.CreateType()!;
- return (IEvent)Activator.CreateInstance(eventType)!;
- }
-
- private static void DefineAutoProperty(
- TypeBuilder typeBuilder,
- FieldBuilder backingField,
- string propertyName,
- Type propertyType
- )
- {
- var propertyBuilder = typeBuilder.DefineProperty(
- propertyName,
- PropertyAttributes.None,
- propertyType,
- Type.EmptyTypes
- );
-
- var getMethod = typeBuilder.DefineMethod(
- $"get_{propertyName}",
- MethodAttributes.Public
- | MethodAttributes.Virtual
- | MethodAttributes.SpecialName
- | MethodAttributes.HideBySig,
- propertyType,
- Type.EmptyTypes
- );
- var getIl = getMethod.GetILGenerator();
- getIl.Emit(OpCodes.Ldarg_0);
- getIl.Emit(OpCodes.Ldfld, backingField);
- getIl.Emit(OpCodes.Ret);
- propertyBuilder.SetGetMethod(getMethod);
-
- var setMethod = typeBuilder.DefineMethod(
- $"set_{propertyName}",
- MethodAttributes.Public
- | MethodAttributes.Virtual
- | MethodAttributes.SpecialName
- | MethodAttributes.HideBySig,
- null,
- [propertyType]
- );
- var setIl = setMethod.GetILGenerator();
- setIl.Emit(OpCodes.Ldarg_0);
- setIl.Emit(OpCodes.Ldarg_1);
- setIl.Emit(OpCodes.Stfld, backingField);
- setIl.Emit(OpCodes.Ret);
- propertyBuilder.SetSetMethod(setMethod);
- }
-}
diff --git a/tests/NetEvolve.Pulse.Tests.Unit/SqlServer/SqlServerEventOutboxTests.cs b/tests/NetEvolve.Pulse.Tests.Unit/SqlServer/SqlServerEventOutboxTests.cs
deleted file mode 100644
index 66fb573e..00000000
--- a/tests/NetEvolve.Pulse.Tests.Unit/SqlServer/SqlServerEventOutboxTests.cs
+++ /dev/null
@@ -1,209 +0,0 @@
-namespace NetEvolve.Pulse.Tests.Unit.SqlServer;
-
-using System;
-using System.Threading.Tasks;
-using Microsoft.Data.SqlClient;
-using Microsoft.Extensions.Options;
-using NetEvolve.Extensions.TUnit;
-using NetEvolve.Pulse.Extensibility;
-using NetEvolve.Pulse.Extensibility.Outbox;
-using NetEvolve.Pulse.Outbox;
-using TUnit.Core;
-using TUnit.Mocks;
-
-[TestGroup("SqlServer")]
-public sealed class SqlServerEventOutboxTests
-{
- [Test]
- public async Task Constructor_WithNullConnection_ThrowsArgumentNullException() =>
- _ = await Assert
- .That(() => new SqlServerEventOutbox(null!, Options.Create(new OutboxOptions()), TimeProvider.System))
- .Throws();
-
- [Test]
- public async Task Constructor_WithNullOptions_ThrowsArgumentNullException()
- {
- await using var connection = new SqlConnection("Server=.;Encrypt=true;");
-
- _ = await Assert
- .That(() => new SqlServerEventOutbox(connection, null!, TimeProvider.System))
- .Throws();
- }
-
- [Test]
- public async Task Constructor_WithNullTimeProvider_ThrowsArgumentNullException()
- {
- await using var connection = new SqlConnection("Server=.;Encrypt=true;");
-
- _ = await Assert
- .That(() => new SqlServerEventOutbox(connection, Options.Create(new OutboxOptions()), null!))
- .Throws();
- }
-
- [Test]
- public async Task Constructor_WithValidArguments_CreatesInstance()
- {
- await using var connection = new SqlConnection("Server=.;Encrypt=true;");
-
- var outbox = new SqlServerEventOutbox(connection, Options.Create(new OutboxOptions()), TimeProvider.System);
-
- _ = await Assert.That(outbox).IsNotNull();
- }
-
- [Test]
- public async Task Constructor_WithTransaction_CreatesInstance()
- {
- await using var connection = new SqlConnection("Server=.;Encrypt=true;");
-
- var outbox = new SqlServerEventOutbox(
- connection,
- Options.Create(new OutboxOptions()),
- TimeProvider.System,
- transaction: null
- );
-
- _ = await Assert.That(outbox).IsNotNull();
- }
-
- [Test]
- public async Task Constructor_DiConstructor_WithNullOptions_ThrowsArgumentNullException() =>
- _ = await Assert
- .That(() => new SqlServerEventOutbox((IOptions)null!, TimeProvider.System))
- .Throws();
-
- [Test]
- public async Task Constructor_DiConstructor_WithNullTimeProvider_ThrowsArgumentNullException() =>
- _ = await Assert
- .That(() =>
- new SqlServerEventOutbox(
- Options.Create(new OutboxOptions { ConnectionString = "Server=.;Encrypt=true;" }),
- null!
- )
- )
- .Throws();
-
- [Test]
- public async Task Constructor_DiConstructor_WithEmptyConnectionString_ThrowsArgumentException() =>
- _ = await Assert
- .That(() =>
- new SqlServerEventOutbox(
- Options.Create(new OutboxOptions { ConnectionString = string.Empty }),
- TimeProvider.System
- )
- )
- .Throws();
-
- [Test]
- public async Task Constructor_DiConstructor_WithValidConnectionString_CreatesInstance()
- {
- var outbox = new SqlServerEventOutbox(
- Options.Create(new OutboxOptions { ConnectionString = "Server=.;Encrypt=true;" }),
- TimeProvider.System
- );
-
- _ = await Assert.That(outbox).IsNotNull();
- }
-
- [Test]
- public async Task Constructor_DiConstructor_WithTransactionScope_CreatesInstance()
- {
- var transactionScope = Mock.Of();
- _ = transactionScope.GetCurrentTransaction().Returns(null!);
-
- var outbox = new SqlServerEventOutbox(
- Options.Create(new OutboxOptions { ConnectionString = "Server=.;Encrypt=true;" }),
- TimeProvider.System,
- transactionScope.Object
- );
-
- _ = await Assert.That(outbox).IsNotNull();
- }
-
- [Test]
- public async Task StoreAsync_WithNullMessage_ThrowsArgumentNullException()
- {
- await using var connection = new SqlConnection("Server=.;Encrypt=true;");
- var outbox = new SqlServerEventOutbox(connection, Options.Create(new OutboxOptions()), TimeProvider.System);
-
- _ = await Assert
- .That(async () => await outbox.StoreAsync(null!).ConfigureAwait(false))
- .Throws();
- }
-
- [Test]
- public async Task StoreAsync_WithLongCorrelationId_ThrowsInvalidOperationException(
- CancellationToken cancellationToken
- )
- {
- await using var connection = new SqlConnection("Server=.;Encrypt=true;");
- var outbox = new SqlServerEventOutbox(connection, Options.Create(new OutboxOptions()), TimeProvider.System);
- var message = new TestEvent
- {
- CorrelationId = new string('x', OutboxMessageSchema.MaxLengths.CorrelationId + 1),
- };
-
- _ = await Assert
- .That(async () => await outbox.StoreAsync(message, cancellationToken).ConfigureAwait(false))
- .Throws();
- }
-
- [Test]
- public async Task StoreAsync_DiConstructor_WithNullMessage_ThrowsArgumentNullException()
- {
- var outbox = new SqlServerEventOutbox(
- Options.Create(new OutboxOptions { ConnectionString = "Server=.;Encrypt=true;" }),
- TimeProvider.System
- );
-
- _ = await Assert
- .That(async () => await outbox.StoreAsync(null!).ConfigureAwait(false))
- .Throws();
- }
-
- [Test]
- public async Task StoreAsync_DiConstructor_WithLongCorrelationId_ThrowsInvalidOperationException(
- CancellationToken cancellationToken
- )
- {
- var outbox = new SqlServerEventOutbox(
- Options.Create(new OutboxOptions { ConnectionString = "Server=.;Encrypt=true;" }),
- TimeProvider.System
- );
- var message = new TestEvent
- {
- CorrelationId = new string('x', OutboxMessageSchema.MaxLengths.CorrelationId + 1),
- };
-
- _ = await Assert
- .That(async () => await outbox.StoreAsync(message, cancellationToken).ConfigureAwait(false))
- .Throws();
- }
-
- [Test]
- public async Task StoreAsync_DiConstructor_WithInvalidTransactionType_ThrowsInvalidOperationException(
- CancellationToken cancellationToken
- )
- {
- var transactionScope = Mock.Of();
- _ = transactionScope.GetCurrentTransaction().Returns(new object()); // not a SqlTransaction
-
- var outbox = new SqlServerEventOutbox(
- Options.Create(new OutboxOptions { ConnectionString = "Server=.;Encrypt=true;" }),
- TimeProvider.System,
- transactionScope.Object
- );
-
- var message = new TestEvent();
-
- _ = await Assert
- .That(async () => await outbox.StoreAsync(message, cancellationToken).ConfigureAwait(false))
- .Throws();
- }
-
- private sealed record TestEvent : IEvent
- {
- public string? CorrelationId { get; set; }
- public string Id { get; init; } = Guid.NewGuid().ToString();
- public DateTimeOffset? PublishedAt { get; set; }
- }
-}
diff --git a/tests/NetEvolve.Pulse.Tests.Unit/SqlServer/SqlServerExtensionsTests.cs b/tests/NetEvolve.Pulse.Tests.Unit/SqlServer/SqlServerExtensionsTests.cs
index 07d04eaa..c699deed 100644
--- a/tests/NetEvolve.Pulse.Tests.Unit/SqlServer/SqlServerExtensionsTests.cs
+++ b/tests/NetEvolve.Pulse.Tests.Unit/SqlServer/SqlServerExtensionsTests.cs
@@ -78,7 +78,7 @@ public async Task AddSqlServerOutbox_WithValidConnectionString_RegistersEventOut
{
_ = await Assert.That(descriptor).IsNotNull();
_ = await Assert.That(descriptor!.Lifetime).IsEqualTo(ServiceLifetime.Scoped);
- _ = await Assert.That(descriptor!.ImplementationType).IsEqualTo(typeof(SqlServerEventOutbox));
+ _ = await Assert.That(descriptor!.ImplementationType).IsEqualTo(typeof(OutboxEventStore));
}
}
@@ -161,7 +161,7 @@ public async Task AddSqlServerOutbox_WithFactory_RegistersEventOutboxAsScoped()
{
_ = await Assert.That(descriptor).IsNotNull();
_ = await Assert.That(descriptor!.Lifetime).IsEqualTo(ServiceLifetime.Scoped);
- _ = await Assert.That(descriptor!.ImplementationType).IsEqualTo(typeof(SqlServerEventOutbox));
+ _ = await Assert.That(descriptor!.ImplementationType).IsEqualTo(typeof(OutboxEventStore));
}
}