Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,30 @@

import java.util.concurrent.CompletionStage;

import io.reactivex.rxjava4.disposables.Disposable;
import io.reactivex.rxjava4.annotations.NonNull;
import io.reactivex.rxjava4.disposables.*;

public record CompletionStageDisposable<T>(CompletionStage<T> stage, Disposable disposable) {
/**
* Consist of a terminal stage and a disposable to be able to cancel a sequence.
* @param <T> the return and element type of the various stages
* @param stage the embedded stage to work with
* @param disposable the way to cancel the stage concurrently
* @since 4.0.0
*/
public record CompletionStageDisposable<T>(@NonNull CompletionStage<T> stage, @NonNull Disposable disposable) {

/**
* Await the completion of the current stage.
*/
public void await() {
Streamer.await(stage);
}

/**
* Await the completion of the current stage.
* @param canceller the canceller link
*/
public void await(DisposableContainer canceller) {
Streamer.await(stage, canceller);
}
}
61 changes: 56 additions & 5 deletions src/main/java/io/reactivex/rxjava4/core/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.reactivex.rxjava4.internal.operators.mixed.*;
import io.reactivex.rxjava4.internal.operators.observable.ObservableFromPublisher;
import io.reactivex.rxjava4.internal.operators.single.SingleToFlowable;
import io.reactivex.rxjava4.internal.operators.streamable.StreamableFromPublisher;
import io.reactivex.rxjava4.internal.schedulers.ImmediateThinScheduler;
import io.reactivex.rxjava4.internal.subscribers.*;
import io.reactivex.rxjava4.internal.util.*;
Expand Down Expand Up @@ -16023,7 +16024,7 @@ public final void subscribe(@NonNull FlowableSubscriber<? super T> subscriber) {
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);

NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
var npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
Expand Down Expand Up @@ -20928,7 +20929,7 @@ public final Stream<T> blockingStream(int prefetch) {
* {@code ExecutorService} uses virtual threads, such as the one returned by
* {@link Executors#newVirtualThreadPerTaskExecutor()}.
* @param <R> the downstream element type
* @param transformer the callback whose {@link VirtualTransformer#transform(Object, VirtualEmitter)}
* @param transformer the callback whose {@link VirtualTransformer#transform(Object, VirtualEmitter, Disposable)}
* is invoked for each upstream item
* @param executor the target {@code ExecutorService} to use for running the callback
* @return the new {@code Flowable} instance
Expand Down Expand Up @@ -20961,7 +20962,7 @@ public final Stream<T> blockingStream(int prefetch) {
* {@link Scheduler} uses virtual threads, such as the one returned by
* {@link Executors#newVirtualThreadPerTaskExecutor()}.
* @param <R> the downstream element type
* @param transformer the callback whose {@link VirtualTransformer#transform(Object, VirtualEmitter)}
* @param transformer the callback whose {@link VirtualTransformer#transform(Object, VirtualEmitter, Disposable)}
* is invoked for each upstream item
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code transformer} is {@code null}
Expand Down Expand Up @@ -20992,7 +20993,7 @@ public final Stream<T> blockingStream(int prefetch) {
* {@code Scheduler} uses virtual threads, such as the one returned by
* {@link Schedulers#virtual()}.
* @param <R> the downstream element type
* @param transformer the callback whose {@link VirtualTransformer#transform(Object, VirtualEmitter)}
* @param transformer the callback whose {@link VirtualTransformer#transform(Object, VirtualEmitter, Disposable)}
* is invoked for each upstream item
* @param scheduler the target {@code Scheduler} to use for running the callback
* @param prefetch the number of items to fetch from the upstream.
Expand Down Expand Up @@ -21030,7 +21031,7 @@ public final Stream<T> blockingStream(int prefetch) {
* {@code ExecutorService} uses virtual threads, such as the one returned by
* {@link Executors#newVirtualThreadPerTaskExecutor()}.
* @param <R> the downstream element type
* @param transformer the callback whose {@link VirtualTransformer#transform(Object, VirtualEmitter)}
* @param transformer the callback whose {@link VirtualTransformer#transform(Object, VirtualEmitter, Disposable)}
* is invoked for each upstream item
* @param executor the target {@code ExecutorService} to use for running the callback
* @param prefetch the number of items to fetch from the upstream.
Expand All @@ -21051,4 +21052,54 @@ public final Stream<T> blockingStream(int prefetch) {
return new FlowableVirtualTransformExecutor<>(this, transformer, executor, null, prefetch);
}

/**
* Converts this {@code Flowable} into a {@link Streamable} instance,
* transparently relaying signals between the two async representations of a sequence.
* <p>
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>This operator requests from the upstream in a bounded manner and
* relays the values one-by-one to the {@link Streamer } view of the sequence.
* </dd>
* <dt><b>Scheduler:</b></dt>
* <dd>The operator by design runs on the default virtual executor of the system.</dd>
* </dl>
* <p>
* @return the new {@code Streamable} instance
* @since 4.0.0
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.VIRTUAL)
@NonNull
public final Streamable<T> toStreamable() {
return toStreamable(Executors.newVirtualThreadPerTaskExecutor());
}

/**
* Converts this {@code Flowable} into a {@link Streamable} instance,
* transparently relaying signals between the two async representations of a sequence.
* <p>
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>This operator requests from the upstream in a bounded manner and
* relays the values one-by-one to the {@link Streamer } view of the sequence.
* </dd>
* <dt><b>Scheduler:</b></dt>
* <dd>The operator runs on the provided {@link ExecutorService}.</dd>
* </dl>
* <p>
* @param executor where the coordination will happen
* @return the new {@code Streamable} instance
* @throws NullPointerException if {@code executor} is {@code null}
* @since 4.0.0
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final Streamable<T> toStreamable(ExecutorService executor) {
Objects.requireNonNull(executor, "executor is null");
return new StreamableFromPublisher<>(this, executor);
}
}
55 changes: 54 additions & 1 deletion src/main/java/io/reactivex/rxjava4/core/Scheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
package io.reactivex.rxjava4.core;

import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.rxjava4.annotations.*;
import io.reactivex.rxjava4.disposables.Disposable;
Expand Down Expand Up @@ -381,6 +382,29 @@ public <S extends Scheduler & Disposable> S when(@NonNull Function<Flowable<Flow
return (S) new SchedulerWhen(combine, this);
}

/**
* Turn this Scheduler into an ExecutorService implementation
* using its various *Direct() methods instead of workers.
* @return the ExecutorService view of this Scheduler
* @since 4.0.0
*/
public ExecutorService toExecutorService() {
return new SchedulerToExecutorService(this, null);
}

/**
* Turn this Scheduler into an ExecutorService implementation
* using its various *Direct() methods or worker methods,
* depending on the parameter.
* @param useWorker if true, one of the workers is used as an executorservice,
* if false, the whole scheduler and its *Direct methods are used.
* @return the ExecutorService view of this Scheduler
* @since 4.0.0
*/
public ExecutorService toExecutorService(boolean useWorker) {
return new SchedulerToExecutorService(this, new AtomicReference<>());
}

/**
* Represents an isolated, sequential worker of a parent Scheduler for executing {@code Runnable} tasks on
* an underlying task-execution scheme (such as custom Threads, event loop, {@link java.util.concurrent.Executor Executor} or Actor system).
Expand Down Expand Up @@ -572,6 +596,35 @@ public Runnable getWrappedRunnable() {
return this.decoratedRun;
}
}

/**
* A stateless Worker that reports itself as shutdown and doesn't do anything.
* @since 4.0.0
*/
@NonNull
public static final Worker SHUTDOWN = new ShutdownWorker();
}

/**
* Implementation of a stateless, shutdown worker. For cleanup and termination purposes.
* @since 4.0.0
*/
static final class ShutdownWorker extends Worker {

@Override
public void dispose() {
}

@Override
public boolean isDisposed() {
return true;
}

@Override
public @NonNull Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
return Disposable.disposed();
}

}

static final class PeriodicDirectTask
Expand Down
Loading