From 974ad4855b36e59dd4d82b50a29c0887db9ed4b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Mon, 9 Feb 2026 08:52:46 +0100 Subject: [PATCH 1/8] Add Channel.estimateSize() for monitoring and observability (#189) Add a non-blocking O(1) method that returns a best-effort estimate of the number of elements in a channel, computed as the difference between send and receive operations initiated. Designed for monitoring, metrics export, and debugging - not for control flow. Includes comprehensive Javadoc with anti-patterns, recommended usage, and a monitoring code example. Tests cover buffered, unlimited, and rendezvous channels, concurrent modification, closed/error states, and waiting senders beyond buffer capacity. --- .../java/com/softwaremill/jox/Channel.java | 72 ++++++ .../com/softwaremill/jox/ChannelTest.java | 223 ++++++++++++++++++ 2 files changed, 295 insertions(+) diff --git a/channels/src/main/java/com/softwaremill/jox/Channel.java b/channels/src/main/java/com/softwaremill/jox/Channel.java index 19387d4..9fa6dde 100644 --- a/channels/src/main/java/com/softwaremill/jox/Channel.java +++ b/channels/src/main/java/com/softwaremill/jox/Channel.java @@ -1069,6 +1069,78 @@ private boolean hasValueToReceive(Segment segment, int i) { } } + // ***************** + // Channel state API + // ***************** + + /** + * Returns a best-effort estimate of the number of elements in the channel. + * + *

This is a non-blocking O(1) operation that computes the difference between the number of + * send and receive operations initiated on this channel. This approximates the number of + * buffered values, but may also include in-flight or waiting operations (e.g., senders blocked + * because the channel is at capacity). + * + *

Important caveats: + * + *

+ * + *

Use cases: + * + *

+ * + *

Anti-patterns (do NOT do this): + * + *

+ * + *

Recommended patterns: + * + *

+ * + *

Example - Monitoring: + * + *

{@code
+     * var ch = Channel.newUnlimitedChannel();
+     *
+     * // Background thread for metrics
+     * Thread.startVirtualThread(() -> {
+     *     while (!ch.closedForSend()) {
+     *         long estimate = ch.estimateSize();
+     *         metricsRegistry.gauge("channel.size", estimate);
+     *         Thread.sleep(Duration.ofSeconds(10));
+     *     }
+     * });
+     * }
+ * + * @return A best-effort estimate of elements in the channel, based on the send/receive + * operation differential. Always >= 0. The value is immediately stale and should only be + * used for observability purposes. + */ + public long estimateSize() { + long s = getSendersCounter(sendersAndClosedFlag); + long r = receivers; + return Math.max(0, s - r); + } + // ************** // Select clauses // ************** diff --git a/channels/src/test/java/com/softwaremill/jox/ChannelTest.java b/channels/src/test/java/com/softwaremill/jox/ChannelTest.java index bf7783c..870f18f 100644 --- a/channels/src/test/java/com/softwaremill/jox/ChannelTest.java +++ b/channels/src/test/java/com/softwaremill/jox/ChannelTest.java @@ -81,4 +81,227 @@ void testNullItem() throws InterruptedException { Channel ch = Channel.newBufferedChannel(4); assertThrows(NullPointerException.class, () -> ch.send(null)); } + + @Test + void testEstimateSize_bufferedChannel_empty() { + Channel ch = Channel.newBufferedChannel(10); + assertEquals(0, ch.estimateSize()); + } + + @Test + void testEstimateSize_bufferedChannel_withValues() throws InterruptedException { + Channel ch = Channel.newBufferedChannel(10); + assertEquals(0, ch.estimateSize()); + + ch.send(1); + ch.send(2); + ch.send(3); + assertTrue(ch.estimateSize() >= 3, "Expected at least 3 items, got " + ch.estimateSize()); + + ch.receive(); + assertTrue( + ch.estimateSize() >= 2, + "Expected at least 2 items after receive, got " + ch.estimateSize()); + + ch.receive(); + ch.receive(); + assertEquals(0, ch.estimateSize()); + } + + @Test + void testEstimateSize_bufferedChannel_afterClose() throws InterruptedException { + Channel ch = Channel.newBufferedChannel(10); + ch.send(1); + ch.send(2); + ch.send(3); + + ch.done(); + assertTrue( + ch.estimateSize() >= 3, + "Estimate should still work after close, got " + ch.estimateSize()); + + ch.receive(); + assertTrue( + ch.estimateSize() >= 2, + "Estimate should reflect remaining buffered values, got " + ch.estimateSize()); + } + + @Test + void testEstimateSize_unlimitedChannel_growth() throws InterruptedException { + Channel ch = Channel.newUnlimitedChannel(); + + for (int i = 0; i < 1000; i++) { + ch.send(i); + } + + long estimate = ch.estimateSize(); + assertTrue( + estimate >= 900, + "Expected at least 900 items in unlimited channel, got " + estimate); + assertTrue(estimate <= 1000, "Expected at most 1000 items, got " + estimate); + + // Receive some and check size decreases + for (int i = 0; i < 500; i++) { + ch.receive(); + } + + estimate = ch.estimateSize(); + assertTrue( + estimate >= 400, + "Expected at least 400 items after receiving 500, got " + estimate); + assertTrue(estimate <= 500, "Expected at most 500 items, got " + estimate); + } + + @Test + void testEstimateSize_rendezvousChannel() throws InterruptedException, ExecutionException { + Channel ch = Channel.newRendezvousChannel(); + assertEquals(0, ch.estimateSize()); + + scoped( + scope -> { + // Start a sender in background + forkVoid( + scope, + () -> { + try { + Thread.sleep(10); + ch.send(1); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + + // Rendezvous should have 0 or very small estimate + Thread.sleep(20); + long estimate = ch.estimateSize(); + assertTrue( + estimate <= 1, + "Rendezvous channel should have small estimate, got " + estimate); + + // Receive the value + ch.receive(); + assertEquals(0, ch.estimateSize()); + }); + } + + @Test + void testEstimateSize_concurrentModification() throws InterruptedException, ExecutionException { + Channel ch = Channel.newBufferedChannel(1000); + + scoped( + scope -> { + // 10 senders + for (int i = 0; i < 10; i++) { + forkVoid( + scope, + () -> { + for (int j = 0; j < 100; j++) { + ch.send(j); + } + }); + } + + // 5 receivers + for (int i = 0; i < 5; i++) { + forkVoid( + scope, + () -> { + for (int j = 0; j < 50; j++) { + ch.receive(); + } + }); + } + + // Check estimate is within bounds during concurrent operations + Thread.sleep(50); + long estimate = ch.estimateSize(); + assertTrue(estimate >= 0, "Estimate should be non-negative, got " + estimate); + assertTrue( + estimate <= 1000, + "Estimate should not exceed capacity significantly, got " + estimate); + }); + + // After all operations, estimate should be around 750 (1000 sent - 250 received) + long finalEstimate = ch.estimateSize(); + assertTrue( + finalEstimate >= 700, + "Expected around 750 items after concurrent ops, got " + finalEstimate); + assertTrue( + finalEstimate <= 800, + "Expected around 750 items after concurrent ops, got " + finalEstimate); + } + + @Test + void testEstimateSize_neverNegative() throws InterruptedException, ExecutionException { + Channel ch = Channel.newRendezvousChannel(); + + scoped( + scope -> { + // Start receivers before senders + for (int i = 0; i < 5; i++) { + forkVoid( + scope, + () -> { + Thread.sleep(10); + ch.receive(); + }); + } + + // Even with waiting receivers, estimate should be >= 0 + Thread.sleep(20); + long estimate = ch.estimateSize(); + assertTrue(estimate >= 0, "Estimate should never be negative, got " + estimate); + + // Send values to waiting receivers + for (int i = 0; i < 5; i++) { + ch.send(i); + } + + assertEquals(0, ch.estimateSize()); + }); + } + + @Test + void testEstimateSize_afterError() throws InterruptedException { + Channel ch = Channel.newBufferedChannel(10); + ch.send(1); + ch.send(2); + + ch.error(new RuntimeException("test error")); + + // Estimate should still work after error + long estimate = ch.estimateSize(); + assertTrue(estimate >= 0, "Estimate should work after error, got " + estimate); + } + + @Test + void testEstimateSize_bufferedChannel_waitingSendersBeyondCapacity() + throws InterruptedException, ExecutionException { + Channel ch = Channel.newBufferedChannel(2); + ch.send(1); + ch.send(2); + // Buffer is now full, estimate should reflect 2 buffered values + assertEquals(2, ch.estimateSize()); + + scoped( + scope -> { + // Start a sender that will block because the buffer is full + forkVoid(scope, () -> ch.send(3)); + + // Give the blocked sender time to increment the senders counter + Thread.sleep(50); + + // Estimate includes the waiting sender: documented behavior + long estimate = ch.estimateSize(); + assertTrue( + estimate >= 2, + "Expected at least 2 (buffered values), got " + estimate); + assertTrue( + estimate <= 3, + "Expected at most 3 (buffered + waiting sender), got " + estimate); + + // Receive one to unblock the waiting sender + ch.receive(); + }); + } } From 0e75dd9f7cc195c9de299aa5bc9e11fece7cddc3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Tue, 17 Feb 2026 12:25:56 +0100 Subject: [PATCH 2/8] Implement non-blocking trySend and tryReceive methods (#187) Add non-blocking operations to the Channel implementation and the Source/Sink interfaces. These methods are essential for integration with event loops (e.g., Netty, Vert.x) where blocking the thread is prohibited. - Added 'trySend(T)' and 'tryReceive()' to Channel, Source, and Sink. - Implemented high-performance, lock-free versions in Channel.java. - Provided default implementations in interfaces using 'Select' fallback for binary compatibility. - Updated documentation with usage examples and clear guidance on non-blocking scenarios. - Refactored 'flows' and 'kafka' modules to use the new API in critical non-blocking callbacks, removing unnecessary 'InterruptedException' handling. --- .../java/com/softwaremill/jox/Channel.java | 265 ++++++++- .../main/java/com/softwaremill/jox/Sink.java | 35 +- .../java/com/softwaremill/jox/Source.java | 47 ++ .../jox/ChannelTrySendReceiveTest.java | 515 ++++++++++++++++++ docs/channels.md | 99 ++-- .../jox/flows/FromFlowPublisher.java | 12 +- .../softwaremill/jox/kafka/KafkaDrain.java | 6 +- .../softwaremill/jox/kafka/KafkaStage.java | 18 +- 8 files changed, 900 insertions(+), 97 deletions(-) create mode 100644 channels/src/test/java/com/softwaremill/jox/ChannelTrySendReceiveTest.java diff --git a/channels/src/main/java/com/softwaremill/jox/Channel.java b/channels/src/main/java/com/softwaremill/jox/Channel.java index 9fa6dde..b4515b4 100644 --- a/channels/src/main/java/com/softwaremill/jox/Channel.java +++ b/channels/src/main/java/com/softwaremill/jox/Channel.java @@ -263,11 +263,19 @@ public Object sendOrClosed(T value) throws InterruptedException { return doSend(value, null, null); } - // used by Sink.trySend + // used by Sink.trySend (Select-based fallback) static final Object DEFAULT_NOT_SENT_VALUE = new Object(); static final DefaultClause DEFAULT_NOT_SENT_CLAUSE = new DefaultClauseValue<>(DEFAULT_NOT_SENT_VALUE); + // used by Source.tryReceive (Select-based fallback) + static final Object DEFAULT_NOT_RECEIVED_VALUE = new Object(); + static final DefaultClause DEFAULT_NOT_RECEIVED_CLAUSE = + new DefaultClauseValue<>(DEFAULT_NOT_RECEIVED_VALUE); + + // sentinel value returned by trySendOrClosed() to indicate "not sent" + static final Object TRY_SEND_NOT_SENT = new Object(); + /** * @return If {@code select} & {@code selectClause} is {@code null}: {@code null} when the value * was sent, or {@link ChannelClosed}, when the channel is closed. Otherwise, might also @@ -348,6 +356,261 @@ private Object doSend(T value, SelectInstance select, SelectClause selectClau } } + // ************* + // Non-blocking send + // ************* + + @Override + public Object trySendOrClosed(T value) { + if (value == null) { + throw new NullPointerException(); + } + while (true) { + // reading the segment before the counter increment + var segment = sendSegment; + var scf = sendersAndClosedFlag; + var s = getSendersCounter(scf); + + if (isClosed(scf)) { + return closedReason; + } + + // pre-check if immediate completion is possible + if (capacity >= 0) { + var bufEnd = bufferEnd; + var r = receivers; + if (capacity == 0) { + if (s >= r) return TRY_SEND_NOT_SENT; + } else { + if (s >= bufEnd && s >= r) return TRY_SEND_NOT_SENT; + } + } + + // reserving cell s + if (!SENDERS_AND_CLOSE_FLAG.compareAndSet(this, scf, scf + 1)) { + continue; + } + + var id = s / Segment.SEGMENT_SIZE; + var i = (int) (s % Segment.SEGMENT_SIZE); + + if (segment.getId() != id) { + segment = findAndMoveForward(SEND_SEGMENT, this, segment, id); + if (segment == null) { + return closedReason; + } + + if (segment.getId() != id) { + SENDERS_AND_CLOSE_FLAG.compareAndSet( + this, s + 1, segment.getId() * Segment.SEGMENT_SIZE); + continue; + } + } + + // process cell (never suspends) + var sendResult = tryUpdateCellSend(segment, i, s, value); + if (sendResult == SendResult.BUFFERED) { + return null; + } else if (sendResult == SendResult.RESUMED) { + segment.cleanPrev(); + return null; + } else if (sendResult == SendResult.FAILED) { + segment.cleanPrev(); + continue; + } else if (sendResult == SendResult.CLOSED) { + return closedReason; + } else if (sendResult == TRY_SEND_NOT_SENT) { + return TRY_SEND_NOT_SENT; + } else { + throw new IllegalStateException( + "Unexpected result: " + sendResult + " in channel: " + this); + } + } + } + + /** + * Non-suspending variant of {@code updateCellSend}. Where the normal send would suspend (no + * receiver, not in buffer), this marks the cell as INTERRUPTED_SEND and returns the + * TRY_SEND_NOT_SENT sentinel. + */ + private Object tryUpdateCellSend(Segment segment, int i, long s, T value) { + while (true) { + var state = segment.getCell(i); + + if (state == null) { + if (capacity >= 0 && s >= (isRendezvous ? 0 : bufferEnd) && s >= receivers) { + // cell is empty, no receiver, not in buffer -> would suspend + // roll back: mark as interrupted send + if (segment.casCell(i, null, INTERRUPTED_SEND)) { + segment.cellInterruptedSender(); + return TRY_SEND_NOT_SENT; + } + // CAS unsuccessful, repeat + } else { + // cell is empty, but a receiver is in progress, or in buffer -> elimination + if (segment.casCell(i, null, value)) { + return SendResult.BUFFERED; + } + // CAS unsuccessful, repeat + } + } else if (state == IN_BUFFER) { + if (segment.casCell(i, IN_BUFFER, value)) { + return SendResult.BUFFERED; + } + // CAS unsuccessful, repeat + } else if (state instanceof Continuation c) { + // a receiver is waiting -> trying to resume + if (c.tryResume(value)) { + segment.setCell(i, DONE); + return SendResult.RESUMED; + } else { + return SendResult.FAILED; + } + } else if (state instanceof StoredSelectClause ss) { + ss.setPayload(value); + if (ss.getSelect().trySelect(ss)) { + segment.setCell(i, DONE); + return SendResult.RESUMED; + } else { + return SendResult.FAILED; + } + } else if (state == INTERRUPTED_RECEIVE || state == BROKEN) { + return SendResult.FAILED; + } else if (state == CLOSED) { + return SendResult.CLOSED; + } else { + throw new IllegalStateException( + "Unexpected state: " + state + " in channel: " + this); + } + } + } + + // ************* + // Non-blocking receive + // ************* + + @Override + public Object tryReceiveOrClosed() { + while (true) { + var scf = sendersAndClosedFlag; + var s = getSendersCounter(scf); + var r = receivers; + + if (s <= r) { + if (isClosed(scf)) return closedForReceive(); + return null; + } + + // reading the segment before the counter increment + var segment = receiveSegment; + + // reserving cell r + if (!RECEIVERS.compareAndSet(this, r, r + 1)) { + continue; + } + + var id = r / Segment.SEGMENT_SIZE; + var i = (int) (r % Segment.SEGMENT_SIZE); + + if (segment.getId() != id) { + segment = findAndMoveForward(RECEIVE_SEGMENT, this, segment, id); + if (segment == null) { + return closedReason; + } + + if (segment.getId() != id) { + RECEIVERS.compareAndSet(this, r + 1, segment.getId() * Segment.SEGMENT_SIZE); + continue; + } + } + + // process cell (never suspends) + var result = tryUpdateCellReceive(segment, i, r); + if (result == ReceiveResult.CLOSED) { + return closedReason; + } else if (result == ReceiveResult.FAILED) { + segment.cleanPrev(); + continue; + } else if (result == null) { + return null; + } else { + segment.cleanPrev(); + return result; + } + } + } + + /** + * Non-suspending variant of {@code updateCellReceive}. Where the normal receive would suspend + * (no sender available), this marks the cell as INTERRUPTED_RECEIVE and returns null. + */ + private Object tryUpdateCellReceive(Segment segment, int i, long r) { + while (true) { + var state = segment.getCell(i); + + if (state == null || state == IN_BUFFER) { + if (r >= getSendersCounter(sendersAndClosedFlag)) { + // cell is empty, no sender -> would suspend + // roll back: mark as interrupted receive + if (segment.casCell(i, state, INTERRUPTED_RECEIVE)) { + segment.cellInterruptedReceiver(); + expandBuffer(); + return null; // nothing available + } + // CAS unsuccessful, repeat + } else { + // sender in progress -> mark as broken + if (segment.casCell(i, state, BROKEN)) { + expandBuffer(); + return ReceiveResult.FAILED; // retry + } + // CAS unsuccessful, repeat + } + } else if (state instanceof Continuation c) { + // resolving a potential race with expandBuffer + if (segment.casCell(i, state, RESUMING)) { + if (c.tryResume(0)) { + segment.setCell(i, DONE); + expandBuffer(); + return c.getPayload(); + } else { + return ReceiveResult.FAILED; + } + } + // CAS unsuccessful, repeat + } else if (state instanceof StoredSelectClause ss) { + if (segment.casCell(i, state, RESUMING)) { + if (ss.getSelect().trySelect(ss)) { + segment.setCell(i, DONE); + expandBuffer(); + return ss.getPayload(); + } else { + return ReceiveResult.FAILED; + } + } + // CAS unsuccessful, repeat + } else if (state instanceof CellState) { + switch (state) { + case CellState.INTERRUPTED_SEND -> { + return ReceiveResult.FAILED; + } + case CellState.RESUMING -> Thread.onSpinWait(); + case CellState.CLOSED -> { + return ReceiveResult.CLOSED; + } + default -> + throw new IllegalStateException( + "Unexpected state: " + state + " in channel: " + this); + } + } else { + // buffered value + segment.setCell(i, DONE); + expandBuffer(); + return state; + } + } + } + /** * @param segment The segment which stores the cell's state. * @param i The index within the {@code segment}. diff --git a/channels/src/main/java/com/softwaremill/jox/Sink.java b/channels/src/main/java/com/softwaremill/jox/Sink.java index b955885..d9fc356 100644 --- a/channels/src/main/java/com/softwaremill/jox/Sink.java +++ b/channels/src/main/java/com/softwaremill/jox/Sink.java @@ -27,19 +27,50 @@ public interface Sink extends CloseableChannel { /** * Attempt to send a value to the channel if there's a waiting receiver, or space in the buffer. * + *

This method never blocks or suspends the calling thread. It completes in bounded time. + * Safe to call from platform threads, including NIO event loop threads. + * + *

May spuriously return {@code false} under contention. Should not be used as a substitute + * for {@link #send(Object)} in a spin loop. + * * @param value The value to send. Not {@code null}. * @return {@code true} if the value was sent, {@code false} otherwise. * @throws ChannelClosedException When the channel is closed. */ default boolean trySend(T value) { + Object r = trySendOrClosed(value); + if (r instanceof ChannelClosed c) throw c.toException(); + return r == null; // null = sent, sentinel = not sent + } + + /** + * Attempt to send a value to the channel if there's a waiting receiver, or space in the buffer. + * Doesn't throw exceptions when the channel is closed but returns a value. + * + *

This method never blocks or suspends the calling thread. It completes in bounded time. + * Safe to call from platform threads, including NIO event loop threads. + * + *

May spuriously fail under contention. Should not be used as a substitute for {@link + * #sendOrClosed(Object)} in a spin loop. + * + * @param value The value to send. Not {@code null}. + * @return Either {@code null} when the value was sent successfully, {@link ChannelClosed} when + * the channel is closed, or a sentinel value indicating the value was not sent (check using + * {@code result == null} for success, {@code result instanceof ChannelClosed} for closed). + */ + default Object trySendOrClosed(T value) { + // Select-based fallback for binary compatibility Object sent; try { sent = Select.select(sendClause(value), Channel.DEFAULT_NOT_SENT_CLAUSE); } catch (InterruptedException e) { throw new IllegalStateException( - "Interrupted during trySend, which should not be possible", e); + "Interrupted during trySendOrClosed, which should not be possible", e); } - return sent != Channel.DEFAULT_NOT_SENT_VALUE; + if (sent == Channel.DEFAULT_NOT_SENT_VALUE) { + return Channel.TRY_SEND_NOT_SENT; + } + return null; // sent successfully } /** diff --git a/channels/src/main/java/com/softwaremill/jox/Source.java b/channels/src/main/java/com/softwaremill/jox/Source.java index bbc91c9..392d8bc 100644 --- a/channels/src/main/java/com/softwaremill/jox/Source.java +++ b/channels/src/main/java/com/softwaremill/jox/Source.java @@ -26,6 +26,53 @@ public interface Source extends CloseableChannel { */ Object receiveOrClosed() throws InterruptedException; + /** + * Attempt to receive a value from the channel if one is immediately available. + * + *

This method never blocks or suspends the calling thread. It completes in bounded time. + * Safe to call from platform threads, including NIO event loop threads. + * + *

May spuriously return {@code null} under contention. Should not be used as a substitute + * for {@link #receive()} in a spin loop. + * + * @return The received value, or {@code null} if no value is immediately available. + * @throws ChannelClosedException When the channel is closed. + */ + default T tryReceive() { + Object r = tryReceiveOrClosed(); + if (r instanceof ChannelClosed c) throw c.toException(); + //noinspection unchecked + return (T) r; // null means nothing available + } + + /** + * Attempt to receive a value from the channel if one is immediately available. Doesn't throw + * exceptions when the channel is closed, but returns a value. + * + *

This method never blocks or suspends the calling thread. It completes in bounded time. + * Safe to call from platform threads, including NIO event loop threads. + * + *

May spuriously return {@code null} under contention. Should not be used as a substitute + * for {@link #receiveOrClosed()} in a spin loop. + * + * @return The received value of type {@code T}, {@link ChannelClosed} when the channel is + * closed, or {@code null} if no value is immediately available. + */ + default Object tryReceiveOrClosed() { + // Select-based fallback for binary compatibility + Object received; + try { + received = Select.select(receiveClause(), Channel.DEFAULT_NOT_RECEIVED_CLAUSE); + } catch (InterruptedException e) { + throw new IllegalStateException( + "Interrupted during tryReceiveOrClosed, which should not be possible", e); + } + if (received == Channel.DEFAULT_NOT_RECEIVED_VALUE) { + return null; // nothing available + } + return received; + } + /** * Create a clause which can be used in {@link Select#select(SelectClause[])}. The clause will * receive a value from the current channel. diff --git a/channels/src/test/java/com/softwaremill/jox/ChannelTrySendReceiveTest.java b/channels/src/test/java/com/softwaremill/jox/ChannelTrySendReceiveTest.java new file mode 100644 index 0000000..f4e20e5 --- /dev/null +++ b/channels/src/test/java/com/softwaremill/jox/ChannelTrySendReceiveTest.java @@ -0,0 +1,515 @@ +package com.softwaremill.jox; + +import static com.softwaremill.jox.TestUtil.*; +import static org.junit.jupiter.api.Assertions.*; + +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.jupiter.api.Test; + +public class ChannelTrySendReceiveTest { + + // ******** + // trySend + // ******** + + @Test + void trySend_buffered_shouldSendWhenBufferHasSpace() { + Channel ch = Channel.newBufferedChannel(2); + assertTrue(ch.trySend("a")); + assertTrue(ch.trySend("b")); + } + + @Test + void trySend_buffered_shouldReturnFalseWhenBufferFull() { + Channel ch = Channel.newBufferedChannel(1); + assertTrue(ch.trySend("a")); + assertFalse(ch.trySend("b")); // buffer full, no receiver + } + + @Test + void trySend_rendezvous_shouldReturnFalseWhenNoReceiver() { + Channel ch = Channel.newRendezvousChannel(); + assertFalse(ch.trySend("a")); + } + + @Test + void trySend_rendezvous_shouldSendWhenReceiverWaiting() + throws InterruptedException, ExecutionException { + Channel ch = Channel.newRendezvousChannel(); + scoped( + scope -> { + var f = fork(scope, ch::receive); + Thread.sleep(50); + assertTrue(ch.trySend("x")); + assertEquals("x", f.get()); + }); + } + + @Test + void trySend_unlimited_shouldAlwaysSend() { + Channel ch = Channel.newUnlimitedChannel(); + for (int i = 0; i < 1000; i++) { + assertTrue(ch.trySend("v" + i)); + } + } + + @Test + void trySend_closedDone_shouldThrow() { + Channel ch = Channel.newBufferedChannel(1); + ch.done(); + assertThrows(ChannelDoneException.class, () -> ch.trySend("x")); + } + + @Test + void trySend_closedError_shouldThrow() { + Channel ch = Channel.newBufferedChannel(1); + ch.error(new RuntimeException("boom")); + assertThrows(ChannelErrorException.class, () -> ch.trySend("x")); + } + + @Test + void trySend_nullValue_shouldThrowNPE() { + Channel ch = Channel.newBufferedChannel(1); + assertThrows(NullPointerException.class, () -> ch.trySend(null)); + } + + // **************** + // trySendOrClosed + // **************** + + @Test + void trySendOrClosed_buffered_shouldReturnNullOnSuccess() { + Channel ch = Channel.newBufferedChannel(2); + assertNull(ch.trySendOrClosed("a")); + } + + @Test + void trySendOrClosed_buffered_shouldReturnSentinelWhenFull() { + Channel ch = Channel.newBufferedChannel(1); + assertNull(ch.trySendOrClosed("a")); // success + Object result = ch.trySendOrClosed("b"); + assertNotNull(result); + assertFalse(result instanceof ChannelClosed); + } + + @Test + void trySendOrClosed_closedDone_shouldReturnChannelDone() { + Channel ch = Channel.newBufferedChannel(1); + ch.done(); + assertInstanceOf(ChannelDone.class, ch.trySendOrClosed("x")); + } + + @Test + void trySendOrClosed_closedError_shouldReturnChannelError() { + Channel ch = Channel.newBufferedChannel(1); + ch.error(new RuntimeException("boom")); + assertInstanceOf(ChannelError.class, ch.trySendOrClosed("x")); + } + + // ********** + // tryReceive + // ********** + + @Test + void tryReceive_buffered_shouldReceiveBufferedValue() throws InterruptedException { + Channel ch = Channel.newBufferedChannel(2); + ch.send("a"); + ch.send("b"); + assertEquals("a", ch.tryReceive()); + assertEquals("b", ch.tryReceive()); + } + + @Test + void tryReceive_buffered_shouldReturnNullWhenEmpty() { + Channel ch = Channel.newBufferedChannel(2); + assertNull(ch.tryReceive()); + } + + @Test + void tryReceive_rendezvous_shouldReturnNullWhenNoSender() { + Channel ch = Channel.newRendezvousChannel(); + assertNull(ch.tryReceive()); + } + + @Test + void tryReceive_rendezvous_shouldReceiveWhenSenderWaiting() + throws InterruptedException, ExecutionException { + Channel ch = Channel.newRendezvousChannel(); + scoped( + scope -> { + forkVoid(scope, () -> ch.send("x")); + Thread.sleep(50); + assertEquals("x", ch.tryReceive()); + }); + } + + @Test + void tryReceive_unlimited_shouldReceiveBufferedValues() throws InterruptedException { + Channel ch = Channel.newUnlimitedChannel(); + ch.send("a"); + ch.send("b"); + assertEquals("a", ch.tryReceive()); + assertEquals("b", ch.tryReceive()); + assertNull(ch.tryReceive()); + } + + @Test + void tryReceive_closedDone_noValues_shouldThrow() { + Channel ch = Channel.newBufferedChannel(1); + ch.done(); + assertThrows(ChannelDoneException.class, () -> ch.tryReceive()); + } + + @Test + void tryReceive_closedError_shouldThrow() { + Channel ch = Channel.newBufferedChannel(1); + ch.error(new RuntimeException("boom")); + assertThrows(ChannelErrorException.class, () -> ch.tryReceive()); + } + + // ******************** + // tryReceiveOrClosed + // ******************** + + @Test + void tryReceiveOrClosed_shouldReturnNullWhenEmpty() { + Channel ch = Channel.newBufferedChannel(2); + assertNull(ch.tryReceiveOrClosed()); + } + + @Test + void tryReceiveOrClosed_shouldReturnValueWhenAvailable() throws InterruptedException { + Channel ch = Channel.newBufferedChannel(2); + ch.send("hello"); + assertEquals("hello", ch.tryReceiveOrClosed()); + } + + @Test + void tryReceiveOrClosed_closedDone_noValues_shouldReturnChannelDone() { + Channel ch = Channel.newBufferedChannel(1); + ch.done(); + assertInstanceOf(ChannelDone.class, ch.tryReceiveOrClosed()); + } + + @Test + void tryReceiveOrClosed_closedError_shouldReturnChannelError() { + Channel ch = Channel.newBufferedChannel(1); + ch.error(new RuntimeException("boom")); + assertInstanceOf(ChannelError.class, ch.tryReceiveOrClosed()); + } + + @Test + void tryReceiveOrClosed_closedDone_withBufferedValues_shouldReturnValuesThenDone() + throws InterruptedException { + Channel ch = Channel.newBufferedChannel(3); + ch.send("a"); + ch.send("b"); + ch.done(); + + assertEquals("a", ch.tryReceiveOrClosed()); + assertEquals("b", ch.tryReceiveOrClosed()); + assertInstanceOf(ChannelDone.class, ch.tryReceiveOrClosed()); + } + + // ************************* + // Mixed send/receive tests + // ************************* + + @Test + void trySend_thenTryReceive_roundTrip_buffered() { + Channel ch = Channel.newBufferedChannel(5); + for (int i = 0; i < 5; i++) { + assertTrue(ch.trySend(i)); + } + for (int i = 0; i < 5; i++) { + assertEquals(i, ch.tryReceive()); + } + assertNull(ch.tryReceive()); + } + + @Test + void trySend_thenTryReceive_roundTrip_unlimited() { + Channel ch = Channel.newUnlimitedChannel(); + for (int i = 0; i < 100; i++) { + assertTrue(ch.trySend(i)); + } + for (int i = 0; i < 100; i++) { + assertEquals(i, ch.tryReceive()); + } + assertNull(ch.tryReceive()); + } + + @Test + void trySend_mixedWithRegularReceive() throws InterruptedException, ExecutionException { + Channel ch = Channel.newBufferedChannel(2); + assertTrue(ch.trySend("a")); + assertTrue(ch.trySend("b")); + assertEquals("a", ch.receive()); + assertEquals("b", ch.receive()); + } + + @Test + void regularSend_thenTryReceive() throws InterruptedException { + Channel ch = Channel.newBufferedChannel(2); + ch.send("a"); + ch.send("b"); + assertEquals("a", ch.tryReceive()); + assertEquals("b", ch.tryReceive()); + } + + // ************* + // Stress tests + // ************* + + @Test + void concurrentTrySendAndTryReceive_noValueLossOrDuplication() + throws InterruptedException, ExecutionException { + Channel ch = Channel.newBufferedChannel(64); + int numProducers = 4; + int numConsumers = 4; + int itemsPerProducer = 10_000; + + var sentCount = new AtomicInteger(0); + var receivedValues = new ConcurrentSkipListSet(); + + scoped( + scope -> { + // producers: trySend in a loop + for (int p = 0; p < numProducers; p++) { + int producerBase = p * itemsPerProducer; + forkVoid( + scope, + () -> { + for (int i = 0; i < itemsPerProducer; i++) { + int val = producerBase + i; + while (!ch.trySend(val)) { + Thread.yield(); + } + sentCount.incrementAndGet(); + } + }); + } + + // consumers: tryReceive in a loop + int totalItems = numProducers * itemsPerProducer; + for (int c = 0; c < numConsumers; c++) { + forkVoid( + scope, + () -> { + while (receivedValues.size() < totalItems) { + Integer v = ch.tryReceive(); + if (v != null) { + receivedValues.add(v); + } else { + Thread.yield(); + } + } + }); + } + }); + + int totalItems = numProducers * itemsPerProducer; + assertEquals(totalItems, sentCount.get()); + assertEquals(totalItems, receivedValues.size()); + } + + @Test + void concurrentTrySendWithRegularReceive() throws InterruptedException, ExecutionException { + Channel ch = Channel.newBufferedChannel(16); + int total = 10_000; + var received = new ConcurrentSkipListSet(); + + scoped( + scope -> { + // producer: trySend + forkVoid( + scope, + () -> { + for (int i = 0; i < total; i++) { + while (!ch.trySend(i)) { + Thread.yield(); + } + } + ch.done(); + }); + + // consumer: regular receive + forkVoid( + scope, + () -> { + while (true) { + var r = ch.receiveOrClosed(); + if (r instanceof ChannelDone) break; + received.add((Integer) r); + } + }) + .get(); + }); + + assertEquals(total, received.size()); + } + + @Test + void concurrentRegularSendWithTryReceive() throws InterruptedException, ExecutionException { + Channel ch = Channel.newBufferedChannel(16); + int total = 10_000; + var received = new ConcurrentSkipListSet(); + + scoped( + scope -> { + // producer: regular send + forkVoid( + scope, + () -> { + for (int i = 0; i < total; i++) { + ch.send(i); + } + ch.done(); + }); + + // consumer: tryReceive + forkVoid( + scope, + () -> { + while (true) { + var r = ch.tryReceiveOrClosed(); + if (r instanceof ChannelDone) break; + if (r != null) { + received.add((Integer) r); + } else { + Thread.yield(); + } + } + }) + .get(); + }); + + assertEquals(total, received.size()); + } + + @Test + void trySend_rendezvous_concurrentStress() throws InterruptedException, ExecutionException { + Channel ch = Channel.newRendezvousChannel(); + int total = 1000; + var received = new ConcurrentSkipListSet(); + + scoped( + scope -> { + // sender: trySend + forkVoid( + scope, + () -> { + for (int i = 0; i < total; i++) { + while (!ch.trySend(i)) { + Thread.yield(); + } + } + ch.done(); + }); + + // receiver: regular receive + forkVoid( + scope, + () -> { + while (true) { + var r = ch.receiveOrClosed(); + if (r instanceof ChannelDone) break; + received.add((Integer) r); + } + }) + .get(); + }); + + assertEquals(total, received.size()); + } + + @Test + void tryReceive_rendezvous_concurrentStress() throws InterruptedException, ExecutionException { + Channel ch = Channel.newRendezvousChannel(); + int total = 1000; + var received = new ConcurrentSkipListSet(); + + scoped( + scope -> { + // sender: regular send + forkVoid( + scope, + () -> { + for (int i = 0; i < total; i++) { + ch.send(i); + } + ch.done(); + }); + + // receiver: tryReceive + forkVoid( + scope, + () -> { + while (true) { + var r = ch.tryReceiveOrClosed(); + if (r instanceof ChannelDone) break; + if (r != null) { + received.add((Integer) r); + } else { + Thread.yield(); + } + } + }) + .get(); + }); + + assertEquals(total, received.size()); + } + + @Test + void segmentCleanup_afterManyTrySendFailures() throws InterruptedException, ExecutionException { + // trySend on a rendezvous channel with no receiver: should not leak segments + Channel ch = Channel.newRendezvousChannel(); + for (int i = 0; i < 10_000; i++) { + assertFalse(ch.trySend(i)); + } + // channel should still be functional + scoped( + scope -> { + forkVoid(scope, () -> ch.send(42)); + Thread.sleep(50); + assertEquals(42, ch.tryReceive()); + }); + } + + @Test + void segmentCleanup_afterManyTryReceiveFailures() throws InterruptedException { + // tryReceive on an empty channel: should not leak segments + Channel ch = Channel.newBufferedChannel(2); + for (int i = 0; i < 10_000; i++) { + assertNull(ch.tryReceive()); + } + // channel should still be functional + ch.send(42); + assertEquals(42, ch.tryReceive()); + } + + @Test + void trySendTryReceive_bufferedCapacity1() { + Channel ch = Channel.newBufferedChannel(1); + assertTrue(ch.trySend(1)); + assertEquals(1, ch.tryReceive()); + } + + @Test + void trySendTryReceive_bufferedCapacity10() { + Channel ch = Channel.newBufferedChannel(10); + assertTrue(ch.trySend(1)); + assertEquals(1, ch.tryReceive()); + } + + @Test + void trySendTryReceive_unlimited() { + Channel ch = Channel.newUnlimitedChannel(); + assertTrue(ch.trySend(1)); + assertEquals(1, ch.tryReceive()); + } +} diff --git a/docs/channels.md b/docs/channels.md index 98e1fdc..c64a9c0 100644 --- a/docs/channels.md +++ b/docs/channels.md @@ -95,27 +95,39 @@ the sink processes the received values. use the `sendOrClosed()` and `receiveOrClosed()` methods, which return either a `ChannelClosed` value (reason of closure), or `null` / the received value. -Channels can also be inspected whether they are closed, using the `isClosedForReceive()` and `isClosedForSend()`. - ```java import com.softwaremill.jox.Channel; class Demo3 { public static void main(String[] args) throws InterruptedException { var ch = Channel.newBufferedChannel(3); - - // send()-s won't block ch.send(1); ch.done(); - // prints: Received: 1 - System.out.println("Received: " + ch.receiveOrClosed()); - // prints: Received: ChannelDone[] - System.out.println("Received: " + ch.receiveOrClosed()); + System.out.println("Received: " + ch.receiveOrClosed()); // 1 + System.out.println("Received: " + ch.receiveOrClosed()); // ChannelDone[] } } ``` +## Non-blocking operations + +For integration with non-blocking frameworks (e.g. Netty, Vert.x), `trySend` and `tryReceive` methods are provided. They never block or suspend and are safe to call from platform threads. + +```java +var ch = Channel.newBufferedChannel(1); + +// returns true if sent, false if the buffer is full or no receiver is waiting +boolean s1 = ch.trySend(1); + +// returns the value, or null if none is immediately available +Integer r1 = ch.tryReceive(); +``` + +Both have `OrClosed` variants which return a `ChannelClosed` value instead of throwing an exception if the channel is closed. + +Channels can also be inspected using `isClosedForReceive()` and `isClosedForSend()`. + ## Selecting from multiple channels The `select` method selects exactly one clause to complete. For example, you can receive a value from exactly one @@ -123,41 +135,27 @@ channel: ```java import com.softwaremill.jox.Channel; - import static com.softwaremill.jox.Select.select; class Demo4 { public static void main(String[] args) throws InterruptedException { - // creates a buffered channel (buffer of size 3) var ch1 = Channel.newBufferedChannel(3); var ch2 = Channel.newBufferedChannel(3); - var ch3 = Channel.newBufferedChannel(3); - // send a value to two channels ch2.send(29); - ch3.send(32); - var received = - select(ch1.receiveClause(), ch2.receiveClause(), ch3.receiveClause()); - - // prints: Received: 29 - System.out.println("Received: " + received); - // ch3 still holds a value that can be received + var received = select(ch1.receiveClause(), ch2.receiveClause()); + System.out.println("Received: " + received); // 29 } } ``` -The received value can be optionally transformed by a provided function. - -`select` is biased: if a couple of the clauses can be completed immediately, the one that appears first will be -selected. +The received value can be optionally transformed by a provided function. `select` is biased: if multiple clauses can be completed immediately, the one that appears first is selected. -Similarly, you can select from a send clause to complete. Apart from the `Channel.sendClause()` method, there's also a -variant which runs a callback, once the clause is selected: +Similarly, you can select from a send clause. Apart from `sendClause(value)`, there's also a variant which runs a callback once selected: ```java import com.softwaremill.jox.Channel; - import static com.softwaremill.jox.Select.select; class Demo5 { @@ -165,91 +163,64 @@ class Demo5 { var ch1 = Channel.newBufferedChannel(1); var ch2 = Channel.newBufferedChannel(1); - ch1.send(12); // buffer is now full + ch1.send(12); // full var sent = select(ch1.sendClause(13, () -> "1st"), ch2.sendClause(25, () -> "2nd")); - - // prints: Sent: second - System.out.println("Sent: " + sent); + System.out.println("Sent: " + sent); // 2nd } } ``` -Optionally, you can also provide a default clause, which will be selected if none of the other clauses can be completed -immediately: +Optionally, a `defaultClause` can be provided, which is selected if no other clause is available immediately: ```java import com.softwaremill.jox.Channel; - import static com.softwaremill.jox.Select.defaultClause; import static com.softwaremill.jox.Select.select; class Demo6 { public static void main(String[] args) throws InterruptedException { var ch1 = Channel.newBufferedChannel(3); - var ch2 = Channel.newBufferedChannel(3); - - var received = select(ch1.receiveClause(), ch2.receiveClause(), defaultClause(52)); - - // prints: Received: 52 - System.out.println("Received: " + received); + var received = select(ch1.receiveClause(), defaultClause(52)); + System.out.println("Received: " + received); // 52 } } ``` ### Select with timeout -You can also select from multiple channels with a timeout using `selectWithin`. If none of the clauses can be completed -within the specified timeout, a `TimeoutException` is thrown: +`selectWithin` throws `TimeoutException` if no clause completes within the given duration: ```java import com.softwaremill.jox.Channel; - import java.time.Duration; import java.util.concurrent.TimeoutException; - import static com.softwaremill.jox.Select.selectWithin; class Demo7 { public static void main(String[] args) throws InterruptedException { var ch1 = Channel.newBufferedChannel(3); - var ch2 = Channel.newBufferedChannel(3); - try { - // Wait up to 500 milliseconds for a value to be available - var received = selectWithin(Duration.ofMillis(500), ch1.receiveClause(), ch2.receiveClause()); - System.out.println("Received: " + received); + selectWithin(Duration.ofMillis(500), ch1.receiveClause()); } catch (TimeoutException e) { - // prints: Select timed out after 500 ms - System.out.println("Select timed out after 500 ms"); + System.out.println("Select timed out"); } } } ``` -Alternatively, you can use `selectOrClosedWithin` which returns a timeout value instead of throwing an exception: +Alternatively, `selectOrClosedWithin` returns a timeout value: ```java import com.softwaremill.jox.Channel; - import java.time.Duration; - import static com.softwaremill.jox.Select.selectOrClosedWithin; class Demo8 { public static void main(String[] args) throws InterruptedException { var ch1 = Channel.newBufferedChannel(3); - var ch2 = Channel.newBufferedChannel(3); - - var result = selectOrClosedWithin(Duration.ofMillis(500), "TIMEOUT", - ch1.receiveClause(), ch2.receiveClause()); - - if (result.equals("TIMEOUT")) { - // prints: Select timed out - System.out.println("Select timed out"); - } else { - System.out.println("Received: " + result); - } + var result = selectOrClosedWithin(Duration.ofMillis(500), "TIMEOUT", ch1.receiveClause()); + System.out.println("Result: " + result); // TIMEOUT } } ``` diff --git a/flows/src/main/java/com/softwaremill/jox/flows/FromFlowPublisher.java b/flows/src/main/java/com/softwaremill/jox/flows/FromFlowPublisher.java index b3b818c..292d1eb 100644 --- a/flows/src/main/java/com/softwaremill/jox/flows/FromFlowPublisher.java +++ b/flows/src/main/java/com/softwaremill/jox/flows/FromFlowPublisher.java @@ -206,11 +206,7 @@ private record FlowSubscription(Sink signals) implements Flow.Subscripti // 3.15: the signals channel is never closed @Override public void request(long n) { - try { - signals.send(new Request(n)); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + signals.trySend(new Request(n)); } // 3.5: as above for 3.2 @@ -218,11 +214,7 @@ public void request(long n) { // 3.16: as above for 3.15 @Override public void cancel() { - try { - signals.send(new Cancel()); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + signals.trySend(new Cancel()); } // 3.10, 3.11: no synchronous calls in this implementation diff --git a/kafka/src/main/java/com/softwaremill/jox/kafka/KafkaDrain.java b/kafka/src/main/java/com/softwaremill/jox/kafka/KafkaDrain.java index f3e1173..b53ace4 100644 --- a/kafka/src/main/java/com/softwaremill/jox/kafka/KafkaDrain.java +++ b/kafka/src/main/java/com/softwaremill/jox/kafka/KafkaDrain.java @@ -63,11 +63,7 @@ public static void runPublish( if (exception != null) { logger.error( "Exception when sending record", exception); - try { - producerExceptions.sendOrClosed(exception); - } catch (InterruptedException e) { - // ignore - } + producerExceptions.trySendOrClosed(exception); } }); } diff --git a/kafka/src/main/java/com/softwaremill/jox/kafka/KafkaStage.java b/kafka/src/main/java/com/softwaremill/jox/kafka/KafkaStage.java index 806f4e9..3c78341 100644 --- a/kafka/src/main/java/com/softwaremill/jox/kafka/KafkaStage.java +++ b/kafka/src/main/java/com/softwaremill/jox/kafka/KafkaStage.java @@ -202,24 +202,12 @@ private static void sendPacket( toSend, (m, e) -> { if (e != null) { - try { - exceptions.sendOrClosed(e); - } catch (InterruptedException ex) { - // ignore - } + exceptions.trySendOrClosed(e); } else { if (commitOffsets && leftToSend.decrementAndGet() == 0) { - try { - toCommit.send(packet); - } catch (InterruptedException ex) { - // ignore - } - } - try { - metadata.send(new Pair<>(sequenceNo, m)); - } catch (InterruptedException ex) { - // ignore + toCommit.trySendOrClosed(packet); } + metadata.trySendOrClosed(new Pair<>(sequenceNo, m)); } }); } From f2d26853231cfff2eb1b4154edf9a73cf3300721 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Tue, 17 Feb 2026 14:44:50 +0100 Subject: [PATCH 3/8] Add RESUMING state comments in tryUpdateCellReceive and clean up channels.md Add explanatory comments for the transient RESUMING state in tryUpdateCellReceive, matching the existing comments in the blocking updateCellReceive variant. Revert unrelated cosmetic changes to channels.md from the trySend/tryReceive PR, keeping only the new non-blocking operations section and a contention caveat note. --- .../jox/fray/FrayTrySendReceiveTest.java | 284 ++++++++++++++++++ .../java/com/softwaremill/jox/Channel.java | 4 + docs/channels.md | 87 ++++-- 3 files changed, 355 insertions(+), 20 deletions(-) create mode 100644 channels-fray-tests/src/test/java/com/softwaremill/jox/fray/FrayTrySendReceiveTest.java diff --git a/channels-fray-tests/src/test/java/com/softwaremill/jox/fray/FrayTrySendReceiveTest.java b/channels-fray-tests/src/test/java/com/softwaremill/jox/fray/FrayTrySendReceiveTest.java new file mode 100644 index 0000000..8ee8738 --- /dev/null +++ b/channels-fray-tests/src/test/java/com/softwaremill/jox/fray/FrayTrySendReceiveTest.java @@ -0,0 +1,284 @@ +package com.softwaremill.jox.fray; + +import static com.softwaremill.jox.fray.Config.CHANNEL_SIZE; + +import java.util.ArrayList; + +import org.junit.jupiter.api.extension.ExtendWith; +import org.pastalab.fray.junit.junit5.FrayTestExtension; +import org.pastalab.fray.junit.junit5.annotations.ConcurrencyTest; + +import com.softwaremill.jox.Channel; +import com.softwaremill.jox.ChannelDone; + +@ExtendWith(FrayTestExtension.class) +public class FrayTrySendReceiveTest { + + // trySend | receive + + @ConcurrencyTest + public void trySendReceiveTest() throws InterruptedException { + Channel ch = Channel.newBufferedChannel(CHANNEL_SIZE); + + Fork f1 = + Fork.newNoResult( + () -> { + while (!ch.trySend(10)) { + Thread.yield(); + } + }); + Fork f2 = Fork.newWithResult(ch::receive); + + Fork.startAll(f1, f2); + f1.join(); + + assert (f2.join() == 10); + } + + // send | tryReceive + + @ConcurrencyTest + public void sendTryReceiveTest() throws InterruptedException { + Channel ch = Channel.newBufferedChannel(CHANNEL_SIZE); + + Fork f1 = Fork.newNoResult(() -> ch.send(10)); + Fork f2 = + Fork.newWithResult( + () -> { + Integer result; + while ((result = ch.tryReceive()) == null) { + Thread.yield(); + } + return result; + }); + + Fork.startAll(f1, f2); + f1.join(); + + assert (f2.join() == 10); + } + + // trySend | tryReceive + + @ConcurrencyTest + public void trySendTryReceiveTest() throws InterruptedException { + Channel ch = Channel.newBufferedChannel(CHANNEL_SIZE); + + Fork f1 = + Fork.newNoResult( + () -> { + while (!ch.trySend(10)) { + Thread.yield(); + } + }); + Fork f2 = + Fork.newWithResult( + () -> { + Integer result; + while ((result = ch.tryReceive()) == null) { + Thread.yield(); + } + return result; + }); + + Fork.startAll(f1, f2); + f1.join(); + + assert (f2.join() == 10); + } + + // multiple trySend | multiple tryReceive + + @ConcurrencyTest + public void multiTrySendMultiTryReceiveTest() throws InterruptedException { + Channel ch = Channel.newBufferedChannel(CHANNEL_SIZE); + + int concurrency = 4; + + var sendForks = new ArrayList>(); + var receiveForks = new ArrayList>(); + + for (int i = 0; i < concurrency; i++) { + final var finalI = i; + sendForks.add( + Fork.newNoResult( + () -> { + while (!ch.trySend(finalI)) { + Thread.yield(); + } + })); + receiveForks.add( + Fork.newWithResult( + () -> { + Integer result; + while ((result = ch.tryReceive()) == null) { + Thread.yield(); + } + return result; + })); + } + + Fork.startAll(sendForks.toArray(new Fork[0])); + Fork.startAll(receiveForks.toArray(new Fork[0])); + + for (Fork sendFork : sendForks) { + sendFork.join(); + } + + var result = 0; + for (Fork receiveFork : receiveForks) { + result += receiveFork.join(); + } + + assert (result == concurrency * (concurrency - 1) / 2); + } + + // trySend | tryReceive (rendezvous) — both try simultaneously, exactly one pair succeeds + + @ConcurrencyTest + public void trySendTryReceive_rendezvousTest() throws InterruptedException { + Channel ch = Channel.newRendezvousChannel(); + + Fork f1 = + Fork.newWithResult( + () -> { + while (!ch.trySend(10)) { + Thread.yield(); + } + return true; + }); + Fork f2 = + Fork.newWithResult( + () -> { + Integer result; + while ((result = ch.tryReceive()) == null) { + Thread.yield(); + } + return result; + }); + + Fork.startAll(f1, f2); + f1.join(); + + assert (f2.join() == 10); + } + + // trySend | receive (rendezvous) — trySend spins, blocking receive waits + + @ConcurrencyTest + public void trySendReceive_rendezvousTest() throws InterruptedException { + Channel ch = Channel.newRendezvousChannel(); + + Fork f1 = + Fork.newNoResult( + () -> { + while (!ch.trySend(10)) { + Thread.yield(); + } + }); + Fork f2 = Fork.newWithResult(ch::receive); + + Fork.startAll(f1, f2); + f1.join(); + + assert (f2.join() == 10); + } + + // send | tryReceive (rendezvous) — blocking send waits, tryReceive spins + + @ConcurrencyTest + public void sendTryReceive_rendezvousTest() throws InterruptedException { + Channel ch = Channel.newRendezvousChannel(); + + Fork f1 = Fork.newNoResult(() -> ch.send(10)); + Fork f2 = + Fork.newWithResult( + () -> { + Integer result; + while ((result = ch.tryReceive()) == null) { + Thread.yield(); + } + return result; + }); + + Fork.startAll(f1, f2); + f1.join(); + + assert (f2.join() == 10); + } + + // trySend | tryReceive (unlimited) + + @ConcurrencyTest + public void trySendTryReceive_unlimitedTest() throws InterruptedException { + Channel ch = Channel.newUnlimitedChannel(); + + Fork f1 = + Fork.newNoResult( + () -> { + while (!ch.trySend(10)) { + Thread.yield(); + } + }); + Fork f2 = + Fork.newWithResult( + () -> { + Integer result; + while ((result = ch.tryReceive()) == null) { + Thread.yield(); + } + return result; + }); + + Fork.startAll(f1, f2); + f1.join(); + + assert (f2.join() == 10); + } + + // trySend | close | tryReceive + + @ConcurrencyTest + public void trySendCloseTryReceiveTest() throws InterruptedException { + Channel ch = Channel.newBufferedChannel(CHANNEL_SIZE); + + Fork f1 = + Fork.newWithResult( + () -> { + Object r = ch.trySendOrClosed(10); + return r == null; // true if sent + }); + + Fork f2 = Fork.newNoResult(ch::done); + + Fork f3 = + Fork.newWithResult( + () -> { + // keep trying until we get a value or the channel is done + while (true) { + Object r = ch.tryReceiveOrClosed(); + if (r instanceof ChannelDone) { + return r; + } + if (r != null) { + return r; + } + Thread.yield(); + } + }); + + Fork.startAll(f1, f2, f3); + + boolean sent = f1.join(); + f2.join(); + Object received = f3.join(); + + if (sent) { + // if the value was sent, it must have been received + assert (received.equals(10)); + } else { + // if not sent, receiver must see channel done + assert (received instanceof ChannelDone); + } + } +} diff --git a/channels/src/main/java/com/softwaremill/jox/Channel.java b/channels/src/main/java/com/softwaremill/jox/Channel.java index b4515b4..380a061 100644 --- a/channels/src/main/java/com/softwaremill/jox/Channel.java +++ b/channels/src/main/java/com/softwaremill/jox/Channel.java @@ -574,6 +574,8 @@ private Object tryUpdateCellReceive(Segment segment, int i, long r) { expandBuffer(); return c.getPayload(); } else { + // the state will be set to INTERRUPTED_SEND by the continuation, meanwhile + // everybody else will observe RESUMING return ReceiveResult.FAILED; } } @@ -585,6 +587,8 @@ private Object tryUpdateCellReceive(Segment segment, int i, long r) { expandBuffer(); return ss.getPayload(); } else { + // the state will be set to INTERRUPTED_SEND by the cleanup, meanwhile + // everybody else will observe RESUMING return ReceiveResult.FAILED; } } diff --git a/docs/channels.md b/docs/channels.md index c64a9c0..40b3b1f 100644 --- a/docs/channels.md +++ b/docs/channels.md @@ -95,17 +95,23 @@ the sink processes the received values. use the `sendOrClosed()` and `receiveOrClosed()` methods, which return either a `ChannelClosed` value (reason of closure), or `null` / the received value. +Channels can also be inspected whether they are closed, using the `isClosedForReceive()` and `isClosedForSend()`. + ```java import com.softwaremill.jox.Channel; class Demo3 { public static void main(String[] args) throws InterruptedException { var ch = Channel.newBufferedChannel(3); + + // send()-s won't block ch.send(1); ch.done(); - System.out.println("Received: " + ch.receiveOrClosed()); // 1 - System.out.println("Received: " + ch.receiveOrClosed()); // ChannelDone[] + // prints: Received: 1 + System.out.println("Received: " + ch.receiveOrClosed()); + // prints: Received: ChannelDone[] + System.out.println("Received: " + ch.receiveOrClosed()); } } ``` @@ -118,15 +124,15 @@ For integration with non-blocking frameworks (e.g. Netty, Vert.x), `trySend` and var ch = Channel.newBufferedChannel(1); // returns true if sent, false if the buffer is full or no receiver is waiting -boolean s1 = ch.trySend(1); +boolean s1 = ch.trySend(1); // returns the value, or null if none is immediately available -Integer r1 = ch.tryReceive(); +Integer r1 = ch.tryReceive(); ``` Both have `OrClosed` variants which return a `ChannelClosed` value instead of throwing an exception if the channel is closed. -Channels can also be inspected using `isClosedForReceive()` and `isClosedForSend()`. +Note: under contention, `trySend`/`tryReceive` may return `false`/`null` even when space or values are available. If you need guaranteed delivery, use `send()`/`receive()` instead. ## Selecting from multiple channels @@ -135,27 +141,41 @@ channel: ```java import com.softwaremill.jox.Channel; + import static com.softwaremill.jox.Select.select; class Demo4 { public static void main(String[] args) throws InterruptedException { + // creates a buffered channel (buffer of size 3) var ch1 = Channel.newBufferedChannel(3); var ch2 = Channel.newBufferedChannel(3); + var ch3 = Channel.newBufferedChannel(3); + // send a value to two channels ch2.send(29); + ch3.send(32); - var received = select(ch1.receiveClause(), ch2.receiveClause()); - System.out.println("Received: " + received); // 29 + var received = + select(ch1.receiveClause(), ch2.receiveClause(), ch3.receiveClause()); + + // prints: Received: 29 + System.out.println("Received: " + received); + // ch3 still holds a value that can be received } } ``` -The received value can be optionally transformed by a provided function. `select` is biased: if multiple clauses can be completed immediately, the one that appears first is selected. +The received value can be optionally transformed by a provided function. + +`select` is biased: if a couple of the clauses can be completed immediately, the one that appears first will be +selected. -Similarly, you can select from a send clause. Apart from `sendClause(value)`, there's also a variant which runs a callback once selected: +Similarly, you can select from a send clause to complete. Apart from the `Channel.sendClause()` method, there's also a +variant which runs a callback, once the clause is selected: ```java import com.softwaremill.jox.Channel; + import static com.softwaremill.jox.Select.select; class Demo5 { @@ -163,64 +183,91 @@ class Demo5 { var ch1 = Channel.newBufferedChannel(1); var ch2 = Channel.newBufferedChannel(1); - ch1.send(12); // full + ch1.send(12); // buffer is now full var sent = select(ch1.sendClause(13, () -> "1st"), ch2.sendClause(25, () -> "2nd")); - System.out.println("Sent: " + sent); // 2nd + + // prints: Sent: second + System.out.println("Sent: " + sent); } } ``` -Optionally, a `defaultClause` can be provided, which is selected if no other clause is available immediately: +Optionally, you can also provide a default clause, which will be selected if none of the other clauses can be completed +immediately: ```java import com.softwaremill.jox.Channel; + import static com.softwaremill.jox.Select.defaultClause; import static com.softwaremill.jox.Select.select; class Demo6 { public static void main(String[] args) throws InterruptedException { var ch1 = Channel.newBufferedChannel(3); - var received = select(ch1.receiveClause(), defaultClause(52)); - System.out.println("Received: " + received); // 52 + var ch2 = Channel.newBufferedChannel(3); + + var received = select(ch1.receiveClause(), ch2.receiveClause(), defaultClause(52)); + + // prints: Received: 52 + System.out.println("Received: " + received); } } ``` ### Select with timeout -`selectWithin` throws `TimeoutException` if no clause completes within the given duration: +You can also select from multiple channels with a timeout using `selectWithin`. If none of the clauses can be completed +within the specified timeout, a `TimeoutException` is thrown: ```java import com.softwaremill.jox.Channel; + import java.time.Duration; import java.util.concurrent.TimeoutException; + import static com.softwaremill.jox.Select.selectWithin; class Demo7 { public static void main(String[] args) throws InterruptedException { var ch1 = Channel.newBufferedChannel(3); + var ch2 = Channel.newBufferedChannel(3); + try { - selectWithin(Duration.ofMillis(500), ch1.receiveClause()); + // Wait up to 500 milliseconds for a value to be available + var received = selectWithin(Duration.ofMillis(500), ch1.receiveClause(), ch2.receiveClause()); + System.out.println("Received: " + received); } catch (TimeoutException e) { - System.out.println("Select timed out"); + // prints: Select timed out after 500 ms + System.out.println("Select timed out after 500 ms"); } } } ``` -Alternatively, `selectOrClosedWithin` returns a timeout value: +Alternatively, you can use `selectOrClosedWithin` which returns a timeout value instead of throwing an exception: ```java import com.softwaremill.jox.Channel; + import java.time.Duration; + import static com.softwaremill.jox.Select.selectOrClosedWithin; class Demo8 { public static void main(String[] args) throws InterruptedException { var ch1 = Channel.newBufferedChannel(3); - var result = selectOrClosedWithin(Duration.ofMillis(500), "TIMEOUT", ch1.receiveClause()); - System.out.println("Result: " + result); // TIMEOUT + var ch2 = Channel.newBufferedChannel(3); + + var result = selectOrClosedWithin(Duration.ofMillis(500), "TIMEOUT", + ch1.receiveClause(), ch2.receiveClause()); + + if (result.equals("TIMEOUT")) { + // prints: Select timed out + System.out.println("Select timed out"); + } else { + System.out.println("Received: " + result); + } } } ``` From c4ec274b4f208a42170f30ed38a4ace760155335 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Tue, 17 Feb 2026 14:51:12 +0100 Subject: [PATCH 4/8] Improve Javadoc wording for contention caveat and add error-with-buffered-values test Clarify Sink/Source Javadoc: replace "spuriously" with explicit explanation of what happens under contention. Add test verifying that tryReceiveOrClosed returns ChannelError immediately when error() is called on a channel with buffered values. --- .../src/main/java/com/softwaremill/jox/Sink.java | 8 ++++---- .../src/main/java/com/softwaremill/jox/Source.java | 8 ++++---- .../softwaremill/jox/ChannelTrySendReceiveTest.java | 12 ++++++++++++ 3 files changed, 20 insertions(+), 8 deletions(-) diff --git a/channels/src/main/java/com/softwaremill/jox/Sink.java b/channels/src/main/java/com/softwaremill/jox/Sink.java index d9fc356..12e49b0 100644 --- a/channels/src/main/java/com/softwaremill/jox/Sink.java +++ b/channels/src/main/java/com/softwaremill/jox/Sink.java @@ -30,8 +30,8 @@ public interface Sink extends CloseableChannel { *

This method never blocks or suspends the calling thread. It completes in bounded time. * Safe to call from platform threads, including NIO event loop threads. * - *

May spuriously return {@code false} under contention. Should not be used as a substitute - * for {@link #send(Object)} in a spin loop. + *

May return {@code false} even when space is available, due to contention with concurrent + * operations. Should not be used as a substitute for {@link #send(Object)} in a spin loop. * * @param value The value to send. Not {@code null}. * @return {@code true} if the value was sent, {@code false} otherwise. @@ -50,8 +50,8 @@ default boolean trySend(T value) { *

This method never blocks or suspends the calling thread. It completes in bounded time. * Safe to call from platform threads, including NIO event loop threads. * - *

May spuriously fail under contention. Should not be used as a substitute for {@link - * #sendOrClosed(Object)} in a spin loop. + *

May fail even when space is available, due to contention with concurrent operations. + * Should not be used as a substitute for {@link #sendOrClosed(Object)} in a spin loop. * * @param value The value to send. Not {@code null}. * @return Either {@code null} when the value was sent successfully, {@link ChannelClosed} when diff --git a/channels/src/main/java/com/softwaremill/jox/Source.java b/channels/src/main/java/com/softwaremill/jox/Source.java index 392d8bc..2c06bd3 100644 --- a/channels/src/main/java/com/softwaremill/jox/Source.java +++ b/channels/src/main/java/com/softwaremill/jox/Source.java @@ -32,8 +32,8 @@ public interface Source extends CloseableChannel { *

This method never blocks or suspends the calling thread. It completes in bounded time. * Safe to call from platform threads, including NIO event loop threads. * - *

May spuriously return {@code null} under contention. Should not be used as a substitute - * for {@link #receive()} in a spin loop. + *

May return {@code null} even when a value is available, due to contention with concurrent + * operations. Should not be used as a substitute for {@link #receive()} in a spin loop. * * @return The received value, or {@code null} if no value is immediately available. * @throws ChannelClosedException When the channel is closed. @@ -52,8 +52,8 @@ default T tryReceive() { *

This method never blocks or suspends the calling thread. It completes in bounded time. * Safe to call from platform threads, including NIO event loop threads. * - *

May spuriously return {@code null} under contention. Should not be used as a substitute - * for {@link #receiveOrClosed()} in a spin loop. + *

May return {@code null} even when a value is available, due to contention with concurrent + * operations. Should not be used as a substitute for {@link #receiveOrClosed()} in a spin loop. * * @return The received value of type {@code T}, {@link ChannelClosed} when the channel is * closed, or {@code null} if no value is immediately available. diff --git a/channels/src/test/java/com/softwaremill/jox/ChannelTrySendReceiveTest.java b/channels/src/test/java/com/softwaremill/jox/ChannelTrySendReceiveTest.java index f4e20e5..70dfd31 100644 --- a/channels/src/test/java/com/softwaremill/jox/ChannelTrySendReceiveTest.java +++ b/channels/src/test/java/com/softwaremill/jox/ChannelTrySendReceiveTest.java @@ -214,6 +214,18 @@ void tryReceiveOrClosed_closedDone_withBufferedValues_shouldReturnValuesThenDone assertInstanceOf(ChannelDone.class, ch.tryReceiveOrClosed()); } + @Test + void tryReceiveOrClosed_closedError_withBufferedValues_shouldReturnError() + throws InterruptedException { + Channel ch = Channel.newBufferedChannel(3); + ch.send("a"); + ch.send("b"); + ch.error(new RuntimeException("boom")); + + // error() discards buffered values — should return ChannelError immediately + assertInstanceOf(ChannelError.class, ch.tryReceiveOrClosed()); + } + // ************************* // Mixed send/receive tests // ************************* From fbd3b089ec28b0614c6b307f75860a871f9caf34 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Tue, 17 Feb 2026 15:00:38 +0100 Subject: [PATCH 5/8] Strip redundant comments from non-blocking send/receive methods Remove narrating comments (CAS unsuccessful, reserving cell, etc.) that just restate what the code does. Keep comments that explain why, matching the style of the blocking variants. --- .../java/com/softwaremill/jox/Channel.java | 29 +++++-------------- 1 file changed, 8 insertions(+), 21 deletions(-) diff --git a/channels/src/main/java/com/softwaremill/jox/Channel.java b/channels/src/main/java/com/softwaremill/jox/Channel.java index 380a061..1657b8c 100644 --- a/channels/src/main/java/com/softwaremill/jox/Channel.java +++ b/channels/src/main/java/com/softwaremill/jox/Channel.java @@ -366,7 +366,6 @@ public Object trySendOrClosed(T value) { throw new NullPointerException(); } while (true) { - // reading the segment before the counter increment var segment = sendSegment; var scf = sendersAndClosedFlag; var s = getSendersCounter(scf); @@ -375,7 +374,6 @@ public Object trySendOrClosed(T value) { return closedReason; } - // pre-check if immediate completion is possible if (capacity >= 0) { var bufEnd = bufferEnd; var r = receivers; @@ -386,7 +384,6 @@ public Object trySendOrClosed(T value) { } } - // reserving cell s if (!SENDERS_AND_CLOSE_FLAG.compareAndSet(this, scf, scf + 1)) { continue; } @@ -407,7 +404,6 @@ public Object trySendOrClosed(T value) { } } - // process cell (never suspends) var sendResult = tryUpdateCellSend(segment, i, s, value); if (sendResult == SendResult.BUFFERED) { return null; @@ -439,25 +435,21 @@ private Object tryUpdateCellSend(Segment segment, int i, long s, T value) { if (state == null) { if (capacity >= 0 && s >= (isRendezvous ? 0 : bufferEnd) && s >= receivers) { - // cell is empty, no receiver, not in buffer -> would suspend - // roll back: mark as interrupted send + // cell is empty, and no receiver, not in buffer -> interrupt if (segment.casCell(i, null, INTERRUPTED_SEND)) { segment.cellInterruptedSender(); return TRY_SEND_NOT_SENT; } - // CAS unsuccessful, repeat } else { // cell is empty, but a receiver is in progress, or in buffer -> elimination if (segment.casCell(i, null, value)) { return SendResult.BUFFERED; } - // CAS unsuccessful, repeat } } else if (state == IN_BUFFER) { if (segment.casCell(i, IN_BUFFER, value)) { return SendResult.BUFFERED; } - // CAS unsuccessful, repeat } else if (state instanceof Continuation c) { // a receiver is waiting -> trying to resume if (c.tryResume(value)) { @@ -468,6 +460,7 @@ private Object tryUpdateCellSend(Segment segment, int i, long s, T value) { } } else if (state instanceof StoredSelectClause ss) { ss.setPayload(value); + // a select clause is waiting -> trying to resume if (ss.getSelect().trySelect(ss)) { segment.setCell(i, DONE); return SendResult.RESUMED; @@ -475,6 +468,7 @@ private Object tryUpdateCellSend(Segment segment, int i, long s, T value) { return SendResult.FAILED; } } else if (state == INTERRUPTED_RECEIVE || state == BROKEN) { + // cell interrupted or poisoned -> trying with a new one return SendResult.FAILED; } else if (state == CLOSED) { return SendResult.CLOSED; @@ -501,10 +495,8 @@ public Object tryReceiveOrClosed() { return null; } - // reading the segment before the counter increment var segment = receiveSegment; - // reserving cell r if (!RECEIVERS.compareAndSet(this, r, r + 1)) { continue; } @@ -524,7 +516,6 @@ public Object tryReceiveOrClosed() { } } - // process cell (never suspends) var result = tryUpdateCellReceive(segment, i, r); if (result == ReceiveResult.CLOSED) { return closedReason; @@ -550,25 +541,22 @@ private Object tryUpdateCellReceive(Segment segment, int i, long r) { if (state == null || state == IN_BUFFER) { if (r >= getSendersCounter(sendersAndClosedFlag)) { - // cell is empty, no sender -> would suspend - // roll back: mark as interrupted receive + // cell is empty, no sender -> interrupt if (segment.casCell(i, state, INTERRUPTED_RECEIVE)) { segment.cellInterruptedReceiver(); expandBuffer(); - return null; // nothing available + return null; } - // CAS unsuccessful, repeat } else { // sender in progress -> mark as broken if (segment.casCell(i, state, BROKEN)) { expandBuffer(); - return ReceiveResult.FAILED; // retry + return ReceiveResult.FAILED; } - // CAS unsuccessful, repeat } } else if (state instanceof Continuation c) { - // resolving a potential race with expandBuffer if (segment.casCell(i, state, RESUMING)) { + // a sender is waiting -> trying to resume if (c.tryResume(0)) { segment.setCell(i, DONE); expandBuffer(); @@ -579,9 +567,9 @@ private Object tryUpdateCellReceive(Segment segment, int i, long r) { return ReceiveResult.FAILED; } } - // CAS unsuccessful, repeat } else if (state instanceof StoredSelectClause ss) { if (segment.casCell(i, state, RESUMING)) { + // a send clause is waiting -> trying to resume if (ss.getSelect().trySelect(ss)) { segment.setCell(i, DONE); expandBuffer(); @@ -592,7 +580,6 @@ private Object tryUpdateCellReceive(Segment segment, int i, long r) { return ReceiveResult.FAILED; } } - // CAS unsuccessful, repeat } else if (state instanceof CellState) { switch (state) { case CellState.INTERRUPTED_SEND -> { From e3b89dd6d39e2f6dd00b4e52b421eafca060bfb5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Wed, 18 Feb 2026 13:10:39 +0100 Subject: [PATCH 6/8] fix: Simplify non-blocking send/receive test logic and remove redundant spinning loops --- .../jox/fray/FrayTrySendReceiveTest.java | 139 ++++++------------ 1 file changed, 43 insertions(+), 96 deletions(-) diff --git a/channels-fray-tests/src/test/java/com/softwaremill/jox/fray/FrayTrySendReceiveTest.java b/channels-fray-tests/src/test/java/com/softwaremill/jox/fray/FrayTrySendReceiveTest.java index 8ee8738..6610740 100644 --- a/channels-fray-tests/src/test/java/com/softwaremill/jox/fray/FrayTrySendReceiveTest.java +++ b/channels-fray-tests/src/test/java/com/softwaremill/jox/fray/FrayTrySendReceiveTest.java @@ -23,8 +23,8 @@ public void trySendReceiveTest() throws InterruptedException { Fork f1 = Fork.newNoResult( () -> { - while (!ch.trySend(10)) { - Thread.yield(); + if (!ch.trySend(10)) { + ch.send(10); } }); Fork f2 = Fork.newWithResult(ch::receive); @@ -42,20 +42,13 @@ public void sendTryReceiveTest() throws InterruptedException { Channel ch = Channel.newBufferedChannel(CHANNEL_SIZE); Fork f1 = Fork.newNoResult(() -> ch.send(10)); - Fork f2 = - Fork.newWithResult( - () -> { - Integer result; - while ((result = ch.tryReceive()) == null) { - Thread.yield(); - } - return result; - }); + Fork f2 = Fork.newWithResult(ch::tryReceive); Fork.startAll(f1, f2); f1.join(); + Integer received = f2.join(); - assert (f2.join() == 10); + assert (received == null || received == 10); } // trySend | tryReceive @@ -64,27 +57,17 @@ public void sendTryReceiveTest() throws InterruptedException { public void trySendTryReceiveTest() throws InterruptedException { Channel ch = Channel.newBufferedChannel(CHANNEL_SIZE); - Fork f1 = - Fork.newNoResult( - () -> { - while (!ch.trySend(10)) { - Thread.yield(); - } - }); - Fork f2 = - Fork.newWithResult( - () -> { - Integer result; - while ((result = ch.tryReceive()) == null) { - Thread.yield(); - } - return result; - }); + Fork f1 = Fork.newWithResult(() -> ch.trySend(10)); + Fork f2 = Fork.newWithResult(ch::tryReceive); Fork.startAll(f1, f2); - f1.join(); + boolean sent = f1.join(); + Integer received = f2.join(); - assert (f2.join() == 10); + if (received != null) { + assert sent; + assert (received == 10); + } } // multiple trySend | multiple tryReceive @@ -103,18 +86,16 @@ public void multiTrySendMultiTryReceiveTest() throws InterruptedException { sendForks.add( Fork.newNoResult( () -> { - while (!ch.trySend(finalI)) { - Thread.yield(); + if (!ch.trySend(finalI)) { + ch.send(finalI); } })); receiveForks.add( Fork.newWithResult( () -> { - Integer result; - while ((result = ch.tryReceive()) == null) { - Thread.yield(); - } - return result; + Integer result = ch.tryReceive(); + if (result != null) return result; + return ch.receive(); })); } @@ -133,28 +114,25 @@ public void multiTrySendMultiTryReceiveTest() throws InterruptedException { assert (result == concurrency * (concurrency - 1) / 2); } - // trySend | tryReceive (rendezvous) — both try simultaneously, exactly one pair succeeds + // trySend | tryReceive (rendezvous) @ConcurrencyTest public void trySendTryReceive_rendezvousTest() throws InterruptedException { Channel ch = Channel.newRendezvousChannel(); - Fork f1 = - Fork.newWithResult( + Fork f1 = + Fork.newNoResult( () -> { - while (!ch.trySend(10)) { - Thread.yield(); + if (!ch.trySend(10)) { + ch.send(10); } - return true; }); Fork f2 = Fork.newWithResult( () -> { - Integer result; - while ((result = ch.tryReceive()) == null) { - Thread.yield(); - } - return result; + Integer result = ch.tryReceive(); + if (result != null) return result; + return ch.receive(); }); Fork.startAll(f1, f2); @@ -163,7 +141,7 @@ public void trySendTryReceive_rendezvousTest() throws InterruptedException { assert (f2.join() == 10); } - // trySend | receive (rendezvous) — trySend spins, blocking receive waits + // trySend | receive (rendezvous) @ConcurrencyTest public void trySendReceive_rendezvousTest() throws InterruptedException { @@ -172,8 +150,8 @@ public void trySendReceive_rendezvousTest() throws InterruptedException { Fork f1 = Fork.newNoResult( () -> { - while (!ch.trySend(10)) { - Thread.yield(); + if (!ch.trySend(10)) { + ch.send(10); } }); Fork f2 = Fork.newWithResult(ch::receive); @@ -184,7 +162,7 @@ public void trySendReceive_rendezvousTest() throws InterruptedException { assert (f2.join() == 10); } - // send | tryReceive (rendezvous) — blocking send waits, tryReceive spins + // send | tryReceive (rendezvous) @ConcurrencyTest public void sendTryReceive_rendezvousTest() throws InterruptedException { @@ -194,11 +172,9 @@ public void sendTryReceive_rendezvousTest() throws InterruptedException { Fork f2 = Fork.newWithResult( () -> { - Integer result; - while ((result = ch.tryReceive()) == null) { - Thread.yield(); - } - return result; + Integer result = ch.tryReceive(); + if (result != null) return result; + return ch.receive(); }); Fork.startAll(f1, f2); @@ -213,27 +189,15 @@ public void sendTryReceive_rendezvousTest() throws InterruptedException { public void trySendTryReceive_unlimitedTest() throws InterruptedException { Channel ch = Channel.newUnlimitedChannel(); - Fork f1 = - Fork.newNoResult( - () -> { - while (!ch.trySend(10)) { - Thread.yield(); - } - }); - Fork f2 = - Fork.newWithResult( - () -> { - Integer result; - while ((result = ch.tryReceive()) == null) { - Thread.yield(); - } - return result; - }); + Fork f1 = Fork.newWithResult(() -> ch.trySend(10)); + Fork f2 = Fork.newWithResult(ch::tryReceive); Fork.startAll(f1, f2); - f1.join(); + boolean sent = f1.join(); + Integer received = f2.join(); - assert (f2.join() == 10); + assert sent; + assert received == null || (received == 10); } // trySend | close | tryReceive @@ -246,26 +210,12 @@ public void trySendCloseTryReceiveTest() throws InterruptedException { Fork.newWithResult( () -> { Object r = ch.trySendOrClosed(10); - return r == null; // true if sent + return r == null; }); Fork f2 = Fork.newNoResult(ch::done); - Fork f3 = - Fork.newWithResult( - () -> { - // keep trying until we get a value or the channel is done - while (true) { - Object r = ch.tryReceiveOrClosed(); - if (r instanceof ChannelDone) { - return r; - } - if (r != null) { - return r; - } - Thread.yield(); - } - }); + Fork f3 = Fork.newWithResult(ch::tryReceiveOrClosed); Fork.startAll(f1, f2, f3); @@ -273,12 +223,9 @@ public void trySendCloseTryReceiveTest() throws InterruptedException { f2.join(); Object received = f3.join(); - if (sent) { - // if the value was sent, it must have been received + if (received != null && !(received instanceof ChannelDone)) { assert (received.equals(10)); - } else { - // if not sent, receiver must see channel done - assert (received instanceof ChannelDone); + assert sent; } } } From 00509e204c441d956c1dcb9d06e60e1982fc6c25 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Wed, 18 Feb 2026 13:32:22 +0100 Subject: [PATCH 7/8] fix: Update non-blocking send/receive test to fallback to blocking receive when result is null --- .../softwaremill/jox/fray/FrayTrySendReceiveTest.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/channels-fray-tests/src/test/java/com/softwaremill/jox/fray/FrayTrySendReceiveTest.java b/channels-fray-tests/src/test/java/com/softwaremill/jox/fray/FrayTrySendReceiveTest.java index 6610740..f162bbb 100644 --- a/channels-fray-tests/src/test/java/com/softwaremill/jox/fray/FrayTrySendReceiveTest.java +++ b/channels-fray-tests/src/test/java/com/softwaremill/jox/fray/FrayTrySendReceiveTest.java @@ -42,13 +42,18 @@ public void sendTryReceiveTest() throws InterruptedException { Channel ch = Channel.newBufferedChannel(CHANNEL_SIZE); Fork f1 = Fork.newNoResult(() -> ch.send(10)); - Fork f2 = Fork.newWithResult(ch::tryReceive); + Fork f2 = + Fork.newWithResult( + () -> { + Integer result = ch.tryReceive(); + if (result != null) return result; + return ch.receive(); + }); Fork.startAll(f1, f2); f1.join(); - Integer received = f2.join(); - assert (received == null || received == 10); + assert (f2.join() == 10); } // trySend | tryReceive From 0f040ccd69c49db632549a1a63ae10dcf1650f71 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Thu, 19 Feb 2026 11:41:22 +0100 Subject: [PATCH 8/8] Remove estimateSize() from Channel The team decided against channel size estimation (ADR #257). Remove the method and its tests to clean up the PR #262 diff. --- .../java/com/softwaremill/jox/Channel.java | 72 ------ .../com/softwaremill/jox/ChannelTest.java | 223 ------------------ 2 files changed, 295 deletions(-) diff --git a/channels/src/main/java/com/softwaremill/jox/Channel.java b/channels/src/main/java/com/softwaremill/jox/Channel.java index 1657b8c..515ea8b 100644 --- a/channels/src/main/java/com/softwaremill/jox/Channel.java +++ b/channels/src/main/java/com/softwaremill/jox/Channel.java @@ -1323,78 +1323,6 @@ private boolean hasValueToReceive(Segment segment, int i) { } } - // ***************** - // Channel state API - // ***************** - - /** - * Returns a best-effort estimate of the number of elements in the channel. - * - *

This is a non-blocking O(1) operation that computes the difference between the number of - * send and receive operations initiated on this channel. This approximates the number of - * buffered values, but may also include in-flight or waiting operations (e.g., senders blocked - * because the channel is at capacity). - * - *

Important caveats: - * - *

    - *
  • The returned value is immediately outdated due to concurrent operations - *
  • For buffered channels at capacity, this includes waiting senders beyond the buffer - *
  • For rendezvous channels, this typically returns 0 - *
  • Should NOT be used for synchronization or control flow logic - *
  • Best suited for monitoring, metrics, and debugging - *
- * - *

Use cases: - * - *

    - *
  • Monitoring unlimited channels for unbounded growth - *
  • Exporting metrics to monitoring systems (Prometheus, Micrometer, etc.) - *
  • Dashboard visualization of channel state - *
  • Alert thresholds (e.g., "channel buffered >5000 items") - *
  • Capacity planning and performance analysis - *
- * - *

Anti-patterns (do NOT do this): - * - *

    - *
  • {@code if (ch.estimateSize() > 0) ch.receive()} - race condition - *
  • {@code while (ch.estimateSize() < capacity) ch.send(x)} - unreliable - *
- * - *

Recommended patterns: - * - *

    - *
  • Log estimate periodically for trending analysis - *
  • Export to metrics system for alerting on thresholds - *
  • Use in tests to verify approximate behavior - *
- * - *

Example - Monitoring: - * - *

{@code
-     * var ch = Channel.newUnlimitedChannel();
-     *
-     * // Background thread for metrics
-     * Thread.startVirtualThread(() -> {
-     *     while (!ch.closedForSend()) {
-     *         long estimate = ch.estimateSize();
-     *         metricsRegistry.gauge("channel.size", estimate);
-     *         Thread.sleep(Duration.ofSeconds(10));
-     *     }
-     * });
-     * }
- * - * @return A best-effort estimate of elements in the channel, based on the send/receive - * operation differential. Always >= 0. The value is immediately stale and should only be - * used for observability purposes. - */ - public long estimateSize() { - long s = getSendersCounter(sendersAndClosedFlag); - long r = receivers; - return Math.max(0, s - r); - } - // ************** // Select clauses // ************** diff --git a/channels/src/test/java/com/softwaremill/jox/ChannelTest.java b/channels/src/test/java/com/softwaremill/jox/ChannelTest.java index 870f18f..bf7783c 100644 --- a/channels/src/test/java/com/softwaremill/jox/ChannelTest.java +++ b/channels/src/test/java/com/softwaremill/jox/ChannelTest.java @@ -81,227 +81,4 @@ void testNullItem() throws InterruptedException { Channel ch = Channel.newBufferedChannel(4); assertThrows(NullPointerException.class, () -> ch.send(null)); } - - @Test - void testEstimateSize_bufferedChannel_empty() { - Channel ch = Channel.newBufferedChannel(10); - assertEquals(0, ch.estimateSize()); - } - - @Test - void testEstimateSize_bufferedChannel_withValues() throws InterruptedException { - Channel ch = Channel.newBufferedChannel(10); - assertEquals(0, ch.estimateSize()); - - ch.send(1); - ch.send(2); - ch.send(3); - assertTrue(ch.estimateSize() >= 3, "Expected at least 3 items, got " + ch.estimateSize()); - - ch.receive(); - assertTrue( - ch.estimateSize() >= 2, - "Expected at least 2 items after receive, got " + ch.estimateSize()); - - ch.receive(); - ch.receive(); - assertEquals(0, ch.estimateSize()); - } - - @Test - void testEstimateSize_bufferedChannel_afterClose() throws InterruptedException { - Channel ch = Channel.newBufferedChannel(10); - ch.send(1); - ch.send(2); - ch.send(3); - - ch.done(); - assertTrue( - ch.estimateSize() >= 3, - "Estimate should still work after close, got " + ch.estimateSize()); - - ch.receive(); - assertTrue( - ch.estimateSize() >= 2, - "Estimate should reflect remaining buffered values, got " + ch.estimateSize()); - } - - @Test - void testEstimateSize_unlimitedChannel_growth() throws InterruptedException { - Channel ch = Channel.newUnlimitedChannel(); - - for (int i = 0; i < 1000; i++) { - ch.send(i); - } - - long estimate = ch.estimateSize(); - assertTrue( - estimate >= 900, - "Expected at least 900 items in unlimited channel, got " + estimate); - assertTrue(estimate <= 1000, "Expected at most 1000 items, got " + estimate); - - // Receive some and check size decreases - for (int i = 0; i < 500; i++) { - ch.receive(); - } - - estimate = ch.estimateSize(); - assertTrue( - estimate >= 400, - "Expected at least 400 items after receiving 500, got " + estimate); - assertTrue(estimate <= 500, "Expected at most 500 items, got " + estimate); - } - - @Test - void testEstimateSize_rendezvousChannel() throws InterruptedException, ExecutionException { - Channel ch = Channel.newRendezvousChannel(); - assertEquals(0, ch.estimateSize()); - - scoped( - scope -> { - // Start a sender in background - forkVoid( - scope, - () -> { - try { - Thread.sleep(10); - ch.send(1); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - }); - - // Rendezvous should have 0 or very small estimate - Thread.sleep(20); - long estimate = ch.estimateSize(); - assertTrue( - estimate <= 1, - "Rendezvous channel should have small estimate, got " + estimate); - - // Receive the value - ch.receive(); - assertEquals(0, ch.estimateSize()); - }); - } - - @Test - void testEstimateSize_concurrentModification() throws InterruptedException, ExecutionException { - Channel ch = Channel.newBufferedChannel(1000); - - scoped( - scope -> { - // 10 senders - for (int i = 0; i < 10; i++) { - forkVoid( - scope, - () -> { - for (int j = 0; j < 100; j++) { - ch.send(j); - } - }); - } - - // 5 receivers - for (int i = 0; i < 5; i++) { - forkVoid( - scope, - () -> { - for (int j = 0; j < 50; j++) { - ch.receive(); - } - }); - } - - // Check estimate is within bounds during concurrent operations - Thread.sleep(50); - long estimate = ch.estimateSize(); - assertTrue(estimate >= 0, "Estimate should be non-negative, got " + estimate); - assertTrue( - estimate <= 1000, - "Estimate should not exceed capacity significantly, got " + estimate); - }); - - // After all operations, estimate should be around 750 (1000 sent - 250 received) - long finalEstimate = ch.estimateSize(); - assertTrue( - finalEstimate >= 700, - "Expected around 750 items after concurrent ops, got " + finalEstimate); - assertTrue( - finalEstimate <= 800, - "Expected around 750 items after concurrent ops, got " + finalEstimate); - } - - @Test - void testEstimateSize_neverNegative() throws InterruptedException, ExecutionException { - Channel ch = Channel.newRendezvousChannel(); - - scoped( - scope -> { - // Start receivers before senders - for (int i = 0; i < 5; i++) { - forkVoid( - scope, - () -> { - Thread.sleep(10); - ch.receive(); - }); - } - - // Even with waiting receivers, estimate should be >= 0 - Thread.sleep(20); - long estimate = ch.estimateSize(); - assertTrue(estimate >= 0, "Estimate should never be negative, got " + estimate); - - // Send values to waiting receivers - for (int i = 0; i < 5; i++) { - ch.send(i); - } - - assertEquals(0, ch.estimateSize()); - }); - } - - @Test - void testEstimateSize_afterError() throws InterruptedException { - Channel ch = Channel.newBufferedChannel(10); - ch.send(1); - ch.send(2); - - ch.error(new RuntimeException("test error")); - - // Estimate should still work after error - long estimate = ch.estimateSize(); - assertTrue(estimate >= 0, "Estimate should work after error, got " + estimate); - } - - @Test - void testEstimateSize_bufferedChannel_waitingSendersBeyondCapacity() - throws InterruptedException, ExecutionException { - Channel ch = Channel.newBufferedChannel(2); - ch.send(1); - ch.send(2); - // Buffer is now full, estimate should reflect 2 buffered values - assertEquals(2, ch.estimateSize()); - - scoped( - scope -> { - // Start a sender that will block because the buffer is full - forkVoid(scope, () -> ch.send(3)); - - // Give the blocked sender time to increment the senders counter - Thread.sleep(50); - - // Estimate includes the waiting sender: documented behavior - long estimate = ch.estimateSize(); - assertTrue( - estimate >= 2, - "Expected at least 2 (buffered values), got " + estimate); - assertTrue( - estimate <= 3, - "Expected at most 3 (buffered + waiting sender), got " + estimate); - - // Receive one to unblock the waiting sender - ch.receive(); - }); - } }