Skip to content

Commit 90a18c2

Browse files
committed
refactor: migrate to Zig 0.16.0-dev.2676 PriorityQueue API
- Rename PriorityQueue methods: init→initContext, add→push, remove→pop, removeIndex→popIndex, deinit now takes allocator - Add self.* = undefined in EventClock.deinit for memory poisoning - Split triple compound condition in SlotClock AdvanceIterator - Update workaround comments for macOS GCD bug #31307
1 parent 35eb80a commit 90a18c2

2 files changed

Lines changed: 28 additions & 26 deletions

File tree

src/EventClock.zig

Lines changed: 22 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ pub fn init(self: *EventClock, allocator: Allocator, config: Config, io_handle:
8888
.allocator = allocator,
8989
.io = io_handle,
9090
.clock = undefined,
91-
.waiters = WaiterQueue.init(allocator, {}),
91+
.waiters = WaiterQueue.initContext({}),
9292
};
9393
self.clock = SlotClock.init(config, TimeSource.fromIo(&self.io)) catch return error.InvalidConfig;
9494
}
@@ -106,8 +106,8 @@ pub fn stop(self: *EventClock) void {
106106
self.abortAllWaiters();
107107
}
108108

109-
/// WORKAROUND for Zig bug #31307: always await, never cancel sleeping futures.
110-
/// See: https://codeberg.org/ziglang/zig/issues/31307
109+
/// Await loop fiber completion. Uses `await` (not `cancel`) because
110+
/// `Future.cancel` segfaults on macOS GCD (Zig bug #31307).
111111
pub fn join(self: *EventClock) void {
112112
var maybe_future = self.loop_future;
113113
self.loop_future = null;
@@ -124,7 +124,8 @@ pub fn deinit(self: *EventClock) void {
124124
self.epoch_snapshot.deinit(self.allocator);
125125
self.slot_listeners.deinit(self.allocator);
126126
self.epoch_listeners.deinit(self.allocator);
127-
self.waiters.deinit();
127+
self.waiters.deinit(self.allocator);
128+
self.* = undefined;
128129
}
129130

130131
// ── Listener API ──
@@ -276,8 +277,8 @@ pub const WaitForSlotResult = struct {
276277

277278
pub fn await(self: *WaitForSlotResult, io: std.Io) Error!void {
278279
const result = self.inner.await(io);
279-
// Free AFTER await returns — workaround for Zig futex use-after-free
280-
// where GCD still holds a reference to the event address after wake.
280+
// Free AFTER await returns — the OS futex implementation may still
281+
// reference the Event address briefly after wake.
281282
if (self.state) |s| s.allocator.destroy(s);
282283
self.state = null;
283284
return result;
@@ -310,7 +311,7 @@ pub fn waitForSlot(self: *EventClock, target: Slot) Error!WaitForSlotResult {
310311
self.allocator.destroy(state);
311312
return WaitForSlotResult.immediate(error.Aborted);
312313
}
313-
self.waiters.add(.{ .target = target, .state = state }) catch return error.OutOfMemory;
314+
self.waiters.push(self.allocator, .{ .target = target, .state = state }) catch return error.OutOfMemory;
314315
self.dispatchWaiters(self.clock.current_slot);
315316

316317
return .{
@@ -329,15 +330,14 @@ pub fn cancelWait(self: *EventClock, result: *WaitForSlotResult) void {
329330
// won't dereference the freed state pointer.
330331
for (self.waiters.items, 0..) |entry, i| {
331332
if (entry.state == state) {
332-
_ = self.waiters.removeIndex(i);
333+
_ = self.waiters.popIndex(i);
333334
break;
334335
}
335336
}
336337
state.aborted = true;
337338
state.event.set(state.io);
338339
}
339-
// Must await the fiber so it finishes before we free its state.
340-
// (Cannot use Future.cancel due to Zig bug #31307.)
340+
// Await the fiber so it finishes before we free its state.
341341
// The fiber returns error.Aborted (expected) or {} (already dispatched).
342342
_ = result.inner.await(self.io) catch |err| {
343343
std.debug.assert(err == error.Aborted);
@@ -395,14 +395,14 @@ fn dispatchWaiters(self: *EventClock, current_slot: ?Slot) void {
395395
const slot = current_slot orelse return;
396396
while (self.waiters.peek()) |head| {
397397
if (head.target > slot) break;
398-
const waiter = self.waiters.remove();
398+
const waiter = self.waiters.pop().?;
399399
waiter.state.aborted = false;
400400
waiter.state.event.set(waiter.state.io);
401401
}
402402
}
403403

404404
fn abortAllWaiters(self: *EventClock) void {
405-
while (self.waiters.removeOrNull()) |waiter| {
405+
while (self.waiters.pop()) |waiter| {
406406
waiter.state.aborted = true;
407407
waiter.state.event.set(waiter.state.io);
408408
}
@@ -438,19 +438,17 @@ fn runAutoLoop(self: *EventClock) void {
438438
break;
439439
};
440440
// Sleep in short chunks so we can check `stopped` promptly.
441-
// Without this, join() must wait for the full sleep to finish
442-
// (we use await, not cancel, due to Zig bug #31307).
441+
// We use await (not cancel) in join() because Future.cancel
442+
// segfaults on macOS GCD (Zig bug #31307).
443443
const chunk_ms = @min(next_ms, 500);
444444
const sleep_ms = std.math.cast(i64, @max(@as(u64, 1), chunk_ms)) orelse std.math.maxInt(i64);
445445

446-
// Sleep failure (e.g., I/O shutdown, interrupt) is transient — safe to
447-
// retry from the top of the loop, which will re-check `stopped` first.
448446
std.Io.sleep(
449447
self.io,
450448
std.Io.Duration.fromMilliseconds(sleep_ms),
451449
.awake,
452-
) catch |err| {
453-
std.log.debug("EventClock: sleep failed ({s}), retrying", .{@errorName(err)});
450+
) catch {
451+
// Transient error — re-check stopped flag.
454452
continue;
455453
};
456454

@@ -465,8 +463,8 @@ fn runAutoLoop(self: *EventClock) void {
465463

466464
fn waitForSlotFutureAwait(state: *WaitState) Error!void {
467465
// NOTE: Do NOT free state here. The caller (WaitForSlotResult.await) frees
468-
// it AFTER this future completes — workaround for Zig futex use-after-free
469-
// where GCD still holds a reference to the event address after wake.
466+
// it AFTER this future completes, ensuring the Event address remains valid
467+
// until the futex wake is fully processed by the OS.
470468
state.event.waitUncancelable(state.io);
471469
if (state.aborted) return error.Aborted;
472470
}
@@ -808,7 +806,7 @@ test "real-time: multi-slot advancement delivers ordered events" {
808806
}
809807
}
810808

811-
test "real-time: stop returns within chunk window" {
809+
test "real-time: stop+join returns within chunk window" {
812810
var rt: TestIo = undefined;
813811
try rt.init();
814812
defer rt.deinit();
@@ -824,16 +822,16 @@ test "real-time: stop returns within chunk window" {
824822

825823
clock.start();
826824

827-
// Give the loop fiber time to enter its first sleep chunk.
825+
// Give the loop fiber time to enter its sleep.
828826
std.Io.sleep(io_handle, std.Io.Duration.fromMilliseconds(50), .awake) catch {};
829827

830828
const before_ms = nowMsAt(io_handle);
831829
clock.stop();
832830
clock.join();
833831
const elapsed = nowMsAt(io_handle) - before_ms;
834832

835-
// The 500ms chunking means join() should return within ~500ms + overhead,
836-
// NOT after the full 12-second slot duration.
833+
// The loop sleeps in 500ms chunks so it checks `stopped` promptly —
834+
// well under the full 12-second slot duration.
837835
try testing.expect(elapsed < 1500);
838836
}
839837

src/SlotClock.zig

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,12 @@ pub const AdvanceIterator = struct {
5252
// Check epoch boundary — epochAtSlot returns ?Epoch
5353
const prev_epoch = slot_math.epochAtSlot(self.clock.config, cur);
5454
const new_epoch = slot_math.epochAtSlot(self.clock.config, next_slot);
55-
if (prev_epoch != null and new_epoch != null and prev_epoch.? < new_epoch.?) {
56-
self.pending_epoch = new_epoch.?;
55+
if (prev_epoch) |prev_ep| {
56+
if (new_epoch) |new_ep| {
57+
if (prev_ep < new_ep) {
58+
self.pending_epoch = new_ep;
59+
}
60+
}
5761
}
5862

5963
return .{ .slot = next_slot };

0 commit comments

Comments
 (0)