diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b48c010..09e8ccb 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -8,11 +8,7 @@ on: jobs: test: - strategy: - fail-fast: false - matrix: - os: [ubuntu-latest, macos-latest] - runs-on: ${{ matrix.os }} + runs-on: ubuntu-latest steps: - name: Checkout diff --git a/README.md b/README.md index 9f1270a..40171e8 100644 --- a/README.md +++ b/README.md @@ -57,7 +57,7 @@ init() → onSlot/onEpoch() → start() → ... → stop() → join() → deinit - `init` — in-place initialization (self-referential struct, not returned by value) - `start` — spawns `runAutoLoop` fiber via `std.Io.async`. Idempotent - `stop` — signals loop to exit, aborts all pending waiters. Idempotent -- `join` — awaits loop fiber completion (workaround for Zig bug [#31307](https://codeberg.org/ziglang/zig/issues/31307)) +- `join` — awaits loop fiber completion via `Future.cancel` - `deinit` — calls `stop()` + `join()`, frees all resources **Listener API:** @@ -75,13 +75,15 @@ _ = clock.offSlot(id); ```zig var fut = try clock.waitForSlot(target_slot); +errdefer fut.cancel(); try fut.await(io); ``` - Returns immediately if already at or past target slot - Future-based API backed by `std.Io.Event` + `std.Io.Future` - Dispatched by `advanceAndDispatch` when `current_slot >= target` -- `cancelWait` available for error-path cleanup +- `fut.cancel()` available for error-path cleanup (releases `WaitState` and removes from waiters list) +- Use `errdefer fut.cancel()` between `waitForSlot` and `await` to guarantee cleanup on error - Returns `error.Aborted` on `stop()` **Catch-up semantics (matching TS `get currentSlot()`):** @@ -94,7 +96,7 @@ Pure arithmetic helpers (`slotWithFutureTolerance`, `slotWithPastTolerance`, `se **Auto-loop (`runAutoLoop`):** -- Sleeps in 500ms chunks for prompt `stop()` response (cannot cancel sleeping futures due to Zig bug #31307) +- Sleeps until next slot boundary; `stop()` cancels the sleep future directly via `Future.cancel` - Skips advancement pre-genesis (`currentSlot()` returns `null`) - Checks `stopped` flag in `advanceAndDispatch` loop (matches TS's `!this.signal.aborted`) - Falls back to `self.stop()` + `break` if `msUntilNextSlot` returns `null` (config overflow defense-in-depth) @@ -116,7 +118,7 @@ This implementation matches the Lodestar `Clock` class behavior: | `get currentSlot()` catches up events before returning | `catchUp()` called in all current-state accessors | | `onNextSlot` loop checks `!this.signal.aborted` | `advanceAndDispatch` checks `self.stopped` per event | | `waitForSlot` uses `this.currentSlot` getter (triggers catch-up) | `catchUp()` + `current_slot` fast-path | -| `setTimeout(this.onNextSlot, this.msUntilNextSlot())` | `std.Io.sleep` in 500ms chunks + `advanceAndDispatch` | +| `setTimeout(this.onNextSlot, this.msUntilNextSlot())` | `std.Io.sleep` + `Future.cancel` on stop | | `slot 0` not emitted pre-genesis | `currentSlot()` returns `null` pre-genesis, loop skips | ## Example @@ -158,6 +160,7 @@ pub fn main() !void { std.debug.print("start_slot={d}, waiting for slot {d}...\n", .{ start_slot, start_slot + 3 }); var fut = try ec.waitForSlot(start_slot + 3); + errdefer fut.cancel(); try fut.await(io); std.debug.print("done.\n", .{}); diff --git a/examples/clock_basic.zig b/examples/clock_basic.zig index d647ced..3b50006 100644 --- a/examples/clock_basic.zig +++ b/examples/clock_basic.zig @@ -34,6 +34,7 @@ pub fn main() !void { std.debug.print("start_slot={d}, waiting for slot {d}...\n", .{ start_slot, start_slot + 3 }); var fut = try ec.waitForSlot(start_slot + 3); + errdefer fut.cancel(); try fut.await(io); std.debug.print("done.\n", .{}); diff --git a/src/EventClock.zig b/src/EventClock.zig index aa5b1f2..5ea43fe 100644 --- a/src/EventClock.zig +++ b/src/EventClock.zig @@ -106,13 +106,12 @@ pub fn stop(self: *EventClock) void { self.abortAllWaiters(); } -/// WORKAROUND for Zig bug #31307: always await, never cancel sleeping futures. -/// See: https://codeberg.org/ziglang/zig/issues/31307 +/// Cancel the loop fiber and wait for it to finish. pub fn join(self: *EventClock) void { var maybe_future = self.loop_future; self.loop_future = null; if (maybe_future) |*future| { - future.await(self.io); + future.cancel(self.io); } } @@ -261,18 +260,24 @@ pub fn msFromSlot(self: *EventClock, slot: Slot, to_ms: ?slot_math.UnixMs) ?i64 /// Return type from `waitForSlot`. The caller MUST either: /// - call `await()` to wait for the target slot and release resources, OR -/// - call `EventClock.cancelWait()` to abort and release resources, OR +/// - call `cancel()` to abort and release resources, OR /// - call `stop()` on the EventClock and THEN `await()` to get `error.Aborted`. -/// Dropping a WaitForSlotResult without calling `await` or `cancelWait` leaks +/// Dropping a WaitForSlotResult without calling `await` or `cancel` leaks /// the internal WaitState. +/// +/// Idiomatic usage with `errdefer`: +/// var fut = try ec.waitForSlot(target); +/// errdefer fut.cancel(); +/// try fut.await(io); pub const WaitForSlotResult = struct { inner: std.Io.Future(Error!void), state: ?*WaitState, + clock: ?*EventClock, /// Create an immediately-resolved result (no async work needed). /// Relies on `std.Io.Future.await` returning `.result` when `.any_future == null`. fn immediate(result: Error!void) WaitForSlotResult { - return .{ .inner = .{ .any_future = null, .result = result }, .state = null }; + return .{ .inner = .{ .any_future = null, .result = result }, .state = null, .clock = null }; } pub fn await(self: *WaitForSlotResult, io: std.Io) Error!void { @@ -281,8 +286,40 @@ pub const WaitForSlotResult = struct { // where GCD still holds a reference to the event address after wake. if (self.state) |s| s.allocator.destroy(s); self.state = null; + self.clock = null; return result; } + + /// Abort a pending wait and release its resources. Idempotent — safe + /// to call on an already-awaited, already-cancelled, or immediate result. + /// + /// Typical usage: + /// var fut = try ec.waitForSlot(target); + /// errdefer fut.cancel(); + /// try fut.await(io); + pub fn cancel(self: *WaitForSlotResult) void { + const state = self.state orelse return; + // Remove from waiter queue before freeing, so abortAllWaiters + // won't dereference the freed state pointer. + if (self.clock) |clock| { + for (clock.waiters.items, 0..) |entry, i| { + if (entry.state == state) { + _ = clock.waiters.popIndex(i); + break; + } + } + } + state.aborted = true; + state.event.set(state.io); + // Must await the fiber so it finishes before we free its state. + // The fiber returns error.Aborted (expected) or {} (already dispatched). + _ = self.inner.await(state.io) catch |err| { + std.debug.assert(err == error.Aborted); + }; + state.allocator.destroy(state); + self.state = null; + self.clock = null; + } }; /// Return a future that resolves when the clock reaches `target`. @@ -317,36 +354,10 @@ pub fn waitForSlot(self: *EventClock, target: Slot) Error!WaitForSlotResult { return .{ .inner = std.Io.async(self.io, waitForSlotFutureAwait, .{state}), .state = state, + .clock = self, }; } -/// Abort a pending wait and release its resources. Use this in error -/// paths where `WaitForSlotResult.await` cannot be called. The waiter -/// is removed from the internal queue so `deinit` won't touch freed memory. -/// Idempotent — safe to call on an already-awaited or immediate result. -pub fn cancelWait(self: *EventClock, result: *WaitForSlotResult) void { - if (result.state) |state| { - // Remove from waiter queue before freeing, so abortAllWaiters - // won't dereference the freed state pointer. - for (self.waiters.items, 0..) |entry, i| { - if (entry.state == state) { - _ = self.waiters.popIndex(i); - break; - } - } - state.aborted = true; - state.event.set(state.io); - } - // Must await the fiber so it finishes before we free its state. - // (Cannot use Future.cancel due to Zig bug #31307.) - // The fiber returns error.Aborted (expected) or {} (already dispatched). - _ = result.inner.await(self.io) catch |err| { - std.debug.assert(err == error.Aborted); - }; - if (result.state) |s| s.allocator.destroy(s); - result.state = null; -} - // ── Private ── /// Ensure event-clock state is caught up to wall-clock time. @@ -438,14 +449,10 @@ fn runAutoLoop(self: *EventClock) void { self.stop(); break; }; - // Sleep in short chunks so we can check `stopped` promptly. - // Without this, join() must wait for the full sleep to finish - // (we use await, not cancel, due to Zig bug #31307). - const chunk_ms = @min(next_ms, 500); - const sleep_ms = std.math.cast(i64, @max(@as(u64, 1), chunk_ms)) orelse std.math.maxInt(i64); - - // Sleep failure (e.g., I/O shutdown, interrupt) is transient — safe to - // retry from the top of the loop, which will re-check `stopped` first. + const sleep_ms = std.math.cast(i64, @max(@as(u64, 1), next_ms)) orelse std.math.maxInt(i64); + + // Sleep failure (e.g., cancel from join()) is expected — + // re-check `stopped` flag before continuing. std.Io.sleep( self.io, std.Io.Duration.fromMilliseconds(sleep_ms), @@ -548,6 +555,7 @@ test "lifecycle: init -> register -> start -> receive events -> stop" { const start_slot = clock.currentSlotOrGenesis(); var fut = try clock.waitForSlot(start_slot + 1); + errdefer fut.cancel(); try fut.await(io_handle); try testing.expect(trace.slot_len > 0); @@ -570,6 +578,7 @@ test "waitForSlot resolves immediately when at target" { const current = clock.currentSlotOrGenesis(); var fut = try clock.waitForSlot(current); + errdefer fut.cancel(); try fut.await(io_handle); } @@ -588,6 +597,7 @@ test "waitForSlot returns aborted on stop" { defer clock.deinit(); var fut = try clock.waitForSlot(100); + errdefer fut.cancel(); clock.stop(); try testing.expectError(error.Aborted, fut.await(io_handle)); } @@ -679,8 +689,11 @@ test "multiple waiters are dispatched in target-slot order" { // Register waiters for slots 5, 3, 1 (out of order) var fut5 = try clock.waitForSlot(5); + errdefer fut5.cancel(); var fut3 = try clock.waitForSlot(3); + errdefer fut3.cancel(); var fut1 = try clock.waitForSlot(1); + errdefer fut1.cancel(); // Advance to slot 3 — should dispatch slot 1 and slot 3, NOT slot 5 clock.advanceAndDispatch(3); @@ -693,7 +706,7 @@ test "multiple waiters are dispatched in target-slot order" { try testing.expectError(error.Aborted, fut5.await(io_handle)); } -test "cancelWait releases WaitState without awaiting" { +test "cancel releases WaitState without awaiting" { var rt: TestIo = undefined; try rt.init(); defer rt.deinit(); @@ -708,9 +721,9 @@ test "cancelWait releases WaitState without awaiting" { defer clock.deinit(); // Create a waiter for a far-future slot and immediately cancel it. - // testing.allocator will detect a leak if cancelWait fails to free. + // testing.allocator will detect a leak if cancel fails to free. var fut = try clock.waitForSlot(999); - clock.cancelWait(&fut); + fut.cancel(); } // ── Real-time tests ── @@ -766,6 +779,7 @@ test "real-time: slot events fire with correct timing" { const start_slot = clock.currentSlotOrGenesis(); const before_ms = nowMsAt(io_handle); var fut = try clock.waitForSlot(start_slot + 1); + errdefer fut.cancel(); try fut.await(io_handle); const elapsed = nowMsAt(io_handle) - before_ms; @@ -799,6 +813,7 @@ test "real-time: multi-slot advancement delivers ordered events" { const start_slot = clock.currentSlotOrGenesis(); var fut = try clock.waitForSlot(start_slot + 2); + errdefer fut.cancel(); try fut.await(io_handle); // At least 2 slot events should have been emitted. @@ -809,7 +824,7 @@ test "real-time: multi-slot advancement delivers ordered events" { } } -test "real-time: stop returns within chunk window" { +test "real-time: stop+join cancels promptly" { var rt: TestIo = undefined; try rt.init(); defer rt.deinit(); @@ -825,7 +840,7 @@ test "real-time: stop returns within chunk window" { clock.start(); - // Give the loop fiber time to enter its first sleep chunk. + // Give the loop fiber time to enter its sleep. std.Io.sleep(io_handle, std.Io.Duration.fromMilliseconds(50), .awake) catch {}; const before_ms = nowMsAt(io_handle); @@ -833,8 +848,8 @@ test "real-time: stop returns within chunk window" { clock.join(); const elapsed = nowMsAt(io_handle) - before_ms; - // The 500ms chunking means join() should return within ~500ms + overhead, - // NOT after the full 12-second slot duration. + // join() cancels the sleeping future directly, so it should return + // almost immediately — NOT after the full 12-second slot duration. try testing.expect(elapsed < 1500); } @@ -862,6 +877,7 @@ test "real-time: epoch boundary event fires" { const start_slot = clock.currentSlotOrGenesis(); // Wait enough slots to guarantee crossing at least one epoch boundary. var fut = try clock.waitForSlot(start_slot + 3); + errdefer fut.cancel(); try fut.await(io_handle); try testing.expect(trace.slot_len >= 3);