Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions channels/src/main/java/com/softwaremill/jox/Channel.java
Original file line number Diff line number Diff line change
Expand Up @@ -1069,6 +1069,45 @@ private boolean hasValueToReceive(Segment segment, int i) {
}
}

// *****************
// Channel state API
// *****************

/**
* Returns a best-effort estimate of the number of elements in the channel.
*
* <p>This is a non-blocking O(1) operation that computes the difference between the number of
* send and receive operations initiated on this channel. This approximates the number of
* buffered values, but may also include in-flight or waiting operations (e.g., senders blocked
* because the channel is at capacity).
*
* <p><b>Important caveats:</b>
*
* <ul>
* <li>The returned value is immediately outdated due to concurrent operations
* <li>For buffered channels at capacity, this includes waiting senders beyond the buffer
* <li>For rendezvous channels, this typically returns 0
* <li>Should NOT be used for synchronization or control flow logic
* <li>Best suited for monitoring, metrics, and debugging
* </ul>
*
* <p><b>Anti-patterns (do NOT do this):</b>
*
* <ul>
* <li>{@code if (ch.estimateSize() > 0) ch.receive()} - race condition
* <li>{@code while (ch.estimateSize() < capacity) ch.send(x)} - unreliable
* </ul>
*
* @return A best-effort estimate of elements in the channel, based on the send/receive
* operation differential. Always &gt;= 0. The value is immediately stale and should only be
* used for observability purposes.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'd try to make these docs a bit more concise, as it repeats some information (no need to do that). E.g. it's enough to say that this is best used for monitoring, no need to enumerate what monitoring actually means (dashbords, exporting metrics etc.)

On the other hand, I think one important info might be missing - that the count might be inflated, as BROKEN cells (I think that was the name) might be counted as well - when there's a concurrency conflict when sending & receiving. But maybe not ... it also might depend on the ordering of reading the counters?

*/
public long estimateSize() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this I'm having second thoughts if we should have this at all. And that's because the counter might not only be imprecise due to concurrency, but also inflated due to e.g. interrupted sends. If the channel is full, and you'll try to send, interrupt that, send, interrupt etc., each such operation inflates the counter. So such an "estimate" might be really misleading.

Maybe we should just document, why estimating size here is not possible?

Other options would include maintaining interrupt counters, but that would impact the core performance, just to implement this method, so probably not worth it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably right, the current approach is a best-effort estimate not to impact on performance but as you said estimates like this can be misleading rather than just imprecise.

For reference Kotlin's kotlinx.coroutines doesn't expose any size method on Channel. Their stance is that channels are communication primitives, not inspectable containers, and the lock-free segment design makes reliable size estimation fundamentally impractical.

I agree - let's drop the feature and document why estimating size is not possible?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed :) Maybe an ADR?

long s = getSendersCounter(sendersAndClosedFlag);
long r = receivers;
Comment on lines +1105 to +1107
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

estimateSize() reads receivers directly while sendersAndClosedFlag is read and decoded via getSendersCounter(...). To avoid torn/stale reads on 64-bit counters and to keep the memory semantics consistent, consider reading receivers using the same mechanism used elsewhere in Channel for counter reads (e.g., VarHandle/AtomicLong accessor), or ensure receivers is volatile and read as such here.

