Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ internal sealed class PostgreSqlOutboxRepository : IOutboxRepository
/// <summary>Cached SQL for inserting into Outbox Table.</summary>
private readonly string _insertSql;

/// <summary>Cached SQL for counting pending outbox messages.</summary>
private readonly string _getPendingCountSql;

/// <summary>
/// Initializes a new instance of the <see cref="PostgreSqlOutboxRepository"/> class.
/// </summary>
Expand All @@ -90,11 +93,14 @@ public PostgreSqlOutboxRepository(
: options.Value.Schema;
_getPendingSql = $"SELECT * FROM \"{schema}\".get_pending_outbox_messages(@batch_size)";
_getFailedForRetrySql =
$"SELECT * FROM \"{schema}\".get_failed_outbox_messages_for_retry(@max_retry_count, @batch_size)";
_markCompletedSql = $"SELECT \"{schema}\".mark_outbox_message_completed(@message_id)";
$"SELECT * FROM \"{schema}\".get_failed_outbox_messages_for_retry(@max_retry_count, @batch_size, @now_utc)";
_markCompletedSql =
$"SELECT \"{schema}\".mark_outbox_message_completed(@message_id, @processed_at, @updated_at)";
_markFailedSql = $"SELECT \"{schema}\".mark_outbox_message_failed(@message_id, @error, @next_retry_at)";
_markDeadLetterSql = $"SELECT \"{schema}\".mark_outbox_message_dead_letter(@message_id, @error)";
_deleteCompletedSql = $"SELECT \"{schema}\".delete_completed_outbox_messages(@older_than_utc)";
_getPendingCountSql =
$"SELECT COUNT(*) FROM {options.Value.FullTableName} WHERE \"{OutboxMessageSchema.Columns.Status}\" = 0";

_insertSql = $"""
INSERT INTO {options.Value.FullTableName}
Expand Down Expand Up @@ -156,33 +162,71 @@ public async Task<IReadOnlyList<OutboxMessage>> GetPendingAsync(
return await ReadMessagesAsync(command, cancellationToken).ConfigureAwait(false);
}

/// <inheritdoc />
public async Task<long> GetPendingCountAsync(CancellationToken cancellationToken = default)
{
await using var connection = await CreateConnectionAsync(cancellationToken).ConfigureAwait(false);
await using var command = new NpgsqlCommand(_getPendingCountSql, connection);

var result = await command.ExecuteScalarAsync(cancellationToken).ConfigureAwait(false);
return result is long count
? count
: Convert.ToInt64(result, System.Globalization.CultureInfo.InvariantCulture);
}

/// <inheritdoc />
public async Task<IReadOnlyList<OutboxMessage>> GetFailedForRetryAsync(
int maxRetryCount,
int batchSize,
CancellationToken cancellationToken = default
)
{
var now = _timeProvider.GetUtcNow();

await using var connection = await CreateConnectionAsync(cancellationToken).ConfigureAwait(false);
await using var command = new NpgsqlCommand(_getFailedForRetrySql, connection);

_ = command.Parameters.AddWithValue("max_retry_count", maxRetryCount);
_ = command.Parameters.AddWithValue("batch_size", batchSize);
_ = command.Parameters.AddWithValue("now_utc", now);

return await ReadMessagesAsync(command, cancellationToken).ConfigureAwait(false);
}

/// <inheritdoc />
public async Task MarkAsCompletedAsync(Guid messageId, CancellationToken cancellationToken = default)
{
var now = _timeProvider.GetUtcNow();

await using var connection = await CreateConnectionAsync(cancellationToken).ConfigureAwait(false);
await using var command = new NpgsqlCommand(_markCompletedSql, connection);

_ = command.Parameters.AddWithValue("message_id", messageId);
_ = command.Parameters.AddWithValue("processed_at", now);
_ = command.Parameters.AddWithValue("updated_at", now);

_ = await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}

/// <inheritdoc />
public async Task MarkAsCompletedAsync(
IReadOnlyCollection<Guid> messageIds,
CancellationToken cancellationToken = default
)
{
var now = _timeProvider.GetUtcNow();

await using var connection = await CreateConnectionAsync(cancellationToken).ConfigureAwait(false);
foreach (var messageId in messageIds)
{
await using var command = new NpgsqlCommand(_markCompletedSql, connection);
_ = command.Parameters.AddWithValue("message_id", messageId);
_ = command.Parameters.AddWithValue("processed_at", now);
_ = command.Parameters.AddWithValue("updated_at", now);
_ = await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
}

/// <inheritdoc />
public async Task MarkAsFailedAsync(
Guid messageId,
Expand Down Expand Up @@ -223,14 +267,18 @@ public async Task MarkAsFailedAsync(
IReadOnlyCollection<Guid> messageIds,
string errorMessage,
CancellationToken cancellationToken = default
) =>
await Parallel
.ForEachAsync(
messageIds,
cancellationToken,
async (id, token) => await MarkAsFailedAsync(id, errorMessage, token).ConfigureAwait(false)
)
.ConfigureAwait(false);
)
{
await using var connection = await CreateConnectionAsync(cancellationToken).ConfigureAwait(false);
foreach (var messageId in messageIds)
{
await using var command = new NpgsqlCommand(_markFailedSql, connection);
_ = command.Parameters.AddWithValue("message_id", messageId);
_ = command.Parameters.AddWithValue("error", (object?)errorMessage ?? DBNull.Value);
_ = command.Parameters.AddWithValue("next_retry_at", DBNull.Value);
_ = await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
}

/// <inheritdoc />
public async Task MarkAsDeadLetterAsync(
Expand Down
44 changes: 40 additions & 4 deletions src/NetEvolve.Pulse.PostgreSql/PostgreSqlExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public static class PostgreSqlExtensions
/// database objects before using this provider.
/// <para><strong>Registered Services:</strong></para>
/// <list type="bullet">
/// <item><description><see cref="IEventOutbox"/> as <see cref="OutboxEventStore"/> (Scoped)</description></item>
/// <item><description><see cref="IOutboxRepository"/> as <see cref="PostgreSqlOutboxRepository"/> (Scoped)</description></item>
/// <item><description><see cref="IOutboxManagement"/> as <see cref="PostgreSqlOutboxManagement"/> (Scoped)</description></item>
/// <item><description><see cref="TimeProvider"/> (Singleton, if not already registered)</description></item>
Expand Down Expand Up @@ -72,6 +73,7 @@ public static IMediatorBuilder AddPostgreSqlOutbox(
/// database objects before using this provider.
/// <para><strong>Registered Services:</strong></para>
/// <list type="bullet">
/// <item><description><see cref="IEventOutbox"/> as <see cref="OutboxEventStore"/> (Scoped)</description></item>
/// <item><description><see cref="IOutboxRepository"/> as <see cref="PostgreSqlOutboxRepository"/> (Scoped)</description></item>
/// <item><description><see cref="IOutboxManagement"/> as <see cref="PostgreSqlOutboxManagement"/> (Scoped)</description></item>
/// <item><description><see cref="TimeProvider"/> (Singleton, if not already registered)</description></item>
Expand Down Expand Up @@ -125,6 +127,7 @@ public static IMediatorBuilder AddPostgreSqlOutbox(
/// database objects before using this provider.
/// <para><strong>Registered Services:</strong></para>
/// <list type="bullet">
/// <item><description><see cref="IEventOutbox"/> as <see cref="OutboxEventStore"/> (Scoped)</description></item>
/// <item><description><see cref="IOutboxRepository"/> as <see cref="PostgreSqlOutboxRepository"/> (Scoped)</description></item>
/// <item><description><see cref="IOutboxManagement"/> as <see cref="PostgreSqlOutboxManagement"/> (Scoped)</description></item>
/// <item><description><see cref="TimeProvider"/> (Singleton, if not already registered)</description></item>
Expand Down Expand Up @@ -158,14 +161,47 @@ Action<OutboxOptions> configureOptions
return configurator.RegisterPostgreSqlOutboxServices();
}

private static IMediatorBuilder RegisterPostgreSqlOutboxServices(this IMediatorBuilder configurator)
/// <summary>
/// Registers a unit-of-work type as <see cref="IOutboxTransactionScope"/> (Scoped) so that
/// <see cref="OutboxEventStore"/> can enlist in the caller's transaction automatically.
/// </summary>
/// <typeparam name="TUnitOfWork">
/// A type that implements both the application unit-of-work contract and <see cref="IOutboxTransactionScope"/>.
/// </typeparam>
/// <param name="configurator">The mediator configurator.</param>
/// <returns>The configurator for chaining.</returns>
/// <remarks>
/// Call this method after <see cref="AddPostgreSqlOutbox(IMediatorBuilder, string, Action{OutboxOptions}?)"/>
/// to wire up your unit-of-work so that <see cref="OutboxEventStore"/> automatically
/// enlists in any active <see cref="Npgsql.NpgsqlTransaction"/> owned by the unit-of-work.
/// </remarks>
/// <example>
/// <code>
/// services.AddPulse(config => config
/// .AddOutbox()
/// .AddPostgreSqlOutbox("Host=localhost;Database=MyDb;Username=postgres;Password=secret;")
/// .AddPostgreSqlOutboxTransactionScope&lt;MyUnitOfWork&gt;()
/// );
/// </code>
/// </example>
public static IMediatorBuilder AddPostgreSqlOutboxTransactionScope<TUnitOfWork>(this IMediatorBuilder configurator)
where TUnitOfWork : class, IOutboxTransactionScope
{
var services = configurator.Services;
ArgumentNullException.ThrowIfNull(configurator);

services.TryAddSingleton(TimeProvider.System);
_ = configurator.Services.AddScoped<IOutboxTransactionScope, TUnitOfWork>();

_ = services
return configurator;
}

private static IMediatorBuilder RegisterPostgreSqlOutboxServices(this IMediatorBuilder configurator)
{
// AddOutbox() uses TryAdd* internally, so this call is safe even when AddOutbox() was already invoked.
_ = configurator
.AddOutbox()
.Services.RemoveAll<IOutboxRepository>()
.AddScoped<IOutboxRepository, PostgreSqlOutboxRepository>()
.RemoveAll<IOutboxManagement>()
.AddScoped<IOutboxManagement, PostgreSqlOutboxManagement>();

return configurator;
Expand Down
12 changes: 8 additions & 4 deletions src/NetEvolve.Pulse.PostgreSql/Scripts/OutboxMessage.sql
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ $$;
-- get_failed_outbox_messages_for_retry: Retrieves failed messages eligible for retry
CREATE OR REPLACE FUNCTION ":schema_name".get_failed_outbox_messages_for_retry(
max_retry_count INTEGER,
batch_size INTEGER
batch_size INTEGER,
now_utc TIMESTAMPTZ
)
RETURNS TABLE (
"Id" UUID,
Expand All @@ -130,6 +131,7 @@ BEGIN
FROM ":schema_name".":table_name" om
WHERE om."Status" = 3 -- Failed
AND om."RetryCount" < max_retry_count
AND (om."NextRetryAt" IS NULL OR om."NextRetryAt" <= now_utc)
ORDER BY om."UpdatedAt"
LIMIT batch_size
FOR UPDATE SKIP LOCKED
Expand Down Expand Up @@ -157,7 +159,9 @@ $$;

-- mark_outbox_message_completed: Marks a message as successfully processed
CREATE OR REPLACE FUNCTION ":schema_name".mark_outbox_message_completed(
message_id UUID
message_id UUID,
processed_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ
)
RETURNS VOID
LANGUAGE plpgsql
Expand All @@ -166,8 +170,8 @@ BEGIN
UPDATE ":schema_name".":table_name"
SET
"Status" = 2, -- Completed
"ProcessedAt" = NOW(),
"UpdatedAt" = NOW()
"ProcessedAt" = processed_at,
"UpdatedAt" = updated_at
WHERE "Id" = message_id
AND "Status" = 1; -- Processing
END;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
namespace NetEvolve.Pulse.Tests.Integration.Internals;

using System.Diagnostics.CodeAnalysis;
using System.Text.RegularExpressions;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using NetEvolve.Pulse;
using NetEvolve.Pulse.Extensibility;
using NetEvolve.Pulse.Extensibility.Outbox;
using NetEvolve.Pulse.Outbox;
using Npgsql;

[SuppressMessage(
"Security",
"CA2100:Review SQL queries for security vulnerabilities",
Justification = "SQL is read from a script file with schema and table names substituted from validated OutboxOptions properties."
)]
public sealed partial class PostgreSqlAdoNetInitializer : IDatabaseInitializer
{
private static readonly string _scriptPath = Path.Combine(
AppContext.BaseDirectory,
"Scripts",
"PostgreSql",
"OutboxMessage.sql"
);

public void Configure(IMediatorBuilder mediatorBuilder, IDatabaseServiceFixture databaseService)
{
ArgumentNullException.ThrowIfNull(databaseService);
_ = mediatorBuilder.AddPostgreSqlOutbox(databaseService.ConnectionString);
}

public async ValueTask CreateDatabaseAsync(IServiceProvider serviceProvider, CancellationToken cancellationToken)
{
var options = serviceProvider.GetRequiredService<IOptions<OutboxOptions>>().Value;

var connectionString =
options.ConnectionString
?? throw new InvalidOperationException("OutboxOptions.ConnectionString is not configured.");

var schema = string.IsNullOrWhiteSpace(options.Schema) ? OutboxMessageSchema.DefaultSchema : options.Schema;

var tableName = string.IsNullOrWhiteSpace(options.TableName)
? OutboxMessageSchema.DefaultTableName
: options.TableName;

var script = await File.ReadAllTextAsync(_scriptPath, cancellationToken).ConfigureAwait(false);

// Remove psql-specific variable declarations (not valid SQL)
script = SearchSetVar().Replace(script, string.Empty);

// Substitute psql variables with actual values.
// PostgreSQL script uses :schema_name and :table_name as placeholders.
// The placeholders appear both unquoted (e.g., CREATE SCHEMA :schema_name)
// and within quotes (e.g., ":schema_name".":table_name").
// We replace all occurrences with the actual values directly.
script = script
.Replace(":schema_name", schema, StringComparison.Ordinal)
.Replace(":table_name", tableName, StringComparison.Ordinal);

await using var connection = new NpgsqlConnection(connectionString);
await connection.OpenAsync(cancellationToken).ConfigureAwait(false);

await using var command = new NpgsqlCommand(script, connection);
_ = await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}

public void Initialize(IServiceCollection services, IDatabaseServiceFixture databaseService)
{
// No additional service initialization required for ADO.NET outbox tests.
// The Configure method handles all necessary service registrations.
}

[GeneratedRegex(@"^\\set\s+\w+\s+.*$", RegexOptions.Multiline)]
private static partial Regex SearchSetVar();
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ public sealed class PostgreSqlContainerFixture : IAsyncDisposable, IAsyncInitial
/*dockerimage*/"postgres:18.3"
)
.WithLogger(NullLogger.Instance)
.WithCommand("-c", "max_connections=500") // Raised for parallel integration tests; each test creates its own unique database/pool.
.Build();

public string ConnectionString => _container.GetConnectionString() + ";Include Error Detail=true;";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,52 @@
namespace NetEvolve.Pulse.Tests.Integration.Internals;

using Npgsql;

public sealed class PostgreSqlDatabaseServiceFixture : IDatabaseServiceFixture
{
[ClassDataSource<PostgreSqlContainerFixture>(Shared = SharedType.PerTestSession)]
public PostgreSqlContainerFixture Container { get; set; } = default!;

public string ConnectionString =>
Container.ConnectionString.Replace("Database=postgres;", $"Database={DatabaseName};", StringComparison.Ordinal);
public string ConnectionString
{
get
{
// Use NpgsqlConnectionStringBuilder to safely set the test database name and
// constrain the pool. Each test has a unique connection string, so without these
// limits the per-test pools accumulate physical connections and exhaust
// PostgreSQL's max_connections during parallel runs.
var builder = new NpgsqlConnectionStringBuilder(Container.ConnectionString)
{
Database = DatabaseName,
MinPoolSize = 0,
MaxPoolSize = 5,
ConnectionIdleLifetime = 15, // Must be >= ConnectionPruningInterval (default 10 s).
};
return builder.ToString();
}
}

internal string DatabaseName { get; } = $"{TestHelper.TargetFramework}{Guid.NewGuid():N}";

public DatabaseType DatabaseType => DatabaseType.PostgreSQL;

public ValueTask DisposeAsync() => ValueTask.CompletedTask;
public ValueTask DisposeAsync()
{
// Eagerly clear the Npgsql pool for this test's unique connection string so that
// physical connections are returned to the server immediately instead of sitting
// idle for ConnectionIdleLifetime seconds. Without this, completed-test pools
// accumulate and exhaust PostgreSQL's max_connections during parallel runs.
using var conn = new NpgsqlConnection(ConnectionString);
NpgsqlConnection.ClearPool(conn);
return ValueTask.CompletedTask;
}

public async Task InitializeAsync()
{
try
{
// Create temporary database to ensure the container is fully initialized and ready to accept connections
await using var con = new Npgsql.NpgsqlConnection(Container.ConnectionString);
await using var con = new NpgsqlConnection(Container.ConnectionString);
await con.OpenAsync();

await using var cmd = con.CreateCommand();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
namespace NetEvolve.Pulse.Tests.Integration.Outbox;

using NetEvolve.Extensions.TUnit;
using NetEvolve.Pulse.Tests.Integration.Internals;

[ClassDataSource<PostgreSqlDatabaseServiceFixture, PostgreSqlAdoNetInitializer>(
Shared = [SharedType.None, SharedType.None]
)]
[TestGroup("PostgreSql")]
[TestGroup("AdoNet")]
[InheritsTests]
public class PostgreSqlAdoNetOutboxTests(
IDatabaseServiceFixture databaseServiceFixture,
IDatabaseInitializer databaseInitializer
) : OutboxTestsBase(databaseServiceFixture, databaseInitializer);
Loading
Loading