Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,7 @@ on:

jobs:
test:
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest, macos-latest]
runs-on: ${{ matrix.os }}
runs-on: ubuntu-latest
Copy link

Copilot AI Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CI is now Linux-only (runs-on: ubuntu-latest) which removes macOS coverage for this repo. If this is intended to be temporary for validating Future.cancel on io_uring, consider keeping macOS in the workflow but marking it continue-on-error or gating it behind a workflow input so platform regressions aren’t silently missed.

Suggested change
runs-on: ubuntu-latest
strategy:
matrix:
os: [ubuntu-latest, macos-latest]
runs-on: ${{ matrix.os }}
continue-on-error: ${{ matrix.os == 'macos-latest' }}

Copilot uses AI. Check for mistakes.

steps:
- name: Checkout
Expand Down
11 changes: 7 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ init() → onSlot/onEpoch() → start() → ... → stop() → join() → deinit
- `init` — in-place initialization (self-referential struct, not returned by value)
- `start` — spawns `runAutoLoop` fiber via `std.Io.async`. Idempotent
- `stop` — signals loop to exit, aborts all pending waiters. Idempotent
- `join` — awaits loop fiber completion (workaround for Zig bug [#31307](https://codeberg.org/ziglang/zig/issues/31307))
- `join` — awaits loop fiber completion via `Future.cancel`
- `deinit` — calls `stop()` + `join()`, frees all resources

**Listener API:**
Expand All @@ -75,13 +75,15 @@ _ = clock.offSlot(id);

```zig
var fut = try clock.waitForSlot(target_slot);
errdefer fut.cancel();
try fut.await(io);
```

- Returns immediately if already at or past target slot
- Future-based API backed by `std.Io.Event` + `std.Io.Future`
- Dispatched by `advanceAndDispatch` when `current_slot >= target`
- `cancelWait` available for error-path cleanup
- `fut.cancel()` available for error-path cleanup (releases `WaitState` and removes from waiters list)
- Use `errdefer fut.cancel()` between `waitForSlot` and `await` to guarantee cleanup on error
- Returns `error.Aborted` on `stop()`

**Catch-up semantics (matching TS `get currentSlot()`):**
Expand All @@ -94,7 +96,7 @@ Pure arithmetic helpers (`slotWithFutureTolerance`, `slotWithPastTolerance`, `se

**Auto-loop (`runAutoLoop`):**

- Sleeps in 500ms chunks for prompt `stop()` response (cannot cancel sleeping futures due to Zig bug #31307)
- Sleeps until next slot boundary; `stop()` cancels the sleep future directly via `Future.cancel`
- Skips advancement pre-genesis (`currentSlot()` returns `null`)
- Checks `stopped` flag in `advanceAndDispatch` loop (matches TS's `!this.signal.aborted`)
- Falls back to `self.stop()` + `break` if `msUntilNextSlot` returns `null` (config overflow defense-in-depth)
Expand All @@ -116,7 +118,7 @@ This implementation matches the Lodestar `Clock` class behavior:
| `get currentSlot()` catches up events before returning | `catchUp()` called in all current-state accessors |
| `onNextSlot` loop checks `!this.signal.aborted` | `advanceAndDispatch` checks `self.stopped` per event |
| `waitForSlot` uses `this.currentSlot` getter (triggers catch-up) | `catchUp()` + `current_slot` fast-path |
| `setTimeout(this.onNextSlot, this.msUntilNextSlot())` | `std.Io.sleep` in 500ms chunks + `advanceAndDispatch` |
| `setTimeout(this.onNextSlot, this.msUntilNextSlot())` | `std.Io.sleep` + `Future.cancel` on stop |
| `slot 0` not emitted pre-genesis | `currentSlot()` returns `null` pre-genesis, loop skips |

## Example
Expand Down Expand Up @@ -158,6 +160,7 @@ pub fn main() !void {
std.debug.print("start_slot={d}, waiting for slot {d}...\n", .{ start_slot, start_slot + 3 });

var fut = try ec.waitForSlot(start_slot + 3);
errdefer fut.cancel();
try fut.await(io);

std.debug.print("done.\n", .{});
Expand Down
1 change: 1 addition & 0 deletions examples/clock_basic.zig
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub fn main() !void {
std.debug.print("start_slot={d}, waiting for slot {d}...\n", .{ start_slot, start_slot + 3 });

var fut = try ec.waitForSlot(start_slot + 3);
errdefer fut.cancel();
try fut.await(io);

std.debug.print("done.\n", .{});
Expand Down
112 changes: 64 additions & 48 deletions src/EventClock.zig
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,12 @@ pub fn stop(self: *EventClock) void {
self.abortAllWaiters();
}

/// WORKAROUND for Zig bug #31307: always await, never cancel sleeping futures.
/// See: https://codeberg.org/ziglang/zig/issues/31307
/// Cancel the loop fiber and wait for it to finish.
pub fn join(self: *EventClock) void {
var maybe_future = self.loop_future;
self.loop_future = null;
if (maybe_future) |*future| {
future.await(self.io);
future.cancel(self.io);
}
}
Comment on lines +109 to 116
Copy link

Copilot AI Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

join()’s docstring says it “cancel[s] the loop fiber and wait[s] for it to finish”, but the implementation only calls future.cancel(self.io) and never awaits/completes the future explicitly. If Future.cancel is not guaranteed to block until the fiber terminates, deinit() can free self’s resources while runAutoLoop is still running (use-after-free). Please either (a) ensure join() truly waits for completion (e.g., cancel then await / otherwise drain the future), or (b) adjust the doc + deinit contract to match cancel’s actual semantics.

Copilot uses AI. Check for mistakes.

Expand Down Expand Up @@ -261,18 +260,24 @@ pub fn msFromSlot(self: *EventClock, slot: Slot, to_ms: ?slot_math.UnixMs) ?i64

/// Return type from `waitForSlot`. The caller MUST either:
/// - call `await()` to wait for the target slot and release resources, OR
/// - call `EventClock.cancelWait()` to abort and release resources, OR
/// - call `cancel()` to abort and release resources, OR
/// - call `stop()` on the EventClock and THEN `await()` to get `error.Aborted`.
/// Dropping a WaitForSlotResult without calling `await` or `cancelWait` leaks
/// Dropping a WaitForSlotResult without calling `await` or `cancel` leaks
/// the internal WaitState.
///
/// Idiomatic usage with `errdefer`:
/// var fut = try ec.waitForSlot(target);
/// errdefer fut.cancel();
/// try fut.await(io);
pub const WaitForSlotResult = struct {
inner: std.Io.Future(Error!void),
state: ?*WaitState,
clock: ?*EventClock,

/// Create an immediately-resolved result (no async work needed).
/// Relies on `std.Io.Future.await` returning `.result` when `.any_future == null`.
fn immediate(result: Error!void) WaitForSlotResult {
return .{ .inner = .{ .any_future = null, .result = result }, .state = null };
return .{ .inner = .{ .any_future = null, .result = result }, .state = null, .clock = null };
}

pub fn await(self: *WaitForSlotResult, io: std.Io) Error!void {
Expand All @@ -281,8 +286,40 @@ pub const WaitForSlotResult = struct {
// where GCD still holds a reference to the event address after wake.
if (self.state) |s| s.allocator.destroy(s);
self.state = null;
self.clock = null;
return result;
}

/// Abort a pending wait and release its resources. Idempotent — safe
/// to call on an already-awaited, already-cancelled, or immediate result.
///
/// Typical usage:
/// var fut = try ec.waitForSlot(target);
/// errdefer fut.cancel();
/// try fut.await(io);
pub fn cancel(self: *WaitForSlotResult) void {
const state = self.state orelse return;
// Remove from waiter queue before freeing, so abortAllWaiters
// won't dereference the freed state pointer.
if (self.clock) |clock| {
for (clock.waiters.items, 0..) |entry, i| {
if (entry.state == state) {
_ = clock.waiters.popIndex(i);
break;
}
}
}
state.aborted = true;
state.event.set(state.io);
// Must await the fiber so it finishes before we free its state.
// The fiber returns error.Aborted (expected) or {} (already dispatched).
_ = self.inner.await(state.io) catch |err| {
std.debug.assert(err == error.Aborted);
};
state.allocator.destroy(state);
self.state = null;
self.clock = null;
}
};

/// Return a future that resolves when the clock reaches `target`.
Expand Down Expand Up @@ -317,36 +354,10 @@ pub fn waitForSlot(self: *EventClock, target: Slot) Error!WaitForSlotResult {
return .{
.inner = std.Io.async(self.io, waitForSlotFutureAwait, .{state}),
.state = state,
.clock = self,
};
}

/// Abort a pending wait and release its resources. Use this in error
/// paths where `WaitForSlotResult.await` cannot be called. The waiter
/// is removed from the internal queue so `deinit` won't touch freed memory.
/// Idempotent — safe to call on an already-awaited or immediate result.
pub fn cancelWait(self: *EventClock, result: *WaitForSlotResult) void {
if (result.state) |state| {
// Remove from waiter queue before freeing, so abortAllWaiters
// won't dereference the freed state pointer.
for (self.waiters.items, 0..) |entry, i| {
if (entry.state == state) {
_ = self.waiters.popIndex(i);
break;
}
}
state.aborted = true;
state.event.set(state.io);
}
// Must await the fiber so it finishes before we free its state.
// (Cannot use Future.cancel due to Zig bug #31307.)
// The fiber returns error.Aborted (expected) or {} (already dispatched).
_ = result.inner.await(self.io) catch |err| {
std.debug.assert(err == error.Aborted);
};
if (result.state) |s| s.allocator.destroy(s);
result.state = null;
}

// ── Private ──

/// Ensure event-clock state is caught up to wall-clock time.
Expand Down Expand Up @@ -438,14 +449,10 @@ fn runAutoLoop(self: *EventClock) void {
self.stop();
break;
};
// Sleep in short chunks so we can check `stopped` promptly.
// Without this, join() must wait for the full sleep to finish
// (we use await, not cancel, due to Zig bug #31307).
const chunk_ms = @min(next_ms, 500);
const sleep_ms = std.math.cast(i64, @max(@as(u64, 1), chunk_ms)) orelse std.math.maxInt(i64);

// Sleep failure (e.g., I/O shutdown, interrupt) is transient — safe to
// retry from the top of the loop, which will re-check `stopped` first.
const sleep_ms = std.math.cast(i64, @max(@as(u64, 1), next_ms)) orelse std.math.maxInt(i64);

// Sleep failure (e.g., cancel from join()) is expected —
// re-check `stopped` flag before continuing.
std.Io.sleep(
self.io,
std.Io.Duration.fromMilliseconds(sleep_ms),
Expand Down Expand Up @@ -548,6 +555,7 @@ test "lifecycle: init -> register -> start -> receive events -> stop" {

const start_slot = clock.currentSlotOrGenesis();
var fut = try clock.waitForSlot(start_slot + 1);
errdefer fut.cancel();
try fut.await(io_handle);

try testing.expect(trace.slot_len > 0);
Expand All @@ -570,6 +578,7 @@ test "waitForSlot resolves immediately when at target" {

const current = clock.currentSlotOrGenesis();
var fut = try clock.waitForSlot(current);
errdefer fut.cancel();
try fut.await(io_handle);
}

Expand All @@ -588,6 +597,7 @@ test "waitForSlot returns aborted on stop" {
defer clock.deinit();

var fut = try clock.waitForSlot(100);
errdefer fut.cancel();
clock.stop();
try testing.expectError(error.Aborted, fut.await(io_handle));
}
Expand Down Expand Up @@ -679,8 +689,11 @@ test "multiple waiters are dispatched in target-slot order" {

// Register waiters for slots 5, 3, 1 (out of order)
var fut5 = try clock.waitForSlot(5);
errdefer fut5.cancel();
var fut3 = try clock.waitForSlot(3);
errdefer fut3.cancel();
var fut1 = try clock.waitForSlot(1);
errdefer fut1.cancel();

// Advance to slot 3 — should dispatch slot 1 and slot 3, NOT slot 5
clock.advanceAndDispatch(3);
Expand All @@ -693,7 +706,7 @@ test "multiple waiters are dispatched in target-slot order" {
try testing.expectError(error.Aborted, fut5.await(io_handle));
}

test "cancelWait releases WaitState without awaiting" {
test "cancel releases WaitState without awaiting" {
var rt: TestIo = undefined;
try rt.init();
defer rt.deinit();
Expand All @@ -708,9 +721,9 @@ test "cancelWait releases WaitState without awaiting" {
defer clock.deinit();

// Create a waiter for a far-future slot and immediately cancel it.
// testing.allocator will detect a leak if cancelWait fails to free.
// testing.allocator will detect a leak if cancel fails to free.
var fut = try clock.waitForSlot(999);
clock.cancelWait(&fut);
fut.cancel();
}

// ── Real-time tests ──
Expand Down Expand Up @@ -766,6 +779,7 @@ test "real-time: slot events fire with correct timing" {
const start_slot = clock.currentSlotOrGenesis();
const before_ms = nowMsAt(io_handle);
var fut = try clock.waitForSlot(start_slot + 1);
errdefer fut.cancel();
try fut.await(io_handle);
const elapsed = nowMsAt(io_handle) - before_ms;

Expand Down Expand Up @@ -799,6 +813,7 @@ test "real-time: multi-slot advancement delivers ordered events" {

const start_slot = clock.currentSlotOrGenesis();
var fut = try clock.waitForSlot(start_slot + 2);
errdefer fut.cancel();
try fut.await(io_handle);

// At least 2 slot events should have been emitted.
Expand All @@ -809,7 +824,7 @@ test "real-time: multi-slot advancement delivers ordered events" {
}
}

test "real-time: stop returns within chunk window" {
test "real-time: stop+join cancels promptly" {
var rt: TestIo = undefined;
try rt.init();
defer rt.deinit();
Expand All @@ -825,16 +840,16 @@ test "real-time: stop returns within chunk window" {

clock.start();

// Give the loop fiber time to enter its first sleep chunk.
// Give the loop fiber time to enter its sleep.
std.Io.sleep(io_handle, std.Io.Duration.fromMilliseconds(50), .awake) catch {};

const before_ms = nowMsAt(io_handle);
clock.stop();
clock.join();
const elapsed = nowMsAt(io_handle) - before_ms;

// The 500ms chunking means join() should return within ~500ms + overhead,
// NOT after the full 12-second slot duration.
// join() cancels the sleeping future directly, so it should return
// almost immediately — NOT after the full 12-second slot duration.
try testing.expect(elapsed < 1500);
}

Expand Down Expand Up @@ -862,6 +877,7 @@ test "real-time: epoch boundary event fires" {
const start_slot = clock.currentSlotOrGenesis();
// Wait enough slots to guarantee crossing at least one epoch boundary.
var fut = try clock.waitForSlot(start_slot + 3);
errdefer fut.cancel();
try fut.await(io_handle);

try testing.expect(trace.slot_len >= 3);
Expand Down