Suggested change
public long estimateSize() {
long s = getSendersCounter(sendersAndClosedFlag);
long r = receivers;
// VarHandle used to read the `receivers` counter with proper (volatile) memory semantics.
private static final VarHandle RECEIVERS_HANDLE = initReceiversHandle();
private static VarHandle initReceiversHandle() {
try {
return MethodHandles.lookup().findVarHandle(Channel.class, "receivers", long.class);
} catch (ReflectiveOperationException e) {
throw new ExceptionInInitializerError(e);
}
}
public long estimateSize() {
long s = getSendersCounter(sendersAndClosedFlag);
long r = (long) RECEIVERS_HANDLE.getVolatile(this);

Copilot uses AI. Check for mistakes.
return Math.max(0, s - r);
}

// **************
// Select clauses
// **************
Expand Down
200 changes: 200 additions & 0 deletions channels/src/test/java/com/softwaremill/jox/ChannelTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,204 @@ void testNullItem() throws InterruptedException {
Channel<Object> ch = Channel.newBufferedChannel(4);
assertThrows(NullPointerException.class, () -> ch.send(null));
}

@Test
void testEstimateSize_bufferedChannel_empty() {
Channel<Integer> ch = Channel.newBufferedChannel(10);
assertEquals(0, ch.estimateSize());
}

@Test
void testEstimateSize_bufferedChannel_withValues() throws InterruptedException {
Channel<Integer> ch = Channel.newBufferedChannel(10);
assertEquals(0, ch.estimateSize());

ch.send(1);
ch.send(2);
ch.send(3);
assertEquals(3, ch.estimateSize());

ch.receive();
assertEquals(2, ch.estimateSize());

ch.receive();
ch.receive();
assertEquals(0, ch.estimateSize());
}

@Test
void testEstimateSize_bufferedChannel_afterClose() throws InterruptedException {
Channel<Integer> ch = Channel.newBufferedChannel(10);
ch.send(1);
ch.send(2);
ch.send(3);

ch.done();
assertEquals(3, ch.estimateSize());

ch.receive();
assertEquals(2, ch.estimateSize());
}

@Test
void testEstimateSize_unlimitedChannel_growth() throws InterruptedException {
Channel<Integer> ch = Channel.newUnlimitedChannel();

for (int i = 0; i < 1000; i++) {
ch.send(i);
}

assertEquals(1000, ch.estimateSize());

// Receive some and check size decreases
for (int i = 0; i < 500; i++) {
ch.receive();
}

assertEquals(500, ch.estimateSize());
}

@Test
void testEstimateSize_rendezvousChannel() throws InterruptedException, ExecutionException {
Channel<Integer> ch = Channel.newRendezvousChannel();
assertEquals(0, ch.estimateSize());

scoped(
scope -> {
// Start a sender in background
forkVoid(
scope,
() -> {
Thread.sleep(10);
ch.send(1);
});

// Rendezvous should have 0 or very small estimate
Thread.sleep(20);
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests rely on small Thread.sleep(...) timings for cross-thread coordination, which is prone to flakiness under load/CI variance. Prefer deterministic synchronization (e.g., CountDownLatch, Phaser, or a barrier) to wait until a sender/receiver is actually blocked or has started, and only then assert on estimateSize().

Copilot uses AI. Check for mistakes.
long estimate = ch.estimateSize();
assertTrue(
estimate <= 1,
"Rendezvous channel should have small estimate, got " + estimate);

// Receive the value
ch.receive();
assertEquals(0, ch.estimateSize());
});

// Receivers waiting before senders — estimate should never be negative
Channel<Integer> ch2 = Channel.newRendezvousChannel();
scoped(
scope -> {
// Start receivers before senders
for (int i = 0; i < 5; i++) {
forkVoid(
scope,
() -> {
Thread.sleep(10);
ch2.receive();
});
}

// Even with waiting receivers, estimate should be >= 0
Thread.sleep(20);
assertEquals(0, ch2.estimateSize());

// Send values to waiting receivers
for (int i = 0; i < 5; i++) {
ch2.send(i);
}

assertEquals(0, ch2.estimateSize());
});
}

@Test
void testEstimateSize_concurrentModification() throws InterruptedException, ExecutionException {
Channel<Integer> ch = Channel.newBufferedChannel(1000);

scoped(
scope -> {
// 10 senders
for (int i = 0; i < 10; i++) {
forkVoid(
scope,
() -> {
for (int j = 0; j < 100; j++) {
ch.send(j);
}
});
}

// 5 receivers
for (int i = 0; i < 5; i++) {
forkVoid(
scope,
() -> {
for (int j = 0; j < 50; j++) {
ch.receive();
}
});
}

// Check estimate is within bounds during concurrent operations
Thread.sleep(50);
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests rely on small Thread.sleep(...) timings for cross-thread coordination, which is prone to flakiness under load/CI variance. Prefer deterministic synchronization (e.g., CountDownLatch, Phaser, or a barrier) to wait until a sender/receiver is actually blocked or has started, and only then assert on estimateSize().

Copilot uses AI. Check for mistakes.
long estimate = ch.estimateSize();
assertTrue(estimate >= 0, "Estimate should be non-negative, got " + estimate);
assertTrue(
estimate <= 1000,
"Estimate should not exceed capacity significantly, got " + estimate);
});

// After all operations, estimate should be around 750 (1000 sent - 250 received)
long finalEstimate = ch.estimateSize();
assertTrue(
finalEstimate >= 700,
"Expected around 750 items after concurrent ops, got " + finalEstimate);
assertTrue(
finalEstimate <= 800,
"Expected around 750 items after concurrent ops, got " + finalEstimate);
}

@Test
void testEstimateSize_afterError() throws InterruptedException {
Channel<Integer> ch = Channel.newBufferedChannel(10);
ch.send(1);
ch.send(2);

ch.error(new RuntimeException("test error"));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, after an error, with no concurrency, shouldn't it be exactly 0?


// Estimate should still work after error — 2 sends, 0 receives
assertEquals(2, ch.estimateSize());
}

@Test
void testEstimateSize_bufferedChannel_waitingSendersBeyondCapacity()
throws InterruptedException, ExecutionException {
Channel<Integer> ch = Channel.newBufferedChannel(2);
ch.send(1);
ch.send(2);
// Buffer is now full, estimate should reflect 2 buffered values
assertEquals(2, ch.estimateSize());

scoped(
scope -> {
// Start a sender that will block because the buffer is full
forkVoid(scope, () -> ch.send(3));

// Give the blocked sender time to increment the senders counter
Thread.sleep(50);
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests rely on small Thread.sleep(...) timings for cross-thread coordination, which is prone to flakiness under load/CI variance. Prefer deterministic synchronization (e.g., CountDownLatch, Phaser, or a barrier) to wait until a sender/receiver is actually blocked or has started, and only then assert on estimateSize().

Copilot uses AI. Check for mistakes.

// Estimate includes the waiting sender: documented behavior
long estimate = ch.estimateSize();
assertTrue(
estimate >= 2,
"Expected at least 2 (buffered values), got " + estimate);
assertTrue(
estimate <= 3,
"Expected at most 3 (buffered + waiting sender), got " + estimate);

// Receive one to unblock the waiting sender
ch.receive();
});
}
}
Loading