-
Notifications
You must be signed in to change notification settings - Fork 0
refactor: use Future.cancel, remove 500ms chunking workaround #2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
a70d123
b4e3283
9476de2
edb4e32
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -106,13 +106,12 @@ pub fn stop(self: *EventClock) void { | |
| self.abortAllWaiters(); | ||
| } | ||
|
|
||
| /// WORKAROUND for Zig bug #31307: always await, never cancel sleeping futures. | ||
| /// See: https://codeberg.org/ziglang/zig/issues/31307 | ||
| /// Cancel the loop fiber and wait for it to finish. | ||
| pub fn join(self: *EventClock) void { | ||
| var maybe_future = self.loop_future; | ||
| self.loop_future = null; | ||
| if (maybe_future) |*future| { | ||
| future.await(self.io); | ||
| future.cancel(self.io); | ||
| } | ||
| } | ||
|
Comment on lines
+109
to
116
|
||
|
|
||
|
|
@@ -261,18 +260,24 @@ pub fn msFromSlot(self: *EventClock, slot: Slot, to_ms: ?slot_math.UnixMs) ?i64 | |
|
|
||
| /// Return type from `waitForSlot`. The caller MUST either: | ||
| /// - call `await()` to wait for the target slot and release resources, OR | ||
| /// - call `EventClock.cancelWait()` to abort and release resources, OR | ||
| /// - call `cancel()` to abort and release resources, OR | ||
| /// - call `stop()` on the EventClock and THEN `await()` to get `error.Aborted`. | ||
| /// Dropping a WaitForSlotResult without calling `await` or `cancelWait` leaks | ||
| /// Dropping a WaitForSlotResult without calling `await` or `cancel` leaks | ||
| /// the internal WaitState. | ||
| /// | ||
| /// Idiomatic usage with `errdefer`: | ||
| /// var fut = try ec.waitForSlot(target); | ||
| /// errdefer fut.cancel(); | ||
| /// try fut.await(io); | ||
| pub const WaitForSlotResult = struct { | ||
| inner: std.Io.Future(Error!void), | ||
| state: ?*WaitState, | ||
| clock: ?*EventClock, | ||
|
|
||
| /// Create an immediately-resolved result (no async work needed). | ||
| /// Relies on `std.Io.Future.await` returning `.result` when `.any_future == null`. | ||
| fn immediate(result: Error!void) WaitForSlotResult { | ||
| return .{ .inner = .{ .any_future = null, .result = result }, .state = null }; | ||
| return .{ .inner = .{ .any_future = null, .result = result }, .state = null, .clock = null }; | ||
| } | ||
|
|
||
| pub fn await(self: *WaitForSlotResult, io: std.Io) Error!void { | ||
|
|
@@ -281,8 +286,40 @@ pub const WaitForSlotResult = struct { | |
| // where GCD still holds a reference to the event address after wake. | ||
| if (self.state) |s| s.allocator.destroy(s); | ||
| self.state = null; | ||
| self.clock = null; | ||
| return result; | ||
| } | ||
|
|
||
| /// Abort a pending wait and release its resources. Idempotent — safe | ||
| /// to call on an already-awaited, already-cancelled, or immediate result. | ||
| /// | ||
| /// Typical usage: | ||
| /// var fut = try ec.waitForSlot(target); | ||
| /// errdefer fut.cancel(); | ||
| /// try fut.await(io); | ||
| pub fn cancel(self: *WaitForSlotResult) void { | ||
| const state = self.state orelse return; | ||
| // Remove from waiter queue before freeing, so abortAllWaiters | ||
| // won't dereference the freed state pointer. | ||
| if (self.clock) |clock| { | ||
| for (clock.waiters.items, 0..) |entry, i| { | ||
| if (entry.state == state) { | ||
| _ = clock.waiters.popIndex(i); | ||
| break; | ||
| } | ||
| } | ||
| } | ||
| state.aborted = true; | ||
| state.event.set(state.io); | ||
| // Must await the fiber so it finishes before we free its state. | ||
| // The fiber returns error.Aborted (expected) or {} (already dispatched). | ||
| _ = self.inner.await(state.io) catch |err| { | ||
| std.debug.assert(err == error.Aborted); | ||
| }; | ||
| state.allocator.destroy(state); | ||
| self.state = null; | ||
| self.clock = null; | ||
| } | ||
| }; | ||
|
|
||
| /// Return a future that resolves when the clock reaches `target`. | ||
|
|
@@ -317,36 +354,10 @@ pub fn waitForSlot(self: *EventClock, target: Slot) Error!WaitForSlotResult { | |
| return .{ | ||
| .inner = std.Io.async(self.io, waitForSlotFutureAwait, .{state}), | ||
| .state = state, | ||
| .clock = self, | ||
| }; | ||
| } | ||
|
|
||
| /// Abort a pending wait and release its resources. Use this in error | ||
| /// paths where `WaitForSlotResult.await` cannot be called. The waiter | ||
| /// is removed from the internal queue so `deinit` won't touch freed memory. | ||
| /// Idempotent — safe to call on an already-awaited or immediate result. | ||
| pub fn cancelWait(self: *EventClock, result: *WaitForSlotResult) void { | ||
| if (result.state) |state| { | ||
| // Remove from waiter queue before freeing, so abortAllWaiters | ||
| // won't dereference the freed state pointer. | ||
| for (self.waiters.items, 0..) |entry, i| { | ||
| if (entry.state == state) { | ||
| _ = self.waiters.popIndex(i); | ||
| break; | ||
| } | ||
| } | ||
| state.aborted = true; | ||
| state.event.set(state.io); | ||
| } | ||
| // Must await the fiber so it finishes before we free its state. | ||
| // (Cannot use Future.cancel due to Zig bug #31307.) | ||
| // The fiber returns error.Aborted (expected) or {} (already dispatched). | ||
| _ = result.inner.await(self.io) catch |err| { | ||
| std.debug.assert(err == error.Aborted); | ||
| }; | ||
| if (result.state) |s| s.allocator.destroy(s); | ||
| result.state = null; | ||
| } | ||
|
|
||
| // ── Private ── | ||
|
|
||
| /// Ensure event-clock state is caught up to wall-clock time. | ||
|
|
@@ -438,14 +449,10 @@ fn runAutoLoop(self: *EventClock) void { | |
| self.stop(); | ||
| break; | ||
| }; | ||
| // Sleep in short chunks so we can check `stopped` promptly. | ||
| // Without this, join() must wait for the full sleep to finish | ||
| // (we use await, not cancel, due to Zig bug #31307). | ||
| const chunk_ms = @min(next_ms, 500); | ||
| const sleep_ms = std.math.cast(i64, @max(@as(u64, 1), chunk_ms)) orelse std.math.maxInt(i64); | ||
|
|
||
| // Sleep failure (e.g., I/O shutdown, interrupt) is transient — safe to | ||
| // retry from the top of the loop, which will re-check `stopped` first. | ||
| const sleep_ms = std.math.cast(i64, @max(@as(u64, 1), next_ms)) orelse std.math.maxInt(i64); | ||
|
|
||
| // Sleep failure (e.g., cancel from join()) is expected — | ||
| // re-check `stopped` flag before continuing. | ||
| std.Io.sleep( | ||
| self.io, | ||
| std.Io.Duration.fromMilliseconds(sleep_ms), | ||
|
|
@@ -548,6 +555,7 @@ test "lifecycle: init -> register -> start -> receive events -> stop" { | |
|
|
||
| const start_slot = clock.currentSlotOrGenesis(); | ||
| var fut = try clock.waitForSlot(start_slot + 1); | ||
| errdefer fut.cancel(); | ||
| try fut.await(io_handle); | ||
|
|
||
| try testing.expect(trace.slot_len > 0); | ||
|
|
@@ -570,6 +578,7 @@ test "waitForSlot resolves immediately when at target" { | |
|
|
||
| const current = clock.currentSlotOrGenesis(); | ||
| var fut = try clock.waitForSlot(current); | ||
| errdefer fut.cancel(); | ||
| try fut.await(io_handle); | ||
| } | ||
|
|
||
|
|
@@ -588,6 +597,7 @@ test "waitForSlot returns aborted on stop" { | |
| defer clock.deinit(); | ||
|
|
||
| var fut = try clock.waitForSlot(100); | ||
| errdefer fut.cancel(); | ||
| clock.stop(); | ||
| try testing.expectError(error.Aborted, fut.await(io_handle)); | ||
| } | ||
|
|
@@ -679,8 +689,11 @@ test "multiple waiters are dispatched in target-slot order" { | |
|
|
||
| // Register waiters for slots 5, 3, 1 (out of order) | ||
| var fut5 = try clock.waitForSlot(5); | ||
| errdefer fut5.cancel(); | ||
| var fut3 = try clock.waitForSlot(3); | ||
| errdefer fut3.cancel(); | ||
| var fut1 = try clock.waitForSlot(1); | ||
| errdefer fut1.cancel(); | ||
|
|
||
| // Advance to slot 3 — should dispatch slot 1 and slot 3, NOT slot 5 | ||
| clock.advanceAndDispatch(3); | ||
|
|
@@ -693,7 +706,7 @@ test "multiple waiters are dispatched in target-slot order" { | |
| try testing.expectError(error.Aborted, fut5.await(io_handle)); | ||
| } | ||
|
|
||
| test "cancelWait releases WaitState without awaiting" { | ||
| test "cancel releases WaitState without awaiting" { | ||
| var rt: TestIo = undefined; | ||
| try rt.init(); | ||
| defer rt.deinit(); | ||
|
|
@@ -708,9 +721,9 @@ test "cancelWait releases WaitState without awaiting" { | |
| defer clock.deinit(); | ||
|
|
||
| // Create a waiter for a far-future slot and immediately cancel it. | ||
| // testing.allocator will detect a leak if cancelWait fails to free. | ||
| // testing.allocator will detect a leak if cancel fails to free. | ||
| var fut = try clock.waitForSlot(999); | ||
| clock.cancelWait(&fut); | ||
| fut.cancel(); | ||
| } | ||
|
|
||
| // ── Real-time tests ── | ||
|
|
@@ -766,6 +779,7 @@ test "real-time: slot events fire with correct timing" { | |
| const start_slot = clock.currentSlotOrGenesis(); | ||
| const before_ms = nowMsAt(io_handle); | ||
| var fut = try clock.waitForSlot(start_slot + 1); | ||
| errdefer fut.cancel(); | ||
| try fut.await(io_handle); | ||
| const elapsed = nowMsAt(io_handle) - before_ms; | ||
|
|
||
|
|
@@ -799,6 +813,7 @@ test "real-time: multi-slot advancement delivers ordered events" { | |
|
|
||
| const start_slot = clock.currentSlotOrGenesis(); | ||
| var fut = try clock.waitForSlot(start_slot + 2); | ||
| errdefer fut.cancel(); | ||
| try fut.await(io_handle); | ||
|
|
||
| // At least 2 slot events should have been emitted. | ||
|
|
@@ -809,7 +824,7 @@ test "real-time: multi-slot advancement delivers ordered events" { | |
| } | ||
| } | ||
|
|
||
| test "real-time: stop returns within chunk window" { | ||
| test "real-time: stop+join cancels promptly" { | ||
| var rt: TestIo = undefined; | ||
| try rt.init(); | ||
| defer rt.deinit(); | ||
|
|
@@ -825,16 +840,16 @@ test "real-time: stop returns within chunk window" { | |
|
|
||
| clock.start(); | ||
|
|
||
| // Give the loop fiber time to enter its first sleep chunk. | ||
| // Give the loop fiber time to enter its sleep. | ||
| std.Io.sleep(io_handle, std.Io.Duration.fromMilliseconds(50), .awake) catch {}; | ||
|
|
||
| const before_ms = nowMsAt(io_handle); | ||
| clock.stop(); | ||
| clock.join(); | ||
| const elapsed = nowMsAt(io_handle) - before_ms; | ||
|
|
||
| // The 500ms chunking means join() should return within ~500ms + overhead, | ||
| // NOT after the full 12-second slot duration. | ||
| // join() cancels the sleeping future directly, so it should return | ||
| // almost immediately — NOT after the full 12-second slot duration. | ||
| try testing.expect(elapsed < 1500); | ||
| } | ||
|
|
||
|
|
@@ -862,6 +877,7 @@ test "real-time: epoch boundary event fires" { | |
| const start_slot = clock.currentSlotOrGenesis(); | ||
| // Wait enough slots to guarantee crossing at least one epoch boundary. | ||
| var fut = try clock.waitForSlot(start_slot + 3); | ||
| errdefer fut.cancel(); | ||
| try fut.await(io_handle); | ||
|
|
||
| try testing.expect(trace.slot_len >= 3); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CI is now Linux-only (
runs-on: ubuntu-latest) which removes macOS coverage for this repo. If this is intended to be temporary for validatingFuture.cancelon io_uring, consider keeping macOS in the workflow but marking itcontinue-on-erroror gating it behind a workflow input so platform regressions aren’t silently missed.