diff --git a/channels-fray-tests/src/test/java/com/softwaremill/jox/fray/FrayTrySendReceiveTest.java b/channels-fray-tests/src/test/java/com/softwaremill/jox/fray/FrayTrySendReceiveTest.java new file mode 100644 index 0000000..f162bbb --- /dev/null +++ b/channels-fray-tests/src/test/java/com/softwaremill/jox/fray/FrayTrySendReceiveTest.java @@ -0,0 +1,236 @@ +package com.softwaremill.jox.fray; + +import static com.softwaremill.jox.fray.Config.CHANNEL_SIZE; + +import java.util.ArrayList; + +import org.junit.jupiter.api.extension.ExtendWith; +import org.pastalab.fray.junit.junit5.FrayTestExtension; +import org.pastalab.fray.junit.junit5.annotations.ConcurrencyTest; + +import com.softwaremill.jox.Channel; +import com.softwaremill.jox.ChannelDone; + +@ExtendWith(FrayTestExtension.class) +public class FrayTrySendReceiveTest { + + // trySend | receive + + @ConcurrencyTest + public void trySendReceiveTest() throws InterruptedException { + Channel ch = Channel.newBufferedChannel(CHANNEL_SIZE); + + Fork f1 = + Fork.newNoResult( + () -> { + if (!ch.trySend(10)) { + ch.send(10); + } + }); + Fork f2 = Fork.newWithResult(ch::receive); + + Fork.startAll(f1, f2); + f1.join(); + + assert (f2.join() == 10); + } + + // send | tryReceive + + @ConcurrencyTest + public void sendTryReceiveTest() throws InterruptedException { + Channel ch = Channel.newBufferedChannel(CHANNEL_SIZE); + + Fork f1 = Fork.newNoResult(() -> ch.send(10)); + Fork f2 = + Fork.newWithResult( + () -> { + Integer result = ch.tryReceive(); + if (result != null) return result; + return ch.receive(); + }); + + Fork.startAll(f1, f2); + f1.join(); + + assert (f2.join() == 10); + } + + // trySend | tryReceive + + @ConcurrencyTest + public void trySendTryReceiveTest() throws InterruptedException { + Channel ch = Channel.newBufferedChannel(CHANNEL_SIZE); + + Fork f1 = Fork.newWithResult(() -> ch.trySend(10)); + Fork f2 = Fork.newWithResult(ch::tryReceive); + + Fork.startAll(f1, f2); + boolean sent = f1.join(); + Integer received = f2.join(); + + if (received != null) { + assert sent; + assert (received == 10); + } + } + + // multiple trySend | multiple tryReceive + + @ConcurrencyTest + public void multiTrySendMultiTryReceiveTest() throws InterruptedException { + Channel ch = Channel.newBufferedChannel(CHANNEL_SIZE); + + int concurrency = 4; + + var sendForks = new ArrayList>(); + var receiveForks = new ArrayList>(); + + for (int i = 0; i < concurrency; i++) { + final var finalI = i; + sendForks.add( + Fork.newNoResult( + () -> { + if (!ch.trySend(finalI)) { + ch.send(finalI); + } + })); + receiveForks.add( + Fork.newWithResult( + () -> { + Integer result = ch.tryReceive(); + if (result != null) return result; + return ch.receive(); + })); + } + + Fork.startAll(sendForks.toArray(new Fork[0])); + Fork.startAll(receiveForks.toArray(new Fork[0])); + + for (Fork sendFork : sendForks) { + sendFork.join(); + } + + var result = 0; + for (Fork receiveFork : receiveForks) { + result += receiveFork.join(); + } + + assert (result == concurrency * (concurrency - 1) / 2); + } + + // trySend | tryReceive (rendezvous) + + @ConcurrencyTest + public void trySendTryReceive_rendezvousTest() throws InterruptedException { + Channel ch = Channel.newRendezvousChannel(); + + Fork f1 = + Fork.newNoResult( + () -> { + if (!ch.trySend(10)) { + ch.send(10); + } + }); + Fork f2 = + Fork.newWithResult( + () -> { + Integer result = ch.tryReceive(); + if (result != null) return result; + return ch.receive(); + }); + + Fork.startAll(f1, f2); + f1.join(); + + assert (f2.join() == 10); + } + + // trySend | receive (rendezvous) + + @ConcurrencyTest + public void trySendReceive_rendezvousTest() throws InterruptedException { + Channel ch = Channel.newRendezvousChannel(); + + Fork f1 = + Fork.newNoResult( + () -> { + if (!ch.trySend(10)) { + ch.send(10); + } + }); + Fork f2 = Fork.newWithResult(ch::receive); + + Fork.startAll(f1, f2); + f1.join(); + + assert (f2.join() == 10); + } + + // send | tryReceive (rendezvous) + + @ConcurrencyTest + public void sendTryReceive_rendezvousTest() throws InterruptedException { + Channel ch = Channel.newRendezvousChannel(); + + Fork f1 = Fork.newNoResult(() -> ch.send(10)); + Fork f2 = + Fork.newWithResult( + () -> { + Integer result = ch.tryReceive(); + if (result != null) return result; + return ch.receive(); + }); + + Fork.startAll(f1, f2); + f1.join(); + + assert (f2.join() == 10); + } + + // trySend | tryReceive (unlimited) + + @ConcurrencyTest + public void trySendTryReceive_unlimitedTest() throws InterruptedException { + Channel ch = Channel.newUnlimitedChannel(); + + Fork f1 = Fork.newWithResult(() -> ch.trySend(10)); + Fork f2 = Fork.newWithResult(ch::tryReceive); + + Fork.startAll(f1, f2); + boolean sent = f1.join(); + Integer received = f2.join(); + + assert sent; + assert received == null || (received == 10); + } + + // trySend | close | tryReceive + + @ConcurrencyTest + public void trySendCloseTryReceiveTest() throws InterruptedException { + Channel ch = Channel.newBufferedChannel(CHANNEL_SIZE); + + Fork f1 = + Fork.newWithResult( + () -> { + Object r = ch.trySendOrClosed(10); + return r == null; + }); + + Fork f2 = Fork.newNoResult(ch::done); + + Fork f3 = Fork.newWithResult(ch::tryReceiveOrClosed); + + Fork.startAll(f1, f2, f3); + + boolean sent = f1.join(); + f2.join(); + Object received = f3.join(); + + if (received != null && !(received instanceof ChannelDone)) { + assert (received.equals(10)); + assert sent; + } + } +} diff --git a/channels/src/main/java/com/softwaremill/jox/Channel.java b/channels/src/main/java/com/softwaremill/jox/Channel.java index 19387d4..515ea8b 100644 --- a/channels/src/main/java/com/softwaremill/jox/Channel.java +++ b/channels/src/main/java/com/softwaremill/jox/Channel.java @@ -263,11 +263,19 @@ public Object sendOrClosed(T value) throws InterruptedException { return doSend(value, null, null); } - // used by Sink.trySend + // used by Sink.trySend (Select-based fallback) static final Object DEFAULT_NOT_SENT_VALUE = new Object(); static final DefaultClause DEFAULT_NOT_SENT_CLAUSE = new DefaultClauseValue<>(DEFAULT_NOT_SENT_VALUE); + // used by Source.tryReceive (Select-based fallback) + static final Object DEFAULT_NOT_RECEIVED_VALUE = new Object(); + static final DefaultClause DEFAULT_NOT_RECEIVED_CLAUSE = + new DefaultClauseValue<>(DEFAULT_NOT_RECEIVED_VALUE); + + // sentinel value returned by trySendOrClosed() to indicate "not sent" + static final Object TRY_SEND_NOT_SENT = new Object(); + /** * @return If {@code select} & {@code selectClause} is {@code null}: {@code null} when the value * was sent, or {@link ChannelClosed}, when the channel is closed. Otherwise, might also @@ -348,6 +356,252 @@ private Object doSend(T value, SelectInstance select, SelectClause selectClau } } + // ************* + // Non-blocking send + // ************* + + @Override + public Object trySendOrClosed(T value) { + if (value == null) { + throw new NullPointerException(); + } + while (true) { + var segment = sendSegment; + var scf = sendersAndClosedFlag; + var s = getSendersCounter(scf); + + if (isClosed(scf)) { + return closedReason; + } + + if (capacity >= 0) { + var bufEnd = bufferEnd; + var r = receivers; + if (capacity == 0) { + if (s >= r) return TRY_SEND_NOT_SENT; + } else { + if (s >= bufEnd && s >= r) return TRY_SEND_NOT_SENT; + } + } + + if (!SENDERS_AND_CLOSE_FLAG.compareAndSet(this, scf, scf + 1)) { + continue; + } + + var id = s / Segment.SEGMENT_SIZE; + var i = (int) (s % Segment.SEGMENT_SIZE); + + if (segment.getId() != id) { + segment = findAndMoveForward(SEND_SEGMENT, this, segment, id); + if (segment == null) { + return closedReason; + } + + if (segment.getId() != id) { + SENDERS_AND_CLOSE_FLAG.compareAndSet( + this, s + 1, segment.getId() * Segment.SEGMENT_SIZE); + continue; + } + } + + var sendResult = tryUpdateCellSend(segment, i, s, value); + if (sendResult == SendResult.BUFFERED) { + return null; + } else if (sendResult == SendResult.RESUMED) { + segment.cleanPrev(); + return null; + } else if (sendResult == SendResult.FAILED) { + segment.cleanPrev(); + continue; + } else if (sendResult == SendResult.CLOSED) { + return closedReason; + } else if (sendResult == TRY_SEND_NOT_SENT) { + return TRY_SEND_NOT_SENT; + } else { + throw new IllegalStateException( + "Unexpected result: " + sendResult + " in channel: " + this); + } + } + } + + /** + * Non-suspending variant of {@code updateCellSend}. Where the normal send would suspend (no + * receiver, not in buffer), this marks the cell as INTERRUPTED_SEND and returns the + * TRY_SEND_NOT_SENT sentinel. + */ + private Object tryUpdateCellSend(Segment segment, int i, long s, T value) { + while (true) { + var state = segment.getCell(i); + + if (state == null) { + if (capacity >= 0 && s >= (isRendezvous ? 0 : bufferEnd) && s >= receivers) { + // cell is empty, and no receiver, not in buffer -> interrupt + if (segment.casCell(i, null, INTERRUPTED_SEND)) { + segment.cellInterruptedSender(); + return TRY_SEND_NOT_SENT; + } + } else { + // cell is empty, but a receiver is in progress, or in buffer -> elimination + if (segment.casCell(i, null, value)) { + return SendResult.BUFFERED; + } + } + } else if (state == IN_BUFFER) { + if (segment.casCell(i, IN_BUFFER, value)) { + return SendResult.BUFFERED; + } + } else if (state instanceof Continuation c) { + // a receiver is waiting -> trying to resume + if (c.tryResume(value)) { + segment.setCell(i, DONE); + return SendResult.RESUMED; + } else { + return SendResult.FAILED; + } + } else if (state instanceof StoredSelectClause ss) { + ss.setPayload(value); + // a select clause is waiting -> trying to resume + if (ss.getSelect().trySelect(ss)) { + segment.setCell(i, DONE); + return SendResult.RESUMED; + } else { + return SendResult.FAILED; + } + } else if (state == INTERRUPTED_RECEIVE || state == BROKEN) { + // cell interrupted or poisoned -> trying with a new one + return SendResult.FAILED; + } else if (state == CLOSED) { + return SendResult.CLOSED; + } else { + throw new IllegalStateException( + "Unexpected state: " + state + " in channel: " + this); + } + } + } + + // ************* + // Non-blocking receive + // ************* + + @Override + public Object tryReceiveOrClosed() { + while (true) { + var scf = sendersAndClosedFlag; + var s = getSendersCounter(scf); + var r = receivers; + + if (s <= r) { + if (isClosed(scf)) return closedForReceive(); + return null; + } + + var segment = receiveSegment; + + if (!RECEIVERS.compareAndSet(this, r, r + 1)) { + continue; + } + + var id = r / Segment.SEGMENT_SIZE; + var i = (int) (r % Segment.SEGMENT_SIZE); + + if (segment.getId() != id) { + segment = findAndMoveForward(RECEIVE_SEGMENT, this, segment, id); + if (segment == null) { + return closedReason; + } + + if (segment.getId() != id) { + RECEIVERS.compareAndSet(this, r + 1, segment.getId() * Segment.SEGMENT_SIZE); + continue; + } + } + + var result = tryUpdateCellReceive(segment, i, r); + if (result == ReceiveResult.CLOSED) { + return closedReason; + } else if (result == ReceiveResult.FAILED) { + segment.cleanPrev(); + continue; + } else if (result == null) { + return null; + } else { + segment.cleanPrev(); + return result; + } + } + } + + /** + * Non-suspending variant of {@code updateCellReceive}. Where the normal receive would suspend + * (no sender available), this marks the cell as INTERRUPTED_RECEIVE and returns null. + */ + private Object tryUpdateCellReceive(Segment segment, int i, long r) { + while (true) { + var state = segment.getCell(i); + + if (state == null || state == IN_BUFFER) { + if (r >= getSendersCounter(sendersAndClosedFlag)) { + // cell is empty, no sender -> interrupt + if (segment.casCell(i, state, INTERRUPTED_RECEIVE)) { + segment.cellInterruptedReceiver(); + expandBuffer(); + return null; + } + } else { + // sender in progress -> mark as broken + if (segment.casCell(i, state, BROKEN)) { + expandBuffer(); + return ReceiveResult.FAILED; + } + } + } else if (state instanceof Continuation c) { + if (segment.casCell(i, state, RESUMING)) { + // a sender is waiting -> trying to resume + if (c.tryResume(0)) { + segment.setCell(i, DONE); + expandBuffer(); + return c.getPayload(); + } else { + // the state will be set to INTERRUPTED_SEND by the continuation, meanwhile + // everybody else will observe RESUMING + return ReceiveResult.FAILED; + } + } + } else if (state instanceof StoredSelectClause ss) { + if (segment.casCell(i, state, RESUMING)) { + // a send clause is waiting -> trying to resume + if (ss.getSelect().trySelect(ss)) { + segment.setCell(i, DONE); + expandBuffer(); + return ss.getPayload(); + } else { + // the state will be set to INTERRUPTED_SEND by the cleanup, meanwhile + // everybody else will observe RESUMING + return ReceiveResult.FAILED; + } + } + } else if (state instanceof CellState) { + switch (state) { + case CellState.INTERRUPTED_SEND -> { + return ReceiveResult.FAILED; + } + case CellState.RESUMING -> Thread.onSpinWait(); + case CellState.CLOSED -> { + return ReceiveResult.CLOSED; + } + default -> + throw new IllegalStateException( + "Unexpected state: " + state + " in channel: " + this); + } + } else { + // buffered value + segment.setCell(i, DONE); + expandBuffer(); + return state; + } + } + } + /** * @param segment The segment which stores the cell's state. * @param i The index within the {@code segment}. diff --git a/channels/src/main/java/com/softwaremill/jox/Sink.java b/channels/src/main/java/com/softwaremill/jox/Sink.java index b955885..12e49b0 100644 --- a/channels/src/main/java/com/softwaremill/jox/Sink.java +++ b/channels/src/main/java/com/softwaremill/jox/Sink.java @@ -27,19 +27,50 @@ public interface Sink extends CloseableChannel { /** * Attempt to send a value to the channel if there's a waiting receiver, or space in the buffer. * + *

This method never blocks or suspends the calling thread. It completes in bounded time. + * Safe to call from platform threads, including NIO event loop threads. + * + *

May return {@code false} even when space is available, due to contention with concurrent + * operations. Should not be used as a substitute for {@link #send(Object)} in a spin loop. + * * @param value The value to send. Not {@code null}. * @return {@code true} if the value was sent, {@code false} otherwise. * @throws ChannelClosedException When the channel is closed. */ default boolean trySend(T value) { + Object r = trySendOrClosed(value); + if (r instanceof ChannelClosed c) throw c.toException(); + return r == null; // null = sent, sentinel = not sent + } + + /** + * Attempt to send a value to the channel if there's a waiting receiver, or space in the buffer. + * Doesn't throw exceptions when the channel is closed but returns a value. + * + *

This method never blocks or suspends the calling thread. It completes in bounded time. + * Safe to call from platform threads, including NIO event loop threads. + * + *

May fail even when space is available, due to contention with concurrent operations. + * Should not be used as a substitute for {@link #sendOrClosed(Object)} in a spin loop. + * + * @param value The value to send. Not {@code null}. + * @return Either {@code null} when the value was sent successfully, {@link ChannelClosed} when + * the channel is closed, or a sentinel value indicating the value was not sent (check using + * {@code result == null} for success, {@code result instanceof ChannelClosed} for closed). + */ + default Object trySendOrClosed(T value) { + // Select-based fallback for binary compatibility Object sent; try { sent = Select.select(sendClause(value), Channel.DEFAULT_NOT_SENT_CLAUSE); } catch (InterruptedException e) { throw new IllegalStateException( - "Interrupted during trySend, which should not be possible", e); + "Interrupted during trySendOrClosed, which should not be possible", e); } - return sent != Channel.DEFAULT_NOT_SENT_VALUE; + if (sent == Channel.DEFAULT_NOT_SENT_VALUE) { + return Channel.TRY_SEND_NOT_SENT; + } + return null; // sent successfully } /** diff --git a/channels/src/main/java/com/softwaremill/jox/Source.java b/channels/src/main/java/com/softwaremill/jox/Source.java index bbc91c9..2c06bd3 100644 --- a/channels/src/main/java/com/softwaremill/jox/Source.java +++ b/channels/src/main/java/com/softwaremill/jox/Source.java @@ -26,6 +26,53 @@ public interface Source extends CloseableChannel { */ Object receiveOrClosed() throws InterruptedException; + /** + * Attempt to receive a value from the channel if one is immediately available. + * + *

This method never blocks or suspends the calling thread. It completes in bounded time. + * Safe to call from platform threads, including NIO event loop threads. + * + *

May return {@code null} even when a value is available, due to contention with concurrent + * operations. Should not be used as a substitute for {@link #receive()} in a spin loop. + * + * @return The received value, or {@code null} if no value is immediately available. + * @throws ChannelClosedException When the channel is closed. + */ + default T tryReceive() { + Object r = tryReceiveOrClosed(); + if (r instanceof ChannelClosed c) throw c.toException(); + //noinspection unchecked + return (T) r; // null means nothing available + } + + /** + * Attempt to receive a value from the channel if one is immediately available. Doesn't throw + * exceptions when the channel is closed, but returns a value. + * + *

This method never blocks or suspends the calling thread. It completes in bounded time. + * Safe to call from platform threads, including NIO event loop threads. + * + *

May return {@code null} even when a value is available, due to contention with concurrent + * operations. Should not be used as a substitute for {@link #receiveOrClosed()} in a spin loop. + * + * @return The received value of type {@code T}, {@link ChannelClosed} when the channel is + * closed, or {@code null} if no value is immediately available. + */ + default Object tryReceiveOrClosed() { + // Select-based fallback for binary compatibility + Object received; + try { + received = Select.select(receiveClause(), Channel.DEFAULT_NOT_RECEIVED_CLAUSE); + } catch (InterruptedException e) { + throw new IllegalStateException( + "Interrupted during tryReceiveOrClosed, which should not be possible", e); + } + if (received == Channel.DEFAULT_NOT_RECEIVED_VALUE) { + return null; // nothing available + } + return received; + } + /** * Create a clause which can be used in {@link Select#select(SelectClause[])}. The clause will * receive a value from the current channel. diff --git a/channels/src/test/java/com/softwaremill/jox/ChannelTrySendReceiveTest.java b/channels/src/test/java/com/softwaremill/jox/ChannelTrySendReceiveTest.java new file mode 100644 index 0000000..70dfd31 --- /dev/null +++ b/channels/src/test/java/com/softwaremill/jox/ChannelTrySendReceiveTest.java @@ -0,0 +1,527 @@ +package com.softwaremill.jox; + +import static com.softwaremill.jox.TestUtil.*; +import static org.junit.jupiter.api.Assertions.*; + +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.jupiter.api.Test; + +public class ChannelTrySendReceiveTest { + + // ******** + // trySend + // ******** + + @Test + void trySend_buffered_shouldSendWhenBufferHasSpace() { + Channel ch = Channel.newBufferedChannel(2); + assertTrue(ch.trySend("a")); + assertTrue(ch.trySend("b")); + } + + @Test + void trySend_buffered_shouldReturnFalseWhenBufferFull() { + Channel ch = Channel.newBufferedChannel(1); + assertTrue(ch.trySend("a")); + assertFalse(ch.trySend("b")); // buffer full, no receiver + } + + @Test + void trySend_rendezvous_shouldReturnFalseWhenNoReceiver() { + Channel ch = Channel.newRendezvousChannel(); + assertFalse(ch.trySend("a")); + } + + @Test + void trySend_rendezvous_shouldSendWhenReceiverWaiting() + throws InterruptedException, ExecutionException { + Channel ch = Channel.newRendezvousChannel(); + scoped( + scope -> { + var f = fork(scope, ch::receive); + Thread.sleep(50); + assertTrue(ch.trySend("x")); + assertEquals("x", f.get()); + }); + } + + @Test + void trySend_unlimited_shouldAlwaysSend() { + Channel ch = Channel.newUnlimitedChannel(); + for (int i = 0; i < 1000; i++) { + assertTrue(ch.trySend("v" + i)); + } + } + + @Test + void trySend_closedDone_shouldThrow() { + Channel ch = Channel.newBufferedChannel(1); + ch.done(); + assertThrows(ChannelDoneException.class, () -> ch.trySend("x")); + } + + @Test + void trySend_closedError_shouldThrow() { + Channel ch = Channel.newBufferedChannel(1); + ch.error(new RuntimeException("boom")); + assertThrows(ChannelErrorException.class, () -> ch.trySend("x")); + } + + @Test + void trySend_nullValue_shouldThrowNPE() { + Channel ch = Channel.newBufferedChannel(1); + assertThrows(NullPointerException.class, () -> ch.trySend(null)); + } + + // **************** + // trySendOrClosed + // **************** + + @Test + void trySendOrClosed_buffered_shouldReturnNullOnSuccess() { + Channel ch = Channel.newBufferedChannel(2); + assertNull(ch.trySendOrClosed("a")); + } + + @Test + void trySendOrClosed_buffered_shouldReturnSentinelWhenFull() { + Channel ch = Channel.newBufferedChannel(1); + assertNull(ch.trySendOrClosed("a")); // success + Object result = ch.trySendOrClosed("b"); + assertNotNull(result); + assertFalse(result instanceof ChannelClosed); + } + + @Test + void trySendOrClosed_closedDone_shouldReturnChannelDone() { + Channel ch = Channel.newBufferedChannel(1); + ch.done(); + assertInstanceOf(ChannelDone.class, ch.trySendOrClosed("x")); + } + + @Test + void trySendOrClosed_closedError_shouldReturnChannelError() { + Channel ch = Channel.newBufferedChannel(1); + ch.error(new RuntimeException("boom")); + assertInstanceOf(ChannelError.class, ch.trySendOrClosed("x")); + } + + // ********** + // tryReceive + // ********** + + @Test + void tryReceive_buffered_shouldReceiveBufferedValue() throws InterruptedException { + Channel ch = Channel.newBufferedChannel(2); + ch.send("a"); + ch.send("b"); + assertEquals("a", ch.tryReceive()); + assertEquals("b", ch.tryReceive()); + } + + @Test + void tryReceive_buffered_shouldReturnNullWhenEmpty() { + Channel ch = Channel.newBufferedChannel(2); + assertNull(ch.tryReceive()); + } + + @Test + void tryReceive_rendezvous_shouldReturnNullWhenNoSender() { + Channel ch = Channel.newRendezvousChannel(); + assertNull(ch.tryReceive()); + } + + @Test + void tryReceive_rendezvous_shouldReceiveWhenSenderWaiting() + throws InterruptedException, ExecutionException { + Channel ch = Channel.newRendezvousChannel(); + scoped( + scope -> { + forkVoid(scope, () -> ch.send("x")); + Thread.sleep(50); + assertEquals("x", ch.tryReceive()); + }); + } + + @Test + void tryReceive_unlimited_shouldReceiveBufferedValues() throws InterruptedException { + Channel ch = Channel.newUnlimitedChannel(); + ch.send("a"); + ch.send("b"); + assertEquals("a", ch.tryReceive()); + assertEquals("b", ch.tryReceive()); + assertNull(ch.tryReceive()); + } + + @Test + void tryReceive_closedDone_noValues_shouldThrow() { + Channel ch = Channel.newBufferedChannel(1); + ch.done(); + assertThrows(ChannelDoneException.class, () -> ch.tryReceive()); + } + + @Test + void tryReceive_closedError_shouldThrow() { + Channel ch = Channel.newBufferedChannel(1); + ch.error(new RuntimeException("boom")); + assertThrows(ChannelErrorException.class, () -> ch.tryReceive()); + } + + // ******************** + // tryReceiveOrClosed + // ******************** + + @Test + void tryReceiveOrClosed_shouldReturnNullWhenEmpty() { + Channel ch = Channel.newBufferedChannel(2); + assertNull(ch.tryReceiveOrClosed()); + } + + @Test + void tryReceiveOrClosed_shouldReturnValueWhenAvailable() throws InterruptedException { + Channel ch = Channel.newBufferedChannel(2); + ch.send("hello"); + assertEquals("hello", ch.tryReceiveOrClosed()); + } + + @Test + void tryReceiveOrClosed_closedDone_noValues_shouldReturnChannelDone() { + Channel ch = Channel.newBufferedChannel(1); + ch.done(); + assertInstanceOf(ChannelDone.class, ch.tryReceiveOrClosed()); + } + + @Test + void tryReceiveOrClosed_closedError_shouldReturnChannelError() { + Channel ch = Channel.newBufferedChannel(1); + ch.error(new RuntimeException("boom")); + assertInstanceOf(ChannelError.class, ch.tryReceiveOrClosed()); + } + + @Test + void tryReceiveOrClosed_closedDone_withBufferedValues_shouldReturnValuesThenDone() + throws InterruptedException { + Channel ch = Channel.newBufferedChannel(3); + ch.send("a"); + ch.send("b"); + ch.done(); + + assertEquals("a", ch.tryReceiveOrClosed()); + assertEquals("b", ch.tryReceiveOrClosed()); + assertInstanceOf(ChannelDone.class, ch.tryReceiveOrClosed()); + } + + @Test + void tryReceiveOrClosed_closedError_withBufferedValues_shouldReturnError() + throws InterruptedException { + Channel ch = Channel.newBufferedChannel(3); + ch.send("a"); + ch.send("b"); + ch.error(new RuntimeException("boom")); + + // error() discards buffered values — should return ChannelError immediately + assertInstanceOf(ChannelError.class, ch.tryReceiveOrClosed()); + } + + // ************************* + // Mixed send/receive tests + // ************************* + + @Test + void trySend_thenTryReceive_roundTrip_buffered() { + Channel ch = Channel.newBufferedChannel(5); + for (int i = 0; i < 5; i++) { + assertTrue(ch.trySend(i)); + } + for (int i = 0; i < 5; i++) { + assertEquals(i, ch.tryReceive()); + } + assertNull(ch.tryReceive()); + } + + @Test + void trySend_thenTryReceive_roundTrip_unlimited() { + Channel ch = Channel.newUnlimitedChannel(); + for (int i = 0; i < 100; i++) { + assertTrue(ch.trySend(i)); + } + for (int i = 0; i < 100; i++) { + assertEquals(i, ch.tryReceive()); + } + assertNull(ch.tryReceive()); + } + + @Test + void trySend_mixedWithRegularReceive() throws InterruptedException, ExecutionException { + Channel ch = Channel.newBufferedChannel(2); + assertTrue(ch.trySend("a")); + assertTrue(ch.trySend("b")); + assertEquals("a", ch.receive()); + assertEquals("b", ch.receive()); + } + + @Test + void regularSend_thenTryReceive() throws InterruptedException { + Channel ch = Channel.newBufferedChannel(2); + ch.send("a"); + ch.send("b"); + assertEquals("a", ch.tryReceive()); + assertEquals("b", ch.tryReceive()); + } + + // ************* + // Stress tests + // ************* + + @Test + void concurrentTrySendAndTryReceive_noValueLossOrDuplication() + throws InterruptedException, ExecutionException { + Channel ch = Channel.newBufferedChannel(64); + int numProducers = 4; + int numConsumers = 4; + int itemsPerProducer = 10_000; + + var sentCount = new AtomicInteger(0); + var receivedValues = new ConcurrentSkipListSet(); + + scoped( + scope -> { + // producers: trySend in a loop + for (int p = 0; p < numProducers; p++) { + int producerBase = p * itemsPerProducer; + forkVoid( + scope, + () -> { + for (int i = 0; i < itemsPerProducer; i++) { + int val = producerBase + i; + while (!ch.trySend(val)) { + Thread.yield(); + } + sentCount.incrementAndGet(); + } + }); + } + + // consumers: tryReceive in a loop + int totalItems = numProducers * itemsPerProducer; + for (int c = 0; c < numConsumers; c++) { + forkVoid( + scope, + () -> { + while (receivedValues.size() < totalItems) { + Integer v = ch.tryReceive(); + if (v != null) { + receivedValues.add(v); + } else { + Thread.yield(); + } + } + }); + } + }); + + int totalItems = numProducers * itemsPerProducer; + assertEquals(totalItems, sentCount.get()); + assertEquals(totalItems, receivedValues.size()); + } + + @Test + void concurrentTrySendWithRegularReceive() throws InterruptedException, ExecutionException { + Channel ch = Channel.newBufferedChannel(16); + int total = 10_000; + var received = new ConcurrentSkipListSet(); + + scoped( + scope -> { + // producer: trySend + forkVoid( + scope, + () -> { + for (int i = 0; i < total; i++) { + while (!ch.trySend(i)) { + Thread.yield(); + } + } + ch.done(); + }); + + // consumer: regular receive + forkVoid( + scope, + () -> { + while (true) { + var r = ch.receiveOrClosed(); + if (r instanceof ChannelDone) break; + received.add((Integer) r); + } + }) + .get(); + }); + + assertEquals(total, received.size()); + } + + @Test + void concurrentRegularSendWithTryReceive() throws InterruptedException, ExecutionException { + Channel ch = Channel.newBufferedChannel(16); + int total = 10_000; + var received = new ConcurrentSkipListSet(); + + scoped( + scope -> { + // producer: regular send + forkVoid( + scope, + () -> { + for (int i = 0; i < total; i++) { + ch.send(i); + } + ch.done(); + }); + + // consumer: tryReceive + forkVoid( + scope, + () -> { + while (true) { + var r = ch.tryReceiveOrClosed(); + if (r instanceof ChannelDone) break; + if (r != null) { + received.add((Integer) r); + } else { + Thread.yield(); + } + } + }) + .get(); + }); + + assertEquals(total, received.size()); + } + + @Test + void trySend_rendezvous_concurrentStress() throws InterruptedException, ExecutionException { + Channel ch = Channel.newRendezvousChannel(); + int total = 1000; + var received = new ConcurrentSkipListSet(); + + scoped( + scope -> { + // sender: trySend + forkVoid( + scope, + () -> { + for (int i = 0; i < total; i++) { + while (!ch.trySend(i)) { + Thread.yield(); + } + } + ch.done(); + }); + + // receiver: regular receive + forkVoid( + scope, + () -> { + while (true) { + var r = ch.receiveOrClosed(); + if (r instanceof ChannelDone) break; + received.add((Integer) r); + } + }) + .get(); + }); + + assertEquals(total, received.size()); + } + + @Test + void tryReceive_rendezvous_concurrentStress() throws InterruptedException, ExecutionException { + Channel ch = Channel.newRendezvousChannel(); + int total = 1000; + var received = new ConcurrentSkipListSet(); + + scoped( + scope -> { + // sender: regular send + forkVoid( + scope, + () -> { + for (int i = 0; i < total; i++) { + ch.send(i); + } + ch.done(); + }); + + // receiver: tryReceive + forkVoid( + scope, + () -> { + while (true) { + var r = ch.tryReceiveOrClosed(); + if (r instanceof ChannelDone) break; + if (r != null) { + received.add((Integer) r); + } else { + Thread.yield(); + } + } + }) + .get(); + }); + + assertEquals(total, received.size()); + } + + @Test + void segmentCleanup_afterManyTrySendFailures() throws InterruptedException, ExecutionException { + // trySend on a rendezvous channel with no receiver: should not leak segments + Channel ch = Channel.newRendezvousChannel(); + for (int i = 0; i < 10_000; i++) { + assertFalse(ch.trySend(i)); + } + // channel should still be functional + scoped( + scope -> { + forkVoid(scope, () -> ch.send(42)); + Thread.sleep(50); + assertEquals(42, ch.tryReceive()); + }); + } + + @Test + void segmentCleanup_afterManyTryReceiveFailures() throws InterruptedException { + // tryReceive on an empty channel: should not leak segments + Channel ch = Channel.newBufferedChannel(2); + for (int i = 0; i < 10_000; i++) { + assertNull(ch.tryReceive()); + } + // channel should still be functional + ch.send(42); + assertEquals(42, ch.tryReceive()); + } + + @Test + void trySendTryReceive_bufferedCapacity1() { + Channel ch = Channel.newBufferedChannel(1); + assertTrue(ch.trySend(1)); + assertEquals(1, ch.tryReceive()); + } + + @Test + void trySendTryReceive_bufferedCapacity10() { + Channel ch = Channel.newBufferedChannel(10); + assertTrue(ch.trySend(1)); + assertEquals(1, ch.tryReceive()); + } + + @Test + void trySendTryReceive_unlimited() { + Channel ch = Channel.newUnlimitedChannel(); + assertTrue(ch.trySend(1)); + assertEquals(1, ch.tryReceive()); + } +} diff --git a/docs/channels.md b/docs/channels.md index 98e1fdc..40b3b1f 100644 --- a/docs/channels.md +++ b/docs/channels.md @@ -116,6 +116,24 @@ class Demo3 { } ``` +## Non-blocking operations + +For integration with non-blocking frameworks (e.g. Netty, Vert.x), `trySend` and `tryReceive` methods are provided. They never block or suspend and are safe to call from platform threads. + +```java +var ch = Channel.newBufferedChannel(1); + +// returns true if sent, false if the buffer is full or no receiver is waiting +boolean s1 = ch.trySend(1); + +// returns the value, or null if none is immediately available +Integer r1 = ch.tryReceive(); +``` + +Both have `OrClosed` variants which return a `ChannelClosed` value instead of throwing an exception if the channel is closed. + +Note: under contention, `trySend`/`tryReceive` may return `false`/`null` even when space or values are available. If you need guaranteed delivery, use `send()`/`receive()` instead. + ## Selecting from multiple channels The `select` method selects exactly one clause to complete. For example, you can receive a value from exactly one @@ -213,7 +231,7 @@ import static com.softwaremill.jox.Select.selectWithin; class Demo7 { public static void main(String[] args) throws InterruptedException { var ch1 = Channel.newBufferedChannel(3); - var ch2 = Channel.newBufferedChannel(3); + var ch2 = Channel.newBufferedChannel(3); try { // Wait up to 500 milliseconds for a value to be available @@ -241,7 +259,7 @@ class Demo8 { var ch1 = Channel.newBufferedChannel(3); var ch2 = Channel.newBufferedChannel(3); - var result = selectOrClosedWithin(Duration.ofMillis(500), "TIMEOUT", + var result = selectOrClosedWithin(Duration.ofMillis(500), "TIMEOUT", ch1.receiveClause(), ch2.receiveClause()); if (result.equals("TIMEOUT")) { diff --git a/flows/src/main/java/com/softwaremill/jox/flows/FromFlowPublisher.java b/flows/src/main/java/com/softwaremill/jox/flows/FromFlowPublisher.java index b3b818c..292d1eb 100644 --- a/flows/src/main/java/com/softwaremill/jox/flows/FromFlowPublisher.java +++ b/flows/src/main/java/com/softwaremill/jox/flows/FromFlowPublisher.java @@ -206,11 +206,7 @@ private record FlowSubscription(Sink signals) implements Flow.Subscripti // 3.15: the signals channel is never closed @Override public void request(long n) { - try { - signals.send(new Request(n)); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + signals.trySend(new Request(n)); } // 3.5: as above for 3.2 @@ -218,11 +214,7 @@ public void request(long n) { // 3.16: as above for 3.15 @Override public void cancel() { - try { - signals.send(new Cancel()); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + signals.trySend(new Cancel()); } // 3.10, 3.11: no synchronous calls in this implementation diff --git a/kafka/src/main/java/com/softwaremill/jox/kafka/KafkaDrain.java b/kafka/src/main/java/com/softwaremill/jox/kafka/KafkaDrain.java index f3e1173..b53ace4 100644 --- a/kafka/src/main/java/com/softwaremill/jox/kafka/KafkaDrain.java +++ b/kafka/src/main/java/com/softwaremill/jox/kafka/KafkaDrain.java @@ -63,11 +63,7 @@ public static void runPublish( if (exception != null) { logger.error( "Exception when sending record", exception); - try { - producerExceptions.sendOrClosed(exception); - } catch (InterruptedException e) { - // ignore - } + producerExceptions.trySendOrClosed(exception); } }); } diff --git a/kafka/src/main/java/com/softwaremill/jox/kafka/KafkaStage.java b/kafka/src/main/java/com/softwaremill/jox/kafka/KafkaStage.java index 806f4e9..3c78341 100644 --- a/kafka/src/main/java/com/softwaremill/jox/kafka/KafkaStage.java +++ b/kafka/src/main/java/com/softwaremill/jox/kafka/KafkaStage.java @@ -202,24 +202,12 @@ private static void sendPacket( toSend, (m, e) -> { if (e != null) { - try { - exceptions.sendOrClosed(e); - } catch (InterruptedException ex) { - // ignore - } + exceptions.trySendOrClosed(e); } else { if (commitOffsets && leftToSend.decrementAndGet() == 0) { - try { - toCommit.send(packet); - } catch (InterruptedException ex) { - // ignore - } - } - try { - metadata.send(new Pair<>(sequenceNo, m)); - } catch (InterruptedException ex) { - // ignore + toCommit.trySendOrClosed(packet); } + metadata.trySendOrClosed(new Pair<>(sequenceNo, m)); } }); }