From 6aa8fd2a16ffe0799f2a67007bc079fa016fad38 Mon Sep 17 00:00:00 2001 From: Yordis Prieto Date: Sun, 28 Jun 2026 19:25:13 -0400 Subject: [PATCH] feat(indexing): support checkpoint storage Signed-off-by: Yordis Prieto --- .../InMemoryIndexCheckpointStoreTests.cs | 68 +++++++++++++++++++ .../Storage/Indexing/IIndexCheckpointStore.cs | 11 +++ .../Indexing/InMemoryIndexCheckpointStore.cs | 32 +++++++++ 3 files changed, 111 insertions(+) create mode 100644 src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/InMemoryIndexCheckpointStoreTests.cs create mode 100644 src/EventStore.Core/Services/Storage/Indexing/IIndexCheckpointStore.cs create mode 100644 src/EventStore.Core/Services/Storage/Indexing/InMemoryIndexCheckpointStore.cs diff --git a/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/InMemoryIndexCheckpointStoreTests.cs b/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/InMemoryIndexCheckpointStoreTests.cs new file mode 100644 index 000000000..f28d016fd --- /dev/null +++ b/src/EventStore.Core.XUnit.Tests/Services/Storage/Indexing/InMemoryIndexCheckpointStoreTests.cs @@ -0,0 +1,68 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using EventStore.Core.Services.Storage.Indexing; +using Xunit; + +namespace EventStore.Core.XUnit.Tests.Services.Storage.Indexing; + +public class InMemoryIndexCheckpointStoreTests +{ + [Fact] + public async Task read_returns_null_when_empty() + { + var store = new InMemoryIndexCheckpointStore(); + + var checkpoint = await store.Read(CancellationToken.None); + + Assert.Null(checkpoint); + } + + [Fact] + public async Task write_and_read_round_trip() + { + var store = new InMemoryIndexCheckpointStore(); + var expected = new IndexCheckpoint(10, 5); + + await store.Write(expected, CancellationToken.None); + var checkpoint = await store.Read(CancellationToken.None); + + Assert.Equal(expected, checkpoint); + } + + [Fact] + public async Task write_overwrites_stored_checkpoint() + { + var store = new InMemoryIndexCheckpointStore(); + var first = new IndexCheckpoint(10, 5); + var second = new IndexCheckpoint(20, 15); + + await store.Write(first, CancellationToken.None); + await store.Write(second, CancellationToken.None); + var checkpoint = await store.Read(CancellationToken.None); + + Assert.Equal(second, checkpoint); + } + + [Fact] + public async Task read_honors_cancelled_token() + { + var store = new InMemoryIndexCheckpointStore(); + using var cancellation = new CancellationTokenSource(); + await cancellation.CancelAsync(); + + await Assert.ThrowsAsync(() => + store.Read(cancellation.Token).AsTask()); + } + + [Fact] + public async Task write_honors_cancelled_token() + { + var store = new InMemoryIndexCheckpointStore(); + using var cancellation = new CancellationTokenSource(); + await cancellation.CancelAsync(); + + await Assert.ThrowsAsync(() => + store.Write(new IndexCheckpoint(10, 5), cancellation.Token).AsTask()); + } +} diff --git a/src/EventStore.Core/Services/Storage/Indexing/IIndexCheckpointStore.cs b/src/EventStore.Core/Services/Storage/Indexing/IIndexCheckpointStore.cs new file mode 100644 index 000000000..fc21fc505 --- /dev/null +++ b/src/EventStore.Core/Services/Storage/Indexing/IIndexCheckpointStore.cs @@ -0,0 +1,11 @@ +using System.Threading; +using System.Threading.Tasks; + +namespace EventStore.Core.Services.Storage.Indexing; + +public interface IIndexCheckpointStore +{ + ValueTask Read(CancellationToken token); + + ValueTask Write(IndexCheckpoint checkpoint, CancellationToken token); +} diff --git a/src/EventStore.Core/Services/Storage/Indexing/InMemoryIndexCheckpointStore.cs b/src/EventStore.Core/Services/Storage/Indexing/InMemoryIndexCheckpointStore.cs new file mode 100644 index 000000000..2867f0d55 --- /dev/null +++ b/src/EventStore.Core/Services/Storage/Indexing/InMemoryIndexCheckpointStore.cs @@ -0,0 +1,32 @@ +using System.Threading; +using System.Threading.Tasks; + +namespace EventStore.Core.Services.Storage.Indexing; + +public sealed class InMemoryIndexCheckpointStore : IIndexCheckpointStore +{ + private readonly object _lock = new(); + private IndexCheckpoint? _checkpoint; + + public ValueTask Read(CancellationToken token) + { + token.ThrowIfCancellationRequested(); + + lock (_lock) + { + return ValueTask.FromResult(_checkpoint); + } + } + + public ValueTask Write(IndexCheckpoint checkpoint, CancellationToken token) + { + token.ThrowIfCancellationRequested(); + + lock (_lock) + { + _checkpoint = checkpoint; + } + + return ValueTask.CompletedTask; + } +}