diff --git a/src/main/java/io/reactivex/rxjava4/core/CompletionStageDisposable.java b/src/main/java/io/reactivex/rxjava4/core/CompletionStageDisposable.java index 94413fec39..616d021e5a 100644 --- a/src/main/java/io/reactivex/rxjava4/core/CompletionStageDisposable.java +++ b/src/main/java/io/reactivex/rxjava4/core/CompletionStageDisposable.java @@ -15,8 +15,30 @@ import java.util.concurrent.CompletionStage; -import io.reactivex.rxjava4.disposables.Disposable; +import io.reactivex.rxjava4.annotations.NonNull; +import io.reactivex.rxjava4.disposables.*; -public record CompletionStageDisposable(CompletionStage stage, Disposable disposable) { +/** + * Consist of a terminal stage and a disposable to be able to cancel a sequence. + * @param the return and element type of the various stages + * @param stage the embedded stage to work with + * @param disposable the way to cancel the stage concurrently + * @since 4.0.0 + */ +public record CompletionStageDisposable(@NonNull CompletionStage stage, @NonNull Disposable disposable) { + + /** + * Await the completion of the current stage. + */ + public void await() { + Streamer.await(stage); + } + /** + * Await the completion of the current stage. + * @param canceller the canceller link + */ + public void await(DisposableContainer canceller) { + Streamer.await(stage, canceller); + } } diff --git a/src/main/java/io/reactivex/rxjava4/core/Flowable.java b/src/main/java/io/reactivex/rxjava4/core/Flowable.java index ea3758f9c1..8528267845 100644 --- a/src/main/java/io/reactivex/rxjava4/core/Flowable.java +++ b/src/main/java/io/reactivex/rxjava4/core/Flowable.java @@ -32,6 +32,7 @@ import io.reactivex.rxjava4.internal.operators.mixed.*; import io.reactivex.rxjava4.internal.operators.observable.ObservableFromPublisher; import io.reactivex.rxjava4.internal.operators.single.SingleToFlowable; +import io.reactivex.rxjava4.internal.operators.streamable.StreamableFromPublisher; import io.reactivex.rxjava4.internal.schedulers.ImmediateThinScheduler; import io.reactivex.rxjava4.internal.subscribers.*; import io.reactivex.rxjava4.internal.util.*; @@ -16023,7 +16024,7 @@ public final void subscribe(@NonNull FlowableSubscriber subscriber) { // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); - NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); + var npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } @@ -20928,7 +20929,7 @@ public final Stream blockingStream(int prefetch) { * {@code ExecutorService} uses virtual threads, such as the one returned by * {@link Executors#newVirtualThreadPerTaskExecutor()}. * @param the downstream element type - * @param transformer the callback whose {@link VirtualTransformer#transform(Object, VirtualEmitter)} + * @param transformer the callback whose {@link VirtualTransformer#transform(Object, VirtualEmitter, Disposable)} * is invoked for each upstream item * @param executor the target {@code ExecutorService} to use for running the callback * @return the new {@code Flowable} instance @@ -20961,7 +20962,7 @@ public final Stream blockingStream(int prefetch) { * {@link Scheduler} uses virtual threads, such as the one returned by * {@link Executors#newVirtualThreadPerTaskExecutor()}. * @param the downstream element type - * @param transformer the callback whose {@link VirtualTransformer#transform(Object, VirtualEmitter)} + * @param transformer the callback whose {@link VirtualTransformer#transform(Object, VirtualEmitter, Disposable)} * is invoked for each upstream item * @return the new {@code Flowable} instance * @throws NullPointerException if {@code transformer} is {@code null} @@ -20992,7 +20993,7 @@ public final Stream blockingStream(int prefetch) { * {@code Scheduler} uses virtual threads, such as the one returned by * {@link Schedulers#virtual()}. * @param the downstream element type - * @param transformer the callback whose {@link VirtualTransformer#transform(Object, VirtualEmitter)} + * @param transformer the callback whose {@link VirtualTransformer#transform(Object, VirtualEmitter, Disposable)} * is invoked for each upstream item * @param scheduler the target {@code Scheduler} to use for running the callback * @param prefetch the number of items to fetch from the upstream. @@ -21030,7 +21031,7 @@ public final Stream blockingStream(int prefetch) { * {@code ExecutorService} uses virtual threads, such as the one returned by * {@link Executors#newVirtualThreadPerTaskExecutor()}. * @param the downstream element type - * @param transformer the callback whose {@link VirtualTransformer#transform(Object, VirtualEmitter)} + * @param transformer the callback whose {@link VirtualTransformer#transform(Object, VirtualEmitter, Disposable)} * is invoked for each upstream item * @param executor the target {@code ExecutorService} to use for running the callback * @param prefetch the number of items to fetch from the upstream. @@ -21051,4 +21052,54 @@ public final Stream blockingStream(int prefetch) { return new FlowableVirtualTransformExecutor<>(this, transformer, executor, null, prefetch); } + /** + * Converts this {@code Flowable} into a {@link Streamable} instance, + * transparently relaying signals between the two async representations of a sequence. + *

+ *

+ *
Backpressure:
+ *
This operator requests from the upstream in a bounded manner and + * relays the values one-by-one to the {@link Streamer } view of the sequence. + *
+ *
Scheduler:
+ *
The operator by design runs on the default virtual executor of the system.
+ *
+ *

+ * @return the new {@code Streamable} instance + * @since 4.0.0 + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.VIRTUAL) + @NonNull + public final Streamable toStreamable() { + return toStreamable(Executors.newVirtualThreadPerTaskExecutor()); + } + + /** + * Converts this {@code Flowable} into a {@link Streamable} instance, + * transparently relaying signals between the two async representations of a sequence. + *

+ *

+ *
Backpressure:
+ *
This operator requests from the upstream in a bounded manner and + * relays the values one-by-one to the {@link Streamer } view of the sequence. + *
+ *
Scheduler:
+ *
The operator runs on the provided {@link ExecutorService}.
+ *
+ *

+ * @param executor where the coordination will happen + * @return the new {@code Streamable} instance + * @throws NullPointerException if {@code executor} is {@code null} + * @since 4.0.0 + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + @NonNull + public final Streamable toStreamable(ExecutorService executor) { + Objects.requireNonNull(executor, "executor is null"); + return new StreamableFromPublisher<>(this, executor); + } } diff --git a/src/main/java/io/reactivex/rxjava4/core/Scheduler.java b/src/main/java/io/reactivex/rxjava4/core/Scheduler.java index 8a4ead7afb..3529e6e6db 100644 --- a/src/main/java/io/reactivex/rxjava4/core/Scheduler.java +++ b/src/main/java/io/reactivex/rxjava4/core/Scheduler.java @@ -14,7 +14,8 @@ package io.reactivex.rxjava4.core; import java.util.Objects; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicReference; import io.reactivex.rxjava4.annotations.*; import io.reactivex.rxjava4.disposables.Disposable; @@ -381,6 +382,29 @@ public S when(@NonNull Function()); + } + /** * Represents an isolated, sequential worker of a parent Scheduler for executing {@code Runnable} tasks on * an underlying task-execution scheme (such as custom Threads, event loop, {@link java.util.concurrent.Executor Executor} or Actor system). @@ -572,6 +596,35 @@ public Runnable getWrappedRunnable() { return this.decoratedRun; } } + + /** + * A stateless Worker that reports itself as shutdown and doesn't do anything. + * @since 4.0.0 + */ + @NonNull + public static final Worker SHUTDOWN = new ShutdownWorker(); + } + + /** + * Implementation of a stateless, shutdown worker. For cleanup and termination purposes. + * @since 4.0.0 + */ + static final class ShutdownWorker extends Worker { + + @Override + public void dispose() { + } + + @Override + public boolean isDisposed() { + return true; + } + + @Override + public @NonNull Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { + return Disposable.disposed(); + } + } static final class PeriodicDirectTask diff --git a/src/main/java/io/reactivex/rxjava4/core/Streamable.java b/src/main/java/io/reactivex/rxjava4/core/Streamable.java index 68b336cb88..3c9f9e9846 100644 --- a/src/main/java/io/reactivex/rxjava4/core/Streamable.java +++ b/src/main/java/io/reactivex/rxjava4/core/Streamable.java @@ -17,45 +17,63 @@ import java.util.Objects; import java.util.concurrent.*; -import io.reactivex.rxjava4.annotations.NonNull; +import io.reactivex.rxjava4.annotations.*; import io.reactivex.rxjava4.disposables.*; import io.reactivex.rxjava4.exceptions.Exceptions; -import io.reactivex.rxjava4.functions.Consumer; +import io.reactivex.rxjava4.functions.*; import io.reactivex.rxjava4.internal.operators.streamable.*; +import io.reactivex.rxjava4.schedulers.Schedulers; +import io.reactivex.rxjava4.subscribers.TestSubscriber; /** - * The {@code IAsyncEnumerable} of the Java world. + * The holographically emergent {@code IAsyncEnumerable} of the Java world. * Runs best with Virtual Threads. * TODO proper docs * @param the element type of the stream. * @since 4.0.0 */ -public abstract class Streamable<@NonNull T> { +public interface Streamable<@NonNull T> { + + // oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo + // API + // oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo /** * Realizes the stream and returns an interface that let's one consume it. * @param cancellation where to register and listen for cancellation calls. * @return the Streamer instance to consume. */ + @CheckReturnValue @NonNull - public abstract Streamer stream(@NonNull DisposableContainer cancellation); + Streamer stream(@NonNull DisposableContainer cancellation); + + // oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo + // HELPERS + // oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo + // TODO, why no public final so it is not unnecessarily reimplemented, Java? /** * Realizes the stream and returns an interface that let's one consume it. * @return the Streamer instance to consume. */ + @CheckReturnValue @NonNull - public final Streamer stream() { + default Streamer stream() { return stream(new CompositeDisposable()); // FIXME, use a practically no-op disposable container instead } + // oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo + // Data sources and wrappers + // oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo + /** * Returns an empty {@code Streamable} that never produces an item and just completes. * @param the element type * @return the {@code Streamable} instance */ + @CheckReturnValue @NonNull - public static <@NonNull T> Streamable empty() { + static <@NonNull T> Streamable empty() { return new StreamableEmpty<>(); } @@ -65,18 +83,171 @@ public final Streamer stream() { * @param item the constant item to produce * @return the {@code Streamable} instance */ - public static <@NonNull T> Streamable just(@NonNull T item) { + @CheckReturnValue + @NonNull + static <@NonNull T> Streamable just(@NonNull T item) { Objects.requireNonNull(item, "item is null"); return new StreamableJust<>(item); } + /** + * Convert any Flow.Publisher into a Streamable sequence. + * @param the element type + * @param source Flow.Publisher to convert + * @return the new Streamable instance + */ + @CheckReturnValue + @NonNull + static Streamable fromPublisher(@NonNull Flow.Publisher source) { + Objects.requireNonNull(source, "source is null"); + return fromPublisher(source, Executors.newVirtualThreadPerTaskExecutor()); + } + + /** + * Convert any Flow.Publisher into a Streamable sequence. + * @param the element type + * @param source Flow.Publisher to convert + * @param executor where the conversion will run + * @return the new Streamable instance + */ + @CheckReturnValue + @NonNull + static Streamable fromPublisher(@NonNull Flow.Publisher source, @NonNull ExecutorService executor) { + Objects.requireNonNull(source, "source is null"); + return new StreamableFromPublisher(source, executor); + } + + /** + * Generate a sequence of values via a virtual generator callback (yielder) + * which is free to block and is natively backpressured. + *

+ * Runs on the {@link Schedulers#virtual()} scheduler. + * @param the element type + * @param generator the generator to use + * @return the streamable instance + */ + @CheckReturnValue + @NonNull + static <@NonNull T> Streamable create(@NonNull VirtualGenerator generator) { + // FIXME native implementation + return Flowable.virtualCreate(generator) + .toStreamable(); + } + + /** + * Generate a sequence of values via a virtual generator callback (yielder) + * which is free to block and is natively backpressured. + *

+ * Runs on the given scheduler. + * @param the element type + * @param generator the generator to use + * @param scheduler the scheduler to run the virtual generator on + * @return the streamable instance + */ + @CheckReturnValue + @NonNull + static <@NonNull T> Streamable create(@NonNull VirtualGenerator generator, @NonNull Scheduler scheduler) { + // FIXME native implementation + return Flowable.virtualCreate(generator, scheduler) + .toStreamable(); + } + + /** + * Generate a sequence of values via a virtual generator callback (yielder) + * which is free to block and is natively backpressured. + *

+ * Runs on the given executor service. + * @param the element type + * @param generator the generator to use + * @param executor the executor to run the virtual generator on + * @return the streamable instance + */ + @CheckReturnValue + @NonNull + static <@NonNull T> Streamable create(@NonNull VirtualGenerator generator, @NonNull ExecutorService executor) { + // FIXME native implementation + return Flowable.virtualCreate(generator, executor) + .toStreamable(); + } + + // oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo + // Operators + // oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo + + /** + * Converts the streamable into a Flowable representation, running + * on the default Executors.newVirtualThreadPerTaskExecutor() virtual thread. + * @return the new Flowable instance + */ + @CheckReturnValue + @NonNull + default Flowable toFlowable() { + return toFlowable(Executors.newVirtualThreadPerTaskExecutor()); + } + + /** + * Converts the streamable into a Flowable representation, running + * on the provided executor service. + * @param executor the executor to use + * @return the new Flowable instance + */ + @CheckReturnValue + @NonNull + default Flowable toFlowable(@NonNull ExecutorService executor) { + Objects.requireNonNull(executor, "executir is null"); + var me = this; + return Flowable.virtualCreate(emitter -> { + me.forEach(emitter::emit).await(emitter.canceller()); + }, executor); + } + + /** + * Transforms the upstream sequence into zero or more elements for the downstream. + * @param the result element type + * @param transformer the interface to implement the transforming logic + * @return the new Streamable instance + */ + @CheckReturnValue + @NonNull + default <@NonNull R> Streamable transform(@NonNull VirtualTransformer transformer) { + return transform(transformer, Executors.newVirtualThreadPerTaskExecutor()); + } + + /** + * Transforms the upstream sequence into zero or more elements for the downstream. + * @param the result element type + * @param transformer the interface to implement the transforming logic + * @param executor where to run the transform and blocking operations + * @return the new Streamable instance + */ + @CheckReturnValue + @NonNull + default <@NonNull R> Streamable transform(@NonNull VirtualTransformer transformer, + @NonNull ExecutorService executor) { + Objects.requireNonNull(transformer, "transformer is null"); + Objects.requireNonNull(executor, "executor is null"); + var me = this; + return create(emitter -> { + me.forEach((item, stopper) -> { + // System.out.println("item " + item); + transformer.transform(item, emitter, stopper); + }, emitter.canceller(), executor) + .await(emitter.canceller()); + }, executor); + } + + // oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo + // Consumption methods and outgoing converters + // oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo + /** * Consumes elements from this {@code Streamable} via the provided executor service. * @param consumer the callback that gets the elements until completion * @return a Disposable that let's one cancel the sequence asynchronously. */ + @CheckReturnValue @NonNull - public final CompletionStageDisposable forEach(@NonNull Consumer consumer) { + default CompletionStageDisposable forEach(@NonNull Consumer consumer) { CompositeDisposable canceller = new CompositeDisposable(); return forEach(consumer, canceller, Executors.newVirtualThreadPerTaskExecutor()); } @@ -87,7 +258,9 @@ public final CompletionStageDisposable forEach(@NonNull Consumer forEach(@NonNull Consumer consumer, @NonNull DisposableContainer canceller) { + @CheckReturnValue + @NonNull + default CompletionStageDisposable forEach(@NonNull Consumer consumer, @NonNull DisposableContainer canceller) { return forEach(consumer, canceller, Executors.newVirtualThreadPerTaskExecutor()); } @@ -97,8 +270,9 @@ public final CompletionStageDisposable forEach(@NonNull Consumer forEach(@NonNull Consumer consumer, @NonNull ExecutorService executor) { + default CompletionStageDisposable forEach(@NonNull Consumer consumer, @NonNull ExecutorService executor) { CompositeDisposable canceller = new CompositeDisposable(); return forEach(consumer, canceller, executor); } @@ -110,8 +284,10 @@ public final CompletionStageDisposable forEach(@NonNull Consumer forEach(@NonNull Consumer consumer, @NonNull DisposableContainer canceller, @NonNull ExecutorService executor) { + default CompletionStageDisposable forEach(@NonNull Consumer consumer, @NonNull DisposableContainer canceller, @NonNull ExecutorService executor) { Objects.requireNonNull(consumer, "consumer is null"); Objects.requireNonNull(canceller, "canceller is null"); Objects.requireNonNull(executor, "executor is null"); @@ -119,14 +295,19 @@ public final CompletionStageDisposable forEach(@NonNull Consumer { try (var str = me.stream(canceller)) { while (!canceller.isDisposed()) { - if (str.next().toCompletableFuture().join()) { + if (str.awaitNext(canceller)) { + // System.out.println("Received " + str.current()); consumer.accept(Objects.requireNonNull(str.current(), "The upstream Streamable " + me.getClass() + " produced a null element!")); } else { + // System.out.println("EOF "); break; } } + // System.out.println("Canceller status after loop: " + canceller.isDisposed()); } catch (final Throwable crash) { Exceptions.throwIfFatal(crash); + // System.out.println("Canceller status in error: " + canceller.isDisposed()); + crash.printStackTrace(); if (crash instanceof RuntimeException ex) { throw ex; } @@ -141,12 +322,111 @@ public final CompletionStageDisposable forEach(@NonNull Consumer(StreamableHelper.toCompletionStage((Future)(Future)future), canceller); } + /** + * Consumes elements from this {@code Streamable} via the provided executor service. + * @param consumer the callback that gets the elements until completion + * @param canceller the container to trigger cancellation of the sequence + * @param executor the service that hosts the blocking waits. + * @return the {@code CompletionStage} that gets notified when the sequence ends + */ + @CheckReturnValue + @NonNull + @SuppressWarnings("unchecked") + default CompletionStageDisposable forEach( + @NonNull BiConsumer consumer, + @NonNull DisposableContainer canceller, + @NonNull ExecutorService executor) { + Objects.requireNonNull(consumer, "consumer is null"); + Objects.requireNonNull(canceller, "canceller is null"); + Objects.requireNonNull(executor, "executor is null"); + final Streamable me = this; + var future = executor.submit(() -> { + try (var str = me.stream(canceller)) { + var stopper = Disposable.empty(); + while (!canceller.isDisposed() && !stopper.isDisposed()) { + if (str.awaitNext(canceller)) { + // System.out.println("Received " + str.current()); + var v = Objects.requireNonNull(str.current(), "The upstream Streamable " + me.getClass() + " produced a null element!"); + consumer.accept(v, stopper); + } else { + // System.out.println("EOF "); + break; + } + } + // System.out.println("Canceller status after loop: " + canceller.isDisposed()); + } catch (final Throwable crash) { + Exceptions.throwIfFatal(crash); + // System.out.println("Canceller status in error: " + canceller.isDisposed()); + // crash.printStackTrace(); + if (crash instanceof RuntimeException ex) { + throw ex; + } + if (crash instanceof Exception ex) { + throw ex; + } + throw new InvocationTargetException(crash); + } + return null; + }); + canceller.add(Disposable.fromFuture(future)); + return new CompletionStageDisposable( + StreamableHelper.toCompletionStage((Future)(Future)future), canceller); + } + /** * Consume this {@code Streamable} via the given flow-reactive-streams subscriber. * @param subscriber the subscriber to consume with. * @param executor the service that hosts the blocking waits. */ - public final void subscribe(@NonNull Flow.Subscriber subscriber, @NonNull ExecutorService executor) { + default void subscribe(@NonNull Flow.Subscriber subscriber, @NonNull ExecutorService executor) { + final Streamable me = this; + Flowable.virtualCreate(emitter -> { + // System.out.println("subscribe::virtualCreate"); + me.forEach(v -> { + // System.out.println("subscribe::virtualCreate::forEach::emit"); + emitter.emit(v); + }).await(emitter.canceller()); + }, executor) + .subscribe(subscriber); + } + /** + * Consume this {@code Streamable} via the given flow-reactive-streams subscriber. + * @param subscriber the subscriber to consume with. + */ + default void subscribe(@NonNull Flow.Subscriber subscriber) { + final Streamable me = this; + Flowable.virtualCreate(emitter -> { + me.forEach(v -> { + // System.out.println("Emitting " + v); + emitter.emit(v); + }).await(emitter.canceller()); + }) + .subscribe(subscriber); + } + + /** + * Creates a new {@link TestSubscriber} and subscribes it to this {@code Streamable}. + * @return the created test subscriber + */ + @CheckReturnValue + @NonNull + default TestSubscriber test() { + var ts = new TestSubscriber(); + subscribe(ts); + return ts; + } + + /** + * Creates a new {@link TestSubscriber} and subscribes it to this {@code Streamable}. + * @param executor the executor to use + * @return the created test subscriber + */ + @CheckReturnValue + @NonNull + default TestSubscriber test(@NonNull ExecutorService executor) { + var ts = new TestSubscriber(); + subscribe(ts, executor); + return ts; } } diff --git a/src/main/java/io/reactivex/rxjava4/core/Streamer.java b/src/main/java/io/reactivex/rxjava4/core/Streamer.java index 2f49dd8c8b..efa8dd97c9 100644 --- a/src/main/java/io/reactivex/rxjava4/core/Streamer.java +++ b/src/main/java/io/reactivex/rxjava4/core/Streamer.java @@ -13,26 +13,37 @@ package io.reactivex.rxjava4.core; -import java.util.NoSuchElementException; -import java.util.concurrent.CompletionStage; +import java.util.*; +import java.util.concurrent.*; +import java.util.function.Function; -import io.reactivex.rxjava4.annotations.NonNull; +import io.reactivex.rxjava4.annotations.*; +import io.reactivex.rxjava4.disposables.*; /** * A realized stream which can then be consumed asynchronously in steps. * Think of it as the {@IAsyncEnumerator} of the Java world. Runs best on Virtual Threads. + *

+ * To make sure you can run finish, use {@link DisposableContainer#clear()} or {@link DisposableContainer#reset()} + * to get rid of all previous registered disposables. finish() will create its own, and if that + * gets stuck, just call clear()/dispose() on the container to get rid of this sequence for good. * @param the element type. * TODO proper docs * @since 4.0.0 */ public interface Streamer<@NonNull T> extends AutoCloseable { + // oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo + // API + // oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo + /** * Determine if there are more elements available from the source. + * @param cancellation ability to perform cancellation on a per-virtual-pull request. * @return eventually true or false, indicating availability or termination */ @NonNull - CompletionStage next(); + CompletionStage next(@NonNull DisposableContainer cancellation); /** * Returns the current element if {@link #next()} yielded {@code true}. @@ -46,15 +57,217 @@ public interface Streamer<@NonNull T> extends AutoCloseable { /** * Called when the stream ends or gets cancelled. Should be always invoked. * TODO, this is inherited from {@code IAsyncDisposable} in C#... + * @param cancellation to cancel a stuck finish operation, just in case. * @return the stage you can await to cleanups to happen */ @NonNull - CompletionStage cancel(); + CompletionStage finish(@NonNull DisposableContainer cancellation); + + // oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo + // HELPERS + // oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo + + /** + * Determine if there are more elements available from the source. + * Uses a default, individual {@link CompositeDisposable} to manage cancellation. + * @return eventually true or false, indicating availability or termination + */ + @NonNull + default CompletionStage next() { + return next(new CompositeDisposable()); + } /** - * Make this Streamer a resource and a Closeable. + * Make this Streamer a resource and a Closeable, allowing virtually blocking closing. */ + @Override default void close() { - cancel().toCompletableFuture().join(); + awaitFinish(); + } + + /** + * Augments the streamer so that the given canceller is injected into the various + * lifecycle await calls. + * @param canceller the canceller to inject + * @return the augmented streamer + */ + default Streamer finishVia(@NonNull DisposableContainer canceller) { + Objects.requireNonNull(canceller, "canceller is null"); + if (this instanceof StreamerFinishViaDisposableContainerCanceller augment) { + if (augment.streamer == this && augment.canceller == canceller) { + // DO not rewrap! + return this; + } + } + + return new StreamerFinishViaDisposableContainerCanceller<>(this, canceller); + } + + /** + * Augments the base streamer with a canceller so that it can be injected at the various await calls. + * @param the element type of the stream + */ + static record StreamerFinishViaDisposableContainerCanceller( + @NonNull Streamer streamer, @NonNull DisposableContainer canceller) + implements Streamer { + + @Override + public @NonNull CompletionStage next(@NonNull DisposableContainer cancellation) { + // TODO Auto-generated method stub + return streamer.next(cancellation); + } + + @Override + public @NonNull T current() { + return streamer.current(); + } + + @Override + public @NonNull CompletionStage finish(@NonNull DisposableContainer cancellation) { + return streamer.finish(cancellation); + } + + } + + /** + * Hides the identity of this Streamer for debug or deoptimization purposes. + * @return the augmented streamer, always unique. + */ + default Streamer hide() { + return new HiddenStreamer<>(this); + } + + /** + * Hides the identity of the Streamer for debug or deoptimization purposes. + * @param the element type of the streamer + */ + static record HiddenStreamer(@NonNull Streamer streamer) implements Streamer { + + @Override + public @NonNull CompletionStage next(@NonNull DisposableContainer cancellation) { + return streamer.next(cancellation); + } + + @Override + public @NonNull T current() { + return streamer.current(); + } + + @Override + public @NonNull CompletionStage finish(@NonNull DisposableContainer cancellation) { + return streamer.finish(cancellation); + } + + } + + /** + * Moves and awaits the sequence's next element, returns false if there are no more + * data. + * @return true if the next element via {@link #current()} can be read, or false if + * the stream ended. + */ + default boolean awaitNext() { + return await(next()); + } + + /** + * Moves and awaits the sequence's next element, returns false if there are no more + * data. + * @param cancellation to efficiently cancel this await if necessary + * @return true if the next element via {@link #current()} can be read, or false if + * the stream ended. + */ + default boolean awaitNext(@NonNull DisposableContainer cancellation) { + return await(next(cancellation), cancellation); + } + + /** + * Finish and cleanup the sequence after its completion or cancellation. + */ + default void awaitFinish() { + await(finish(DisposableContainer.NEVER), DisposableContainer.NEVER); + } + + /** + * Who cancels the cancellation attempt? Another cancellation attempt! + * @param cancellation the token to cancel and ongoing cancel attempt + */ + default void awaitFinish(@NonNull DisposableContainer cancellation) { + await(finish(cancellation), cancellation); + } + + // oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo + // ASYNC/AWAIT "Language" keyword implementations + // oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo + + /** + * The {@code await} keyword for async/await. + * @param the type of the returned value if any. + * @param stage the stage to await virtual-blockingly + * @return the awaited value + */ + @Nullable + static T await(@NonNull CompletionStage stage) { + return await(stage, null); + } + + /** + * The cancellable {@code await} keyword for async/await. + * @param the type of the returned value if any. + * @param stage the stage to await virtual-blockingly + * @param canceller the container that can trigger a cancellation on demand + * @return the awaited value + */ + @Nullable + static T await(@NonNull CompletionStage stage, @Nullable DisposableContainer canceller) { + var f = stage.toCompletableFuture(); + if (canceller == null) { + return f.join(); + } + var d = Disposable.fromFuture(f, true); + try (var _ = canceller.subscribe(d)) { + return f.join(); + } + } + + /** + * Runs a function while turning it into a CompletionStage with a canceller supplied too. + * @param the return type of the function + * @param function the function to apply + * @param canceller the canceller to use + * @param executor the executor to use + * @return the new stage + */ + static CompletionStage runStage(Function function, + DisposableContainer canceller, Executor executor) { + var loopback = new SerialDisposable(); + canceller.add(loopback); + + // new Exception().printStackTrace(); + + var f = CompletableFuture.supplyAsync(() -> { + try { + return function.apply(canceller); + } finally { + canceller.delete(loopback); + } + }, executor); + + var d = Disposable.fromFuture(f, true); + loopback.replace(d); + + return f; + } + + /** + * Runs a function while turning it into a CompletionStage with a canceller supplied too. + * @param the return type of the function + * @param function the function to apply + * @param canceller the canceller to use + * @return the new stage + */ + static CompletionStage runStage(Function function, + DisposableContainer canceller) { + return runStage(function, canceller, Executors.newVirtualThreadPerTaskExecutor()); } } diff --git a/src/main/java/io/reactivex/rxjava4/core/VirtualEmitter.java b/src/main/java/io/reactivex/rxjava4/core/VirtualEmitter.java index cf66193896..97a3e6bccf 100644 --- a/src/main/java/io/reactivex/rxjava4/core/VirtualEmitter.java +++ b/src/main/java/io/reactivex/rxjava4/core/VirtualEmitter.java @@ -13,12 +13,13 @@ package io.reactivex.rxjava4.core; +import io.reactivex.rxjava4.disposables.*; + /** * Interface handed to user code in {@link Flowable#virtualCreate(VirtualGenerator, java.util.concurrent.ExecutorService)} callback. * @param the element type to emit * @since 4.0.0 */ -@FunctionalInterface public interface VirtualEmitter { /** @@ -27,4 +28,10 @@ public interface VirtualEmitter { * @throws Throwable an arbitrary exception if the downstream cancelled */ void emit(T item) throws Throwable; + + /** + * Returns a disposable container to relay cancellation notifications while awaiting the run. + * @return a new Disposable Container instance + */ + DisposableContainer canceller(); } \ No newline at end of file diff --git a/src/main/java/io/reactivex/rxjava4/core/VirtualTransformer.java b/src/main/java/io/reactivex/rxjava4/core/VirtualTransformer.java index dbb397ecf9..733b92061c 100644 --- a/src/main/java/io/reactivex/rxjava4/core/VirtualTransformer.java +++ b/src/main/java/io/reactivex/rxjava4/core/VirtualTransformer.java @@ -12,6 +12,9 @@ */ package io.reactivex.rxjava4.core; + +import io.reactivex.rxjava4.disposables.Disposable; + /** * Interface called by the {@link Flowable#virtualTransform(VirtualTransformer, java.util.concurrent.ExecutorService)} * operator to generate any number of output values based of the current input of the upstream. @@ -29,7 +32,8 @@ public interface VirtualTransformer { * * @param value the upstream value * @param emitter the emitter to use to generate result value(s) + * @param stopper call to stop the upstream * @throws Throwable signaled as {@code onError} for the downstream. */ - void transform(T value, VirtualEmitter emitter) throws Throwable; + void transform(T value, VirtualEmitter emitter, Disposable stopper) throws Throwable; } \ No newline at end of file diff --git a/src/main/java/io/reactivex/rxjava4/disposables/CompositeDisposable.java b/src/main/java/io/reactivex/rxjava4/disposables/CompositeDisposable.java index 9d9066a245..df5be23c65 100644 --- a/src/main/java/io/reactivex/rxjava4/disposables/CompositeDisposable.java +++ b/src/main/java/io/reactivex/rxjava4/disposables/CompositeDisposable.java @@ -255,4 +255,17 @@ void dispose(@Nullable OpenHashSet set) { throw new CompositeException(errors); } } + + @Override + public void reset() { + if (disposed) { + return; + } + synchronized (this) { + if (disposed) { + return; + } + resources = null; + } + } } diff --git a/src/main/java/io/reactivex/rxjava4/disposables/DisposableContainer.java b/src/main/java/io/reactivex/rxjava4/disposables/DisposableContainer.java index 85497e5af7..5586be4b03 100644 --- a/src/main/java/io/reactivex/rxjava4/disposables/DisposableContainer.java +++ b/src/main/java/io/reactivex/rxjava4/disposables/DisposableContainer.java @@ -42,4 +42,96 @@ public interface DisposableContainer extends Disposable { * @return true if the operation was successful */ boolean delete(Disposable d); + + /** + * Removes all contained {@link Disposable}s without disposing them, making this + * container fresh. + * @since 4.0.0 + */ + void reset(); + + /** + * Removes and disposes all contained {@link Disposable}s, making this container fresh + * without disposing the entire container. + */ + void clear(); + + /** + * Registers a {@link Disposable} with this container so that it can be removed and disposed + * via a simple {@link #dispose()} call to the returned Disposable. + * @param d the disposable to register + * @return the Disposable to trigger a {@link #remove(Disposable)} + * @see #subscribe(Disposable) for non-disposing removal. + * @since 4.0.0 + */ + default Disposable register(Disposable d) { + add(d); + return Disposable.fromRunnable(() -> remove(d)); + } + + /** + * Registers a {@link Disposable} with this container so that it can be deleted, not disposed + * via a simple {@link #dispose()} call to the returned Disposable. + * @param d the disposable to register + * @return the Disposable to trigger a {@link #remove(Disposable)} + * @see #subscribe(Disposable) for non-disposing removal. + * @since 4.0.0 + */ + default Disposable subscribe(Disposable d) { + add(d); + return Disposable.fromRunnable(() -> delete(d)); + } + + /** + * The container implementation that just ignores everything, for + * cases where the dispose signal has no side-effects to work with. + * @since 4.0.0 + */ + static DisposableContainer NEVER = new NeverDisposableContainer(); + + /** + * Implementation of a never disposable container. + * @since 4.0.0 + */ + static record NeverDisposableContainer() implements DisposableContainer { + + @Override + public void dispose() { + // Deliberately empty + } + + @Override + public boolean isDisposed() { + // Who cares? + return false; + } + + @Override + public boolean add(Disposable d) { + // Who cares? + return false; + } + + @Override + public boolean remove(Disposable d) { + // Who cares? + return false; + } + + @Override + public boolean delete(Disposable d) { + // Who cares? + return false; + } + + @Override + public void reset() { + // Who cares? + } + + @Override + public void clear() { + // Who cares? + } + } } diff --git a/src/main/java/io/reactivex/rxjava4/functions/Consumer3.java b/src/main/java/io/reactivex/rxjava4/functions/Consumer3.java new file mode 100644 index 0000000000..ca6fa13580 --- /dev/null +++ b/src/main/java/io/reactivex/rxjava4/functions/Consumer3.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava4.functions; + +import io.reactivex.rxjava4.annotations.NonNull; + +/** + * A functional interface (callback) that accepts two values (of possibly different types). + * @param the first value type + * @param the second value type + * @param the third value type + * @since 4.0.0 + */ +@FunctionalInterface +public interface Consumer3<@NonNull T1, @NonNull T2, @NonNull T3> { + + /** + * Performs an operation on the given values. + * @param t1 the first value + * @param t2 the second value + * @param t3 the third value + * @throws Throwable if the implementation wishes to throw any type of exception + */ + void accept(T1 t1, T2 t2, T3 t3) throws Throwable; +} diff --git a/src/main/java/io/reactivex/rxjava4/internal/disposables/ListCompositeDisposable.java b/src/main/java/io/reactivex/rxjava4/internal/disposables/ListCompositeDisposable.java index cd2220dee8..a64faa29df 100644 --- a/src/main/java/io/reactivex/rxjava4/internal/disposables/ListCompositeDisposable.java +++ b/src/main/java/io/reactivex/rxjava4/internal/disposables/ListCompositeDisposable.java @@ -184,4 +184,18 @@ void dispose(List set) { throw new CompositeException(errors); } } + + @Override + public void reset() { + if (disposed) { + return; + } + synchronized (this) { + if (disposed) { + return; + } + resources = null; + } + } + } diff --git a/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableEmpty.java b/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableEmpty.java index 81ab52053b..ed681ead22 100644 --- a/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableEmpty.java +++ b/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableEmpty.java @@ -18,9 +18,9 @@ import io.reactivex.rxjava4.annotations.NonNull; import io.reactivex.rxjava4.core.*; -import io.reactivex.rxjava4.disposables.DisposableContainer; +import io.reactivex.rxjava4.disposables.*; -public final class StreamableEmpty extends Streamable { +public final class StreamableEmpty implements Streamable { @Override public @NonNull Streamer<@NonNull T> stream(@NonNull DisposableContainer cancellation) { @@ -30,7 +30,7 @@ public final class StreamableEmpty extends Streamable { static final class EmptyStreamer implements Streamer { @Override - public @NonNull CompletionStage next() { + public @NonNull CompletionStage next(DisposableContainer cancellation) { return CompletableFuture.completedStage(false); // TODO would constant stages work here or is that contention? } @@ -40,7 +40,7 @@ static final class EmptyStreamer implements Streamer { } @Override - public @NonNull CompletionStage cancel() { + public @NonNull CompletionStage finish(DisposableContainer canceller) { return CompletableFuture.completedStage(null); // TODO would constant stages work here or is that contention? } } diff --git a/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFromPublisher.java b/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFromPublisher.java new file mode 100644 index 0000000000..3e1c3fe2de --- /dev/null +++ b/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFromPublisher.java @@ -0,0 +1,152 @@ +/* + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava4.internal.operators.streamable; + +import java.util.concurrent.*; +import java.util.concurrent.Flow.*; +import java.util.concurrent.atomic.*; + +import io.reactivex.rxjava4.annotations.*; +import io.reactivex.rxjava4.core.*; +import io.reactivex.rxjava4.disposables.DisposableContainer; +import io.reactivex.rxjava4.internal.fuseable.HasUpstreamPublisher; +import io.reactivex.rxjava4.internal.subscriptions.SubscriptionHelper; +import io.reactivex.rxjava4.internal.util.ExceptionHelper; +import io.reactivex.rxjava4.internal.virtual.VirtualResumable; + +public record StreamableFromPublisher(@NonNull Publisher source, + @Nullable Executor executor) +implements Streamable, HasUpstreamPublisher { + + @Override + public @NonNull Streamer<@NonNull T> stream(@NonNull DisposableContainer cancellation) { + + var flow = Flowable.fromPublisher(source); + var streamer = new FlowableStreamer( + cancellation, + new AtomicReference<>(), + new AtomicLong(), + new AtomicReference<>(), + new AtomicReference<>(), + new VirtualResumable(), + executor + ); + flow.subscribe(streamer); + return streamer; + } + + record FlowableStreamer( + DisposableContainer cancellation, + AtomicReference upstream, + AtomicLong requester, + AtomicReference item, + AtomicReference error, + VirtualResumable resumer, + Executor executor) + implements Flow.Subscriber, Streamer { + + @Override + public void onSubscribe(Subscription subscription) { + // System.out.println("onSubscribe | " + subscription); + SubscriptionHelper.deferredSetOnce(upstream, requester, subscription); + } + + @Override + public void onNext(T item) { + // System.out.println("onNext | " + item); + this.item.getAndSet(item); + resumer.resume(); + // System.out.println("Got " + item + " resume signalled"); + } + + @Override + public void onError(Throwable throwable) { + // System.out.println("onError | " + throwable); + error.getAndSet(throwable); + upstream.set(SubscriptionHelper.CANCELLED); + resumer.resume(); + } + + @Override + public void onComplete() { + // System.out.println("onComplete |"); + error.compareAndSet(null, ExceptionHelper.TERMINATED); + upstream.set(SubscriptionHelper.CANCELLED); + resumer.resume(); + } + + @Override + public @NonNull CompletionStage next(@NonNull DisposableContainer canceller) { + // System.out.println("next()"); + return Streamer.runStage(_ -> { + item.lazySet(null); + // System.out.println("Requesting the next item"); + SubscriptionHelper.deferredRequest(upstream, requester, 1); + + // System.out.println("waiting for it"); + + var e = error.get(); + var v = item.get(); + + do { + // System.out.println("1"); + if (e != null || v != null) { + break; + } + + resumer.await(); + + e = error.get(); + v = item.get(); + + // System.out.println("Loop | Value: " + v + ", Error: " + e); + + } while (!canceller.isDisposed()); + + // Because Eclipse craps itself when trying to debug virtual threads + // FU whoever said debugging in virtual threads is straightforward + // System.out.println("Value: " + v + ", Error: " + e); + // if (e != null) { + // e.printStackTrace(); + // } + + if (v == null) { + if (e != null) { + if (e == ExceptionHelper.TERMINATED) { + return false; + } + throw ExceptionHelper.wrapOrThrow(e); + } + throw new IllegalStateException("null current item and null current error? How?"); + } + // System.out.println("Returning true"); + return true; + }, canceller, executor); + } + + @Override + public @NonNull T current() { + return item.get(); + } + + @Override + public @NonNull CompletionStage finish(@NonNull DisposableContainer cancellation) { + // new Exception("StreamableFromPublisher::finish").printStackTrace(); + return Streamer.runStage(_ -> { + SubscriptionHelper.cancel(upstream); + return null; + }, cancellation, executor); + } + } +} diff --git a/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableJust.java b/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableJust.java index 03bcf901b7..24c1285565 100644 --- a/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableJust.java +++ b/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableJust.java @@ -13,19 +13,17 @@ package io.reactivex.rxjava4.internal.operators.streamable; -import java.util.NoSuchElementException; +import java.util.*; import java.util.concurrent.*; import io.reactivex.rxjava4.annotations.NonNull; import io.reactivex.rxjava4.core.*; import io.reactivex.rxjava4.disposables.*; -public final class StreamableJust extends Streamable { - - final T item; +public record StreamableJust(@NonNull T item) implements Streamable { public StreamableJust(T item) { - this.item = item; + this.item = Objects.requireNonNull(item, "item is null");; } @Override @@ -48,7 +46,7 @@ static final class JustStreamer implements Streamer, Disposable { } @Override - public @NonNull CompletionStage next() { + public @NonNull CompletionStage next(DisposableContainer cancellation) { if (stage == 0) { stage = 1; return CompletableFuture.completedStage(true); @@ -72,7 +70,7 @@ static final class JustStreamer implements Streamer, Disposable { } @Override - public @NonNull CompletionStage cancel() { + public @NonNull CompletionStage finish(DisposableContainer canceller) { item = null; cancellation = null; stage = 2; diff --git a/src/main/java/io/reactivex/rxjava4/internal/schedulers/SchedulerToExecutorService.java b/src/main/java/io/reactivex/rxjava4/internal/schedulers/SchedulerToExecutorService.java new file mode 100644 index 0000000000..729e0604fb --- /dev/null +++ b/src/main/java/io/reactivex/rxjava4/internal/schedulers/SchedulerToExecutorService.java @@ -0,0 +1,242 @@ +/* + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava4.internal.schedulers; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicReference; + +import io.reactivex.rxjava4.annotations.NonNull; +import io.reactivex.rxjava4.core.Scheduler; +import io.reactivex.rxjava4.core.Scheduler.Worker; +import io.reactivex.rxjava4.exceptions.Exceptions; + +/** + * Represents the state for a Scheduler -> ExecutorService interface. + * @param scheduler the scheduler to use + * @param workerStore hosts the worker state + * @since 4.0.0 + */ +public record SchedulerToExecutorService(@NonNull Scheduler scheduler, + @NonNull AtomicReference workerStore) implements ExecutorService { + + @Override + public void execute(Runnable command) { + if (workerStore.get() instanceof Worker w) { + w.schedule(command); + } else { + scheduler.scheduleDirect(command); + } + } + + @Override + public void shutdown() { + if (workerStore.get() instanceof Worker w) { + w.dispose(); + } else { + // FIXME, generally we don't want to shut down RxJava schedulers like this! + // scheduler.shutdown(); + var w = workerStore.getAndSet(Scheduler.Worker.SHUTDOWN); + if (w != null) { + w.dispose(); + } + } + } + + @Override + public List shutdownNow() { + if (workerStore.get() instanceof Worker w) { + w.dispose(); + } else { + // FIXME, generally we don't want to shut down RxJava schedulers like this! + // scheduler.shutdown(); + var w = workerStore.getAndSet(Scheduler.Worker.SHUTDOWN); + if (w != null) { + w.dispose(); + } + } + return List.of(); + } + + @Override + public boolean isShutdown() { + var w = workerStore.get(); + return w != null && w.isDisposed(); + } + + @Override + public boolean isTerminated() { + return isShutdown(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + // FIXME no idea how to passively wait, not really applicable in Rx + long totalTime = unit.convert(timeout, TimeUnit.MILLISECONDS); + + while (!isTerminated() && totalTime > 0) { + totalTime--; + Thread.sleep(1); + } + return totalTime > 0; + } + + @Override + public Future submit(Callable task) { + return CompletableFuture.supplyAsync(() -> { + try { + return task.call(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + throw Exceptions.propagate(ex); + } + }, this::execute); + } + + @Override + public Future submit(Runnable task, T result) { + return CompletableFuture.supplyAsync(() -> { + try { + task.run(); + return result; + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + throw Exceptions.propagate(ex); + } + }, this::execute); + } + + @Override + public Future submit(Runnable task) { + return CompletableFuture.runAsync(() -> { + try { + task.run(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + throw Exceptions.propagate(ex); + } + }, this::execute); + } + + @Override + public List> invokeAll(Collection> tasks) throws InterruptedException { + var result = new ArrayList>(); + for (var task : tasks) { + result.add(submit(task)); + } + for (var f : result) { + try { + f.get(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + } + } + return result; + } + + @Override + public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) + throws InterruptedException { + var result = new ArrayList>(); + for (var task : tasks) { + result.add(submit(task)); + } + + // FIXME how to wait in aggregate without spinning??? + long totalTime = unit.convert(timeout, TimeUnit.MILLISECONDS); + + while (!isTerminated() && totalTime > 0) { + totalTime--; + + int done = 0; + for (var f : result) { + if (f.isDone()) { + done++; + } + } + + if (done == result.size()) { + break; + } + + Thread.sleep(1); + } + + return result; + } + + @Override + public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { + if (tasks.size() == 0) { + throw new IllegalArgumentException("The tasks parameter should contain at least one callable!"); + } + + var result = new ArrayList>(); + for (var task : tasks) { + result.add(submit(task)); + } + + while (!isTerminated()) { + for (var f : result) { + if (f.isDone() && !f.isCancelled() && f.exceptionNow() == null) { + + var v = f.resultNow(); + + for (var g : result) { + g.cancel(true); + } + + return v; + } + } + Thread.sleep(1); + } + return null; // Practically unreachable + } + + @Override + public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + if (tasks.size() == 0) { + throw new IllegalArgumentException("The tasks parameter should contain at least one callable!"); + } + + var result = new ArrayList>(); + for (var task : tasks) { + result.add(submit(task)); + } + + // FIXME how to wait in aggregate without spinning??? + long totalTime = unit.convert(timeout, TimeUnit.MILLISECONDS); + + while (!isTerminated() && totalTime > 0) { + totalTime--; + + for (var f : result) { + if (f.isDone() && !f.isCancelled() && f.exceptionNow() == null) { + + var v = f.resultNow(); + + for (var g : result) { + g.cancel(true); + } + + return v; + } + } + } + throw new TimeoutException("None of the tasks produced a clean result in the time allotted."); + } + +} diff --git a/src/main/java/io/reactivex/rxjava4/internal/virtual/FlowableVirtualCreateExecutor.java b/src/main/java/io/reactivex/rxjava4/internal/virtual/FlowableVirtualCreateExecutor.java index c619e6b14a..a3002d2d2c 100644 --- a/src/main/java/io/reactivex/rxjava4/internal/virtual/FlowableVirtualCreateExecutor.java +++ b/src/main/java/io/reactivex/rxjava4/internal/virtual/FlowableVirtualCreateExecutor.java @@ -19,6 +19,7 @@ import java.util.concurrent.atomic.AtomicLong; import io.reactivex.rxjava4.core.*; +import io.reactivex.rxjava4.disposables.*; import io.reactivex.rxjava4.exceptions.Exceptions; import io.reactivex.rxjava4.internal.util.BackpressureHelper; @@ -49,7 +50,11 @@ protected void subscribeActual(Subscriber s) { s.onSubscribe(parent); if (executor != null) { - executor.submit((Callable)parent); + try { + executor.submit((Callable)parent); + } catch (RejectedExecutionException ex) { + s.onError(ex); + } } else { var worker = scheduler.createWorker(); parent.worker = worker; @@ -76,10 +81,13 @@ static final class ExecutorVirtualCreateSubscription extends AtomicLong Scheduler.Worker worker; + final DisposableContainer canceller; + ExecutorVirtualCreateSubscription(Subscriber downstream, VirtualGenerator generator) { this.downstream = downstream; this.generator = generator; this.consumerReady = new VirtualResumable(); + this.canceller = new CompositeDisposable(); } @Override @@ -122,6 +130,7 @@ public void request(long n) { @Override public void cancel() { cancelled = true; + canceller.dispose(); request(1); } @@ -140,5 +149,10 @@ public void emit(T item) throws Throwable { downstream.onNext(item); produced = p + 1; } + + @Override + public DisposableContainer canceller() { + return canceller; + } } } \ No newline at end of file diff --git a/src/main/java/io/reactivex/rxjava4/internal/virtual/FlowableVirtualTransformExecutor.java b/src/main/java/io/reactivex/rxjava4/internal/virtual/FlowableVirtualTransformExecutor.java index 806e072f22..adefc82b09 100644 --- a/src/main/java/io/reactivex/rxjava4/internal/virtual/FlowableVirtualTransformExecutor.java +++ b/src/main/java/io/reactivex/rxjava4/internal/virtual/FlowableVirtualTransformExecutor.java @@ -20,6 +20,7 @@ import io.reactivex.rxjava4.core.*; import io.reactivex.rxjava4.core.Scheduler.Worker; +import io.reactivex.rxjava4.disposables.*; import io.reactivex.rxjava4.exceptions.Exceptions; import io.reactivex.rxjava4.internal.util.BackpressureHelper; import io.reactivex.rxjava4.operators.SpscArrayQueue; @@ -62,7 +63,7 @@ protected void subscribeActual(Subscriber s) { } static final class ExecutorVirtualTransformSubscriber extends AtomicLong - implements FlowableSubscriber, Subscription, VirtualEmitter, Callable, Runnable { + implements FlowableSubscriber, Subscription, VirtualEmitter, Callable, Runnable, Disposable { private static final long serialVersionUID = -4702456711290258571L; @@ -93,6 +94,10 @@ static final class ExecutorVirtualTransformSubscriber extends AtomicLong Worker worker; + final DisposableContainer canceller; + + volatile boolean stopped; + ExecutorVirtualTransformSubscriber(Subscriber downstream, VirtualTransformer transformer, int prefetch) { @@ -103,6 +108,7 @@ static final class ExecutorVirtualTransformSubscriber extends AtomicLong this.producerReady = new VirtualResumable(); this.consumerReady = new VirtualResumable(); this.queue = new SpscArrayQueue<>(prefetch); + this.canceller = new CompositeDisposable(); } @Override @@ -160,18 +166,21 @@ public void request(long n) { @Override public void cancel() { - cancelled = true; - upstream.cancel(); - // cleanup(); don't kill the worker - - var w = worker; - worker = null; - if (w != null) { - w.close(); + try { + cancelled = true; + upstream.cancel(); + canceller.dispose(); + // cleanup(); don't kill the worker + + var w = worker; + worker = null; + if (w != null) { + w.close(); + } + } finally { + producerReady.resume(); + consumerReady.resume(); } - - producerReady.resume(); - consumerReady.resume(); } @Override @@ -209,7 +218,7 @@ public Void call() { upstream.request(limit); } - transformer.transform(v, this); + transformer.transform(v, this, this); continue; } @@ -233,5 +242,21 @@ public Void call() { } return null; } + + @Override + public DisposableContainer canceller() { + return canceller; + } + + @Override + public void dispose() { + stopped = true; + upstream.cancel(); + } + + @Override + public boolean isDisposed() { + return stopped; + } } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/rxjava4/completable/CompletableIsolatedTest.java b/src/test/java/io/reactivex/rxjava4/completable/CompletableIsolatedTest.java new file mode 100644 index 0000000000..9373ec659f --- /dev/null +++ b/src/test/java/io/reactivex/rxjava4/completable/CompletableIsolatedTest.java @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava4.completable; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import org.junit.Assert; +import org.junit.jupiter.api.parallel.Isolated; + +import io.reactivex.rxjava4.core.*; +import io.reactivex.rxjava4.disposables.Disposable; +import io.reactivex.rxjava4.schedulers.Schedulers; + +/** + * Test Completable methods and operators. + */ +@Isolated +public class CompletableIsolatedTest extends RxJavaTest { + + @org.junit.jupiter.api.Test + public void repeatNormal() { + final AtomicReference err = new AtomicReference<>(); + final AtomicInteger calls = new AtomicInteger(); + + Completable c = Completable.fromCallable(() -> { + calls.getAndIncrement(); + Thread.sleep(200); + return null; + }).repeat(); + + c.subscribe(new CompletableObserver() { + @Override + public void onSubscribe(final Disposable d) { + Schedulers.single().scheduleDirect(() -> d.dispose(), 1100, TimeUnit.MILLISECONDS); + } + + @Override + public void onError(Throwable e) { + err.set(e); + } + + @Override + public void onComplete() { + + } + }); + + Assert.assertEquals("calls count mismatch", 6, calls.get()); + Assert.assertNull("error present", err.get()); + } +} diff --git a/src/test/java/io/reactivex/rxjava4/completable/CompletableTest.java b/src/test/java/io/reactivex/rxjava4/completable/CompletableTest.java index bcc49b3f04..cd27007b75 100644 --- a/src/test/java/io/reactivex/rxjava4/completable/CompletableTest.java +++ b/src/test/java/io/reactivex/rxjava4/completable/CompletableTest.java @@ -1983,46 +1983,6 @@ public Completable apply(Throwable v) { c.blockingAwait(); } - @org.junit.jupiter.api.Test - public void repeatNormal() { - final AtomicReference err = new AtomicReference<>(); - final AtomicInteger calls = new AtomicInteger(); - - Completable c = Completable.fromCallable(new Callable() { - @Override - public Object call() throws Exception { - calls.getAndIncrement(); - Thread.sleep(100); - return null; - } - }).repeat(); - - c.subscribe(new CompletableObserver() { - @Override - public void onSubscribe(final Disposable d) { - Schedulers.single().scheduleDirect(new Runnable() { - @Override - public void run() { - d.dispose(); - } - }, 550, TimeUnit.MILLISECONDS); - } - - @Override - public void onError(Throwable e) { - err.set(e); - } - - @Override - public void onComplete() { - - } - }); - - Assert.assertEquals(6, calls.get()); - Assert.assertNull(err.get()); - } - @Test(expected = TestException.class) public void repeatError() { Completable c = error.completable.repeat(); diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableReplayEagerTruncateTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableReplayEagerTruncateTest.java index e1f7d4ba2d..89fa6a6ffd 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableReplayEagerTruncateTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableReplayEagerTruncateTest.java @@ -698,7 +698,7 @@ public static Worker workerSpy(final Disposable mockDisposable) { } private static class InprocessWorker extends Worker { - private final Disposable mockDisposable; + private Disposable mockDisposable; public boolean unsubscribed; InprocessWorker(Disposable mockDisposable) { diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableReplayTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableReplayTest.java index 31cb87c16e..36b6b5e034 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableReplayTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableReplayTest.java @@ -701,7 +701,7 @@ public static Worker workerSpy(final Disposable mockDisposable) { } private static class InprocessWorker extends Worker { - private final Disposable mockDisposable; + private Disposable mockDisposable; public boolean unsubscribed; InprocessWorker(Disposable mockDisposable) { diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableScanTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableScanTest.java index bd0f648e80..d7070cf1e2 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableScanTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableScanTest.java @@ -19,11 +19,12 @@ import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Flow.*; import java.util.concurrent.atomic.*; import org.junit.*; -import static java.util.concurrent.Flow.*; +import io.reactivex.rxjava4.annotations.*; import io.reactivex.rxjava4.core.*; import io.reactivex.rxjava4.exceptions.*; import io.reactivex.rxjava4.flowable.*; @@ -42,14 +43,7 @@ public void scanIntegersWithInitialValue() { Flowable flowable = Flowable.just(1, 2, 3); - Flowable m = flowable.scan("", new BiFunction() { - - @Override - public String apply(String s, Integer n) { - return s + n.toString(); - } - - }); + Flowable m = flowable.scan("", (s, n) -> s + n.toString()); m.subscribe(subscriber); verify(subscriber, never()).onError(any(Throwable.class)); @@ -68,14 +62,7 @@ public void scanIntegersWithoutInitialValue() { Flowable flowable = Flowable.just(1, 2, 3); - Flowable m = flowable.scan(new BiFunction() { - - @Override - public Integer apply(Integer t1, Integer t2) { - return t1 + t2; - } - - }); + Flowable m = flowable.scan((t1, t2) -> t1 + t2); m.subscribe(subscriber); verify(subscriber, never()).onError(any(Throwable.class)); @@ -94,14 +81,7 @@ public void scanIntegersWithoutInitialValueAndOnlyOneValue() { Flowable flowable = Flowable.just(1); - Flowable m = flowable.scan(new BiFunction() { - - @Override - public Integer apply(Integer t1, Integer t2) { - return t1 + t2; - } - - }); + Flowable m = flowable.scan((t1, t2) -> t1 + t2); m.subscribe(subscriber); verify(subscriber, never()).onError(any(Throwable.class)); @@ -115,22 +95,10 @@ public Integer apply(Integer t1, Integer t2) { @Test public void shouldNotEmitUntilAfterSubscription() { TestSubscriber ts = new TestSubscriber<>(); - Flowable.range(1, 100).scan(0, new BiFunction() { - - @Override - public Integer apply(Integer t1, Integer t2) { - return t1 + t2; - } - - }).filter(new Predicate() { - - @Override - public boolean test(Integer t1) { - // this will cause request(1) when 0 is emitted - return t1 > 0; - } - - }).subscribe(ts); + Flowable.range(1, 100) + .scan(0, (t1, t2) -> t1 + t2) + .filter(t1 -> t1 > 0) + .subscribe(ts); assertEquals(100, ts.values().size()); } @@ -139,14 +107,7 @@ public boolean test(Integer t1) { public void backpressureWithInitialValue() { final AtomicInteger count = new AtomicInteger(); Flowable.range(1, 100) - .scan(0, new BiFunction() { - - @Override - public Integer apply(Integer t1, Integer t2) { - return t1 + t2; - } - - }) + .scan(0, (t1, t2) -> t1 + t2) .subscribe(new DefaultSubscriber() { @Override @@ -180,14 +141,7 @@ public void onNext(Integer t) { public void backpressureWithoutInitialValue() { final AtomicInteger count = new AtomicInteger(); Flowable.range(1, 100) - .scan(new BiFunction() { - - @Override - public Integer apply(Integer t1, Integer t2) { - return t1 + t2; - } - - }) + .scan((t1, t2) -> t1 + t2) .subscribe(new DefaultSubscriber() { @Override @@ -221,14 +175,7 @@ public void onNext(Integer t) { public void noBackpressureWithInitialValue() { final AtomicInteger count = new AtomicInteger(); Flowable.range(1, 100) - .scan(0, new BiFunction() { - - @Override - public Integer apply(Integer t1, Integer t2) { - return t1 + t2; - } - - }) + .scan(0, (t1, t2) -> t1 + t2) .subscribe(new DefaultSubscriber() { @Override @@ -259,14 +206,7 @@ public void onNext(Integer t) { @Test public void seedFactory() { Single> o = Flowable.range(1, 10) - .collect(new Supplier>() { - - @Override - public List get() { - return new ArrayList<>(); - } - - }, new BiConsumer, Integer>() { + .collect(() -> new ArrayList<>(), new BiConsumer, Integer>() { @Override public void accept(List list, Integer t2) { @@ -285,21 +225,9 @@ public void accept(List list, Integer t2) { @Test public void seedFactoryFlowable() { Flowable> f = Flowable.range(1, 10) - .collect(new Supplier>() { - - @Override - public List get() { - return new ArrayList<>(); - } - - }, new BiConsumer, Integer>() { - - @Override - public void accept(List list, Integer t2) { - list.add(t2); - } - - }).toFlowable().takeLast(1); + .collect(() -> (List)new ArrayList(), (list, item) -> list.add(item)) + .toFlowable() + .takeLast(1); assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), f.blockingSingle()); assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), f.blockingSingle()); @@ -307,14 +235,10 @@ public void accept(List list, Integer t2) { @Test public void scanWithRequestOne() { - Flowable f = Flowable.just(1, 2).scan(0, new BiFunction() { + Flowable f = Flowable.just(1, 2) + .scan(0, (t1, t2) -> t1 + t2) + .take(1); - @Override - public Integer apply(Integer t1, Integer t2) { - return t1 + t2; - } - - }).take(1); TestSubscriberEx subscriber = new TestSubscriberEx<>(); f.subscribe(subscriber); subscriber.assertValue(0); @@ -322,40 +246,71 @@ public Integer apply(Integer t1, Integer t2) { subscriber.assertNoErrors(); } + /** + * Turns the Subscription methods into lambda callbacks. + * @param the element type of the subscriber + */ + static class SubscriptionDelegate implements Subscription { + + @NonNull Subscriber subscriber; + @NonNull Consumer3, Long, U> onRequest; + @NonNull BiConsumer, U> onCancel; + @Nullable U data; + + SubscriptionDelegate( + @NonNull Subscriber subscriber, + @NonNull Consumer3, Long, U> onRequest, + @NonNull BiConsumer, U> onCancel, + @Nullable U data + ) { + this.subscriber = subscriber; + this.onRequest = onRequest; + this.onCancel = onCancel; + this.data = data; + } + + @Override + public void request(long n) { + try { + onRequest.accept(subscriber, n, data); + } catch(Throwable ex) { + throw Exceptions.propagate(ex); + } + } + + @Override + public void cancel() { + try { + onCancel.accept(subscriber, data); + } catch(Throwable ex) { + throw Exceptions.propagate(ex); + } + } + } + @Test public void scanShouldNotRequestZero() { final AtomicReference producer = new AtomicReference<>(); Flowable f = Flowable.unsafeCreate(new Publisher() { @Override - public void subscribe(final Subscriber subscriber) { - Subscription p = spy(new Subscription() { + public void subscribe(Subscriber subscriber) { - private AtomicBoolean requested = new AtomicBoolean(false); + var requested = new AtomicBoolean(false); - @Override - public void request(long n) { - if (requested.compareAndSet(false, true)) { - subscriber.onNext(1); - subscriber.onComplete(); + var subber = new SubscriptionDelegate(subscriber, (sub, _, data) -> { + if (data.compareAndSet(false, true)) { + sub.onNext(1); + sub.onComplete(); } - } - - @Override - public void cancel() { + }, (_, _) -> { }, + requested + ); - } - }); + Subscription p = spy(subber); producer.set(p); subscriber.onSubscribe(p); } - }).scan(100, new BiFunction() { - - @Override - public Integer apply(Integer t1, Integer t2) { - return t1 + t2; - } - - }); + }).scan(100, (t1, t2) -> t1 + t2); f.subscribe(new TestSubscriber(1L) { @@ -371,19 +326,10 @@ public void onNext(Integer integer) { @Test public void dispose() { - TestHelper.checkDisposed(PublishProcessor.create().scan(new BiFunction() { - @Override - public Object apply(Object a, Object b) throws Exception { - return a; - } - })); + TestHelper.checkDisposed(PublishProcessor.create().scan((a, _) -> a)); - TestHelper.checkDisposed(PublishProcessor.create().scan(0, new BiFunction() { - @Override - public Integer apply(Integer a, Integer b) throws Exception { - return a + b; - } - })); + TestHelper.checkDisposed(PublishProcessor.create() + .scan(0, (a, b) -> a + b)); } @Test @@ -391,37 +337,18 @@ public void doubleOnSubscribe() { TestHelper.checkDoubleOnSubscribeFlowable(new Function, Flowable>() { @Override public Flowable apply(Flowable f) throws Exception { - return f.scan(new BiFunction() { - @Override - public Object apply(Object a, Object b) throws Exception { - return a; - } - }); + return f.scan((a, _) -> a); } }); - TestHelper.checkDoubleOnSubscribeFlowable(new Function, Flowable>() { - @Override - public Flowable apply(Flowable f) throws Exception { - return f.scan(0, new BiFunction() { - @Override - public Object apply(Object a, Object b) throws Exception { - return a; - } - }); - } - }); + TestHelper.checkDoubleOnSubscribeFlowable((Function, Flowable>) f -> + f.scan(0, (a, _) -> a)); } @Test public void error() { Flowable.error(new TestException()) - .scan(new BiFunction() { - @Override - public Object apply(Object a, Object b) throws Exception { - return a; - } - }) + .scan((a, _) -> a) .test() .assertFailure(TestException.class); } @@ -429,12 +356,7 @@ public Object apply(Object a, Object b) throws Exception { @Test public void neverSource() { Flowable.never() - .scan(0, new BiFunction() { - @Override - public Integer apply(Integer a, Integer b) throws Exception { - return a + b; - } - }) + .scan(0, (a, b) -> a + b) .test() .assertValue(0) .assertNoErrors() @@ -453,12 +375,7 @@ public HashMap apply(HashMap accum, Event perIns } }) .take(10) - .blockingForEach(new Consumer>() { - @Override - public void accept(HashMap v) { - System.out.println(v); - } - }); + .blockingForEach(System.out::println); } @Test diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/observable/ObservableWindowWithSizeIsolatedTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/observable/ObservableWindowWithSizeIsolatedTest.java index 1d82a8cf5f..19762d5847 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/operators/observable/ObservableWindowWithSizeIsolatedTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/observable/ObservableWindowWithSizeIsolatedTest.java @@ -22,7 +22,6 @@ import io.reactivex.rxjava4.core.Observable; import io.reactivex.rxjava4.core.RxJavaTest; -import io.reactivex.rxjava4.functions.Consumer; import io.reactivex.rxjava4.schedulers.Schedulers; import io.reactivex.rxjava4.testsupport.*; @@ -35,30 +34,25 @@ public void windowUnsubscribeNonOverlappingAsyncSource() { final AtomicInteger count = new AtomicInteger(); Observable.merge(Observable.range(1, 100000) - .doOnNext(new Consumer() { - - @Override - public void accept(Integer t1) { - if (count.incrementAndGet() == 500000) { - // give it a small break halfway through - try { - Thread.sleep(50); - } catch (InterruptedException ex) { - // ignored - } + .doOnNext(_ -> { + if (count.incrementAndGet() == 50000) { + // give it a small break halfway through + try { + Thread.sleep(75); + } catch (InterruptedException _) { + // ignored } } - }) .observeOn(Schedulers.computation()) .window(5) .take(2)) .subscribe(to); - to.awaitDone(500, TimeUnit.MILLISECONDS); + to.awaitDone(1000, TimeUnit.MILLISECONDS); to.assertTerminated(); to.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); // make sure we don't emit all values ... the unsubscribe should propagate - assertTrue(count.get() < 100000); + assertTrue(count.get() < 100000, "count: " + count.get()); } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableTest.java index 4fb59761db..f06f62189a 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableTest.java @@ -13,16 +13,22 @@ package io.reactivex.rxjava4.internal.operators.streamable; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertEquals; + import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.*; +import org.junit.jupiter.api.parallel.Isolated; -import io.reactivex.rxjava4.core.Streamable; +import io.reactivex.rxjava4.core.*; import io.reactivex.rxjava4.exceptions.TestException; -import io.reactivex.rxjava4.internal.subscriptions.*; +import io.reactivex.rxjava4.internal.subscriptions.EmptySubscription; import io.reactivex.rxjava4.subscribers.TestSubscriber; import io.reactivex.rxjava4.testsupport.TestHelper; +@Isolated public class StreamableTest { @Test @@ -39,6 +45,9 @@ public void empty() throws Throwable { ts .awaitDone(5, TimeUnit.SECONDS) .assertResult(); + + assertFalse(exec.isShutdown(), "Exec::IsShutdown"); + assertFalse(exec.isTerminated(), "Exec::IsTerminated"); }); } @@ -56,6 +65,198 @@ public void just() throws Throwable { ts .awaitDone(5, TimeUnit.SECONDS) .assertResult(1); + + assertFalse(exec.isShutdown(), "Exec::IsShutdown"); + assertFalse(exec.isTerminated(), "Exec::IsTerminated"); + }); + } + + @RepeatedTest(100) + public void fromFlowable() throws Throwable { + TestHelper.withVirtual(exec -> { + Flowable.range(1, 10) + .toStreamable(exec) + .test(exec) + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + ; + + assertFalse(exec.isShutdown(), "Exec::IsShutdown"); + assertFalse(exec.isTerminated(), "Exec::IsTerminated"); + }); + } + + @RepeatedTest(100) + public void fromFlowableToStreamableToFlowable() throws Throwable { + TestHelper.withVirtual(exec -> { + Flowable.range(1, 10) + .toStreamable(exec) + .toFlowable(exec) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + ; + + assertFalse(exec.isShutdown(), "Exec::IsShutdown"); + assertFalse(exec.isTerminated(), "Exec::IsTerminated"); + }); + } + + @RepeatedTest(100) + public void createAndTransform() throws Throwable { + TestHelper.onVirtual(exec -> { + Streamable.create(emitter -> { + for (int i = 1; i < 11; i++) { + emitter.emit(i); + } + }, exec) + .transform((item, emitter, _) -> { + emitter.emit(-item - 1); + }, exec) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(-2, -3, -4, -5, -6, -7, -8, -9, -10, -11); + + assertFalse(exec.isShutdown(), "Exec::IsShutdown"); + assertFalse(exec.isTerminated(), "Exec::IsTerminated"); + }); + } + + @RepeatedTest(100) + public void flowableRangeAndTransform() throws Throwable { + TestHelper.onVirtual(exec -> { + Flowable.range(1, 10) + .toStreamable(exec) + .transform((item, emitter, _) -> { + emitter.emit(-item - 1); + }, exec) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(-2, -3, -4, -5, -6, -7, -8, -9, -10, -11); + + assertFalse(exec.isShutdown(), "Exec::IsShutdown"); + assertFalse(exec.isTerminated(), "Exec::IsTerminated"); + }); + } + + @Test + public void flowableRangeAndTransform1() throws Throwable { + TestHelper.onVirtual(exec -> { + System.out.println(">> START"); + Flowable.range(1, 10) + .doOnSubscribe(_ -> System.out.println("Flowable::doOnSubscribe")) + .doOnRequest(v -> System.out.println("Flowable::doOnRequest " + v)) + .doOnCancel(() -> { + System.out.println("Flowable::doOnCancel"); + new Exception().printStackTrace(); + }) + .doOnNext(v -> System.out.println("Flowable::doOnNext " + v)) + .toStreamable(exec) + .transform((item, emitter, _) -> { + emitter.emit(-item - 1); + }, exec) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(-2, -3, -4, -5, -6, -7, -8, -9, -10, -11); + System.out.println(">> END"); + + assertFalse(exec.isShutdown(), "Exec::IsShutdown"); + assertFalse(exec.isTerminated(), "Exec::IsTerminated"); + }); + } + + @Test + public void flowableRangeAndTransformFlowable1() throws Throwable { + TestHelper.onVirtual(exec -> { + System.out.println(">> START"); + Flowable.range(1, 10) + .doOnSubscribe(_ -> System.out.println("Flowable::doOnSubscribe")) + .doOnRequest(v -> System.out.println("Flowable::doOnRequest " + v)) + .doOnCancel(() -> { + System.out.println("Flowable::doOnCancel"); + new Exception().printStackTrace(); + }) + .doOnNext(v -> System.out.println("Flowable::doOnNext " + v)) + .toStreamable(exec) + .toFlowable() + .virtualTransform((item, emitter, _) -> { + System.out.println("Transform " + item); + emitter.emit(-item - 1); + }, exec) + .toStreamable() + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(-2, -3, -4, -5, -6, -7, -8, -9, -10, -11); + System.out.println(">> END"); + + assertFalse(exec.isShutdown(), "Exec::IsShutdown"); + assertFalse(exec.isTerminated(), "Exec::IsTerminated"); + }); + } + + @Test + public void flowableRangeAndTransform2() throws Throwable { + TestHelper.withCachedExecutor(exec -> { + System.out.println(">> START"); + var ts = Flowable.range(1, 10) + .doOnSubscribe(_ -> System.out.println("Flowable::doOnSubscribe")) + .doOnRequest(v -> System.out.println("Flowable::doOnRequest " + v)) + .doOnCancel(() -> { + System.out.println("Flowable::doOnCancel"); + new Exception().printStackTrace(); + }) + .doOnNext(v -> System.out.println("Flowable::doOnNext " + v)) + .toStreamable(exec) + .transform((item, emitter, _) -> { + System.out.println("Transform " + item); + emitter.emit(-item - 1); + }, exec) + .test() + .awaitDone(5, TimeUnit.SECONDS) + ; + System.out.println(">> CHECK"); + ts.assertResult(-2, -3, -4, -5, -6, -7, -8, -9, -10, -11); + System.out.println(">> END"); + + assertFalse(exec.isShutdown(), "Exec::IsShutdown"); + assertFalse(exec.isTerminated(), "Exec::IsTerminated"); + }); + } + + @Test + public void rangeTransformFilter() throws Throwable { + TestHelper.withVirtual(exec -> { + Flowable.range(1, 10) + .toStreamable(exec) + .transform((item, emitter, _) -> { + if ((item & 1) == 0) { + emitter.emit(item); + } + }, exec) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(2, 4, 6, 8, 10); + }); + } + + @Test + public void rangeTransformTake() throws Throwable { + TestHelper.withVirtual(exec -> { + var cancelled = new AtomicInteger(); + Flowable.range(1, 10) + .doOnCancel(() -> cancelled.incrementAndGet()) + .toStreamable(exec) + .transform((item, emitter, stopper) -> { + if (item == 5) { + stopper.dispose(); + } + emitter.emit(item); + }, exec) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1, 2, 3, 4, 5); + + assertEquals(1, cancelled.get(), "Cancellation count "); }); } } diff --git a/src/test/java/io/reactivex/rxjava4/internal/virtual/VirtualTransformTest.java b/src/test/java/io/reactivex/rxjava4/internal/virtual/VirtualTransformTest.java index 5f7e1f5311..179b8c23ed 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/virtual/VirtualTransformTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/virtual/VirtualTransformTest.java @@ -33,7 +33,7 @@ public void checkIsInsideVirtualThread() { var cancelled = new AtomicBoolean(); Flowable.range(1, 5) .doOnCancel(() -> cancelled.set(true)) - .virtualTransform((v, emitter) -> emitter.emit(v), scope) + .virtualTransform((v, emitter, _) -> emitter.emit(v), scope) .take(1) .test() .awaitDone(5, TimeUnit.SECONDS) @@ -47,7 +47,7 @@ public void checkIsInsideVirtualThread() { public void errorUpstream() throws Throwable { TestHelper.withVirtual(exec -> { Flowable.error(new IOException()) - .virtualTransform((v, e) -> e.emit(v), exec) + .virtualTransform((v, e, _) -> e.emit(v), exec) .test() .awaitDone(5, TimeUnit.SECONDS) .assertError(IOException.class) @@ -59,7 +59,7 @@ public void errorUpstream() throws Throwable { public void errorTransform() throws Throwable { TestHelper.withVirtual(exec -> { Flowable.range(1, 5) - .virtualTransform((_, _) -> { throw new IOException(); }, exec) + .virtualTransform((_, _, _) -> { throw new IOException(); }, exec) .test() .awaitDone(5, TimeUnit.SECONDS) .assertError(IOException.class) @@ -71,7 +71,7 @@ public void errorTransform() throws Throwable { public void take() throws Throwable { TestHelper.withVirtual(exec -> { Flowable.range(1, 5) - .virtualTransform((v, e) -> e.emit(v), exec) + .virtualTransform((v, e, _) -> e.emit(v), exec) .take(2) .test() .awaitDone(5, TimeUnit.SECONDS) @@ -84,7 +84,7 @@ public void take() throws Throwable { public void observeOn() throws Throwable { TestHelper.withVirtual(exec -> { Flowable.range(1, 10000) - .virtualTransform((v, e) -> e.emit(v), exec) + .virtualTransform((v, e, _) -> e.emit(v), exec) .observeOn(Schedulers.single(), false, 2) .test() .awaitDone(5, TimeUnit.SECONDS) @@ -97,7 +97,7 @@ public void observeOn() throws Throwable { public void empty() throws Throwable { TestHelper.withVirtual(exec -> { Flowable.empty() - .virtualTransform((v, e) -> e.emit(v), exec) + .virtualTransform((v, e, _) -> e.emit(v), exec) .test() .awaitDone(5, TimeUnit.SECONDS) .assertResult() @@ -109,7 +109,7 @@ public void empty() throws Throwable { public void emptyNever() throws Throwable { TestHelper.withVirtual(exec -> { Flowable.just(1).concatWith(Flowable.never()) - .virtualTransform((v, e) -> e.emit(v), exec) + .virtualTransform((v, e, _) -> e.emit(v), exec) .test() .awaitDone(1, TimeUnit.SECONDS) .assertValues(1) diff --git a/src/test/java/io/reactivex/rxjava4/internal/virtual/VirtualTransformVirtualTest.java b/src/test/java/io/reactivex/rxjava4/internal/virtual/VirtualTransformVirtualTest.java index 9e891ed5dd..9c4b1e7153 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/virtual/VirtualTransformVirtualTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/virtual/VirtualTransformVirtualTest.java @@ -31,7 +31,7 @@ public void checkIsInsideVirtualThread() { var cancelled = new AtomicBoolean(); Flowable.range(1, 5) .doOnCancel(() -> cancelled.set(true)) - .virtualTransform((v, emitter) -> emitter.emit(v)) + .virtualTransform((v, emitter, _) -> emitter.emit(v)) .take(1) .test() .awaitDone(5, TimeUnit.SECONDS) @@ -43,7 +43,7 @@ public void checkIsInsideVirtualThread() { @Test public void errorUpstream() throws Throwable { Flowable.error(new IOException()) - .virtualTransform((v, e) -> e.emit(v)) + .virtualTransform((v, e, _) -> e.emit(v)) .test() .awaitDone(5, TimeUnit.SECONDS) .assertError(IOException.class) @@ -53,7 +53,7 @@ public void errorUpstream() throws Throwable { @Test public void errorTransform() throws Throwable { Flowable.range(1, 5) - .virtualTransform((_, _) -> { throw new IOException(); }) + .virtualTransform((_, _, _) -> { throw new IOException(); }) .test() .awaitDone(5, TimeUnit.SECONDS) .assertError(IOException.class) @@ -63,7 +63,7 @@ public void errorTransform() throws Throwable { @Test public void take() throws Throwable { Flowable.range(1, 5) - .virtualTransform((v, e) -> e.emit(v)) + .virtualTransform((v, e, _) -> e.emit(v)) .take(2) .test() .awaitDone(5, TimeUnit.SECONDS) @@ -74,7 +74,7 @@ public void take() throws Throwable { @Test public void observeOn() throws Throwable { Flowable.range(1, 10000) - .virtualTransform((v, e) -> e.emit(v)) + .virtualTransform((v, e, _) -> e.emit(v)) .observeOn(Schedulers.single(), false, 2) .test() .awaitDone(5, TimeUnit.SECONDS) @@ -85,7 +85,7 @@ public void observeOn() throws Throwable { @Test public void empty() throws Throwable { Flowable.empty() - .virtualTransform((v, e) -> e.emit(v)) + .virtualTransform((v, e, _) -> e.emit(v)) .test() .awaitDone(5, TimeUnit.SECONDS) .assertResult() @@ -95,7 +95,7 @@ public void empty() throws Throwable { @Test public void emptyNever() throws Throwable { Flowable.just(1).concatWith(Flowable.never()) - .virtualTransform((v, e) -> e.emit(v)) + .virtualTransform((v, e, _) -> e.emit(v)) .test() .awaitDone(1, TimeUnit.SECONDS) .assertValues(1) diff --git a/src/test/java/io/reactivex/rxjava4/internal/virtual/tck/FlowableVirtualTransformExecutor1TckTest.java b/src/test/java/io/reactivex/rxjava4/internal/virtual/tck/FlowableVirtualTransformExecutor1TckTest.java index f0c9344e39..64df518ad7 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/virtual/tck/FlowableVirtualTransformExecutor1TckTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/virtual/tck/FlowableVirtualTransformExecutor1TckTest.java @@ -29,7 +29,7 @@ public Publisher createFlowPublisher(final long elements) { var half = elements >> 1; var rest = elements - half; return Flowable.rangeLong(0, rest) - .virtualTransform((v, emitter) -> { + .virtualTransform((v, emitter, _) -> { emitter.emit(v); if (v < rest - 1 || half == rest) { emitter.emit(v); @@ -40,7 +40,7 @@ public Publisher createFlowPublisher(final long elements) { @Override public Publisher createFailedFlowPublisher() { return Flowable.just(1) - .virtualTransform((_, _) -> { + .virtualTransform((_, _, _) -> { throw new IOException(); }, service, 1); } diff --git a/src/test/java/io/reactivex/rxjava4/internal/virtual/tck/FlowableVirtualTransformExecutor2TckTest.java b/src/test/java/io/reactivex/rxjava4/internal/virtual/tck/FlowableVirtualTransformExecutor2TckTest.java index 44e1f1715b..cf38083dc5 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/virtual/tck/FlowableVirtualTransformExecutor2TckTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/virtual/tck/FlowableVirtualTransformExecutor2TckTest.java @@ -31,7 +31,7 @@ public Publisher createFlowPublisher(final long elements) { var half = elements >> 1; var rest = elements - half; return Flowable.rangeLong(0, rest) - .virtualTransform((v, emitter) -> { + .virtualTransform((v, emitter, _) -> { emitter.emit(v); if (v < rest - 1 || half == rest) { emitter.emit(v); @@ -42,7 +42,7 @@ public Publisher createFlowPublisher(final long elements) { @Override public Publisher createFailedFlowPublisher() { return Flowable.just(1) - .virtualTransform((_, _) -> { + .virtualTransform((_, _, _) -> { throw new IOException(); }, service, 2); } @@ -56,7 +56,7 @@ public void slowProducer() { Thread.sleep(10); return v; }) - .virtualTransform((v, emitter) -> { + .virtualTransform((v, emitter, _) -> { emitter.emit(v); }, service, 2) .test() diff --git a/src/test/java/io/reactivex/rxjava4/internal/virtual/tck/FlowableVirtualTransformExecutor3TckTest.java b/src/test/java/io/reactivex/rxjava4/internal/virtual/tck/FlowableVirtualTransformExecutor3TckTest.java index 068569504f..657bca3603 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/virtual/tck/FlowableVirtualTransformExecutor3TckTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/virtual/tck/FlowableVirtualTransformExecutor3TckTest.java @@ -29,7 +29,7 @@ public Publisher createFlowPublisher(final long elements) { var half = elements >> 1; var rest = elements - half; return Flowable.rangeLong(0, rest) - .virtualTransform((v, emitter) -> { + .virtualTransform((v, emitter, _) -> { emitter.emit(v); if (v < rest - 1 || half == rest) { emitter.emit(v); @@ -40,7 +40,7 @@ public Publisher createFlowPublisher(final long elements) { @Override public Publisher createFailedFlowPublisher() { return Flowable.error(new IOException()) - .virtualTransform((_, _) -> { + .virtualTransform((_, _, _) -> { }, service); } } diff --git a/src/test/java/io/reactivex/rxjava4/internal/virtual/tck/FlowableVirtualTransformExecutorTckTest.java b/src/test/java/io/reactivex/rxjava4/internal/virtual/tck/FlowableVirtualTransformExecutorTckTest.java index b43b12b5b3..07ccfbdc2c 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/virtual/tck/FlowableVirtualTransformExecutorTckTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/virtual/tck/FlowableVirtualTransformExecutorTckTest.java @@ -32,7 +32,7 @@ public Publisher createFlowPublisher(final long elements) { var half = elements >> 1; var rest = elements - half; return Flowable.rangeLong(0, rest) - .virtualTransform((v, emitter) -> { + .virtualTransform((v, emitter, _) -> { emitter.emit(v); if (v < rest - 1 || half == rest) { emitter.emit(v); @@ -43,7 +43,7 @@ public Publisher createFlowPublisher(final long elements) { @Override public Publisher createFailedFlowPublisher() { return Flowable.just(1) - .virtualTransform((_, _) -> { + .virtualTransform((_, _, _) -> { throw new IOException(); }, service); } @@ -70,7 +70,7 @@ public void slowProducer() { return v; }) .doOnRequest(v -> log.offer("Transform requested: " + v)) - .virtualTransform((v, emitter) -> { + .virtualTransform((v, emitter, _) -> { log.offer("Tansform before emit: " + v); emitter.emit(v); log.offer("Tansform after emit: " + v); @@ -106,7 +106,7 @@ public void slowProducerService() { } return v; }) - .virtualTransform((v, emitter) -> { + .virtualTransform((v, emitter, _) -> { emitter.emit(v); }, service) .test() diff --git a/src/test/java/io/reactivex/rxjava4/internal/virtual/tck/FlowableVirtualTransformVirtual1TckTest.java b/src/test/java/io/reactivex/rxjava4/internal/virtual/tck/FlowableVirtualTransformVirtual1TckTest.java index c07e22943d..547d26b724 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/virtual/tck/FlowableVirtualTransformVirtual1TckTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/virtual/tck/FlowableVirtualTransformVirtual1TckTest.java @@ -30,7 +30,7 @@ public Publisher createFlowPublisher(final long elements) { var half = elements >> 1; var rest = elements - half; return Flowable.rangeLong(0, rest) - .virtualTransform((v, emitter) -> { + .virtualTransform((v, emitter, _) -> { emitter.emit(v); if (v < rest - 1 || half == rest) { emitter.emit(v); @@ -41,7 +41,7 @@ public Publisher createFlowPublisher(final long elements) { @Override public Publisher createFailedFlowPublisher() { return Flowable.just(1) - .virtualTransform((_, _) -> { + .virtualTransform((_, _, _) -> { throw new IOException(); }, Schedulers.virtual(), 1); } diff --git a/src/test/java/io/reactivex/rxjava4/internal/virtual/tck/FlowableVirtualTransformVirtual2TckTest.java b/src/test/java/io/reactivex/rxjava4/internal/virtual/tck/FlowableVirtualTransformVirtual2TckTest.java index d478809845..27b7f30fc4 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/virtual/tck/FlowableVirtualTransformVirtual2TckTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/virtual/tck/FlowableVirtualTransformVirtual2TckTest.java @@ -31,7 +31,7 @@ public Publisher createFlowPublisher(final long elements) { var half = elements >> 1; var rest = elements - half; return Flowable.rangeLong(0, rest) - .virtualTransform((v, emitter) -> { + .virtualTransform((v, emitter, _) -> { emitter.emit(v); if (v < rest - 1 || half == rest) { emitter.emit(v); @@ -42,7 +42,7 @@ public Publisher createFlowPublisher(final long elements) { @Override public Publisher createFailedFlowPublisher() { return Flowable.just(1) - .virtualTransform((_, _) -> { + .virtualTransform((_, _, _) -> { throw new IOException(); }, Schedulers.virtual(), 2); } @@ -56,7 +56,7 @@ public void slowProducer() { Thread.sleep(10); return v; }) - .virtualTransform((v, emitter) -> { + .virtualTransform((v, emitter, _) -> { emitter.emit(v); }, Schedulers.virtual(), 2) .test() diff --git a/src/test/java/io/reactivex/rxjava4/internal/virtual/tck/FlowableVirtualTransformVirtual3TckTest.java b/src/test/java/io/reactivex/rxjava4/internal/virtual/tck/FlowableVirtualTransformVirtual3TckTest.java index 31861a3d5b..9a896c71c0 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/virtual/tck/FlowableVirtualTransformVirtual3TckTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/virtual/tck/FlowableVirtualTransformVirtual3TckTest.java @@ -30,7 +30,7 @@ public Publisher createFlowPublisher(final long elements) { var half = elements >> 1; var rest = elements - half; return Flowable.rangeLong(0, rest) - .virtualTransform((v, emitter) -> { + .virtualTransform((v, emitter, _) -> { emitter.emit(v); if (v < rest - 1 || half == rest) { emitter.emit(v); @@ -41,7 +41,7 @@ public Publisher createFlowPublisher(final long elements) { @Override public Publisher createFailedFlowPublisher() { return Flowable.error(new IOException()) - .virtualTransform((_, _) -> { + .virtualTransform((_, _, _) -> { }, Schedulers.virtual(), Flowable.bufferSize()); } } diff --git a/src/test/java/io/reactivex/rxjava4/internal/virtual/tck/FlowableVirtualTransformVirtualTckTest.java b/src/test/java/io/reactivex/rxjava4/internal/virtual/tck/FlowableVirtualTransformVirtualTckTest.java index ee42e56d96..2e53bb46a1 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/virtual/tck/FlowableVirtualTransformVirtualTckTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/virtual/tck/FlowableVirtualTransformVirtualTckTest.java @@ -32,7 +32,7 @@ public Publisher createFlowPublisher(final long elements) { var half = elements >> 1; var rest = elements - half; return Flowable.rangeLong(0, rest) - .virtualTransform((v, emitter) -> { + .virtualTransform((v, emitter, _) -> { emitter.emit(v); if (v < rest - 1 || half == rest) { emitter.emit(v); @@ -43,7 +43,7 @@ public Publisher createFlowPublisher(final long elements) { @Override public Publisher createFailedFlowPublisher() { return Flowable.just(1) - .virtualTransform((_, _) -> { + .virtualTransform((_, _, _) -> { throw new IOException(); }, Schedulers.virtual(), Flowable.bufferSize()); } @@ -70,7 +70,7 @@ public void slowProducer() { return v; }) .doOnRequest(v -> log.offer("Transform requested: " + v)) - .virtualTransform((v, emitter) -> { + .virtualTransform((v, emitter, _) -> { log.offer("Tansform before emit: " + v); emitter.emit(v); log.offer("Tansform after emit: " + v); @@ -106,7 +106,7 @@ public void slowProducerService() { } return v; }) - .virtualTransform((v, emitter) -> { + .virtualTransform((v, emitter, _) -> { emitter.emit(v); }, Schedulers.virtual(), Flowable.bufferSize()) .test() diff --git a/src/test/java/io/reactivex/rxjava4/testsupport/TestHelper.java b/src/test/java/io/reactivex/rxjava4/testsupport/TestHelper.java index bdd931eda2..10546c9ad4 100644 --- a/src/test/java/io/reactivex/rxjava4/testsupport/TestHelper.java +++ b/src/test/java/io/reactivex/rxjava4/testsupport/TestHelper.java @@ -3930,7 +3930,127 @@ public void cancel() { * @throws Throwable propagate exceptions */ public static void withVirtual(Consumer call) throws Throwable { - try (var exec = Executors.newThreadPerTaskExecutor(Thread.ofVirtual().factory())) { + try (var exec = new ExecutorIntercept(Executors.newThreadPerTaskExecutor(Thread.ofVirtual().factory()), false)) { + call.accept(exec); + } + } + + /** + * Execute a call within a virtual thread of the standard virtual thread executor. + * @param call the call to invoke + * @throws Throwable the exception propagated out + */ + public static void onVirtual(Consumer call) throws Throwable { + withVirtual(exec -> { + exec.submit(() -> { + try { + call.accept(exec); + } catch (Throwable ex) { + throw Exceptions.propagate(ex); + } + }).get(); + }); + } + + record ExecutorIntercept(ExecutorService service, boolean printStackTrace) implements ExecutorService { + + @Override + public void execute(Runnable command) { + service.execute(command); + } + + @Override + public void shutdown() { + if (printStackTrace) { + new CancellationException("ExecutorIntercept::shutdown").printStackTrace(); + } + service.shutdown(); + } + + @Override + public List shutdownNow() { + if (printStackTrace) { + new CancellationException("ExecutorIntercept::shutdownNow").printStackTrace(); + } + return service.shutdownNow(); + } + + @Override + public boolean isShutdown() { + return service.isShutdown(); + } + + @Override + public boolean isTerminated() { + return service.isTerminated(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return service.awaitTermination(timeout, unit); + } + + @Override + public Future submit(Callable task) { + return service.submit(task); + } + + @Override + public Future submit(Runnable task, T result) { + return service.submit(task, result); + } + + @Override + public Future submit(Runnable task) { + return service.submit(task); + } + + @Override + public List> invokeAll(Collection> tasks) throws InterruptedException { + return service.invokeAll(tasks); + } + + @Override + public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) + throws InterruptedException { + return service.invokeAll(tasks, timeout, unit); + } + + @Override + public T invokeAny(Collection> tasks) + throws InterruptedException, ExecutionException { + return service.invokeAny(tasks); + } + + @Override + public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return service.invokeAny(tasks, timeout, unit); + } + } + + /** + * Execute a test body with the help of a single thread executor service. + *

+ * Don't forget to {@link ExecutorService#submit(Callable)} your work! + * @param call the callback to give the VTE. + * @throws Throwable propagate exceptions + */ + public static void withSingleExecutor(Consumer call) throws Throwable { + try (var exec = Executors.newSingleThreadScheduledExecutor()) { + call.accept(exec); + } + } + + /** + * Execute a test body with the help of a cached executor service. + *

+ * Don't forget to {@link ExecutorService#submit(Callable)} your work! + * @param call the callback to give the VTE. + * @throws Throwable propagate exceptions + */ + public static void withCachedExecutor(Consumer call) throws Throwable { + try (var exec = new ExecutorIntercept(Executors.newCachedThreadPool(), false)) { call.accept(exec); } } diff --git a/src/test/java/io/reactivex/rxjava4/validators/JavadocWording.java b/src/test/java/io/reactivex/rxjava4/validators/JavadocWording.java index 2c511977b1..48ab92d2c7 100644 --- a/src/test/java/io/reactivex/rxjava4/validators/JavadocWording.java +++ b/src/test/java/io/reactivex/rxjava4/validators/JavadocWording.java @@ -313,7 +313,8 @@ public void flowableDocRefersToFlowableTypes() throws Exception { && !m.signature.contains("void subscribe") ) { CharSequence subSequence = m.javadoc.subSequence(idx - 6, idx + 11); - if (idx < 6 || !subSequence.equals("{@link Disposable")) { + if ((idx < 6 || !subSequence.equals("{@link Disposable")) + && !subSequence.toString().contains(", Disposable")) { e.append("java.lang.RuntimeException: Flowable doc mentions Disposable but not using Flowable\r\n at io.reactivex.rxjava4.core.") .append("Flowable.method(Flowable.java:").append(m.javadocLine + lineNumber(m.javadoc, idx) - 1).append(")\r\n\r\n"); } @@ -456,7 +457,8 @@ public void parallelFlowableDocRefersToCorrectTypes() throws Exception { && !m.signature.contains("Disposable") ) { CharSequence subSequence = m.javadoc.subSequence(idx - 6, idx + 11); - if (idx < 6 || !subSequence.equals("{@link Disposable")) { + if ((idx < 6 || !subSequence.equals("{@link Disposable")) + && !subSequence.toString().contains(", Disposable")) { e.append("java.lang.RuntimeException: Flowable doc mentions Disposable but not using Flowable\r\n at io.reactivex.rxjava4.core.") .append("Flowable.method(Flowable.java:").append(m.javadocLine + lineNumber(m.javadoc, idx) - 1).append(")\r\n\r\n"); } diff --git a/src/test/java/io/reactivex/rxjava4/validators/ParamValidationCheckerTest.java b/src/test/java/io/reactivex/rxjava4/validators/ParamValidationCheckerTest.java index 8d71448d97..e1c48761e2 100644 --- a/src/test/java/io/reactivex/rxjava4/validators/ParamValidationCheckerTest.java +++ b/src/test/java/io/reactivex/rxjava4/validators/ParamValidationCheckerTest.java @@ -609,7 +609,7 @@ public void checkParallelFlowable() { defaultValues.put(ExecutorService.class, Executors.newVirtualThreadPerTaskExecutor()); - VirtualTransformer trs = (_, _) -> { }; + VirtualTransformer trs = (_, _, _) -> { }; defaultValues.put(VirtualTransformer.class, trs); VirtualGenerator vg = _ -> { };