Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 194 additions & 2 deletions src/NetEvolve.Pulse.SQLite/SQLiteExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ namespace NetEvolve.Pulse;

using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Options;
using NetEvolve.Pulse.Extensibility;
using NetEvolve.Pulse.Extensibility.Outbox;
using NetEvolve.Pulse.Outbox;
Expand All @@ -11,6 +12,184 @@ namespace NetEvolve.Pulse;
/// </summary>
public static class SQLiteExtensions
{
/// <summary>
/// Adds SQLite outbox persistence using ADO.NET.
/// </summary>
/// <param name="configurator">The mediator configurator.</param>
/// <param name="connectionString">The SQLite connection string (e.g., <c>"Data Source=outbox.db"</c>).</param>
/// <param name="configureOptions">Optional action to further configure <see cref="OutboxOptions"/>.</param>
/// <returns>The configurator for chaining.</returns>
/// <remarks>
/// <para><strong>Prerequisites:</strong></para>
/// Execute the schema script from <c>Scripts/001_CreateOutboxTable.sql</c> to create the required
/// database objects before using this provider.
/// <para><strong>Registered Services:</strong></para>
/// <list type="bullet">
/// <item><description><see cref="IEventOutbox"/> as <see cref="OutboxEventStore"/> (Scoped)</description></item>
/// <item><description><see cref="IOutboxRepository"/> as <see cref="SQLiteOutboxRepository"/> (Scoped)</description></item>
/// <item><description><see cref="IOutboxManagement"/> as <see cref="SQLiteOutboxManagement"/> (Scoped)</description></item>
/// <item><description><see cref="TimeProvider"/> (Singleton, if not already registered)</description></item>
/// </list>
/// <para><strong>Note:</strong></para>
/// Core outbox services are registered automatically; calling
/// <see cref="OutboxExtensions.AddOutbox"/> before this method is optional but harmless.
/// </remarks>
/// <example>
/// <code>
/// services.AddPulse(config => config
/// .AddSQLiteOutbox("Data Source=outbox.db")
/// );
/// </code>
/// </example>
public static IMediatorBuilder AddSQLiteOutbox(
this IMediatorBuilder configurator,
string connectionString,
Action<OutboxOptions>? configureOptions = null
)
{
ArgumentNullException.ThrowIfNull(configurator);
ArgumentException.ThrowIfNullOrWhiteSpace(connectionString);

return configurator.AddSQLiteOutbox(opts =>
{
opts.ConnectionString = connectionString;
configureOptions?.Invoke(opts);
});
}

/// <summary>
/// Adds SQLite outbox persistence with a connection string provider factory.
/// </summary>
/// <param name="configurator">The mediator configurator.</param>
/// <param name="connectionStringFactory">Factory function to resolve the connection string from the <see cref="IServiceProvider"/>.</param>
/// <param name="configureOptions">Optional action to configure additional <see cref="OutboxOptions"/> settings.</param>
/// <returns>The configurator for chaining.</returns>
/// <remarks>
/// Use this overload when the connection string needs to be resolved from configuration
/// or other services at runtime.
/// <para><strong>Prerequisites:</strong></para>
/// Execute the schema script from <c>Scripts/001_CreateOutboxTable.sql</c> to create the required
/// database objects before using this provider.
/// <para><strong>Registered Services:</strong></para>
/// <list type="bullet">
/// <item><description><see cref="IEventOutbox"/> as <see cref="OutboxEventStore"/> (Scoped)</description></item>
/// <item><description><see cref="IOutboxRepository"/> as <see cref="SQLiteOutboxRepository"/> (Scoped)</description></item>
/// <item><description><see cref="IOutboxManagement"/> as <see cref="SQLiteOutboxManagement"/> (Scoped)</description></item>
/// <item><description><see cref="TimeProvider"/> (Singleton, if not already registered)</description></item>
/// </list>
/// <para><strong>Note:</strong></para>
/// Core outbox services are registered automatically; calling
/// <see cref="OutboxExtensions.AddOutbox"/> before this method is optional but harmless.
/// </remarks>
/// <example>
/// <code>
/// services.AddPulse(config => config
/// .AddSQLiteOutbox(
/// sp => sp.GetRequiredService&lt;IConfiguration&gt;().GetConnectionString("Outbox")!,
/// options => options.EnableWalMode = false)
/// );
/// </code>
/// </example>
public static IMediatorBuilder AddSQLiteOutbox(
this IMediatorBuilder configurator,
Func<IServiceProvider, string> connectionStringFactory,
Action<OutboxOptions>? configureOptions = null
)
{
ArgumentNullException.ThrowIfNull(configurator);
ArgumentNullException.ThrowIfNull(connectionStringFactory);

var services = configurator.Services;

if (configureOptions is not null)
{
_ = services.Configure(configureOptions);
}

_ = services.AddSingleton<IConfigureOptions<OutboxOptions>>(sp => new ConfigureOptions<OutboxOptions>(o =>
o.ConnectionString = connectionStringFactory(sp)
));

return configurator.RegisterSQLiteOutboxServices();
}

/// <summary>
/// Adds SQLite outbox persistence using ADO.NET with a full options configuration action.
/// </summary>
/// <param name="configurator">The mediator configurator.</param>
/// <param name="configureOptions">Action to configure <see cref="OutboxOptions"/>.</param>
/// <returns>The configurator for chaining.</returns>
/// <remarks>
/// <para><strong>Prerequisites:</strong></para>
/// Execute the schema script from <c>Scripts/001_CreateOutboxTable.sql</c> to create the required
/// database objects before using this provider.
/// <para><strong>Registered Services:</strong></para>
/// <list type="bullet">
/// <item><description><see cref="IEventOutbox"/> as <see cref="OutboxEventStore"/> (Scoped)</description></item>
/// <item><description><see cref="IOutboxRepository"/> as <see cref="SQLiteOutboxRepository"/> (Scoped)</description></item>
/// <item><description><see cref="IOutboxManagement"/> as <see cref="SQLiteOutboxManagement"/> (Scoped)</description></item>
/// <item><description><see cref="TimeProvider"/> (Singleton, if not already registered)</description></item>
/// </list>
/// <para><strong>Note:</strong></para>
/// Core outbox services are registered automatically; calling
/// <see cref="OutboxExtensions.AddOutbox"/> before this method is optional but harmless.
/// </remarks>
/// <example>
/// <code>
/// services.AddPulse(config => config
/// .AddSQLiteOutbox(opts =>
/// {
/// opts.ConnectionString = "Data Source=outbox.db";
/// opts.EnableWalMode = true;
/// })
/// );
/// </code>
/// </example>
public static IMediatorBuilder AddSQLiteOutbox(
this IMediatorBuilder configurator,
Action<OutboxOptions> configureOptions
)
{
ArgumentNullException.ThrowIfNull(configurator);
ArgumentNullException.ThrowIfNull(configureOptions);

_ = configurator.Services.Configure(configureOptions);

return configurator.RegisterSQLiteOutboxServices();
}

/// <summary>
/// Registers a unit-of-work type as <see cref="IOutboxTransactionScope"/> (Scoped) so that
/// <see cref="OutboxEventStore"/> can enlist in the caller's transaction automatically.
/// </summary>
/// <typeparam name="TUnitOfWork">
/// A type that implements both the application unit-of-work contract and <see cref="IOutboxTransactionScope"/>.
/// </typeparam>
/// <param name="configurator">The mediator configurator.</param>
/// <returns>The configurator for chaining.</returns>
/// <remarks>
/// Call this method after <see cref="AddSQLiteOutbox(IMediatorBuilder, string, Action{OutboxOptions}?)"/>
/// to wire up your unit-of-work so that <see cref="OutboxEventStore"/> automatically
/// enlists in any active <see cref="Microsoft.Data.Sqlite.SqliteTransaction"/> owned by the unit-of-work.
/// </remarks>
/// <example>
/// <code>
/// services.AddPulse(config => config
/// .AddSQLiteOutbox("Data Source=outbox.db")
/// .AddSQLiteOutboxTransactionScope&lt;MyUnitOfWork&gt;()
/// );
/// </code>
/// </example>
public static IMediatorBuilder AddSQLiteOutboxTransactionScope<TUnitOfWork>(this IMediatorBuilder configurator)
where TUnitOfWork : class, IOutboxTransactionScope
{
ArgumentNullException.ThrowIfNull(configurator);

_ = configurator.Services.AddScoped<IOutboxTransactionScope, TUnitOfWork>();

return configurator;
}

/// <summary>
/// Adds SQLite outbox persistence using ADO.NET with a connection string shorthand.
/// </summary>
Expand Down Expand Up @@ -106,7 +285,7 @@ Action<OutboxOptions> configureOptions
// Register the repository
_ = services.AddScoped<IOutboxRepository>(sp =>
{
var options = sp.GetRequiredService<Microsoft.Extensions.Options.IOptions<OutboxOptions>>();
var options = sp.GetRequiredService<IOptions<OutboxOptions>>();
var timeProvider = sp.GetRequiredService<TimeProvider>();
var transactionScope = sp.GetService<IOutboxTransactionScope>();

Expand All @@ -116,12 +295,25 @@ Action<OutboxOptions> configureOptions
// Register the management API
_ = services.AddScoped<IOutboxManagement>(sp =>
{
var options = sp.GetRequiredService<Microsoft.Extensions.Options.IOptions<OutboxOptions>>();
var options = sp.GetRequiredService<IOptions<OutboxOptions>>();
var timeProvider = sp.GetRequiredService<TimeProvider>();

return new SQLiteOutboxManagement(options, timeProvider);
});

return configurator;
}

private static IMediatorBuilder RegisterSQLiteOutboxServices(this IMediatorBuilder configurator)
{
// AddOutbox() uses TryAdd* internally, so this call is safe even when AddOutbox() was already invoked.
_ = configurator
.AddOutbox()
.Services.RemoveAll<IOutboxRepository>()
.AddScoped<IOutboxRepository, SQLiteOutboxRepository>()
.RemoveAll<IOutboxManagement>()
.AddScoped<IOutboxManagement, SQLiteOutboxManagement>();

return configurator;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
namespace NetEvolve.Pulse.Tests.Integration.Internals;

using System.Diagnostics.CodeAnalysis;
using Microsoft.Data.Sqlite;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using NetEvolve.Pulse.Extensibility;
using NetEvolve.Pulse.Extensibility.Outbox;
using NetEvolve.Pulse.Outbox;

[SuppressMessage(
"Security",
"CA2100:Review SQL queries for security vulnerabilities",
Justification = "SQL is constructed from validated OutboxOptions.TableName property, not user input."
)]
public sealed class SQLiteAdoNetInitializer : IDatabaseInitializer
{
public void Configure(IMediatorBuilder mediatorBuilder, IDatabaseServiceFixture databaseService)
{
ArgumentNullException.ThrowIfNull(mediatorBuilder);
ArgumentNullException.ThrowIfNull(databaseService);
_ = mediatorBuilder.AddSQLiteOutbox(databaseService.ConnectionString);
}

public async ValueTask CreateDatabaseAsync(IServiceProvider serviceProvider, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(serviceProvider);
var options = serviceProvider.GetRequiredService<IOptions<OutboxOptions>>().Value;

var connectionString =
options.ConnectionString
?? throw new InvalidOperationException("OutboxOptions.ConnectionString is not configured.");

var tableName = string.IsNullOrWhiteSpace(options.TableName)
? OutboxMessageSchema.DefaultTableName
: options.TableName;

var quotedTable = $"\"{tableName}\"";
var quotedPk = $"\"PK_{tableName}\"";
var quotedIdx1 = $"\"IX_{tableName}_Status_CreatedAt\"";
var quotedIdx2 = $"\"IX_{tableName}_Status_ProcessedAt\"";

var createTableSql = $"""
PRAGMA journal_mode=WAL;
CREATE TABLE IF NOT EXISTS {quotedTable} (
"Id" TEXT NOT NULL,
"EventType" TEXT NOT NULL,
"Payload" TEXT NOT NULL,
"CorrelationId" TEXT NULL,
"CreatedAt" TEXT NOT NULL,
"UpdatedAt" TEXT NOT NULL,
"ProcessedAt" TEXT NULL,
"NextRetryAt" TEXT NULL,
"RetryCount" INTEGER NOT NULL DEFAULT 0,
"Error" TEXT NULL,
"Status" INTEGER NOT NULL DEFAULT 0,
CONSTRAINT {quotedPk} PRIMARY KEY ("Id")
);
CREATE INDEX IF NOT EXISTS {quotedIdx1}
ON {quotedTable} ("Status", "CreatedAt") WHERE "Status" IN (0, 3);
CREATE INDEX IF NOT EXISTS {quotedIdx2}
ON {quotedTable} ("Status", "ProcessedAt") WHERE "Status" = 2;
""";

await using var connection = new SqliteConnection(connectionString);
await connection.OpenAsync(cancellationToken).ConfigureAwait(false);

await using var command = new SqliteCommand(createTableSql, connection);
_ = await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}

public void Initialize(IServiceCollection services, IDatabaseServiceFixture databaseService) { }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
namespace NetEvolve.Pulse.Tests.Integration.Outbox;

using NetEvolve.Extensions.TUnit;
using NetEvolve.Pulse.Tests.Integration.Internals;

[ClassDataSource<SQLiteDatabaseServiceFixture, SQLiteAdoNetInitializer>(Shared = [SharedType.None, SharedType.None])]
[TestGroup("SQLite")]
[TestGroup("AdoNet")]
[InheritsTests]
public class SQLiteAdoNetOutboxTests(
IDatabaseServiceFixture databaseServiceFixture,
IDatabaseInitializer databaseInitializer
) : OutboxTestsBase(databaseServiceFixture, databaseInitializer);
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[
{
CreatedAt: DateTimeOffset_1,
EventType: OutboxTestsBase.TestEvent,
Id: Guid_1,
Payload: {"CorrelationId":null,"Id":"Test000","PublishedAt":"2025-01-01T12:00:00+00:00"},
Status: Processing,
UpdatedAt: DateTimeOffset_1
},
{
CreatedAt: DateTimeOffset_1,
EventType: OutboxTestsBase.TestEvent,
Id: Guid_2,
Payload: {"CorrelationId":null,"Id":"Test001","PublishedAt":"2025-01-01T12:00:00+00:00"},
Status: Processing,
UpdatedAt: DateTimeOffset_1
},
{
CreatedAt: DateTimeOffset_1,
EventType: OutboxTestsBase.TestEvent,
Id: Guid_3,
Payload: {"CorrelationId":null,"Id":"Test002","PublishedAt":"2025-01-01T12:00:00+00:00"},
Status: Processing,
UpdatedAt: DateTimeOffset_1
}
]
Loading
Loading