Skip to content
Draft
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.mongodb.internal.async;

import com.mongodb.assertions.Assertions;
import com.mongodb.internal.TimeoutContext;
import com.mongodb.internal.async.function.AsyncCallbackLoop;
import com.mongodb.internal.async.function.LoopState;
Expand Down Expand Up @@ -206,6 +207,15 @@ default AsyncRunnable thenRunIf(final Supplier<Boolean> condition, final AsyncRu
};
}

/**
* @param condition The condition to check before each iteration
* @param body The body to run on each iteration
* @return the composition of this runnable and the loop, a runnable
*/
default AsyncRunnable loopWhile(final BooleanSupplier condition, final AsyncRunnable body) {
throw Assertions.fail("Not implemented");
}

/**
* @param supplier The supplier to supply using after this runnable
* @return the composition of this runnable and the supplier, a supplier
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,50 +41,160 @@
public final class AsyncCallbackLoop implements AsyncCallbackRunnable {
private final LoopState state;
private final AsyncCallbackRunnable body;
private final ThreadLocal<SameThreadDetectionStatus> sameThreadDetector;

/**
* @param state The {@link LoopState} to be deemed as initial for the purpose of the new {@link AsyncCallbackLoop}.
* @param body The body of the loop.
*/
public AsyncCallbackLoop(final LoopState state, final AsyncCallbackRunnable body) {
this.state = state;
this.body = body;
this.state = state;
sameThreadDetector = ThreadLocal.withInitial(() -> SameThreadDetectionStatus.NEGATIVE);
}

@Override
public void run(final SingleResultCallback<Void> callback) {
body.run(new LoopingCallback(callback));
run(false, callback);
}

