-
Notifications
You must be signed in to change notification settings - Fork 16
Add Channel.estimateSize() for monitoring and observability #254
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -1069,6 +1069,45 @@ private boolean hasValueToReceive(Segment segment, int i) { | |||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| // ***************** | ||||||||||||||||||||||||||||||||||||||
| // Channel state API | ||||||||||||||||||||||||||||||||||||||
| // ***************** | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||||||
| * Returns a best-effort estimate of the number of elements in the channel. | ||||||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||||||
| * <p>This is a non-blocking O(1) operation that computes the difference between the number of | ||||||||||||||||||||||||||||||||||||||
| * send and receive operations initiated on this channel. This approximates the number of | ||||||||||||||||||||||||||||||||||||||
| * buffered values, but may also include in-flight or waiting operations (e.g., senders blocked | ||||||||||||||||||||||||||||||||||||||
| * because the channel is at capacity). | ||||||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||||||
| * <p><b>Important caveats:</b> | ||||||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||||||
| * <ul> | ||||||||||||||||||||||||||||||||||||||
| * <li>The returned value is immediately outdated due to concurrent operations | ||||||||||||||||||||||||||||||||||||||
| * <li>For buffered channels at capacity, this includes waiting senders beyond the buffer | ||||||||||||||||||||||||||||||||||||||
| * <li>For rendezvous channels, this typically returns 0 | ||||||||||||||||||||||||||||||||||||||
| * <li>Should NOT be used for synchronization or control flow logic | ||||||||||||||||||||||||||||||||||||||
| * <li>Best suited for monitoring, metrics, and debugging | ||||||||||||||||||||||||||||||||||||||
| * </ul> | ||||||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||||||
| * <p><b>Anti-patterns (do NOT do this):</b> | ||||||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||||||
| * <ul> | ||||||||||||||||||||||||||||||||||||||
| * <li>{@code if (ch.estimateSize() > 0) ch.receive()} - race condition | ||||||||||||||||||||||||||||||||||||||
| * <li>{@code while (ch.estimateSize() < capacity) ch.send(x)} - unreliable | ||||||||||||||||||||||||||||||||||||||
| * </ul> | ||||||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||||||
| * @return A best-effort estimate of elements in the channel, based on the send/receive | ||||||||||||||||||||||||||||||||||||||
| * operation differential. Always >= 0. The value is immediately stale and should only be | ||||||||||||||||||||||||||||||||||||||
| * used for observability purposes. | ||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||
| public long estimateSize() { | ||||||||||||||||||||||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thinking about this I'm having second thoughts if we should have this at all. And that's because the counter might not only be imprecise due to concurrency, but also inflated due to e.g. interrupted sends. If the channel is full, and you'll try to send, interrupt that, send, interrupt etc., each such operation inflates the counter. So such an "estimate" might be really misleading. Maybe we should just document, why estimating size here is not possible? Other options would include maintaining interrupt counters, but that would impact the core performance, just to implement this method, so probably not worth it.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably right, the current approach is a best-effort estimate not to impact on performance but as you said estimates like this can be misleading rather than just imprecise. For reference Kotlin's kotlinx.coroutines doesn't expose any size method on Channel. Their stance is that channels are communication primitives, not inspectable containers, and the lock-free segment design makes reliable size estimation fundamentally impractical. I agree - let's drop the feature and document why estimating size is not possible?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed :) Maybe an ADR? |
||||||||||||||||||||||||||||||||||||||
| long s = getSendersCounter(sendersAndClosedFlag); | ||||||||||||||||||||||||||||||||||||||
| long r = receivers; | ||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+1105
to
+1107
|
||||||||||||||||||||||||||||||||||||||
| public long estimateSize() { | |
| long s = getSendersCounter(sendersAndClosedFlag); | |
| long r = receivers; | |
| // VarHandle used to read the `receivers` counter with proper (volatile) memory semantics. | |
| private static final VarHandle RECEIVERS_HANDLE = initReceiversHandle(); | |
| private static VarHandle initReceiversHandle() { | |
| try { | |
| return MethodHandles.lookup().findVarHandle(Channel.class, "receivers", long.class); | |
| } catch (ReflectiveOperationException e) { | |
| throw new ExceptionInInitializerError(e); | |
| } | |
| } | |
| public long estimateSize() { | |
| long s = getSendersCounter(sendersAndClosedFlag); | |
| long r = (long) RECEIVERS_HANDLE.getVolatile(this); |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -81,4 +81,204 @@ void testNullItem() throws InterruptedException { | |
| Channel<Object> ch = Channel.newBufferedChannel(4); | ||
| assertThrows(NullPointerException.class, () -> ch.send(null)); | ||
| } | ||
|
|
||
| @Test | ||
| void testEstimateSize_bufferedChannel_empty() { | ||
| Channel<Integer> ch = Channel.newBufferedChannel(10); | ||
| assertEquals(0, ch.estimateSize()); | ||
| } | ||
|
|
||
| @Test | ||
| void testEstimateSize_bufferedChannel_withValues() throws InterruptedException { | ||
| Channel<Integer> ch = Channel.newBufferedChannel(10); | ||
| assertEquals(0, ch.estimateSize()); | ||
|
|
||
| ch.send(1); | ||
| ch.send(2); | ||
| ch.send(3); | ||
| assertEquals(3, ch.estimateSize()); | ||
|
|
||
| ch.receive(); | ||
| assertEquals(2, ch.estimateSize()); | ||
|
|
||
| ch.receive(); | ||
| ch.receive(); | ||
| assertEquals(0, ch.estimateSize()); | ||
| } | ||
|
|
||
| @Test | ||
| void testEstimateSize_bufferedChannel_afterClose() throws InterruptedException { | ||
| Channel<Integer> ch = Channel.newBufferedChannel(10); | ||
| ch.send(1); | ||
| ch.send(2); | ||
| ch.send(3); | ||
|
|
||
| ch.done(); | ||
| assertEquals(3, ch.estimateSize()); | ||
|
|
||
| ch.receive(); | ||
| assertEquals(2, ch.estimateSize()); | ||
| } | ||
|
|
||
| @Test | ||
| void testEstimateSize_unlimitedChannel_growth() throws InterruptedException { | ||
| Channel<Integer> ch = Channel.newUnlimitedChannel(); | ||
|
|
||
| for (int i = 0; i < 1000; i++) { | ||
| ch.send(i); | ||
| } | ||
|
|
||
| assertEquals(1000, ch.estimateSize()); | ||
|
|
||
| // Receive some and check size decreases | ||
| for (int i = 0; i < 500; i++) { | ||
| ch.receive(); | ||
| } | ||
|
|
||
| assertEquals(500, ch.estimateSize()); | ||
| } | ||
|
|
||
| @Test | ||
| void testEstimateSize_rendezvousChannel() throws InterruptedException, ExecutionException { | ||
| Channel<Integer> ch = Channel.newRendezvousChannel(); | ||
| assertEquals(0, ch.estimateSize()); | ||
|
|
||
| scoped( | ||
| scope -> { | ||
| // Start a sender in background | ||
| forkVoid( | ||
| scope, | ||
| () -> { | ||
| Thread.sleep(10); | ||
| ch.send(1); | ||
| }); | ||
|
|
||
| // Rendezvous should have 0 or very small estimate | ||
| Thread.sleep(20); | ||
|
||
| long estimate = ch.estimateSize(); | ||
| assertTrue( | ||
| estimate <= 1, | ||
| "Rendezvous channel should have small estimate, got " + estimate); | ||
|
|
||
| // Receive the value | ||
| ch.receive(); | ||
| assertEquals(0, ch.estimateSize()); | ||
| }); | ||
|
|
||
| // Receivers waiting before senders — estimate should never be negative | ||
| Channel<Integer> ch2 = Channel.newRendezvousChannel(); | ||
| scoped( | ||
| scope -> { | ||
| // Start receivers before senders | ||
| for (int i = 0; i < 5; i++) { | ||
| forkVoid( | ||
| scope, | ||
| () -> { | ||
| Thread.sleep(10); | ||
| ch2.receive(); | ||
| }); | ||
| } | ||
|
|
||
| // Even with waiting receivers, estimate should be >= 0 | ||
| Thread.sleep(20); | ||
| assertEquals(0, ch2.estimateSize()); | ||
|
|
||
| // Send values to waiting receivers | ||
| for (int i = 0; i < 5; i++) { | ||
| ch2.send(i); | ||
| } | ||
|
|
||
| assertEquals(0, ch2.estimateSize()); | ||
| }); | ||
| } | ||
|
|
||
| @Test | ||
| void testEstimateSize_concurrentModification() throws InterruptedException, ExecutionException { | ||
| Channel<Integer> ch = Channel.newBufferedChannel(1000); | ||
|
|
||
| scoped( | ||
| scope -> { | ||
| // 10 senders | ||
| for (int i = 0; i < 10; i++) { | ||
| forkVoid( | ||
| scope, | ||
| () -> { | ||
| for (int j = 0; j < 100; j++) { | ||
| ch.send(j); | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| // 5 receivers | ||
| for (int i = 0; i < 5; i++) { | ||
| forkVoid( | ||
| scope, | ||
| () -> { | ||
| for (int j = 0; j < 50; j++) { | ||
| ch.receive(); | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| // Check estimate is within bounds during concurrent operations | ||
| Thread.sleep(50); | ||
|
||
| long estimate = ch.estimateSize(); | ||
| assertTrue(estimate >= 0, "Estimate should be non-negative, got " + estimate); | ||
| assertTrue( | ||
| estimate <= 1000, | ||
| "Estimate should not exceed capacity significantly, got " + estimate); | ||
| }); | ||
|
|
||
| // After all operations, estimate should be around 750 (1000 sent - 250 received) | ||
| long finalEstimate = ch.estimateSize(); | ||
| assertTrue( | ||
| finalEstimate >= 700, | ||
| "Expected around 750 items after concurrent ops, got " + finalEstimate); | ||
| assertTrue( | ||
| finalEstimate <= 800, | ||
| "Expected around 750 items after concurrent ops, got " + finalEstimate); | ||
| } | ||
|
|
||
| @Test | ||
| void testEstimateSize_afterError() throws InterruptedException { | ||
| Channel<Integer> ch = Channel.newBufferedChannel(10); | ||
| ch.send(1); | ||
| ch.send(2); | ||
|
|
||
| ch.error(new RuntimeException("test error")); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hm, after an error, with no concurrency, shouldn't it be exactly 0? |
||
|
|
||
| // Estimate should still work after error — 2 sends, 0 receives | ||
| assertEquals(2, ch.estimateSize()); | ||
| } | ||
|
|
||
| @Test | ||
| void testEstimateSize_bufferedChannel_waitingSendersBeyondCapacity() | ||
| throws InterruptedException, ExecutionException { | ||
| Channel<Integer> ch = Channel.newBufferedChannel(2); | ||
| ch.send(1); | ||
| ch.send(2); | ||
| // Buffer is now full, estimate should reflect 2 buffered values | ||
| assertEquals(2, ch.estimateSize()); | ||
|
|
||
| scoped( | ||
| scope -> { | ||
| // Start a sender that will block because the buffer is full | ||
| forkVoid(scope, () -> ch.send(3)); | ||
|
|
||
| // Give the blocked sender time to increment the senders counter | ||
| Thread.sleep(50); | ||
|
||
|
|
||
| // Estimate includes the waiting sender: documented behavior | ||
| long estimate = ch.estimateSize(); | ||
| assertTrue( | ||
| estimate >= 2, | ||
| "Expected at least 2 (buffered values), got " + estimate); | ||
| assertTrue( | ||
| estimate <= 3, | ||
| "Expected at most 3 (buffered + waiting sender), got " + estimate); | ||
|
|
||
| // Receive one to unblock the waiting sender | ||
| ch.receive(); | ||
| }); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I'd try to make these docs a bit more concise, as it repeats some information (no need to do that). E.g. it's enough to say that this is best used for monitoring, no need to enumerate what monitoring actually means (dashbords, exporting metrics etc.)
On the other hand, I think one important info might be missing - that the count might be inflated, as
BROKENcells (I think that was the name) might be counted as well - when there's a concurrency conflict when sending & receiving. But maybe not ... it also might depend on the ordering of reading the counters?