/**
* This callback is allowed to be completed more than once.
* Initiates a new iteration of the loop by invoking
* {@link #body}{@code .}{@link AsyncCallbackRunnable#run(SingleResultCallback) run}.
* The initiated iteration may be executed either synchronously or asynchronously with the method that initiated it:
* <ul>
* <li>synchronous execution—completion of the initiated iteration is guaranteed to happen-before the method completion;
* <ul>
* <li>Note that the formulations
* <ol>
* <li>"completion of the initiated iteration is guaranteed to happen-before the method completion"</li>
* <li>"completion of the initiated iteration happens-before the method completion"</li>
* </ol>
* are different: the former is about the program while the latter is about the execution, and follows from the former.
* For us the former is useful.
* </li>
* </ul>
* </li>
* <li>asynchronous execution—the aforementioned guarantee does not exist.
* <ul>
* <li>Note that the formulations
* <ol>
* <li>"the aforementioned guarantee does not exist"</li>
* <li>"the aforementioned relation does not exist"</li>
* </ol>
* are different: the former is about the program while the latter is about the execution, and follows from the former.
* For us the former is useful.
* </li>
* </ul>
* </li>
* </ul>
*
* <p>If another iteration is needed, it is initiated from the callback passed to
* {@link #body}{@code .}{@link AsyncCallbackRunnable#run(SingleResultCallback) run}
* by invoking {@link #run(boolean, SingleResultCallback)}.
* Completing the initiated iteration is {@linkplain SingleResultCallback#onResult(Object, Throwable) invoking} the callback.
* Thus, it is guaranteed that all iterations are executed sequentially with each other
* (that is, completion of one iteration happens-before initiation of the next one)
* regardless of them being executed synchronously or asynchronously with the method that initiated them.
*
* <p>Initiating any but the {@linkplain LoopState#isFirstIteration() first} iteration is done using trampolining,
* which allows us to do it iteratively rather than recursively, if iterations are executed synchronously,
* and ensures stack usage does not increase with the number of iterations.
*
* @return {@code true} iff it is known that another iteration must be initiated.
* This information is used only for trampolining, and is available only if the iteration executed synchronously.
*
* <p>It is impossible to detect whether an iteration is executed synchronously.
* It is, however, possible to detect whether an iteration is executed in the same thread as the method that initiated it,
* and we use this as a proxy indicator of synchronous execution. Unfortunately, this means we do not support / behave incorrectly
* if an iteration is executed synchronously but in a thread different from the one in which the method that
* initiated the iteration was invoked.
*
* <p>The above limitation should not be a problem in practice:
* <ul>
* <li>the only way to execute an iteration synchronously but in a different thread is to block the thread that
* initiated the iteration by waiting for completion of the iteration by that other thread;</li>
* <li>blocking a thread is forbidden in asynchronous code, and we do not do it;</li>
* <li>therefore, we would not have an iteration that is executed synchronously but in a different thread.</li>
* </ul>
*/
@NotThreadSafe
private class LoopingCallback implements SingleResultCallback<Void> {
private final SingleResultCallback<Void> wrapped;

LoopingCallback(final SingleResultCallback<Void> callback) {
wrapped = callback;
}

@Override
public void onResult(@Nullable final Void result, @Nullable final Throwable t) {
if (t != null) {
wrapped.onResult(null, t);
} else {
boolean continueLooping;
try {
continueLooping = state.advance();
} catch (Throwable e) {
wrapped.onResult(null, e);
boolean run(final boolean trampolining, final SingleResultCallback<Void> afterLoopCallback) {
// The `trampoliningResult` variable must be used only if the initiated iteration is executed synchronously with
// the current method, which must be detected separately.
//
// It may be tempting to detect whether the iteration was executed synchronously by reading from the variable
// and observing a write that is part of the callback execution. However, if the iteration is executed asynchronously with
// the current method, then the aforementioned conflicting write and read actions are not ordered by
// the happens-before relation, the execution contains a data race and the read is allowed to observe the write.
// If such observation happens when the iteration is executed asynchronously, then we have a false positive.
// Furthermore, depending on the nature of the value read, it may not be trustworthy.
//
// Making `trampoliningResult` a `volatile`, or even making it an `AtomicReference`/`AtomicInteger` and calling `compareAndSet`
// does not resolve the issue: it gets rid of the data race, but still leave us with a race condition
// that allows for false positives.
boolean[] trampoliningResult = {false};
sameThreadDetector.set(SameThreadDetectionStatus.PROBING);
body.run((r, t) -> {
if (completeIfNeeded(afterLoopCallback, r, t)) {
// Bounce if we are trampolining and the iteration was executed synchronously,
// trampolining completes and so is the loop;
// otherwise, the loop simply completes.
return;
}
if (trampolining) {
boolean sameThread = sameThreadDetector.get().equals(SameThreadDetectionStatus.PROBING);
if (sameThread) {
// Bounce if we are trampolining and the iteration was executed synchronously;
// otherwise proceed to initiate trampolining.
sameThreadDetector.set(SameThreadDetectionStatus.POSITIVE);
trampoliningResult[0] = true;
return;
}
if (continueLooping) {
body.run(this);
} else {
wrapped.onResult(result, null);
sameThreadDetector.remove();
}
}
// initiate trampolining
boolean anotherIterationNeeded;
do {
anotherIterationNeeded = run(true, afterLoopCallback);
} while (anotherIterationNeeded);
});
try {
return sameThreadDetector.get().equals(SameThreadDetectionStatus.POSITIVE) && trampoliningResult[0];
} finally {
sameThreadDetector.remove();
}
}

/**
* @return {@code true} iff the {@code afterLoopCallback} was
* {@linkplain SingleResultCallback#onResult(Object, Throwable) completed}.
*/
private boolean completeIfNeeded(final SingleResultCallback<Void> afterLoopCallback,
@Nullable final Void result, @Nullable final Throwable t) {
if (t != null) {
afterLoopCallback.onResult(null, t);
return true;
} else {
boolean anotherIterationNeeded;
try {
anotherIterationNeeded = state.advance();
} catch (Throwable e) {
afterLoopCallback.onResult(null, e);
return true;
}
if (anotherIterationNeeded) {
return false;
} else {
afterLoopCallback.onResult(result, null);
return true;
}
}
}

private enum SameThreadDetectionStatus {
NEGATIVE,
PROBING,
POSITIVE
}
}
Loading