From 92632657aa6f84b7b298e1b2af75b675ca8aedff Mon Sep 17 00:00:00 2001 From: Valentin Kovalenko Date: Fri, 13 Feb 2026 17:09:22 -0700 Subject: [PATCH 01/15] first attempt --- .../mongodb/internal/async/AsyncRunnable.java | 9 +- .../async/function/AsyncCallbackLoop.java | 10 +- .../async/function/CallbackChain.java | 56 +++++ .../com/mongodb/internal/async/VakoTest.java | 214 ++++++++++++++++++ 4 files changed, 287 insertions(+), 2 deletions(-) create mode 100644 driver-core/src/main/com/mongodb/internal/async/function/CallbackChain.java create mode 100644 driver-core/src/test/unit/com/mongodb/internal/async/VakoTest.java diff --git a/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java b/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java index e404e2b8152..9d706d9f884 100644 --- a/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java +++ b/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java @@ -253,9 +253,16 @@ default AsyncRunnable thenRunRetryingWhile( * @see AsyncCallbackLoop */ default AsyncRunnable thenRunDoWhileLoop(final AsyncRunnable loopBodyRunnable, final BooleanSupplier whileCheck) { + return thenRunDoWhileLoop(true, loopBodyRunnable, whileCheck); + } + + default AsyncRunnable thenRunDoWhileLoop( + final boolean optimized, + final AsyncRunnable loopBodyRunnable, + final BooleanSupplier whileCheck) { return thenRun(finalCallback -> { LoopState loopState = new LoopState(); - new AsyncCallbackLoop(loopState, iterationCallback -> { + new AsyncCallbackLoop(optimized, loopState, iterationCallback -> { loopBodyRunnable.finish((result, t) -> { if (t != null) { diff --git a/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java b/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java index a347a2a7e47..f1531bc063b 100644 --- a/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java +++ b/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java @@ -39,6 +39,8 @@ */ @NotThreadSafe public final class AsyncCallbackLoop implements AsyncCallbackRunnable { + @Nullable + private final CallbackChain chain; private final LoopState state; private final AsyncCallbackRunnable body; @@ -47,6 +49,11 @@ public final class AsyncCallbackLoop implements AsyncCallbackRunnable { * @param body The body of the loop. */ public AsyncCallbackLoop(final LoopState state, final AsyncCallbackRunnable body) { + this(true, state, body); + } + + public AsyncCallbackLoop(final boolean optimized, final LoopState state, final AsyncCallbackRunnable body) { + this.chain = optimized ? new CallbackChain() : null; this.state = state; this.body = body; } @@ -54,6 +61,7 @@ public AsyncCallbackLoop(final LoopState state, final AsyncCallbackRunnable body @Override public void run(final SingleResultCallback callback) { body.run(new LoopingCallback(callback)); + CallbackChain.run(chain); } /** @@ -80,7 +88,7 @@ public void onResult(@Nullable final Void result, @Nullable final Throwable t) { return; } if (continueLooping) { - body.run(this); + CallbackChain.addOrRun(chain, () -> body.run(this)); } else { wrapped.onResult(result, null); } diff --git a/driver-core/src/main/com/mongodb/internal/async/function/CallbackChain.java b/driver-core/src/main/com/mongodb/internal/async/function/CallbackChain.java new file mode 100644 index 00000000000..7f93bee05a7 --- /dev/null +++ b/driver-core/src/main/com/mongodb/internal/async/function/CallbackChain.java @@ -0,0 +1,56 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.mongodb.internal.async.function; + +import com.mongodb.lang.Nullable; + +import static com.mongodb.assertions.Assertions.assertNotNull; +import static com.mongodb.assertions.Assertions.assertNull; + +public final class CallbackChain { + @Nullable + private Runnable next; + + public CallbackChain() { + } + + public static void addOrRun(@Nullable final CallbackChain chain, final Runnable next) { + if (chain != null) { + chain.add(next); + } else { + next.run(); + } + } + + public static void run(final @Nullable CallbackChain chain) { + if (chain != null) { + chain.run(); + } + } + + private void add(final Runnable next) { + assertNotNull(next); + assertNull(this.next); + this.next = next; + } + + private void run() { + for (Runnable localNext = next; localNext != null; localNext = next) { + next = null; + localNext.run(); + } + } +} diff --git a/driver-core/src/test/unit/com/mongodb/internal/async/VakoTest.java b/driver-core/src/test/unit/com/mongodb/internal/async/VakoTest.java new file mode 100644 index 00000000000..40795bf93fc --- /dev/null +++ b/driver-core/src/test/unit/com/mongodb/internal/async/VakoTest.java @@ -0,0 +1,214 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.mongodb.internal.async; + +import com.mongodb.internal.async.function.AsyncCallbackLoop; +import com.mongodb.internal.async.function.LoopState; +import com.mongodb.lang.Nullable; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.ValueSource; + +import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; + +import static com.mongodb.internal.async.AsyncRunnable.beginAsync; + +class VakoTest { + @ParameterizedTest + @CsvSource({"false, 20", "true, 20"}) + void asyncCallbackLoop(final boolean optimized, final int iterations) { + System.err.printf("baselineStackDepth=%d%n", Thread.currentThread().getStackTrace().length); + LoopState loopState = new LoopState(); + new AsyncCallbackLoop(optimized, loopState, c -> { + int iteration = loopState.iteration(); + System.err.printf("iteration=%d, callStackDepth=%d%n", iteration, Thread.currentThread().getStackTrace().length); + if (!loopState.breakAndCompleteIf(() -> iteration == (iterations - 1), c)) { + c.complete(c); + } + }).run((r, t) -> { + System.err.printf("test callback completed callStackDepth=%d, r=%s, t=%s%n", + Thread.currentThread().getStackTrace().length, r, exceptionToString(t)); + }); + } + + @ParameterizedTest + @CsvSource({"false, 20", "true, 20"}) + void testA(final boolean optimized, final int counterValue) { + System.err.printf("baselineStackDepth=%d%n", Thread.currentThread().getStackTrace().length); + asyncMethod1A(optimized, new Counter(counterValue), (r, t) -> { + System.err.printf("test callback completed callStackDepth=%s, r=%s, t=%s%n", + Thread.currentThread().getStackTrace().length, r, exceptionToString(t)); + }); + } + + private static void asyncMethod1A(final boolean optimized, final Counter counter, final SingleResultCallback callback) { + beginAsync().thenRunDoWhileLoop(optimized, c -> { + asyncMethod2A(counter, c); + }, () -> !counter.done()).finish(callback); + } + + private static void asyncMethod2A(final Counter counter, final SingleResultCallback callback) { + counter.countDown(); + callback.complete(callback); + } + + @ParameterizedTest + @ValueSource(ints = {10}) + void testB(final int counterValue) { + AtomicInteger stackDepthUnoptimized = new AtomicInteger(); + AtomicInteger stackDepthOptimized = new AtomicInteger(); + asyncMethod1B(false, new Counter(counterValue), (r, t) -> { + stackDepthUnoptimized.set(Thread.currentThread().getStackTrace().length); + System.err.printf("test callback completed callStackDepth=%s, r=%s, t=%s%n", + stackDepthUnoptimized, r, exceptionToString(t)); + }); + asyncMethod1B(true, new Counter(counterValue), (r, t) -> { + stackDepthOptimized.set(Thread.currentThread().getStackTrace().length); + System.err.printf("test callback completed callStackDepth=%s, r=%s, t=%s%n", + stackDepthOptimized, r, exceptionToString(t)); + }); + System.err.printf("test completed baselineStackDepth=%d, stackDepthUnoptimized=%s, stackDepthOptimized=%s%n", + Thread.currentThread().getStackTrace().length, stackDepthOptimized, stackDepthUnoptimized); + } + + private static void asyncMethod1B(final boolean optimized, final Counter counter, final SingleResultCallback callback) { + asyncMethod2B(counter, new Callback(optimized, counter, callback)); + } + + private static void asyncMethod2B(final Counter counter, final SingleResultCallback callback) { + counter.countDown(); + callback.complete(callback); + } + + private static final class Callback implements SingleResultCallback { + private final boolean optimized; + private Counter counter; + private SingleResultCallback callback; + + Callback(final boolean optimized, final Counter counter, final SingleResultCallback callback) { + this.optimized = optimized; + this.counter = counter; + this.callback = callback; + } + + Counter takeCounter() { + Counter localCounter = com.mongodb.assertions.Assertions.assertNotNull(counter); + counter = null; + return localCounter; + } + + void setCounter(final Counter counter) { + com.mongodb.assertions.Assertions.assertNull(this.counter); + this.counter = counter; + } + + SingleResultCallback takeCallback() { + SingleResultCallback localCallback = com.mongodb.assertions.Assertions.assertNotNull(callback); + callback = null; + return localCallback; + } + + void setCallback(final SingleResultCallback callback) { + com.mongodb.assertions.Assertions.assertNull(this.callback); + this.callback = callback; + } + + @Override + public void onResult(final Void result, final Throwable t) { + SingleResultCallback localCallback = takeCallback(); + beginAsync().thenRun((c) -> { + System.err.printf("thenRun%n"); + Counter localCounter = takeCounter(); + if (t != null) { + System.err.printf("exception t=%s%n", exceptionToString(t)); + c.completeExceptionally(t); + } else if (localCounter.done()) { + c.complete(c); + } else { + asyncMethod2B(localCounter, new Callback(optimized, localCounter, localCallback)); + } + }).finish((r, t2) -> { + System.err.printf("finish r=%s, t=%s%n", r, exceptionToString(t2)); + localCallback.onResult(r, t); + }); + } + } + + private static final class Counter { + private int v; + + Counter(final int v) { + this.v = v; + } + + void countDown() { + com.mongodb.assertions.Assertions.assertTrue(v > 0); + v--; + System.err.printf("counted %d->%d callStackDepth=%d %n", v + 1, v, Thread.currentThread().getStackTrace().length); + } + + boolean done() { + if (v == 0) { + System.err.printf("counting done callStackDepth=%d %n", Thread.currentThread().getStackTrace().length); + return true; + } + return false; + } + } + + private static String exceptionToString(@Nullable final Throwable t) { + if (t == null) { + return Objects.toString(null); + } + try (StringWriter sw = new StringWriter(); + PrintWriter pw = new PrintWriter(sw)) { + t.printStackTrace(pw); + pw.flush(); + return sw.toString(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} + +/* + + c3.complete{ + //c3... + c2.complete{ + //c2... + c1.complete{ + //c1... + } + } + + c3.complete{ + //c3... + chain.add(c2) + } + chain.run() -> c2.complete{ + //c2... + chain.add(c1) + } + chain.run() -> c1.complete{ + //c1... + } + + */ From 3b7c659d7898276f7cc26427e3852fb8dd3afdb9 Mon Sep 17 00:00:00 2001 From: Valentin Kovalenko Date: Sat, 14 Feb 2026 03:59:51 -0700 Subject: [PATCH 02/15] second attempt --- .../async/function/AsyncCallbackLoop.java | 3 +- .../async/function/CallbackChain.java | 42 ++-- .../async/AsyncFunctionsAbstractTest.java | 19 ++ .../com/mongodb/internal/async/VakoTest.java | 187 ++++++------------ 4 files changed, 107 insertions(+), 144 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java b/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java index f1531bc063b..37410d71517 100644 --- a/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java +++ b/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java @@ -61,7 +61,6 @@ public AsyncCallbackLoop(final boolean optimized, final LoopState state, final A @Override public void run(final SingleResultCallback callback) { body.run(new LoopingCallback(callback)); - CallbackChain.run(chain); } /** @@ -88,7 +87,7 @@ public void onResult(@Nullable final Void result, @Nullable final Throwable t) { return; } if (continueLooping) { - CallbackChain.addOrRun(chain, () -> body.run(this)); + CallbackChain.execute(chain, () -> body.run(this)); } else { wrapped.onResult(result, null); } diff --git a/driver-core/src/main/com/mongodb/internal/async/function/CallbackChain.java b/driver-core/src/main/com/mongodb/internal/async/function/CallbackChain.java index 7f93bee05a7..f4bd2e31abc 100644 --- a/driver-core/src/main/com/mongodb/internal/async/function/CallbackChain.java +++ b/driver-core/src/main/com/mongodb/internal/async/function/CallbackChain.java @@ -17,40 +17,50 @@ import com.mongodb.lang.Nullable; +import java.util.concurrent.atomic.AtomicReference; + import static com.mongodb.assertions.Assertions.assertNotNull; import static com.mongodb.assertions.Assertions.assertNull; -public final class CallbackChain { +final class CallbackChain { @Nullable private Runnable next; + private int runEnteredCounter; + private final AtomicReference thread; - public CallbackChain() { + CallbackChain() { + runEnteredCounter = 0; + thread = new AtomicReference<>(); } - public static void addOrRun(@Nullable final CallbackChain chain, final Runnable next) { + static void execute(@Nullable final CallbackChain chain, final Runnable next) { if (chain != null) { - chain.add(next); + chain.execute(next); } else { next.run(); } } - public static void run(final @Nullable CallbackChain chain) { - if (chain != null) { - chain.run(); - } - } - - private void add(final Runnable next) { + // VAKOTODO figure out thread safety + private void execute(final Runnable next) { assertNotNull(next); assertNull(this.next); this.next = next; - } - private void run() { - for (Runnable localNext = next; localNext != null; localNext = next) { - next = null; - localNext.run(); +// if (!thread.compareAndSet(null, Thread.currentThread())) { +// assertTrue(Thread.currentThread() == thread.get()); +// } + boolean recursive = ++runEnteredCounter > 1; + try { + if (recursive) { + return; + } + for (Runnable localNext = next; localNext != null; localNext = this.next) { + this.next = null; + localNext.run(); + } + } finally { + runEnteredCounter--; } } } diff --git a/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsAbstractTest.java b/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsAbstractTest.java index 9a9b7552d3e..8b102a8102b 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsAbstractTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsAbstractTest.java @@ -990,4 +990,23 @@ void testDerivation() { }).finish(callback); }); } + + @Test + void testThenRunDoWhileLoop() { + assertBehavesSameVariations(8, + () -> { + int i = 0; + do { + i++; + sync(i); + } while (i < 3 && plainTest(i)); + }, + (callback) -> { + final int[] i = new int[1]; + beginAsync().thenRunDoWhileLoop((c) -> { + i[0]++; + async(i[0], c); + }, () -> i[0] < 3 && plainTest(i[0])).finish(callback); + }); + } } diff --git a/driver-core/src/test/unit/com/mongodb/internal/async/VakoTest.java b/driver-core/src/test/unit/com/mongodb/internal/async/VakoTest.java index 40795bf93fc..6c3ee272247 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/async/VakoTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/async/VakoTest.java @@ -17,24 +17,27 @@ import com.mongodb.internal.async.function.AsyncCallbackLoop; import com.mongodb.internal.async.function.LoopState; +import com.mongodb.internal.time.StartTime; import com.mongodb.lang.Nullable; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; -import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; import java.util.Objects; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeUnit; import static com.mongodb.internal.async.AsyncRunnable.beginAsync; class VakoTest { @ParameterizedTest - @CsvSource({"false, 20", "true, 20"}) - void asyncCallbackLoop(final boolean optimized, final int iterations) { + @CsvSource({"false, 30", "true, 30"}) + void asyncCallbackLoop(final boolean optimized, final int iterations) throws Exception { System.err.printf("baselineStackDepth=%d%n", Thread.currentThread().getStackTrace().length); + CompletableFuture join = new CompletableFuture<>(); LoopState loopState = new LoopState(); new AsyncCallbackLoop(optimized, loopState, c -> { int iteration = loopState.iteration(); @@ -45,127 +48,83 @@ void asyncCallbackLoop(final boolean optimized, final int iterations) { }).run((r, t) -> { System.err.printf("test callback completed callStackDepth=%d, r=%s, t=%s%n", Thread.currentThread().getStackTrace().length, r, exceptionToString(t)); + if (t != null) { + join.completeExceptionally(t); + } else { + join.complete(r); + } }); + join.get(); } @ParameterizedTest - @CsvSource({"false, 20", "true, 20"}) - void testA(final boolean optimized, final int counterValue) { + @CsvSource({ + "false, false, 30", "false, true, 30", + "true, false, 30", "true, true, 30"}) + void testThenRunDoWhileLoop(final boolean optimized, final boolean separateThread, final int counterInitialValue) throws Exception { + StartTime start = StartTime.now(); System.err.printf("baselineStackDepth=%d%n", Thread.currentThread().getStackTrace().length); - asyncMethod1A(optimized, new Counter(counterValue), (r, t) -> { + CompletableFuture join = new CompletableFuture<>(); + asyncMethod1(optimized, separateThread, new Counter(counterInitialValue), (r, t) -> { System.err.printf("test callback completed callStackDepth=%s, r=%s, t=%s%n", Thread.currentThread().getStackTrace().length, r, exceptionToString(t)); + if (t != null) { + join.completeExceptionally(t); + } else { + join.complete(r); + } }); + System.err.printf("asyncMethod1 executed in %s%n", start.elapsed()); + join.get(); } - private static void asyncMethod1A(final boolean optimized, final Counter counter, final SingleResultCallback callback) { + private static void asyncMethod1(final boolean optimized, final boolean separateThread, + final Counter counter, final SingleResultCallback callback) { beginAsync().thenRunDoWhileLoop(optimized, c -> { - asyncMethod2A(counter, c); + asyncMethod2(separateThread, counter, c); }, () -> !counter.done()).finish(callback); } - private static void asyncMethod2A(final Counter counter, final SingleResultCallback callback) { - counter.countDown(); - callback.complete(callback); - } - - @ParameterizedTest - @ValueSource(ints = {10}) - void testB(final int counterValue) { - AtomicInteger stackDepthUnoptimized = new AtomicInteger(); - AtomicInteger stackDepthOptimized = new AtomicInteger(); - asyncMethod1B(false, new Counter(counterValue), (r, t) -> { - stackDepthUnoptimized.set(Thread.currentThread().getStackTrace().length); - System.err.printf("test callback completed callStackDepth=%s, r=%s, t=%s%n", - stackDepthUnoptimized, r, exceptionToString(t)); - }); - asyncMethod1B(true, new Counter(counterValue), (r, t) -> { - stackDepthOptimized.set(Thread.currentThread().getStackTrace().length); - System.err.printf("test callback completed callStackDepth=%s, r=%s, t=%s%n", - stackDepthOptimized, r, exceptionToString(t)); - }); - System.err.printf("test completed baselineStackDepth=%d, stackDepthUnoptimized=%s, stackDepthOptimized=%s%n", - Thread.currentThread().getStackTrace().length, stackDepthOptimized, stackDepthUnoptimized); - } - - private static void asyncMethod1B(final boolean optimized, final Counter counter, final SingleResultCallback callback) { - asyncMethod2B(counter, new Callback(optimized, counter, callback)); - } - - private static void asyncMethod2B(final Counter counter, final SingleResultCallback callback) { - counter.countDown(); - callback.complete(callback); - } - - private static final class Callback implements SingleResultCallback { - private final boolean optimized; - private Counter counter; - private SingleResultCallback callback; - - Callback(final boolean optimized, final Counter counter, final SingleResultCallback callback) { - this.optimized = optimized; - this.counter = counter; - this.callback = callback; - } - - Counter takeCounter() { - Counter localCounter = com.mongodb.assertions.Assertions.assertNotNull(counter); - counter = null; - return localCounter; - } - - void setCounter(final Counter counter) { - com.mongodb.assertions.Assertions.assertNull(this.counter); - this.counter = counter; - } - - SingleResultCallback takeCallback() { - SingleResultCallback localCallback = com.mongodb.assertions.Assertions.assertNotNull(callback); - callback = null; - return localCallback; - } - - void setCallback(final SingleResultCallback callback) { - com.mongodb.assertions.Assertions.assertNull(this.callback); - this.callback = callback; - } - - @Override - public void onResult(final Void result, final Throwable t) { - SingleResultCallback localCallback = takeCallback(); - beginAsync().thenRun((c) -> { - System.err.printf("thenRun%n"); - Counter localCounter = takeCounter(); - if (t != null) { - System.err.printf("exception t=%s%n", exceptionToString(t)); - c.completeExceptionally(t); - } else if (localCounter.done()) { - c.complete(c); - } else { - asyncMethod2B(localCounter, new Callback(optimized, localCounter, localCallback)); - } - }).finish((r, t2) -> { - System.err.printf("finish r=%s, t=%s%n", r, exceptionToString(t2)); - localCallback.onResult(r, t); - }); + private static void asyncMethod2(final boolean separateThread, final Counter counter, final SingleResultCallback callback) { + Runnable action = () -> { + try { + Thread.sleep(TimeUnit.SECONDS.toMillis(1) / counter.initial()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + counter.countDown(); + callback.complete(callback); + }; + if (separateThread) { + ForkJoinPool.commonPool().execute(action); + } else { + action.run(); } } private static final class Counter { - private int v; + private final int initial; + private int current; + + Counter(final int initial) { + this.initial = initial; + this.current = initial; + } - Counter(final int v) { - this.v = v; + int initial() { + return initial; } void countDown() { - com.mongodb.assertions.Assertions.assertTrue(v > 0); - v--; - System.err.printf("counted %d->%d callStackDepth=%d %n", v + 1, v, Thread.currentThread().getStackTrace().length); + com.mongodb.assertions.Assertions.assertTrue(current > 0); + int previous = current; + int decremented = --current; + System.err.printf("counted %d->%d callStackDepth=%d %n", previous, decremented, Thread.currentThread().getStackTrace().length); } boolean done() { - if (v == 0) { + if (current == 0) { System.err.printf("counting done callStackDepth=%d %n", Thread.currentThread().getStackTrace().length); return true; } @@ -179,7 +138,8 @@ private static String exceptionToString(@Nullable final Throwable t) { } try (StringWriter sw = new StringWriter(); PrintWriter pw = new PrintWriter(sw)) { - t.printStackTrace(pw); +// t.printStackTrace(pw); + pw.println(t); pw.flush(); return sw.toString(); } catch (IOException e) { @@ -187,28 +147,3 @@ private static String exceptionToString(@Nullable final Throwable t) { } } } - -/* - - c3.complete{ - //c3... - c2.complete{ - //c2... - c1.complete{ - //c1... - } - } - - c3.complete{ - //c3... - chain.add(c2) - } - chain.run() -> c2.complete{ - //c2... - chain.add(c1) - } - chain.run() -> c1.complete{ - //c1... - } - - */ From 271c0bba7af7e30d548f31b3065eef58c9b3db11 Mon Sep 17 00:00:00 2001 From: Valentin Kovalenko Date: Sat, 14 Feb 2026 13:29:16 -0700 Subject: [PATCH 03/15] third attempt --- .../async/function/AsyncCallbackLoop.java | 43 +++++++++++---- .../async/function/CallbackChain.java | 55 +++++++++---------- 2 files changed, 58 insertions(+), 40 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java b/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java index 37410d71517..5c8587c37e6 100644 --- a/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java +++ b/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java @@ -17,6 +17,7 @@ import com.mongodb.annotations.NotThreadSafe; import com.mongodb.internal.async.SingleResultCallback; +import com.mongodb.internal.async.function.CallbackChain.Element; import com.mongodb.lang.Nullable; import java.util.function.Supplier; @@ -39,10 +40,8 @@ */ @NotThreadSafe public final class AsyncCallbackLoop implements AsyncCallbackRunnable { - @Nullable - private final CallbackChain chain; private final LoopState state; - private final AsyncCallbackRunnable body; + private final Body body; /** * @param state The {@link LoopState} to be deemed as initial for the purpose of the new {@link AsyncCallbackLoop}. @@ -53,9 +52,8 @@ public AsyncCallbackLoop(final LoopState state, final AsyncCallbackRunnable body } public AsyncCallbackLoop(final boolean optimized, final LoopState state, final AsyncCallbackRunnable body) { - this.chain = optimized ? new CallbackChain() : null; this.state = state; - this.body = body; + this.body = new Body(optimized, body); } @Override @@ -63,33 +61,58 @@ public void run(final SingleResultCallback callback) { body.run(new LoopingCallback(callback)); } + private static final class Body { + private final AsyncCallbackRunnable wrapped; + @Nullable + private final CallbackChain chain; + + private Body(final boolean optimized, final AsyncCallbackRunnable body) { + this.wrapped = body; + this.chain = optimized ? new CallbackChain() : null; + } + + @Nullable + Element run(final LoopingCallback loopingCallback) { + Element[] mutableElement = new Element[1]; + wrapped.run((r, t) -> { + Element nextCallbackToComplete = loopingCallback.onResult(r, t); + if (!CallbackChain.execute(chain, nextCallbackToComplete)) { + mutableElement[0] = nextCallbackToComplete; + } + }); + return mutableElement[0]; + } + } + /** * This callback is allowed to be completed more than once. */ @NotThreadSafe - private class LoopingCallback implements SingleResultCallback { + private class LoopingCallback { private final SingleResultCallback wrapped; LoopingCallback(final SingleResultCallback callback) { wrapped = callback; } - @Override - public void onResult(@Nullable final Void result, @Nullable final Throwable t) { + @Nullable + public Element onResult(@Nullable final Void result, @Nullable final Throwable t) { if (t != null) { wrapped.onResult(null, t); + return null; } else { boolean continueLooping; try { continueLooping = state.advance(); } catch (Throwable e) { wrapped.onResult(null, e); - return; + return null; } if (continueLooping) { - CallbackChain.execute(chain, () -> body.run(this)); + return () -> body.run(this); } else { wrapped.onResult(result, null); + return null; } } } diff --git a/driver-core/src/main/com/mongodb/internal/async/function/CallbackChain.java b/driver-core/src/main/com/mongodb/internal/async/function/CallbackChain.java index f4bd2e31abc..679a17dac6b 100644 --- a/driver-core/src/main/com/mongodb/internal/async/function/CallbackChain.java +++ b/driver-core/src/main/com/mongodb/internal/async/function/CallbackChain.java @@ -15,52 +15,47 @@ */ package com.mongodb.internal.async.function; +import com.mongodb.annotations.NotThreadSafe; import com.mongodb.lang.Nullable; -import java.util.concurrent.atomic.AtomicReference; - -import static com.mongodb.assertions.Assertions.assertNotNull; -import static com.mongodb.assertions.Assertions.assertNull; - +@NotThreadSafe final class CallbackChain { - @Nullable - private Runnable next; - private int runEnteredCounter; - private final AtomicReference thread; + private int enteringCounter; CallbackChain() { - runEnteredCounter = 0; - thread = new AtomicReference<>(); + enteringCounter = 0; } - static void execute(@Nullable final CallbackChain chain, final Runnable next) { + static boolean execute(@Nullable final CallbackChain chain, @Nullable final Element element) { + if (element == null) { + return false; + } if (chain != null) { - chain.execute(next); + return chain.execute(element); } else { - next.run(); + element.execute(); + return true; } } - // VAKOTODO figure out thread safety - private void execute(final Runnable next) { - assertNotNull(next); - assertNull(this.next); - this.next = next; - -// if (!thread.compareAndSet(null, Thread.currentThread())) { -// assertTrue(Thread.currentThread() == thread.get()); -// } - boolean recursive = ++runEnteredCounter > 1; + private boolean execute(final Element element) { + boolean reentered = ++enteringCounter > 1; try { - if (recursive) { - return; + if (reentered) { + return false; } - for (Runnable localNext = next; localNext != null; localNext = this.next) { - this.next = null; - localNext.run(); + Element next = element.execute(); + while (next != null) { + next = next.execute(); } } finally { - runEnteredCounter--; + enteringCounter--; } + return true; + } + + interface Element { + @Nullable + Element execute(); } } From 62ca456bcf34a7d6ba4f7d496a795fa67c3cedac Mon Sep 17 00:00:00 2001 From: Valentin Kovalenko Date: Sat, 14 Feb 2026 15:40:03 -0700 Subject: [PATCH 04/15] fourth attempt --- .../async/function/AsyncCallbackLoop.java | 32 +++++++--- .../async/function/CallbackChain.java | 61 ------------------- .../com/mongodb/internal/async/VakoTest.java | 12 +++- 3 files changed, 34 insertions(+), 71 deletions(-) delete mode 100644 driver-core/src/main/com/mongodb/internal/async/function/CallbackChain.java diff --git a/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java b/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java index 5c8587c37e6..2ddfa388258 100644 --- a/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java +++ b/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java @@ -17,7 +17,6 @@ import com.mongodb.annotations.NotThreadSafe; import com.mongodb.internal.async.SingleResultCallback; -import com.mongodb.internal.async.function.CallbackChain.Element; import com.mongodb.lang.Nullable; import java.util.function.Supplier; @@ -63,21 +62,35 @@ public void run(final SingleResultCallback callback) { private static final class Body { private final AsyncCallbackRunnable wrapped; - @Nullable - private final CallbackChain chain; + private final boolean optimized; + private boolean executingChain; private Body(final boolean optimized, final AsyncCallbackRunnable body) { this.wrapped = body; - this.chain = optimized ? new CallbackChain() : null; + this.optimized = optimized; + executingChain = false; } @Nullable Element run(final LoopingCallback loopingCallback) { Element[] mutableElement = new Element[1]; wrapped.run((r, t) -> { - Element nextCallbackToComplete = loopingCallback.onResult(r, t); - if (!CallbackChain.execute(chain, nextCallbackToComplete)) { - mutableElement[0] = nextCallbackToComplete; + Element element = loopingCallback.onResult(r, t); + if (!optimized && element != null) { + element.execute(); + return; + } + if (element == null || executingChain) { + mutableElement[0] = element; + } else { + executingChain = true; + try { + do { + element = element.execute(); + } while (element != null); + } finally { + executingChain = false; + } } }); return mutableElement[0]; @@ -117,4 +130,9 @@ public Element onResult(@Nullable final Void result, @Nullable final Throwable t } } } + + interface Element { + @Nullable + Element execute(); + } } diff --git a/driver-core/src/main/com/mongodb/internal/async/function/CallbackChain.java b/driver-core/src/main/com/mongodb/internal/async/function/CallbackChain.java deleted file mode 100644 index 679a17dac6b..00000000000 --- a/driver-core/src/main/com/mongodb/internal/async/function/CallbackChain.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright 2008-present MongoDB, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.mongodb.internal.async.function; - -import com.mongodb.annotations.NotThreadSafe; -import com.mongodb.lang.Nullable; - -@NotThreadSafe -final class CallbackChain { - private int enteringCounter; - - CallbackChain() { - enteringCounter = 0; - } - - static boolean execute(@Nullable final CallbackChain chain, @Nullable final Element element) { - if (element == null) { - return false; - } - if (chain != null) { - return chain.execute(element); - } else { - element.execute(); - return true; - } - } - - private boolean execute(final Element element) { - boolean reentered = ++enteringCounter > 1; - try { - if (reentered) { - return false; - } - Element next = element.execute(); - while (next != null) { - next = next.execute(); - } - } finally { - enteringCounter--; - } - return true; - } - - interface Element { - @Nullable - Element execute(); - } -} diff --git a/driver-core/src/test/unit/com/mongodb/internal/async/VakoTest.java b/driver-core/src/test/unit/com/mongodb/internal/async/VakoTest.java index 6c3ee272247..14ae034ab13 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/async/VakoTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/async/VakoTest.java @@ -34,7 +34,10 @@ class VakoTest { @ParameterizedTest - @CsvSource({"false, 30", "true, 30"}) + @CsvSource({ + "false, 30", + "true, 30" + }) void asyncCallbackLoop(final boolean optimized, final int iterations) throws Exception { System.err.printf("baselineStackDepth=%d%n", Thread.currentThread().getStackTrace().length); CompletableFuture join = new CompletableFuture<>(); @@ -59,8 +62,11 @@ void asyncCallbackLoop(final boolean optimized, final int iterations) throws Exc @ParameterizedTest @CsvSource({ - "false, false, 30", "false, true, 30", - "true, false, 30", "true, true, 30"}) + "false, false, 30", + "false, true, 30", + "true, false, 30", + "true, true, 30" + }) void testThenRunDoWhileLoop(final boolean optimized, final boolean separateThread, final int counterInitialValue) throws Exception { StartTime start = StartTime.now(); System.err.printf("baselineStackDepth=%d%n", Thread.currentThread().getStackTrace().length); From 55bb4a826b7a64c599ae4b3a2577a639c42281da Mon Sep 17 00:00:00 2001 From: Valentin Kovalenko Date: Sat, 14 Feb 2026 18:57:06 -0700 Subject: [PATCH 05/15] fifth attempt --- .../async/function/AsyncCallbackLoop.java | 79 +++++++++++-------- .../async/AsyncFunctionsAbstractTest.java | 38 ++++----- 2 files changed, 63 insertions(+), 54 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java b/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java index 2ddfa388258..9422ae3a8f4 100644 --- a/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java +++ b/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java @@ -21,6 +21,8 @@ import java.util.function.Supplier; +import static com.mongodb.assertions.Assertions.assertTrue; + /** * A decorator that implements automatic repeating of an {@link AsyncCallbackRunnable}. * {@link AsyncCallbackLoop} may execute the original asynchronous function multiple times sequentially, @@ -39,7 +41,6 @@ */ @NotThreadSafe public final class AsyncCallbackLoop implements AsyncCallbackRunnable { - private final LoopState state; private final Body body; /** @@ -51,88 +52,96 @@ public AsyncCallbackLoop(final LoopState state, final AsyncCallbackRunnable body } public AsyncCallbackLoop(final boolean optimized, final LoopState state, final AsyncCallbackRunnable body) { - this.state = state; - this.body = new Body(optimized, body); + this.body = new Body(optimized, state, body); } @Override public void run(final SingleResultCallback callback) { - body.run(new LoopingCallback(callback)); + body.loop(new ReusableLoopCallback(callback)); } private static final class Body { private final AsyncCallbackRunnable wrapped; private final boolean optimized; - private boolean executingChain; + private final LoopState state; + private boolean reenteredLoopMethod; - private Body(final boolean optimized, final AsyncCallbackRunnable body) { + private Body(final boolean optimized, final LoopState state, final AsyncCallbackRunnable body) { this.wrapped = body; this.optimized = optimized; - executingChain = false; + this.state = state; + reenteredLoopMethod = false; } - @Nullable - Element run(final LoopingCallback loopingCallback) { - Element[] mutableElement = new Element[1]; + /** + * @return {@code true} to indicate that the looping completed; + * {@code false} to indicate that the looping is still executing, potentially asynchronously. + */ + boolean loop(final ReusableLoopCallback callback) { + boolean[] done = new boolean[] {true}; wrapped.run((r, t) -> { - Element element = loopingCallback.onResult(r, t); - if (!optimized && element != null) { - element.execute(); + boolean localDone = callback.onResult(state, r, t); + if (localDone) { + done[0] = localDone; return; } - if (element == null || executingChain) { - mutableElement[0] = element; - } else { - executingChain = true; + if (!optimized) { + localDone = loop(callback); + done[0] = localDone; + return; + } + if (!reenteredLoopMethod) { + reenteredLoopMethod = true; try { do { - element = element.execute(); - } while (element != null); + localDone = loop(callback); + } while (!localDone); + done[0] = assertTrue(localDone); } finally { - executingChain = false; + reenteredLoopMethod = false; } + } else { + done[0] = localDone; } }); - return mutableElement[0]; + return done[0]; } } /** - * This callback is allowed to be completed more than once. + * This callback is allowed to be {@linkplain #onResult(LoopState, Void, Throwable) completed} more than once. */ @NotThreadSafe - private class LoopingCallback { + private static final class ReusableLoopCallback { private final SingleResultCallback wrapped; - LoopingCallback(final SingleResultCallback callback) { + ReusableLoopCallback(final SingleResultCallback callback) { wrapped = callback; } - @Nullable - public Element onResult(@Nullable final Void result, @Nullable final Throwable t) { + /** + * @return {@code true} iff the {@linkplain ReusableLoopCallback#ReusableLoopCallback(SingleResultCallback) wrapped} + * {@link SingleResultCallback} is {@linkplain SingleResultCallback#onResult(Object, Throwable) completed}. + */ + public boolean onResult(final LoopState state, @Nullable final Void result, @Nullable final Throwable t) { if (t != null) { wrapped.onResult(null, t); - return null; + return true; } else { boolean continueLooping; try { continueLooping = state.advance(); } catch (Throwable e) { wrapped.onResult(null, e); - return null; + return true; } if (continueLooping) { - return () -> body.run(this); + return false; } else { wrapped.onResult(result, null); - return null; + return true; } } } } - - interface Element { - @Nullable - Element execute(); - } } diff --git a/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsAbstractTest.java b/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsAbstractTest.java index 8b102a8102b..40b78ff18ae 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsAbstractTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsAbstractTest.java @@ -768,6 +768,25 @@ void testDoWhileLoop() { }); } + @Test + void testDoWhileLoop2() { + assertBehavesSameVariations(8, + () -> { + int i = 0; + do { + i++; + sync(i); + } while (i < 3 && plainTest(i)); + }, + (callback) -> { + final int[] i = new int[1]; + beginAsync().thenRunDoWhileLoop((c) -> { + i[0]++; + async(i[0], c); + }, () -> i[0] < 3 && plainTest(i[0])).finish(callback); + }); + } + @Test void testFinallyWithPlainInsideTry() { // (in try: normal flow + exception + exception) * (in finally: normal + exception) = 6 @@ -990,23 +1009,4 @@ void testDerivation() { }).finish(callback); }); } - - @Test - void testThenRunDoWhileLoop() { - assertBehavesSameVariations(8, - () -> { - int i = 0; - do { - i++; - sync(i); - } while (i < 3 && plainTest(i)); - }, - (callback) -> { - final int[] i = new int[1]; - beginAsync().thenRunDoWhileLoop((c) -> { - i[0]++; - async(i[0], c); - }, () -> i[0] < 3 && plainTest(i[0])).finish(callback); - }); - } } From cf6be9d6007b015272fddca9c12f9537beb11e98 Mon Sep 17 00:00:00 2001 From: Valentin Kovalenko Date: Sun, 15 Feb 2026 08:46:05 -0700 Subject: [PATCH 06/15] do test improvements --- .../async/function/AsyncCallbackLoop.java | 2 +- .../com/mongodb/internal/async/VakoTest.java | 75 +++++++++++++------ 2 files changed, 55 insertions(+), 22 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java b/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java index 9422ae3a8f4..2ea8dee5743 100644 --- a/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java +++ b/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java @@ -78,7 +78,7 @@ private Body(final boolean optimized, final LoopState state, final AsyncCallback * {@code false} to indicate that the looping is still executing, potentially asynchronously. */ boolean loop(final ReusableLoopCallback callback) { - boolean[] done = new boolean[] {true}; + boolean[] done = {true}; wrapped.run((r, t) -> { boolean localDone = callback.onResult(state, r, t); if (localDone) { diff --git a/driver-core/src/test/unit/com/mongodb/internal/async/VakoTest.java b/driver-core/src/test/unit/com/mongodb/internal/async/VakoTest.java index 14ae034ab13..1a70041db7c 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/async/VakoTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/async/VakoTest.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; +import java.time.Duration; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ForkJoinPool; @@ -35,8 +36,8 @@ class VakoTest { @ParameterizedTest @CsvSource({ - "false, 30", - "true, 30" + "false, 20", + "true, 20" }) void asyncCallbackLoop(final boolean optimized, final int iterations) throws Exception { System.err.printf("baselineStackDepth=%d%n", Thread.currentThread().getStackTrace().length); @@ -60,19 +61,24 @@ void asyncCallbackLoop(final boolean optimized, final int iterations) throws Exc join.get(); } - @ParameterizedTest + @ParameterizedTest() @CsvSource({ - "false, false, 30", - "false, true, 30", - "true, false, 30", - "true, true, 30" + "false, false, 0, 20", + "false, true, 4, 20", + "true, false, 0, 20", + "true, true, 4, 20" }) - void testThenRunDoWhileLoop(final boolean optimized, final boolean separateThread, final int counterInitialValue) throws Exception { + void testThenRunDoWhileLoop( + final boolean optimized, + final boolean separateThread, + final int sleepSeconds, + final int counterInitialValue) throws Exception { + Duration totalSleepDuration = Duration.ofSeconds(sleepSeconds); StartTime start = StartTime.now(); System.err.printf("baselineStackDepth=%d%n", Thread.currentThread().getStackTrace().length); CompletableFuture join = new CompletableFuture<>(); - asyncMethod1(optimized, separateThread, new Counter(counterInitialValue), (r, t) -> { - System.err.printf("test callback completed callStackDepth=%s, r=%s, t=%s%n", + asyncMethod1(optimized, separateThread, totalSleepDuration, new Counter(counterInitialValue), (r, t) -> { + System.err.printf("TEST callback completed callStackDepth=%s, r=%s, t=%s%n", Thread.currentThread().getStackTrace().length, r, exceptionToString(t)); if (t != null) { join.completeExceptionally(t); @@ -80,27 +86,37 @@ void testThenRunDoWhileLoop(final boolean optimized, final boolean separateThrea join.complete(r); } }); - System.err.printf("asyncMethod1 executed in %s%n", start.elapsed()); + System.err.printf("asyncMethod1 returned in %s%n", start.elapsed()); join.get(); + sleep(Duration.ofSeconds(1)); + System.err.printf("%nDONE%n"); } - private static void asyncMethod1(final boolean optimized, final boolean separateThread, - final Counter counter, final SingleResultCallback callback) { + private static void asyncMethod1( + final boolean optimized, + final boolean separateThread, + final Duration totalSleepDuration, + final Counter counter, + final SingleResultCallback callback) { beginAsync().thenRunDoWhileLoop(optimized, c -> { - asyncMethod2(separateThread, counter, c); +// sleep(totalSleepDuration.dividedBy(counter.initial())); + StartTime start = StartTime.now(); + asyncMethod2(separateThread, totalSleepDuration, counter, c); + System.err.printf("asyncMethod2 returned in %s%n", start.elapsed()); }, () -> !counter.done()).finish(callback); } - private static void asyncMethod2(final boolean separateThread, final Counter counter, final SingleResultCallback callback) { + private static void asyncMethod2( + final boolean separateThread, + final Duration totalSleepDuration, + final Counter counter, + final SingleResultCallback callback) { Runnable action = () -> { - try { - Thread.sleep(TimeUnit.SECONDS.toMillis(1) / counter.initial()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } + sleep(totalSleepDuration.dividedBy(counter.initial())); counter.countDown(); + StartTime start = StartTime.now(); callback.complete(callback); + System.err.printf("asyncMethod2 callback.complete returned in %s%n", start.elapsed()); }; if (separateThread) { ForkJoinPool.commonPool().execute(action); @@ -152,4 +168,21 @@ private static String exceptionToString(@Nullable final Throwable t) { throw new RuntimeException(e); } } + + private static void sleep(Duration duration) { + if (duration.isZero()) { + return; + } + try { + long durationNsPart = duration.getNano(); + long durationMsPartFromNsPart = TimeUnit.MILLISECONDS.convert(duration.getNano(), TimeUnit.NANOSECONDS); + long sleepMs = TimeUnit.MILLISECONDS.convert(duration.getSeconds(), TimeUnit.SECONDS) + durationMsPartFromNsPart; + int sleepNs = Math.toIntExact(durationNsPart - TimeUnit.NANOSECONDS.convert(durationMsPartFromNsPart, TimeUnit.MILLISECONDS)); + System.err.printf("sleeping for %d ms %d ns%n", sleepMs, sleepNs); + Thread.sleep(sleepMs, sleepNs); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } } From 2de9acd97444eafc8aacb440abbce4e17867fd48 Mon Sep 17 00:00:00 2001 From: Valentin Kovalenko Date: Sun, 15 Feb 2026 16:54:49 -0700 Subject: [PATCH 07/15] get rid of the "optimized" parameter that existed for demonstration purposes --- .../mongodb/internal/async/AsyncRunnable.java | 9 +-- .../async/function/AsyncCallbackLoop.java | 15 +---- .../com/mongodb/internal/async/VakoTest.java | 67 ++++++++++--------- 3 files changed, 37 insertions(+), 54 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java b/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java index 9d706d9f884..e404e2b8152 100644 --- a/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java +++ b/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java @@ -253,16 +253,9 @@ default AsyncRunnable thenRunRetryingWhile( * @see AsyncCallbackLoop */ default AsyncRunnable thenRunDoWhileLoop(final AsyncRunnable loopBodyRunnable, final BooleanSupplier whileCheck) { - return thenRunDoWhileLoop(true, loopBodyRunnable, whileCheck); - } - - default AsyncRunnable thenRunDoWhileLoop( - final boolean optimized, - final AsyncRunnable loopBodyRunnable, - final BooleanSupplier whileCheck) { return thenRun(finalCallback -> { LoopState loopState = new LoopState(); - new AsyncCallbackLoop(optimized, loopState, iterationCallback -> { + new AsyncCallbackLoop(loopState, iterationCallback -> { loopBodyRunnable.finish((result, t) -> { if (t != null) { diff --git a/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java b/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java index 2ea8dee5743..548f3ab2a85 100644 --- a/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java +++ b/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java @@ -48,11 +48,7 @@ public final class AsyncCallbackLoop implements AsyncCallbackRunnable { * @param body The body of the loop. */ public AsyncCallbackLoop(final LoopState state, final AsyncCallbackRunnable body) { - this(true, state, body); - } - - public AsyncCallbackLoop(final boolean optimized, final LoopState state, final AsyncCallbackRunnable body) { - this.body = new Body(optimized, state, body); + this.body = new Body(state, body); } @Override @@ -62,13 +58,11 @@ public void run(final SingleResultCallback callback) { private static final class Body { private final AsyncCallbackRunnable wrapped; - private final boolean optimized; private final LoopState state; private boolean reenteredLoopMethod; - private Body(final boolean optimized, final LoopState state, final AsyncCallbackRunnable body) { + private Body(final LoopState state, final AsyncCallbackRunnable body) { this.wrapped = body; - this.optimized = optimized; this.state = state; reenteredLoopMethod = false; } @@ -85,11 +79,6 @@ boolean loop(final ReusableLoopCallback callback) { done[0] = localDone; return; } - if (!optimized) { - localDone = loop(callback); - done[0] = localDone; - return; - } if (!reenteredLoopMethod) { reenteredLoopMethod = true; try { diff --git a/driver-core/src/test/unit/com/mongodb/internal/async/VakoTest.java b/driver-core/src/test/unit/com/mongodb/internal/async/VakoTest.java index 1a70041db7c..e9f836fc9f8 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/async/VakoTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/async/VakoTest.java @@ -36,14 +36,13 @@ class VakoTest { @ParameterizedTest @CsvSource({ - "false, 20", - "true, 20" + "10" }) - void asyncCallbackLoop(final boolean optimized, final int iterations) throws Exception { + void asyncCallbackLoop(final int iterations) throws Exception { System.err.printf("baselineStackDepth=%d%n", Thread.currentThread().getStackTrace().length); CompletableFuture join = new CompletableFuture<>(); LoopState loopState = new LoopState(); - new AsyncCallbackLoop(optimized, loopState, c -> { + new AsyncCallbackLoop(loopState, c -> { int iteration = loopState.iteration(); System.err.printf("iteration=%d, callStackDepth=%d%n", iteration, Thread.currentThread().getStackTrace().length); if (!loopState.breakAndCompleteIf(() -> iteration == (iterations - 1), c)) { @@ -59,66 +58,68 @@ void asyncCallbackLoop(final boolean optimized, final int iterations) throws Exc } }); join.get(); + System.err.printf("%n%nDONE%n%n"); } @ParameterizedTest() @CsvSource({ - "false, false, 0, 20", - "false, true, 4, 20", - "true, false, 0, 20", - "true, true, 4, 20" + "0, false, 0, 10", + "0, true, 4, 10", + "4, true, 0, 10" }) void testThenRunDoWhileLoop( - final boolean optimized, - final boolean separateThread, - final int sleepSeconds, + final int blockInAsyncMethodTotalSeconds, + final boolean asyncExecution, + final int delayAsyncExecutionTotalSeconds, final int counterInitialValue) throws Exception { - Duration totalSleepDuration = Duration.ofSeconds(sleepSeconds); + Duration blockInAsyncMethodTotalDuration = Duration.ofSeconds(blockInAsyncMethodTotalSeconds); + com.mongodb.assertions.Assertions.assertTrue(asyncExecution || delayAsyncExecutionTotalSeconds == 0); + Duration delayAsyncExecutionTotalDuration = Duration.ofSeconds(delayAsyncExecutionTotalSeconds); StartTime start = StartTime.now(); System.err.printf("baselineStackDepth=%d%n", Thread.currentThread().getStackTrace().length); CompletableFuture join = new CompletableFuture<>(); - asyncMethod1(optimized, separateThread, totalSleepDuration, new Counter(counterInitialValue), (r, t) -> { - System.err.printf("TEST callback completed callStackDepth=%s, r=%s, t=%s%n", - Thread.currentThread().getStackTrace().length, r, exceptionToString(t)); - if (t != null) { - join.completeExceptionally(t); - } else { - join.complete(r); - } + asyncMethod1(blockInAsyncMethodTotalDuration, asyncExecution, delayAsyncExecutionTotalDuration, new Counter(counterInitialValue), + (r, t) -> { + System.err.printf("TEST callback completed callStackDepth=%s, r=%s, t=%s%n", + Thread.currentThread().getStackTrace().length, r, exceptionToString(t)); + if (t != null) { + join.completeExceptionally(t); + } else { + join.complete(r); + } }); System.err.printf("asyncMethod1 returned in %s%n", start.elapsed()); join.get(); - sleep(Duration.ofSeconds(1)); - System.err.printf("%nDONE%n"); + System.err.printf("%n%nDONE%n%n"); } private static void asyncMethod1( - final boolean optimized, - final boolean separateThread, - final Duration totalSleepDuration, + final Duration blockInAsyncMethodTotalDuration, + final boolean asyncExecution, + final Duration delayAsyncExecutionTotalDuration, final Counter counter, final SingleResultCallback callback) { - beginAsync().thenRunDoWhileLoop(optimized, c -> { -// sleep(totalSleepDuration.dividedBy(counter.initial())); + beginAsync().thenRunDoWhileLoop(c -> { + sleep(blockInAsyncMethodTotalDuration.dividedBy(counter.initial())); StartTime start = StartTime.now(); - asyncMethod2(separateThread, totalSleepDuration, counter, c); + asyncMethod2(asyncExecution, delayAsyncExecutionTotalDuration, counter, c); System.err.printf("asyncMethod2 returned in %s%n", start.elapsed()); }, () -> !counter.done()).finish(callback); } private static void asyncMethod2( - final boolean separateThread, - final Duration totalSleepDuration, + final boolean asyncExecution, + final Duration delayAsyncExecutionTotalDuration, final Counter counter, final SingleResultCallback callback) { Runnable action = () -> { - sleep(totalSleepDuration.dividedBy(counter.initial())); + sleep(delayAsyncExecutionTotalDuration.dividedBy(counter.initial())); counter.countDown(); StartTime start = StartTime.now(); callback.complete(callback); System.err.printf("asyncMethod2 callback.complete returned in %s%n", start.elapsed()); }; - if (separateThread) { + if (asyncExecution) { ForkJoinPool.commonPool().execute(action); } else { action.run(); @@ -169,7 +170,7 @@ private static String exceptionToString(@Nullable final Throwable t) { } } - private static void sleep(Duration duration) { + private static void sleep(final Duration duration) { if (duration.isZero()) { return; } From 0d614675321de2c263ddba1138cea548bb37b27d Mon Sep 17 00:00:00 2001 From: Valentin Kovalenko Date: Sun, 15 Feb 2026 19:47:46 -0700 Subject: [PATCH 08/15] start using `ThreadLocal` --- .../async/function/AsyncCallbackLoop.java | 66 ++++++++++++------- 1 file changed, 43 insertions(+), 23 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java b/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java index 548f3ab2a85..bd5c936f96c 100644 --- a/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java +++ b/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java @@ -53,47 +53,67 @@ public AsyncCallbackLoop(final LoopState state, final AsyncCallbackRunnable body @Override public void run(final SingleResultCallback callback) { - body.loop(new ReusableLoopCallback(callback)); + body.initiateIteration(false, new ReusableLoopCallback(callback)); } private static final class Body { private final AsyncCallbackRunnable wrapped; private final LoopState state; - private boolean reenteredLoopMethod; + private final ThreadLocal iterationIsExecutingSynchronously; + private final ThreadLocal status; + + private enum Status { + ITERATION_INITIATED, + LAST_ITERATION_COMPLETED, + ANOTHER_ITERATION_NEEDED + } private Body(final LoopState state, final AsyncCallbackRunnable body) { this.wrapped = body; this.state = state; - reenteredLoopMethod = false; + iterationIsExecutingSynchronously = ThreadLocal.withInitial(() -> false); + status = ThreadLocal.withInitial(() -> Status.ITERATION_INITIATED); } /** - * @return {@code true} to indicate that the looping completed; - * {@code false} to indicate that the looping is still executing, potentially asynchronously. + * Invoking this method initiates a new iteration of the loop. An iteration may be executed either + * synchronously or asynchronously with the execution of this method: + * + *
    + *
  • synchronous execution: iteration completes before (in the happens-before order) the method completes;
  • + *
  • asynchronous execution: the aforementioned relation does not exist.
  • + *
+ * + * @return {@code true} iff it is known that another iteration must be initiated. + * Such information is available to this method only if the iteration it initiated has completed synchronously. */ - boolean loop(final ReusableLoopCallback callback) { - boolean[] done = {true}; + Status initiateIteration(final boolean trampolining, final ReusableLoopCallback callback) { + iterationIsExecutingSynchronously.set(true); wrapped.run((r, t) -> { - boolean localDone = callback.onResult(state, r, t); - if (localDone) { - done[0] = localDone; + boolean localIterationIsExecutingSynchronously = iterationIsExecutingSynchronously.get(); + if (callback.onResult(state, r, t)) { + status.set(Status.LAST_ITERATION_COMPLETED); return; } - if (!reenteredLoopMethod) { - reenteredLoopMethod = true; - try { - do { - localDone = loop(callback); - } while (!localDone); - done[0] = assertTrue(localDone); - } finally { - reenteredLoopMethod = false; - } - } else { - done[0] = localDone; + if (trampolining && localIterationIsExecutingSynchronously) { + // bounce + status.set(Status.ANOTHER_ITERATION_NEEDED); + return; } + Status localStatus; + do { + localStatus = initiateIteration(true, callback); + } while (localStatus.equals(Status.ANOTHER_ITERATION_NEEDED)); + status.set(localStatus); + + // VAKOTODO remove thread-locals if executed asynchronously }); - return done[0]; + try { + return status.get(); + } finally { + status.remove(); + iterationIsExecutingSynchronously.remove(); + } } } From 1284d59b129b84f2a70327fe723257d4bb53ab24 Mon Sep 17 00:00:00 2001 From: Valentin Kovalenko Date: Mon, 16 Feb 2026 03:26:07 -0700 Subject: [PATCH 09/15] improve the `ThreadLocal` approach --- .../async/function/AsyncCallbackLoop.java | 111 ++++++++++----- .../com/mongodb/internal/async/VakoTest.java | 132 ++++++++++++------ 2 files changed, 162 insertions(+), 81 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java b/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java index bd5c936f96c..477cb4d67b5 100644 --- a/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java +++ b/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java @@ -21,8 +21,6 @@ import java.util.function.Supplier; -import static com.mongodb.assertions.Assertions.assertTrue; - /** * A decorator that implements automatic repeating of an {@link AsyncCallbackRunnable}. * {@link AsyncCallbackLoop} may execute the original asynchronous function multiple times sequentially, @@ -53,66 +51,103 @@ public AsyncCallbackLoop(final LoopState state, final AsyncCallbackRunnable body @Override public void run(final SingleResultCallback callback) { - body.initiateIteration(false, new ReusableLoopCallback(callback)); + body.run(false, new ReusableLoopCallback(callback)); } private static final class Body { - private final AsyncCallbackRunnable wrapped; + private final AsyncCallbackRunnable body; private final LoopState state; - private final ThreadLocal iterationIsExecutingSynchronously; - private final ThreadLocal status; + private final ThreadLocal sameThreadDetector; - private enum Status { - ITERATION_INITIATED, - LAST_ITERATION_COMPLETED, - ANOTHER_ITERATION_NEEDED + private enum SameThreadDetectionStatus { + NEGATIVE, + PROBING, + POSITIVE } private Body(final LoopState state, final AsyncCallbackRunnable body) { - this.wrapped = body; + this.body = body; this.state = state; - iterationIsExecutingSynchronously = ThreadLocal.withInitial(() -> false); - status = ThreadLocal.withInitial(() -> Status.ITERATION_INITIATED); + sameThreadDetector = ThreadLocal.withInitial(() -> SameThreadDetectionStatus.NEGATIVE); } /** - * Invoking this method initiates a new iteration of the loop. An iteration may be executed either - * synchronously or asynchronously with the execution of this method: - * + * Initiates a new iteration of the loop by invoking + * {@link #body}{@code .}{@link AsyncCallbackRunnable#run(SingleResultCallback) run}. + * The initiated iteration may be executed either synchronously or asynchronously with the method that initiated it: *
    - *
  • synchronous execution: iteration completes before (in the happens-before order) the method completes;
  • - *
  • asynchronous execution: the aforementioned relation does not exist.
  • + *
  • synchronous execution—completion of the initiated iteration happens-before the method completion;
  • + *
  • asynchronous execution—the aforementioned relation does not exist.
  • *
* + *

If another iteration is needed, it is initiated from the callback passed to + * {@link #body}{@code .}{@link AsyncCallbackRunnable#run(SingleResultCallback) run} + * by invoking {@link #run(boolean, ReusableLoopCallback)}. + * Completing the initiated iteration is {@linkplain SingleResultCallback#onResult(Object, Throwable) invoking} the callback. + * Thus, it is guaranteed that all iterations are executed sequentially with each other + * (that is, completion of one iteration happens-before initiation of the next one) + * regardless of them being executed synchronously or asynchronously with the method that initiated them. + * + *

Initiating any but the {@linkplain LoopState#isFirstIteration() first} iteration is done using trampolining, + * which allows us to do it iteratively rather than recursively, if iterations are executed synchronously, + * and ensures stack usage does not increase with the number of iterations. + * * @return {@code true} iff it is known that another iteration must be initiated. - * Such information is available to this method only if the iteration it initiated has completed synchronously. + * This information is used only for trampolining, and is available only if the iteration executed synchronously. + * + *

It is impossible to detect whether an iteration is executed synchronously. + * It is, however, possible to detect whether an iteration is executed in the same thread as the method that initiated it, + * and we use it as proxy indicator of synchronous execution. Unfortunately, this means we do not support and behave incorrectly + * if an iteration is executed synchronously but in a thread different from the one in which the method that + * initiated the iteration was invoked. + * + *

The above limitation should not be a problem in practice: + *

    + *
  • the only way to execute an iteration synchronously but in a different thread is to block the thread that + * initiated the iteration by waiting for completion of the iteration by that other thread;
  • + *
  • blocking a thread is forbidden in asynchronous code, and we do not do it;
  • + *
  • therefore, we would not have an iteration that is executed synchronously but in a different thread.
  • + *
*/ - Status initiateIteration(final boolean trampolining, final ReusableLoopCallback callback) { - iterationIsExecutingSynchronously.set(true); - wrapped.run((r, t) -> { - boolean localIterationIsExecutingSynchronously = iterationIsExecutingSynchronously.get(); + boolean run(final boolean trampolining, final ReusableLoopCallback callback) { + // The `trampoliningResult` variable must be used only if the initiated iteration is executed synchronously with + // the current method, which must be detected separately. + // + // It may be tempting to detect whether the iteration was executed synchronously by reading from the variable + // and observing a write that is part of the callback execution. However, if the iteration is executed asynchronously with + // the current method, then the aforementioned conflicting write and read actions are not ordered by + // the happens-before relation, the execution contains a data race and the read is allowed to observe the write. + // If such observation happens when the iteration is executed asynchronously, then we have a false positive. + // Furthermore, depending on the nature of the value read, it may not be trustworthy. + boolean[] trampoliningResult = {false}; + sameThreadDetector.set(SameThreadDetectionStatus.PROBING); + body.run((r, t) -> { if (callback.onResult(state, r, t)) { - status.set(Status.LAST_ITERATION_COMPLETED); + // If we are trampolining, then here we bounce up, trampolining completes and so is the whole loop; + // otherwise, the whole loop simply completes. return; } - if (trampolining && localIterationIsExecutingSynchronously) { - // bounce - status.set(Status.ANOTHER_ITERATION_NEEDED); - return; + if (trampolining) { + boolean sameThread = sameThreadDetector.get().equals(SameThreadDetectionStatus.PROBING); + if (sameThread) { + // Bounce up if we are trampolining and the iteration was executed synchronously; + // otherwise proceed to begin trampolining. + sameThreadDetector.set(SameThreadDetectionStatus.POSITIVE); + trampoliningResult[0] = true; + return; + } else { + sameThreadDetector.remove(); + } } - Status localStatus; - do { - localStatus = initiateIteration(true, callback); - } while (localStatus.equals(Status.ANOTHER_ITERATION_NEEDED)); - status.set(localStatus); - - // VAKOTODO remove thread-locals if executed asynchronously + boolean anotherIterationNeeded; + do { // trampolining + anotherIterationNeeded = run(true, callback); + } while (anotherIterationNeeded); }); try { - return status.get(); + return sameThreadDetector.get().equals(SameThreadDetectionStatus.POSITIVE) && trampoliningResult[0]; } finally { - status.remove(); - iterationIsExecutingSynchronously.remove(); + sameThreadDetector.remove(); } } } diff --git a/driver-core/src/test/unit/com/mongodb/internal/async/VakoTest.java b/driver-core/src/test/unit/com/mongodb/internal/async/VakoTest.java index e9f836fc9f8..f9b904e005d 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/async/VakoTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/async/VakoTest.java @@ -19,6 +19,8 @@ import com.mongodb.internal.async.function.LoopState; import com.mongodb.internal.time.StartTime; import com.mongodb.lang.Nullable; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; @@ -28,18 +30,32 @@ import java.time.Duration; import java.util.Objects; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import static com.mongodb.internal.async.AsyncRunnable.beginAsync; class VakoTest { + private static ScheduledExecutorService executor; + + @BeforeAll + static void beforeAll() { + executor = Executors.newScheduledThreadPool(2); + } + + @AfterAll + static void afterAll() throws InterruptedException { + executor.shutdownNow(); + com.mongodb.assertions.Assertions.assertTrue(executor.awaitTermination(1, TimeUnit.MINUTES)); + } + @ParameterizedTest @CsvSource({ "10" }) void asyncCallbackLoop(final int iterations) throws Exception { - System.err.printf("baselineStackDepth=%d%n", Thread.currentThread().getStackTrace().length); + System.err.printf("baselineStackDepth=%d%n%n", Thread.currentThread().getStackTrace().length); CompletableFuture join = new CompletableFuture<>(); LoopState loopState = new LoopState(); new AsyncCallbackLoop(loopState, c -> { @@ -51,78 +67,92 @@ void asyncCallbackLoop(final int iterations) throws Exception { }).run((r, t) -> { System.err.printf("test callback completed callStackDepth=%d, r=%s, t=%s%n", Thread.currentThread().getStackTrace().length, r, exceptionToString(t)); - if (t != null) { - join.completeExceptionally(t); - } else { - join.complete(r); - } + complete(join, r, t); }); join.get(); System.err.printf("%n%nDONE%n%n"); } + private enum IterationExecutionType { + SYNC_SAME_THREAD, + SYNC_DIFFERENT_THREAD, + ASYNC, + } + @ParameterizedTest() @CsvSource({ - "0, false, 0, 10", - "0, true, 4, 10", - "4, true, 0, 10" + "10, 0, SYNC_SAME_THREAD, 0", +// "10, 0, SYNC_DIFFERENT_THREAD, 0", + "10, 0, ASYNC, 4", + "10, 4, ASYNC, 0" }) void testThenRunDoWhileLoop( - final int blockInAsyncMethodTotalSeconds, - final boolean asyncExecution, - final int delayAsyncExecutionTotalSeconds, - final int counterInitialValue) throws Exception { - Duration blockInAsyncMethodTotalDuration = Duration.ofSeconds(blockInAsyncMethodTotalSeconds); - com.mongodb.assertions.Assertions.assertTrue(asyncExecution || delayAsyncExecutionTotalSeconds == 0); + final int counterInitialValue, + final int blockSyncPartOfIterationTotalSeconds, + final IterationExecutionType executionType, + final int delayAsyncExecutionTotalSeconds) throws Exception { + System.err.printf("baselineStackDepth=%d%n%n", Thread.currentThread().getStackTrace().length); + Duration blockSyncPartOfIterationTotalDuration = Duration.ofSeconds(blockSyncPartOfIterationTotalSeconds); + com.mongodb.assertions.Assertions.assertTrue( + executionType.equals(IterationExecutionType.ASYNC) || delayAsyncExecutionTotalSeconds == 0); Duration delayAsyncExecutionTotalDuration = Duration.ofSeconds(delayAsyncExecutionTotalSeconds); StartTime start = StartTime.now(); - System.err.printf("baselineStackDepth=%d%n", Thread.currentThread().getStackTrace().length); CompletableFuture join = new CompletableFuture<>(); - asyncMethod1(blockInAsyncMethodTotalDuration, asyncExecution, delayAsyncExecutionTotalDuration, new Counter(counterInitialValue), + asyncLoop(new Counter(counterInitialValue), blockSyncPartOfIterationTotalDuration, executionType, delayAsyncExecutionTotalDuration, (r, t) -> { - System.err.printf("TEST callback completed callStackDepth=%s, r=%s, t=%s%n", + System.err.printf("test callback completed callStackDepth=%s, r=%s, t=%s%n", Thread.currentThread().getStackTrace().length, r, exceptionToString(t)); - if (t != null) { - join.completeExceptionally(t); - } else { - join.complete(r); - } + complete(join, r, t); }); - System.err.printf("asyncMethod1 returned in %s%n", start.elapsed()); + System.err.printf("\tasyncLoop returned in %s%n", start.elapsed()); join.get(); System.err.printf("%n%nDONE%n%n"); } - private static void asyncMethod1( - final Duration blockInAsyncMethodTotalDuration, - final boolean asyncExecution, - final Duration delayAsyncExecutionTotalDuration, + private static void asyncLoop( final Counter counter, + final Duration blockSyncPartOfIterationTotalDuration, + final IterationExecutionType executionType, + final Duration delayAsyncExecutionTotalDuration, final SingleResultCallback callback) { beginAsync().thenRunDoWhileLoop(c -> { - sleep(blockInAsyncMethodTotalDuration.dividedBy(counter.initial())); + sleep(blockSyncPartOfIterationTotalDuration.dividedBy(counter.initial())); StartTime start = StartTime.now(); - asyncMethod2(asyncExecution, delayAsyncExecutionTotalDuration, counter, c); - System.err.printf("asyncMethod2 returned in %s%n", start.elapsed()); + asyncPartOfIteration(counter, executionType, delayAsyncExecutionTotalDuration, c); + System.err.printf("\tasyncPartOfIteration returned in %s%n", start.elapsed()); }, () -> !counter.done()).finish(callback); } - private static void asyncMethod2( - final boolean asyncExecution, - final Duration delayAsyncExecutionTotalDuration, + private static void asyncPartOfIteration( final Counter counter, + final IterationExecutionType executionType, + final Duration delayAsyncExecutionTotalDuration, final SingleResultCallback callback) { - Runnable action = () -> { - sleep(delayAsyncExecutionTotalDuration.dividedBy(counter.initial())); + Runnable asyncPartOfIteration = () -> { counter.countDown(); StartTime start = StartTime.now(); callback.complete(callback); - System.err.printf("asyncMethod2 callback.complete returned in %s%n", start.elapsed()); + System.err.printf("\tasyncPartOfIteration callback.complete returned in %s%n", start.elapsed()); }; - if (asyncExecution) { - ForkJoinPool.commonPool().execute(action); - } else { - action.run(); + switch (executionType) { + case SYNC_SAME_THREAD: { + asyncPartOfIteration.run(); + break; + } + case SYNC_DIFFERENT_THREAD: { + Thread guaranteedDifferentThread = new Thread(asyncPartOfIteration); + guaranteedDifferentThread.start(); + join(guaranteedDifferentThread); + break; + } + case ASYNC: { + executor.schedule(asyncPartOfIteration, + delayAsyncExecutionTotalDuration.dividedBy(counter.initial()).toNanos(), TimeUnit.NANOSECONDS); + break; + } + default: { + com.mongodb.assertions.Assertions.fail(executionType.toString()); + } } } @@ -170,6 +200,23 @@ private static String exceptionToString(@Nullable final Throwable t) { } } + private static void complete(final CompletableFuture future, @Nullable final T result, @Nullable final Throwable t) { + if (t != null) { + future.completeExceptionally(t); + } else { + future.complete(result); + } + } + + private static void join(final Thread thread) { + try { + thread.join(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + private static void sleep(final Duration duration) { if (duration.isZero()) { return; @@ -179,7 +226,6 @@ private static void sleep(final Duration duration) { long durationMsPartFromNsPart = TimeUnit.MILLISECONDS.convert(duration.getNano(), TimeUnit.NANOSECONDS); long sleepMs = TimeUnit.MILLISECONDS.convert(duration.getSeconds(), TimeUnit.SECONDS) + durationMsPartFromNsPart; int sleepNs = Math.toIntExact(durationNsPart - TimeUnit.NANOSECONDS.convert(durationMsPartFromNsPart, TimeUnit.MILLISECONDS)); - System.err.printf("sleeping for %d ms %d ns%n", sleepMs, sleepNs); Thread.sleep(sleepMs, sleepNs); } catch (InterruptedException e) { Thread.currentThread().interrupt(); From d41861211b8b2d6b3fa12a9ad5337eb365ccc2f3 Mon Sep 17 00:00:00 2001 From: Valentin Kovalenko Date: Mon, 16 Feb 2026 05:19:43 -0700 Subject: [PATCH 10/15] refactor the `ThreadLocal` approach --- .../async/function/AsyncCallbackLoop.java | 44 +++++++---------- .../com/mongodb/internal/async/VakoTest.java | 47 ++++++++++++++----- 2 files changed, 53 insertions(+), 38 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java b/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java index 477cb4d67b5..dc864dd8e26 100644 --- a/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java +++ b/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java @@ -51,7 +51,7 @@ public AsyncCallbackLoop(final LoopState state, final AsyncCallbackRunnable body @Override public void run(final SingleResultCallback callback) { - body.run(false, new ReusableLoopCallback(callback)); + body.run(false, callback); } private static final class Body { @@ -82,7 +82,7 @@ private Body(final LoopState state, final AsyncCallbackRunnable body) { * *

If another iteration is needed, it is initiated from the callback passed to * {@link #body}{@code .}{@link AsyncCallbackRunnable#run(SingleResultCallback) run} - * by invoking {@link #run(boolean, ReusableLoopCallback)}. + * by invoking {@link #run(boolean, SingleResultCallback)}. * Completing the initiated iteration is {@linkplain SingleResultCallback#onResult(Object, Throwable) invoking} the callback. * Thus, it is guaranteed that all iterations are executed sequentially with each other * (that is, completion of one iteration happens-before initiation of the next one) @@ -109,7 +109,7 @@ private Body(final LoopState state, final AsyncCallbackRunnable body) { *

  • therefore, we would not have an iteration that is executed synchronously but in a different thread.
  • * */ - boolean run(final boolean trampolining, final ReusableLoopCallback callback) { + boolean run(final boolean trampolining, final SingleResultCallback afterLoopCallback) { // The `trampoliningResult` variable must be used only if the initiated iteration is executed synchronously with // the current method, which must be detected separately. // @@ -122,7 +122,7 @@ boolean run(final boolean trampolining, final ReusableLoopCallback callback) { boolean[] trampoliningResult = {false}; sameThreadDetector.set(SameThreadDetectionStatus.PROBING); body.run((r, t) -> { - if (callback.onResult(state, r, t)) { + if (completeIfNeeded(afterLoopCallback, r, t)) { // If we are trampolining, then here we bounce up, trampolining completes and so is the whole loop; // otherwise, the whole loop simply completes. return; @@ -139,9 +139,10 @@ boolean run(final boolean trampolining, final ReusableLoopCallback callback) { sameThreadDetector.remove(); } } + // trampolining boolean anotherIterationNeeded; - do { // trampolining - anotherIterationNeeded = run(true, callback); + do { + anotherIterationNeeded = run(true, afterLoopCallback); } while (anotherIterationNeeded); }); try { @@ -150,39 +151,28 @@ boolean run(final boolean trampolining, final ReusableLoopCallback callback) { sameThreadDetector.remove(); } } - } - - /** - * This callback is allowed to be {@linkplain #onResult(LoopState, Void, Throwable) completed} more than once. - */ - @NotThreadSafe - private static final class ReusableLoopCallback { - private final SingleResultCallback wrapped; - - ReusableLoopCallback(final SingleResultCallback callback) { - wrapped = callback; - } /** - * @return {@code true} iff the {@linkplain ReusableLoopCallback#ReusableLoopCallback(SingleResultCallback) wrapped} - * {@link SingleResultCallback} is {@linkplain SingleResultCallback#onResult(Object, Throwable) completed}. + * @return {@code true} iff the {@code afterLoopCallback} was + * {@linkplain SingleResultCallback#onResult(Object, Throwable) completed}. */ - public boolean onResult(final LoopState state, @Nullable final Void result, @Nullable final Throwable t) { + private boolean completeIfNeeded(final SingleResultCallback afterLoopCallback, + @Nullable final Void result, @Nullable final Throwable t) { if (t != null) { - wrapped.onResult(null, t); + afterLoopCallback.onResult(null, t); return true; } else { - boolean continueLooping; + boolean anotherIterationNeeded; try { - continueLooping = state.advance(); + anotherIterationNeeded = state.advance(); } catch (Throwable e) { - wrapped.onResult(null, e); + afterLoopCallback.onResult(null, e); return true; } - if (continueLooping) { + if (anotherIterationNeeded) { return false; } else { - wrapped.onResult(result, null); + afterLoopCallback.onResult(result, null); return true; } } diff --git a/driver-core/src/test/unit/com/mongodb/internal/async/VakoTest.java b/driver-core/src/test/unit/com/mongodb/internal/async/VakoTest.java index f9b904e005d..98e17aba73b 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/async/VakoTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/async/VakoTest.java @@ -32,6 +32,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import static com.mongodb.internal.async.AsyncRunnable.beginAsync; @@ -77,20 +78,23 @@ private enum IterationExecutionType { SYNC_SAME_THREAD, SYNC_DIFFERENT_THREAD, ASYNC, + MIXED_SYNC_SAME_AND_ASYNC } @ParameterizedTest() @CsvSource({ - "10, 0, SYNC_SAME_THREAD, 0", -// "10, 0, SYNC_DIFFERENT_THREAD, 0", - "10, 0, ASYNC, 4", - "10, 4, ASYNC, 0" + "10, 0, SYNC_SAME_THREAD, 0, true", +// "10, 0, SYNC_DIFFERENT_THREAD, 0, true", + "10, 0, ASYNC, 4, true", + "10, 4, ASYNC, 0, true", + "1_000_000, 0, MIXED_SYNC_SAME_AND_ASYNC, 0, false", }) void testThenRunDoWhileLoop( final int counterInitialValue, final int blockSyncPartOfIterationTotalSeconds, final IterationExecutionType executionType, - final int delayAsyncExecutionTotalSeconds) throws Exception { + final int delayAsyncExecutionTotalSeconds, + final boolean verbose) throws Exception { System.err.printf("baselineStackDepth=%d%n%n", Thread.currentThread().getStackTrace().length); Duration blockSyncPartOfIterationTotalDuration = Duration.ofSeconds(blockSyncPartOfIterationTotalSeconds); com.mongodb.assertions.Assertions.assertTrue( @@ -98,7 +102,8 @@ void testThenRunDoWhileLoop( Duration delayAsyncExecutionTotalDuration = Duration.ofSeconds(delayAsyncExecutionTotalSeconds); StartTime start = StartTime.now(); CompletableFuture join = new CompletableFuture<>(); - asyncLoop(new Counter(counterInitialValue), blockSyncPartOfIterationTotalDuration, executionType, delayAsyncExecutionTotalDuration, + asyncLoop(new Counter(counterInitialValue, verbose), + blockSyncPartOfIterationTotalDuration, executionType, delayAsyncExecutionTotalDuration, verbose, (r, t) -> { System.err.printf("test callback completed callStackDepth=%s, r=%s, t=%s%n", Thread.currentThread().getStackTrace().length, r, exceptionToString(t)); @@ -114,12 +119,15 @@ private static void asyncLoop( final Duration blockSyncPartOfIterationTotalDuration, final IterationExecutionType executionType, final Duration delayAsyncExecutionTotalDuration, + final boolean verbose, final SingleResultCallback callback) { beginAsync().thenRunDoWhileLoop(c -> { sleep(blockSyncPartOfIterationTotalDuration.dividedBy(counter.initial())); StartTime start = StartTime.now(); - asyncPartOfIteration(counter, executionType, delayAsyncExecutionTotalDuration, c); - System.err.printf("\tasyncPartOfIteration returned in %s%n", start.elapsed()); + asyncPartOfIteration(counter, executionType, delayAsyncExecutionTotalDuration, verbose, c); + if (verbose) { + System.err.printf("\tasyncPartOfIteration returned in %s%n", start.elapsed()); + } }, () -> !counter.done()).finish(callback); } @@ -127,12 +135,15 @@ private static void asyncPartOfIteration( final Counter counter, final IterationExecutionType executionType, final Duration delayAsyncExecutionTotalDuration, + final boolean verbose, final SingleResultCallback callback) { Runnable asyncPartOfIteration = () -> { counter.countDown(); StartTime start = StartTime.now(); callback.complete(callback); - System.err.printf("\tasyncPartOfIteration callback.complete returned in %s%n", start.elapsed()); + if (verbose) { + System.err.printf("\tasyncPartOfIteration callback.complete returned in %s%n", start.elapsed()); + } }; switch (executionType) { case SYNC_SAME_THREAD: { @@ -150,6 +161,15 @@ private static void asyncPartOfIteration( delayAsyncExecutionTotalDuration.dividedBy(counter.initial()).toNanos(), TimeUnit.NANOSECONDS); break; } + case MIXED_SYNC_SAME_AND_ASYNC: { + if (ThreadLocalRandom.current().nextBoolean()) { + asyncPartOfIteration.run(); + } else { + executor.schedule(asyncPartOfIteration, + delayAsyncExecutionTotalDuration.dividedBy(counter.initial()).toNanos(), TimeUnit.NANOSECONDS); + } + break; + } default: { com.mongodb.assertions.Assertions.fail(executionType.toString()); } @@ -159,10 +179,12 @@ private static void asyncPartOfIteration( private static final class Counter { private final int initial; private int current; + private final boolean verbose; - Counter(final int initial) { + Counter(final int initial, final boolean verbose) { this.initial = initial; this.current = initial; + this.verbose = verbose; } int initial() { @@ -173,7 +195,10 @@ void countDown() { com.mongodb.assertions.Assertions.assertTrue(current > 0); int previous = current; int decremented = --current; - System.err.printf("counted %d->%d callStackDepth=%d %n", previous, decremented, Thread.currentThread().getStackTrace().length); + if (verbose || decremented % 100_000 == 0) { + System.err.printf("counted %d->%d callStackDepth=%d %n", + previous, decremented, Thread.currentThread().getStackTrace().length); + } } boolean done() { From e70e375832acee7bf2b8dd20532957237f6e1261 Mon Sep 17 00:00:00 2001 From: Valentin Kovalenko Date: Mon, 16 Feb 2026 12:18:33 -0700 Subject: [PATCH 11/15] further refactor and improve the code --- .../async/function/AsyncCallbackLoop.java | 227 +++++++++--------- .../com/mongodb/internal/async/VakoTest.java | 15 +- 2 files changed, 118 insertions(+), 124 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java b/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java index dc864dd8e26..9be6991e4b9 100644 --- a/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java +++ b/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java @@ -39,143 +39,136 @@ */ @NotThreadSafe public final class AsyncCallbackLoop implements AsyncCallbackRunnable { - private final Body body; + private final LoopState state; + private final AsyncCallbackRunnable body; + private final ThreadLocal sameThreadDetector; /** * @param state The {@link LoopState} to be deemed as initial for the purpose of the new {@link AsyncCallbackLoop}. * @param body The body of the loop. */ public AsyncCallbackLoop(final LoopState state, final AsyncCallbackRunnable body) { - this.body = new Body(state, body); + this.body = body; + this.state = state; + sameThreadDetector = ThreadLocal.withInitial(() -> SameThreadDetectionStatus.NEGATIVE); } @Override public void run(final SingleResultCallback callback) { - body.run(false, callback); + run(false, callback); } - private static final class Body { - private final AsyncCallbackRunnable body; - private final LoopState state; - private final ThreadLocal sameThreadDetector; - - private enum SameThreadDetectionStatus { - NEGATIVE, - PROBING, - POSITIVE - } - - private Body(final LoopState state, final AsyncCallbackRunnable body) { - this.body = body; - this.state = state; - sameThreadDetector = ThreadLocal.withInitial(() -> SameThreadDetectionStatus.NEGATIVE); - } - - /** - * Initiates a new iteration of the loop by invoking - * {@link #body}{@code .}{@link AsyncCallbackRunnable#run(SingleResultCallback) run}. - * The initiated iteration may be executed either synchronously or asynchronously with the method that initiated it: - *
      - *
    • synchronous execution—completion of the initiated iteration happens-before the method completion;
    • - *
    • asynchronous execution—the aforementioned relation does not exist.
    • - *
    - * - *

    If another iteration is needed, it is initiated from the callback passed to - * {@link #body}{@code .}{@link AsyncCallbackRunnable#run(SingleResultCallback) run} - * by invoking {@link #run(boolean, SingleResultCallback)}. - * Completing the initiated iteration is {@linkplain SingleResultCallback#onResult(Object, Throwable) invoking} the callback. - * Thus, it is guaranteed that all iterations are executed sequentially with each other - * (that is, completion of one iteration happens-before initiation of the next one) - * regardless of them being executed synchronously or asynchronously with the method that initiated them. - * - *

    Initiating any but the {@linkplain LoopState#isFirstIteration() first} iteration is done using trampolining, - * which allows us to do it iteratively rather than recursively, if iterations are executed synchronously, - * and ensures stack usage does not increase with the number of iterations. - * - * @return {@code true} iff it is known that another iteration must be initiated. - * This information is used only for trampolining, and is available only if the iteration executed synchronously. - * - *

    It is impossible to detect whether an iteration is executed synchronously. - * It is, however, possible to detect whether an iteration is executed in the same thread as the method that initiated it, - * and we use it as proxy indicator of synchronous execution. Unfortunately, this means we do not support and behave incorrectly - * if an iteration is executed synchronously but in a thread different from the one in which the method that - * initiated the iteration was invoked. - * - *

    The above limitation should not be a problem in practice: - *

      - *
    • the only way to execute an iteration synchronously but in a different thread is to block the thread that - * initiated the iteration by waiting for completion of the iteration by that other thread;
    • - *
    • blocking a thread is forbidden in asynchronous code, and we do not do it;
    • - *
    • therefore, we would not have an iteration that is executed synchronously but in a different thread.
    • - *
    - */ - boolean run(final boolean trampolining, final SingleResultCallback afterLoopCallback) { - // The `trampoliningResult` variable must be used only if the initiated iteration is executed synchronously with - // the current method, which must be detected separately. - // - // It may be tempting to detect whether the iteration was executed synchronously by reading from the variable - // and observing a write that is part of the callback execution. However, if the iteration is executed asynchronously with - // the current method, then the aforementioned conflicting write and read actions are not ordered by - // the happens-before relation, the execution contains a data race and the read is allowed to observe the write. - // If such observation happens when the iteration is executed asynchronously, then we have a false positive. - // Furthermore, depending on the nature of the value read, it may not be trustworthy. - boolean[] trampoliningResult = {false}; - sameThreadDetector.set(SameThreadDetectionStatus.PROBING); - body.run((r, t) -> { - if (completeIfNeeded(afterLoopCallback, r, t)) { - // If we are trampolining, then here we bounce up, trampolining completes and so is the whole loop; - // otherwise, the whole loop simply completes. + /** + * Initiates a new iteration of the loop by invoking + * {@link #body}{@code .}{@link AsyncCallbackRunnable#run(SingleResultCallback) run}. + * The initiated iteration may be executed either synchronously or asynchronously with the method that initiated it: + *
      + *
    • synchronous execution—completion of the initiated iteration happens-before the method completion;
    • + *
    • asynchronous execution—the aforementioned relation does not exist.
    • + *
    + * + *

    If another iteration is needed, it is initiated from the callback passed to + * {@link #body}{@code .}{@link AsyncCallbackRunnable#run(SingleResultCallback) run} + * by invoking {@link #run(boolean, SingleResultCallback)}. + * Completing the initiated iteration is {@linkplain SingleResultCallback#onResult(Object, Throwable) invoking} the callback. + * Thus, it is guaranteed that all iterations are executed sequentially with each other + * (that is, completion of one iteration happens-before initiation of the next one) + * regardless of them being executed synchronously or asynchronously with the method that initiated them. + * + *

    Initiating any but the {@linkplain LoopState#isFirstIteration() first} iteration is done using trampolining, + * which allows us to do it iteratively rather than recursively, if iterations are executed synchronously, + * and ensures stack usage does not increase with the number of iterations. + * + * @return {@code true} iff it is known that another iteration must be initiated. + * This information is used only for trampolining, and is available only if the iteration executed synchronously. + * + *

    It is impossible to detect whether an iteration is executed synchronously. + * It is, however, possible to detect whether an iteration is executed in the same thread as the method that initiated it, + * and we use this as a proxy indicator of synchronous execution. Unfortunately, this means we do not support / behave incorrectly + * if an iteration is executed synchronously but in a thread different from the one in which the method that + * initiated the iteration was invoked. + * + *

    The above limitation should not be a problem in practice: + *

      + *
    • the only way to execute an iteration synchronously but in a different thread is to block the thread that + * initiated the iteration by waiting for completion of the iteration by that other thread;
    • + *
    • blocking a thread is forbidden in asynchronous code, and we do not do it;
    • + *
    • therefore, we would not have an iteration that is executed synchronously but in a different thread.
    • + *
    + */ + boolean run(final boolean trampolining, final SingleResultCallback afterLoopCallback) { + // The `trampoliningResult` variable must be used only if the initiated iteration is executed synchronously with + // the current method, which must be detected separately. + // + // It may be tempting to detect whether the iteration was executed synchronously by reading from the variable + // and observing a write that is part of the callback execution. However, if the iteration is executed asynchronously with + // the current method, then the aforementioned conflicting write and read actions are not ordered by + // the happens-before relation, the execution contains a data race and the read is allowed to observe the write. + // If such observation happens when the iteration is executed asynchronously, then we have a false positive. + // Furthermore, depending on the nature of the value read, it may not be trustworthy. + boolean[] trampoliningResult = {false}; + sameThreadDetector.set(SameThreadDetectionStatus.PROBING); + body.run((r, t) -> { + if (completeIfNeeded(afterLoopCallback, r, t)) { + // Bounce if we are trampolining and the iteration was executed synchronously, + // trampolining completes and so is the whole loop; + // otherwise, the whole loop simply completes. + return; + } + if (trampolining) { + boolean sameThread = sameThreadDetector.get().equals(SameThreadDetectionStatus.PROBING); + if (sameThread) { + // Bounce if we are trampolining and the iteration was executed synchronously; + // otherwise proceed to initiate trampolining. + sameThreadDetector.set(SameThreadDetectionStatus.POSITIVE); + trampoliningResult[0] = true; return; + } else { + sameThreadDetector.remove(); } - if (trampolining) { - boolean sameThread = sameThreadDetector.get().equals(SameThreadDetectionStatus.PROBING); - if (sameThread) { - // Bounce up if we are trampolining and the iteration was executed synchronously; - // otherwise proceed to begin trampolining. - sameThreadDetector.set(SameThreadDetectionStatus.POSITIVE); - trampoliningResult[0] = true; - return; - } else { - sameThreadDetector.remove(); - } - } - // trampolining - boolean anotherIterationNeeded; - do { - anotherIterationNeeded = run(true, afterLoopCallback); - } while (anotherIterationNeeded); - }); - try { - return sameThreadDetector.get().equals(SameThreadDetectionStatus.POSITIVE) && trampoliningResult[0]; - } finally { - sameThreadDetector.remove(); } + // initiate trampolining + boolean anotherIterationNeeded; + do { + anotherIterationNeeded = run(true, afterLoopCallback); + } while (anotherIterationNeeded); + }); + try { + return sameThreadDetector.get().equals(SameThreadDetectionStatus.POSITIVE) && trampoliningResult[0]; + } finally { + sameThreadDetector.remove(); } + } - /** - * @return {@code true} iff the {@code afterLoopCallback} was - * {@linkplain SingleResultCallback#onResult(Object, Throwable) completed}. - */ - private boolean completeIfNeeded(final SingleResultCallback afterLoopCallback, - @Nullable final Void result, @Nullable final Throwable t) { - if (t != null) { - afterLoopCallback.onResult(null, t); + /** + * @return {@code true} iff the {@code afterLoopCallback} was + * {@linkplain SingleResultCallback#onResult(Object, Throwable) completed}. + */ + private boolean completeIfNeeded(final SingleResultCallback afterLoopCallback, + @Nullable final Void result, @Nullable final Throwable t) { + if (t != null) { + afterLoopCallback.onResult(null, t); + return true; + } else { + boolean anotherIterationNeeded; + try { + anotherIterationNeeded = state.advance(); + } catch (Throwable e) { + afterLoopCallback.onResult(null, e); return true; + } + if (anotherIterationNeeded) { + return false; } else { - boolean anotherIterationNeeded; - try { - anotherIterationNeeded = state.advance(); - } catch (Throwable e) { - afterLoopCallback.onResult(null, e); - return true; - } - if (anotherIterationNeeded) { - return false; - } else { - afterLoopCallback.onResult(result, null); - return true; - } + afterLoopCallback.onResult(result, null); + return true; } } } + + private enum SameThreadDetectionStatus { + NEGATIVE, + PROBING, + POSITIVE + } } diff --git a/driver-core/src/test/unit/com/mongodb/internal/async/VakoTest.java b/driver-core/src/test/unit/com/mongodb/internal/async/VakoTest.java index 98e17aba73b..7dea7175b5f 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/async/VakoTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/async/VakoTest.java @@ -83,11 +83,12 @@ private enum IterationExecutionType { @ParameterizedTest() @CsvSource({ - "10, 0, SYNC_SAME_THREAD, 0, true", -// "10, 0, SYNC_DIFFERENT_THREAD, 0, true", - "10, 0, ASYNC, 4, true", - "10, 4, ASYNC, 0, true", + "1_000_000, 0, SYNC_SAME_THREAD, 0, false", +// "1_000_000, 0, SYNC_DIFFERENT_THREAD, 0, false", + "1_000_000, 0, ASYNC, 0, false", "1_000_000, 0, MIXED_SYNC_SAME_AND_ASYNC, 0, false", + "4, 0, ASYNC, 4, true", + "4, 4, ASYNC, 0, true", }) void testThenRunDoWhileLoop( final int counterInitialValue, @@ -109,7 +110,7 @@ void testThenRunDoWhileLoop( Thread.currentThread().getStackTrace().length, r, exceptionToString(t)); complete(join, r, t); }); - System.err.printf("\tasyncLoop returned in %s%n", start.elapsed()); + System.err.printf("\tasyncLoop method completed in %s%n", start.elapsed()); join.get(); System.err.printf("%n%nDONE%n%n"); } @@ -126,7 +127,7 @@ private static void asyncLoop( StartTime start = StartTime.now(); asyncPartOfIteration(counter, executionType, delayAsyncExecutionTotalDuration, verbose, c); if (verbose) { - System.err.printf("\tasyncPartOfIteration returned in %s%n", start.elapsed()); + System.err.printf("\tasyncPartOfIteration method completed in %s%n", start.elapsed()); } }, () -> !counter.done()).finish(callback); } @@ -142,7 +143,7 @@ private static void asyncPartOfIteration( StartTime start = StartTime.now(); callback.complete(callback); if (verbose) { - System.err.printf("\tasyncPartOfIteration callback.complete returned in %s%n", start.elapsed()); + System.err.printf("\tasyncPartOfIteration callback.complete method completed in %s%n", start.elapsed()); } }; switch (executionType) { From 92548c16457976e190664563480316095d7aa6a8 Mon Sep 17 00:00:00 2001 From: Valentin Kovalenko Date: Mon, 16 Feb 2026 21:34:55 -0700 Subject: [PATCH 12/15] improve docs --- .../async/function/AsyncCallbackLoop.java | 26 +++++++++++++++++-- .../com/mongodb/internal/async/VakoTest.java | 22 +++++++++------- 2 files changed, 37 insertions(+), 11 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java b/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java index 9be6991e4b9..447869e5941 100644 --- a/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java +++ b/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java @@ -63,8 +63,30 @@ public void run(final SingleResultCallback callback) { * {@link #body}{@code .}{@link AsyncCallbackRunnable#run(SingleResultCallback) run}. * The initiated iteration may be executed either synchronously or asynchronously with the method that initiated it: *
      - *
    • synchronous execution—completion of the initiated iteration happens-before the method completion;
    • - *
    • asynchronous execution—the aforementioned relation does not exist.
    • + *
    • synchronous execution—completion of the initiated iteration is guaranteed to happen-before the method completion; + *
        + *
      • Note that the formulations + *
          + *
        1. "completion of the initiated iteration is guaranteed to happen-before the method completion"
        2. + *
        3. "completion of the initiated iteration happens-before the method completion"
        4. + *
        + * are different: the former is about the program while the latter is about the execution, and follows from the former. + * For us the former is useful. + *
      • + *
      + *
    • + *
    • asynchronous execution—the aforementioned guarantee does not exist. + *
        + *
      • Note that the formulations + *
          + *
        1. "the aforementioned guarantee does not exist"
        2. + *
        3. "the aforementioned relation does not exist"
        4. + *
        + * are different: the former is about the program while the latter is about the execution, and follows from the former. + * For us the former is useful. + *
      • + *
      + *
    • *
    * *

    If another iteration is needed, it is initiated from the callback passed to diff --git a/driver-core/src/test/unit/com/mongodb/internal/async/VakoTest.java b/driver-core/src/test/unit/com/mongodb/internal/async/VakoTest.java index 7dea7175b5f..5c061f542af 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/async/VakoTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/async/VakoTest.java @@ -78,7 +78,7 @@ private enum IterationExecutionType { SYNC_SAME_THREAD, SYNC_DIFFERENT_THREAD, ASYNC, - MIXED_SYNC_SAME_AND_ASYNC + MIXED_SYNC_SAME_THREAD_AND_ASYNC } @ParameterizedTest() @@ -86,7 +86,7 @@ private enum IterationExecutionType { "1_000_000, 0, SYNC_SAME_THREAD, 0, false", // "1_000_000, 0, SYNC_DIFFERENT_THREAD, 0, false", "1_000_000, 0, ASYNC, 0, false", - "1_000_000, 0, MIXED_SYNC_SAME_AND_ASYNC, 0, false", + "1_000_000, 0, MIXED_SYNC_SAME_THREAD_AND_ASYNC, 0, false", "4, 0, ASYNC, 4, true", "4, 4, ASYNC, 0, true", }) @@ -105,10 +105,10 @@ void testThenRunDoWhileLoop( CompletableFuture join = new CompletableFuture<>(); asyncLoop(new Counter(counterInitialValue, verbose), blockSyncPartOfIterationTotalDuration, executionType, delayAsyncExecutionTotalDuration, verbose, - (r, t) -> { - System.err.printf("test callback completed callStackDepth=%s, r=%s, t=%s%n", - Thread.currentThread().getStackTrace().length, r, exceptionToString(t)); - complete(join, r, t); + (r, t) -> { + System.err.printf("test callback completed callStackDepth=%s, r=%s, t=%s%n", + Thread.currentThread().getStackTrace().length, r, exceptionToString(t)); + complete(join, r, t); }); System.err.printf("\tasyncLoop method completed in %s%n", start.elapsed()); join.get(); @@ -162,7 +162,7 @@ private static void asyncPartOfIteration( delayAsyncExecutionTotalDuration.dividedBy(counter.initial()).toNanos(), TimeUnit.NANOSECONDS); break; } - case MIXED_SYNC_SAME_AND_ASYNC: { + case MIXED_SYNC_SAME_THREAD_AND_ASYNC: { if (ThreadLocalRandom.current().nextBoolean()) { asyncPartOfIteration.run(); } else { @@ -180,11 +180,13 @@ private static void asyncPartOfIteration( private static final class Counter { private final int initial; private int current; + private boolean doneReturnedTrue; private final boolean verbose; Counter(final int initial, final boolean verbose) { this.initial = initial; this.current = initial; + this.doneReturnedTrue = false; this.verbose = verbose; } @@ -197,14 +199,16 @@ void countDown() { int previous = current; int decremented = --current; if (verbose || decremented % 100_000 == 0) { - System.err.printf("counted %d->%d callStackDepth=%d %n", - previous, decremented, Thread.currentThread().getStackTrace().length); + System.err.printf("counted %d->%d tid=%d callStackDepth=%d %n", + previous, decremented, Thread.currentThread().getId(), Thread.currentThread().getStackTrace().length); } } boolean done() { if (current == 0) { + com.mongodb.assertions.Assertions.assertFalse(doneReturnedTrue); System.err.printf("counting done callStackDepth=%d %n", Thread.currentThread().getStackTrace().length); + doneReturnedTrue = true; return true; } return false; From 4ab2ad06596e735242ba513a2f73fbd46ae13470 Mon Sep 17 00:00:00 2001 From: Valentin Kovalenko Date: Tue, 17 Feb 2026 09:05:17 -0700 Subject: [PATCH 13/15] improve docs --- .../internal/async/function/AsyncCallbackLoop.java | 8 ++++++-- .../test/unit/com/mongodb/internal/async/VakoTest.java | 2 +- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java b/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java index 447869e5941..4ca496794af 100644 --- a/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java +++ b/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java @@ -128,13 +128,17 @@ boolean run(final boolean trampolining, final SingleResultCallback afterLo // the happens-before relation, the execution contains a data race and the read is allowed to observe the write. // If such observation happens when the iteration is executed asynchronously, then we have a false positive. // Furthermore, depending on the nature of the value read, it may not be trustworthy. + // + // Making `trampoliningResult` a `volatile`, or even making it an `AtomicReference`/`AtomicInteger` and calling `compareAndSet` + // does not resolve the issue: it gets rid of the data race, but still leave us with a race condition + // that allows for false positives. boolean[] trampoliningResult = {false}; sameThreadDetector.set(SameThreadDetectionStatus.PROBING); body.run((r, t) -> { if (completeIfNeeded(afterLoopCallback, r, t)) { // Bounce if we are trampolining and the iteration was executed synchronously, - // trampolining completes and so is the whole loop; - // otherwise, the whole loop simply completes. + // trampolining completes and so is the loop; + // otherwise, the loop simply completes. return; } if (trampolining) { diff --git a/driver-core/src/test/unit/com/mongodb/internal/async/VakoTest.java b/driver-core/src/test/unit/com/mongodb/internal/async/VakoTest.java index 5c061f542af..b2f8977096b 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/async/VakoTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/async/VakoTest.java @@ -42,7 +42,7 @@ class VakoTest { @BeforeAll static void beforeAll() { - executor = Executors.newScheduledThreadPool(2); + executor = Executors.newScheduledThreadPool(1); } @AfterAll From 0020c7eb50cc540620589f36a8afe5fb6547e666 Mon Sep 17 00:00:00 2001 From: Valentin Kovalenko Date: Wed, 18 Feb 2026 03:27:45 -0700 Subject: [PATCH 14/15] improve the test --- .../mongodb/internal/async/AsyncLoopTest.java | 394 ++++++++++++++++++ .../com/mongodb/internal/async/VakoTest.java | 265 ------------ 2 files changed, 394 insertions(+), 265 deletions(-) create mode 100644 driver-core/src/test/unit/com/mongodb/internal/async/AsyncLoopTest.java delete mode 100644 driver-core/src/test/unit/com/mongodb/internal/async/VakoTest.java diff --git a/driver-core/src/test/unit/com/mongodb/internal/async/AsyncLoopTest.java b/driver-core/src/test/unit/com/mongodb/internal/async/AsyncLoopTest.java new file mode 100644 index 00000000000..f8ea79e0e09 --- /dev/null +++ b/driver-core/src/test/unit/com/mongodb/internal/async/AsyncLoopTest.java @@ -0,0 +1,394 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.mongodb.internal.async; + +import com.mongodb.internal.async.function.AsyncCallbackLoop; +import com.mongodb.internal.async.function.LoopState; +import com.mongodb.internal.time.StartTime; +import com.mongodb.lang.Nullable; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.time.Duration; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import static com.mongodb.internal.async.AsyncRunnable.beginAsync; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class AsyncLoopTest { + private static final int MAX_STACK_DEPTH = 500; + + @ParameterizedTest + @CsvSource({ + "10" + }) + void testDemo(final int iterations) throws Exception { + System.err.printf("baselineStackDepth=%d%n%n", Thread.currentThread().getStackTrace().length); + CompletableFuture join = new CompletableFuture<>(); + LoopState loopState = new LoopState(); + new AsyncCallbackLoop(loopState, c -> { + int iteration = loopState.iteration(); + System.err.printf("iteration=%d, callStackDepth=%d%n", iteration, Thread.currentThread().getStackTrace().length); + if (!loopState.breakAndCompleteIf(() -> iteration == (iterations - 1), c)) { + c.complete(c); + } + }).run((r, t) -> { + System.err.printf("test callback completed callStackDepth=%d, r=%s, t=%s%n", + Thread.currentThread().getStackTrace().length, r, exceptionToString(t)); + complete(join, r, t); + }); + join.get(); + System.err.printf("%nDONE%n%n"); + } + + private enum IterationExecutionType { + SYNC_SAME_THREAD, + SYNC_DIFFERENT_THREAD, + ASYNC, + MIXED_SYNC_SAME_THREAD_AND_ASYNC + } + + private enum Verbocity { + VERBOSE, + COMPACT; + + /** + * Every {@value}s message is printed. + */ + private static final int COMPACTNESS = 50_000; + } + + private enum ThreadManagement { + NEW_THREAD_PER_TASK, + REUSE_THREADS + } + + @ParameterizedTest() + @CsvSource({ + "250_000, 0, SYNC_SAME_THREAD, 0, COMPACT, 0, REUSE_THREADS", + "250_000, 0, ASYNC, 0, COMPACT, 0, NEW_THREAD_PER_TASK", + "250_000, 0, ASYNC, 0, COMPACT, 1, REUSE_THREADS", + "250_000, 0, ASYNC, 0, COMPACT, 2, REUSE_THREADS", + "250_000, 0, MIXED_SYNC_SAME_THREAD_AND_ASYNC, 0, COMPACT, 0, NEW_THREAD_PER_TASK", + "250_000, 0, MIXED_SYNC_SAME_THREAD_AND_ASYNC, 0, COMPACT, 1, REUSE_THREADS", + "4, 0, ASYNC, 4, VERBOSE, 1, REUSE_THREADS", + "4, 4, ASYNC, 0, VERBOSE, 1, REUSE_THREADS", + "250_000, 0, SYNC_DIFFERENT_THREAD, 0, COMPACT, 0, NEW_THREAD_PER_TASK", + "250_000, 0, SYNC_DIFFERENT_THREAD, 0, COMPACT, 1, REUSE_THREADS", + }) + void thenRunDoWhileLoopTest( + final int counterInitialValue, + final int blockSyncPartOfIterationTotalSeconds, + final IterationExecutionType executionType, + final int delayAsyncExecutionTotalSeconds, + final Verbocity verbocity, + final int executorSize, + final ThreadManagement threadManagement) throws Exception { + Duration blockSyncPartOfIterationTotalDuration = Duration.ofSeconds(blockSyncPartOfIterationTotalSeconds); + if (executionType.equals(IterationExecutionType.SYNC_DIFFERENT_THREAD)) { + com.mongodb.assertions.Assertions.assertTrue( + (executorSize > 0 && threadManagement.equals(ThreadManagement.REUSE_THREADS)) + || (executorSize == 0 && threadManagement.equals(ThreadManagement.NEW_THREAD_PER_TASK))); + } + if (executionType.equals(IterationExecutionType.SYNC_SAME_THREAD)) { + com.mongodb.assertions.Assertions.assertTrue(executorSize == 0); + com.mongodb.assertions.Assertions.assertTrue(threadManagement.equals(ThreadManagement.REUSE_THREADS)); + } + if (!executionType.equals(IterationExecutionType.ASYNC)) { + com.mongodb.assertions.Assertions.assertTrue(delayAsyncExecutionTotalSeconds == 0); + } + if (threadManagement.equals(ThreadManagement.NEW_THREAD_PER_TASK)) { + com.mongodb.assertions.Assertions.assertTrue(executorSize == 0); + } + Duration delayAsyncExecutionTotalDuration = Duration.ofSeconds(delayAsyncExecutionTotalSeconds); + ScheduledExecutor executor = executorSize == 0 ? null : new ScheduledExecutor(executorSize, threadManagement); + try { + System.err.printf("baselineStackDepth=%d%n%n", Thread.currentThread().getStackTrace().length); + StartTime start = StartTime.now(); + CompletableFuture join = new CompletableFuture<>(); + asyncLoop(new Counter(counterInitialValue, verbocity), + blockSyncPartOfIterationTotalDuration, executionType, delayAsyncExecutionTotalDuration, verbocity, executor, + (r, t) -> { + int stackDepth = Thread.currentThread().getStackTrace().length; + System.err.printf("test callback completed callStackDepth=%s, r=%s, t=%s%n", + stackDepth, r, exceptionToString(t)); + assertTrue(stackDepth <= MAX_STACK_DEPTH); + complete(join, r, t); + }); + System.err.printf("\tasyncLoop method completed in %s%n", start.elapsed()); + join.get(); + System.err.printf("%nDONE%n%n"); + } finally { + if (executor != null) { + executor.shutdownNow(); + com.mongodb.assertions.Assertions.assertTrue(executor.awaitTermination(1, TimeUnit.MINUTES)); + } + } + } + + private static void asyncLoop( + final Counter counter, + final Duration blockSyncPartOfIterationTotalDuration, + final IterationExecutionType executionType, + final Duration delayAsyncExecutionTotalDuration, + final Verbocity verbocity, + @Nullable + final ScheduledExecutor executor, + final SingleResultCallback callback) { + beginAsync().thenRunDoWhileLoop(c -> { + sleep(blockSyncPartOfIterationTotalDuration.dividedBy(counter.initial())); + StartTime start = StartTime.now(); + asyncPartOfIteration(counter, executionType, delayAsyncExecutionTotalDuration, verbocity, executor, c); + if (verbocity.equals(Verbocity.VERBOSE)) { + System.err.printf("\tasyncPartOfIteration method completed in %s%n", start.elapsed()); + } + }, () -> !counter.done()).finish(callback); + } + + private static void asyncPartOfIteration( + final Counter counter, + final IterationExecutionType executionType, + final Duration delayAsyncExecutionTotalDuration, + final Verbocity verbocity, + @Nullable + final ScheduledExecutor executor, + final SingleResultCallback callback) { + Runnable asyncPartOfIteration = () -> { + counter.countDown(); + StartTime start = StartTime.now(); + callback.complete(callback); + if (verbocity.equals(Verbocity.VERBOSE)) { + System.err.printf("\tasyncPartOfIteration callback.complete method completed in %s%n", start.elapsed()); + } + }; + switch (executionType) { + case SYNC_SAME_THREAD: { + asyncPartOfIteration.run(); + break; + } + case SYNC_DIFFERENT_THREAD: { + if (executor == null) { + Thread thread = new Thread(asyncPartOfIteration); + thread.start(); + join(thread); + } else { + join(executor.submit(asyncPartOfIteration)); + } + break; + } + case ASYNC: { + if (executor == null) { + Thread thread = new Thread(() -> { + sleep(delayAsyncExecutionTotalDuration.dividedBy(counter.initial())); + asyncPartOfIteration.run(); + }); + thread.start(); + } else { + com.mongodb.assertions.Assertions.assertNotNull(executor).schedule(asyncPartOfIteration, + delayAsyncExecutionTotalDuration.dividedBy(counter.initial()).toNanos(), TimeUnit.NANOSECONDS); + } + break; + } + case MIXED_SYNC_SAME_THREAD_AND_ASYNC: { + if (ThreadLocalRandom.current().nextBoolean()) { + asyncPartOfIteration.run(); + } else { + if (executor == null) { + Thread thread = new Thread(() -> { + sleep(delayAsyncExecutionTotalDuration.dividedBy(counter.initial())); + asyncPartOfIteration.run(); + }); + thread.start(); + } else { + com.mongodb.assertions.Assertions.assertNotNull(executor).schedule(asyncPartOfIteration, + delayAsyncExecutionTotalDuration.dividedBy(counter.initial()).toNanos(), TimeUnit.NANOSECONDS); + } + } + break; + } + default: { + com.mongodb.assertions.Assertions.fail(executionType.toString()); + } + } + } + + private static final class Counter { + private final int initial; + private int current; + private boolean doneReturnedTrue; + private final Verbocity verbocity; + + Counter(final int initial, final Verbocity verbocity) { + this.initial = initial; + this.current = initial; + this.doneReturnedTrue = false; + this.verbocity = verbocity; + } + + int initial() { + return initial; + } + + void countDown() { + com.mongodb.assertions.Assertions.assertTrue(current > 0); + int previous = current; + int decremented = --current; + if (verbocity.equals(Verbocity.VERBOSE) || decremented % Verbocity.COMPACTNESS == 0) { + int stackDepth = Thread.currentThread().getStackTrace().length; + assertTrue(stackDepth <= MAX_STACK_DEPTH); + System.err.printf("counted %d->%d tid=%d callStackDepth=%d %n", + previous, decremented, Thread.currentThread().getId(), stackDepth); + } + } + + boolean done() { + if (current == 0) { + com.mongodb.assertions.Assertions.assertFalse(doneReturnedTrue); + int stackDepth = Thread.currentThread().getStackTrace().length; + assertTrue(stackDepth <= MAX_STACK_DEPTH); + System.err.printf("counting done callStackDepth=%d %n", stackDepth); + doneReturnedTrue = true; + return true; + } + return false; + } + } + + private static String exceptionToString(@Nullable final Throwable t) { + if (t == null) { + return Objects.toString(null); + } + try (StringWriter sw = new StringWriter(); + PrintWriter pw = new PrintWriter(sw)) { +// t.printStackTrace(pw); + pw.println(t); + pw.flush(); + return sw.toString(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private static void complete(final CompletableFuture future, @Nullable final T result, @Nullable final Throwable t) { + if (t != null) { + future.completeExceptionally(t); + } else { + future.complete(result); + } + } + + private static void join(final Thread thread) { + try { + thread.join(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + + private static void join(final Future future) { + try { + future.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + + private static void sleep(final Duration duration) { + if (duration.isZero()) { + return; + } + try { + long durationNsPart = duration.getNano(); + long durationMsPartFromNsPart = TimeUnit.MILLISECONDS.convert(duration.getNano(), TimeUnit.NANOSECONDS); + long sleepMs = TimeUnit.MILLISECONDS.convert(duration.getSeconds(), TimeUnit.SECONDS) + durationMsPartFromNsPart; + int sleepNs = Math.toIntExact(durationNsPart - TimeUnit.NANOSECONDS.convert(durationMsPartFromNsPart, TimeUnit.MILLISECONDS)); + Thread.sleep(sleepMs, sleepNs); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + + /** + * This {@link ScheduledThreadPoolExecutor} propagates exceptions that caused termination of a task execution, + * causing the thread that executed the task to be terminated. + */ + private static final class ScheduledExecutor extends ScheduledThreadPoolExecutor { + ScheduledExecutor(final int size, final ThreadManagement threadManagement) { + super(size, r -> { + Thread thread = new Thread(() -> { + r.run(); + if (threadManagement.equals(ThreadManagement.NEW_THREAD_PER_TASK)) { + terminateCurrentThread(); + } + }); + thread.setUncaughtExceptionHandler((t, e) -> { + if (e instanceof ThreadTerminationException) { + return; + } + t.getThreadGroup().uncaughtException(t, e); + }); + return thread; + }); + } + + private static void terminateCurrentThread() { + throw ThreadTerminationException.INSTANCE; + } + + @Override + protected void afterExecute(final Runnable r, final Throwable t) { + if (t instanceof ThreadTerminationException) { + throw (ThreadTerminationException) t; + } else if (r instanceof Future) { + Future future = (Future) r; + if (future.isDone()) { + try { + future.get(); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof ThreadTerminationException) { + throw (ThreadTerminationException) cause; + } + } catch (Throwable e) { + // do nothing, we are not swallowing `e`, btw + } + } + } + } + + private static final class ThreadTerminationException extends RuntimeException { + static final ThreadTerminationException INSTANCE = new ThreadTerminationException(); + + private ThreadTerminationException() { + super(null, null, false, false); + } + } + } +} diff --git a/driver-core/src/test/unit/com/mongodb/internal/async/VakoTest.java b/driver-core/src/test/unit/com/mongodb/internal/async/VakoTest.java deleted file mode 100644 index b2f8977096b..00000000000 --- a/driver-core/src/test/unit/com/mongodb/internal/async/VakoTest.java +++ /dev/null @@ -1,265 +0,0 @@ -/* - * Copyright 2008-present MongoDB, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.mongodb.internal.async; - -import com.mongodb.internal.async.function.AsyncCallbackLoop; -import com.mongodb.internal.async.function.LoopState; -import com.mongodb.internal.time.StartTime; -import com.mongodb.lang.Nullable; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.CsvSource; - -import java.io.IOException; -import java.io.PrintWriter; -import java.io.StringWriter; -import java.time.Duration; -import java.util.Objects; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; - -import static com.mongodb.internal.async.AsyncRunnable.beginAsync; - -class VakoTest { - private static ScheduledExecutorService executor; - - @BeforeAll - static void beforeAll() { - executor = Executors.newScheduledThreadPool(1); - } - - @AfterAll - static void afterAll() throws InterruptedException { - executor.shutdownNow(); - com.mongodb.assertions.Assertions.assertTrue(executor.awaitTermination(1, TimeUnit.MINUTES)); - } - - @ParameterizedTest - @CsvSource({ - "10" - }) - void asyncCallbackLoop(final int iterations) throws Exception { - System.err.printf("baselineStackDepth=%d%n%n", Thread.currentThread().getStackTrace().length); - CompletableFuture join = new CompletableFuture<>(); - LoopState loopState = new LoopState(); - new AsyncCallbackLoop(loopState, c -> { - int iteration = loopState.iteration(); - System.err.printf("iteration=%d, callStackDepth=%d%n", iteration, Thread.currentThread().getStackTrace().length); - if (!loopState.breakAndCompleteIf(() -> iteration == (iterations - 1), c)) { - c.complete(c); - } - }).run((r, t) -> { - System.err.printf("test callback completed callStackDepth=%d, r=%s, t=%s%n", - Thread.currentThread().getStackTrace().length, r, exceptionToString(t)); - complete(join, r, t); - }); - join.get(); - System.err.printf("%n%nDONE%n%n"); - } - - private enum IterationExecutionType { - SYNC_SAME_THREAD, - SYNC_DIFFERENT_THREAD, - ASYNC, - MIXED_SYNC_SAME_THREAD_AND_ASYNC - } - - @ParameterizedTest() - @CsvSource({ - "1_000_000, 0, SYNC_SAME_THREAD, 0, false", -// "1_000_000, 0, SYNC_DIFFERENT_THREAD, 0, false", - "1_000_000, 0, ASYNC, 0, false", - "1_000_000, 0, MIXED_SYNC_SAME_THREAD_AND_ASYNC, 0, false", - "4, 0, ASYNC, 4, true", - "4, 4, ASYNC, 0, true", - }) - void testThenRunDoWhileLoop( - final int counterInitialValue, - final int blockSyncPartOfIterationTotalSeconds, - final IterationExecutionType executionType, - final int delayAsyncExecutionTotalSeconds, - final boolean verbose) throws Exception { - System.err.printf("baselineStackDepth=%d%n%n", Thread.currentThread().getStackTrace().length); - Duration blockSyncPartOfIterationTotalDuration = Duration.ofSeconds(blockSyncPartOfIterationTotalSeconds); - com.mongodb.assertions.Assertions.assertTrue( - executionType.equals(IterationExecutionType.ASYNC) || delayAsyncExecutionTotalSeconds == 0); - Duration delayAsyncExecutionTotalDuration = Duration.ofSeconds(delayAsyncExecutionTotalSeconds); - StartTime start = StartTime.now(); - CompletableFuture join = new CompletableFuture<>(); - asyncLoop(new Counter(counterInitialValue, verbose), - blockSyncPartOfIterationTotalDuration, executionType, delayAsyncExecutionTotalDuration, verbose, - (r, t) -> { - System.err.printf("test callback completed callStackDepth=%s, r=%s, t=%s%n", - Thread.currentThread().getStackTrace().length, r, exceptionToString(t)); - complete(join, r, t); - }); - System.err.printf("\tasyncLoop method completed in %s%n", start.elapsed()); - join.get(); - System.err.printf("%n%nDONE%n%n"); - } - - private static void asyncLoop( - final Counter counter, - final Duration blockSyncPartOfIterationTotalDuration, - final IterationExecutionType executionType, - final Duration delayAsyncExecutionTotalDuration, - final boolean verbose, - final SingleResultCallback callback) { - beginAsync().thenRunDoWhileLoop(c -> { - sleep(blockSyncPartOfIterationTotalDuration.dividedBy(counter.initial())); - StartTime start = StartTime.now(); - asyncPartOfIteration(counter, executionType, delayAsyncExecutionTotalDuration, verbose, c); - if (verbose) { - System.err.printf("\tasyncPartOfIteration method completed in %s%n", start.elapsed()); - } - }, () -> !counter.done()).finish(callback); - } - - private static void asyncPartOfIteration( - final Counter counter, - final IterationExecutionType executionType, - final Duration delayAsyncExecutionTotalDuration, - final boolean verbose, - final SingleResultCallback callback) { - Runnable asyncPartOfIteration = () -> { - counter.countDown(); - StartTime start = StartTime.now(); - callback.complete(callback); - if (verbose) { - System.err.printf("\tasyncPartOfIteration callback.complete method completed in %s%n", start.elapsed()); - } - }; - switch (executionType) { - case SYNC_SAME_THREAD: { - asyncPartOfIteration.run(); - break; - } - case SYNC_DIFFERENT_THREAD: { - Thread guaranteedDifferentThread = new Thread(asyncPartOfIteration); - guaranteedDifferentThread.start(); - join(guaranteedDifferentThread); - break; - } - case ASYNC: { - executor.schedule(asyncPartOfIteration, - delayAsyncExecutionTotalDuration.dividedBy(counter.initial()).toNanos(), TimeUnit.NANOSECONDS); - break; - } - case MIXED_SYNC_SAME_THREAD_AND_ASYNC: { - if (ThreadLocalRandom.current().nextBoolean()) { - asyncPartOfIteration.run(); - } else { - executor.schedule(asyncPartOfIteration, - delayAsyncExecutionTotalDuration.dividedBy(counter.initial()).toNanos(), TimeUnit.NANOSECONDS); - } - break; - } - default: { - com.mongodb.assertions.Assertions.fail(executionType.toString()); - } - } - } - - private static final class Counter { - private final int initial; - private int current; - private boolean doneReturnedTrue; - private final boolean verbose; - - Counter(final int initial, final boolean verbose) { - this.initial = initial; - this.current = initial; - this.doneReturnedTrue = false; - this.verbose = verbose; - } - - int initial() { - return initial; - } - - void countDown() { - com.mongodb.assertions.Assertions.assertTrue(current > 0); - int previous = current; - int decremented = --current; - if (verbose || decremented % 100_000 == 0) { - System.err.printf("counted %d->%d tid=%d callStackDepth=%d %n", - previous, decremented, Thread.currentThread().getId(), Thread.currentThread().getStackTrace().length); - } - } - - boolean done() { - if (current == 0) { - com.mongodb.assertions.Assertions.assertFalse(doneReturnedTrue); - System.err.printf("counting done callStackDepth=%d %n", Thread.currentThread().getStackTrace().length); - doneReturnedTrue = true; - return true; - } - return false; - } - } - - private static String exceptionToString(@Nullable final Throwable t) { - if (t == null) { - return Objects.toString(null); - } - try (StringWriter sw = new StringWriter(); - PrintWriter pw = new PrintWriter(sw)) { -// t.printStackTrace(pw); - pw.println(t); - pw.flush(); - return sw.toString(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - private static void complete(final CompletableFuture future, @Nullable final T result, @Nullable final Throwable t) { - if (t != null) { - future.completeExceptionally(t); - } else { - future.complete(result); - } - } - - private static void join(final Thread thread) { - try { - thread.join(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } - } - - private static void sleep(final Duration duration) { - if (duration.isZero()) { - return; - } - try { - long durationNsPart = duration.getNano(); - long durationMsPartFromNsPart = TimeUnit.MILLISECONDS.convert(duration.getNano(), TimeUnit.NANOSECONDS); - long sleepMs = TimeUnit.MILLISECONDS.convert(duration.getSeconds(), TimeUnit.SECONDS) + durationMsPartFromNsPart; - int sleepNs = Math.toIntExact(durationNsPart - TimeUnit.NANOSECONDS.convert(durationMsPartFromNsPart, TimeUnit.MILLISECONDS)); - Thread.sleep(sleepMs, sleepNs); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } - } -} From 1f5da7f0b8629899022e15285d89a4a41adbedd5 Mon Sep 17 00:00:00 2001 From: Valentin Kovalenko Date: Wed, 18 Feb 2026 05:59:20 -0700 Subject: [PATCH 15/15] adopt tests from - https://github.com/mongodb/mongo-java-driver/pull/1890 - https://github.com/katcharov/mongo-java-driver/commits/async-loop/ --- .../mongodb/internal/async/AsyncRunnable.java | 10 ++ .../async/AsyncFunctionsAbstractTest.java | 149 +++++++++++++++++- 2 files changed, 157 insertions(+), 2 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java b/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java index e404e2b8152..ecc83e7e005 100644 --- a/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java +++ b/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java @@ -16,6 +16,7 @@ package com.mongodb.internal.async; +import com.mongodb.assertions.Assertions; import com.mongodb.internal.TimeoutContext; import com.mongodb.internal.async.function.AsyncCallbackLoop; import com.mongodb.internal.async.function.LoopState; @@ -206,6 +207,15 @@ default AsyncRunnable thenRunIf(final Supplier condition, final AsyncRu }; } + /** + * @param condition The condition to check before each iteration + * @param body The body to run on each iteration + * @return the composition of this runnable and the loop, a runnable + */ + default AsyncRunnable loopWhile(final BooleanSupplier condition, final AsyncRunnable body) { + throw Assertions.fail("Not implemented"); + } + /** * @param supplier The supplier to supply using after this runnable * @return the composition of this runnable and the supplier, a supplier diff --git a/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsAbstractTest.java b/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsAbstractTest.java index 40b78ff18ae..f5669d2a5b8 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsAbstractTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsAbstractTest.java @@ -18,14 +18,17 @@ import com.mongodb.MongoException; import com.mongodb.internal.TimeoutContext; import com.mongodb.internal.TimeoutSettings; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Supplier; import static com.mongodb.assertions.Assertions.assertNotNull; import static com.mongodb.internal.async.AsyncRunnable.beginAsync; +import static org.junit.jupiter.api.Assertions.assertTrue; abstract class AsyncFunctionsAbstractTest extends AsyncFunctionsTestBase { private static final TimeoutContext TIMEOUT_CONTEXT = new TimeoutContext(new TimeoutSettings(0, 0, 0, 0L, 0)); @@ -724,7 +727,85 @@ void testTryCatchTestAndRethrow() { } @Test + @Disabled("Tests AsyncRunnable.loopWhile, but we agreed to improve and test AsyncRunnable.thenRunDoWhileLoop") + void testWhile() { + assertBehavesSameVariations(10, // TODO check expected variations + () -> { + int i = 0; + while (i < 3 && plainTest(i)) { + i++; + sync(i); + } + }, + (callback) -> { + final int[] i = new int[1]; + beginAsync().loopWhile(() -> i[0] < 3 && plainTest(i[0]), (c2) -> { + i[0]++; + async(i[0], c2); + }).finish(callback); + }); + } + + @Test + @Disabled("Tests AsyncRunnable.loopWhile, but we agreed to improve and test AsyncRunnable.thenRunDoWhileLoop") + void testWhile2() { + assertBehavesSameVariations(14, // TODO check expected variations + () -> { + int i = 0; + while (i < 3 && plainTest(i)) { + i++; + sync(i); + } + sync(i + 100); + }, + (callback) -> { + final int[] i = new int[1]; + beginAsync().thenRun(c -> { + beginAsync().loopWhile(() -> i[0] < 3 && plainTest(i[0]), (c2) -> { + i[0]++; + async(i[0], c2); + }).finish(c); + }).thenRun(c -> { + async(i[0] + 100, c); + }).finish(callback); + }); + } + + @Test + @Disabled("Tests AsyncRunnable.loopWhile, but we agreed to improve and test AsyncRunnable.thenRunDoWhileLoop") void testRetryLoop() { + assertBehavesSameVariations(InvocationTracker.DEPTH_LIMIT * 2 + 1, + () -> { + while (true) { + try { + sync(plainTest(0) ? 1 : 2); + } catch (RuntimeException e) { + if (e.getMessage().equals("exception-1")) { + continue; + } + throw e; + } + break; + } + }, + (callback) -> { + final boolean[] shouldContinue = new boolean[]{true}; + beginAsync().loopWhile(() -> shouldContinue[0], (c) -> { + beginAsync().thenRun(c2 -> { + async(plainTest(0) ? 1 : 2, c2); + }).thenRun(c2 -> { + shouldContinue[0] = false; + c2.complete(c2); + }).onErrorIf(e -> e.getMessage().equals("exception-1"), (e, c2) -> { + c2.complete(c2); + }).finish(c); + }).finish(callback); + }); + } + + @Test + void testThenRunRetryingWhile() { + for (int i = 0; i < 1000; i++) { assertBehavesSameVariations(InvocationTracker.DEPTH_LIMIT * 2 + 1, () -> { while (true) { @@ -746,10 +827,11 @@ void testRetryLoop() { e -> e.getMessage().equals("exception-1") ).finish(callback); }); - } + }} @Test void testDoWhileLoop() { + for (int i = 0; i < 1000; i++) { assertBehavesSameVariations(67, () -> { do { @@ -766,7 +848,7 @@ void testDoWhileLoop() { () -> plainTest(2) ).finish(finalCallback); }); - } + }} @Test void testDoWhileLoop2() { @@ -1009,4 +1091,67 @@ void testDerivation() { }).finish(callback); }); } + + @Test + @Disabled("Tests AsyncRunnable.thenRun/finish, but we agreed to improve and test AsyncRunnable.thenRunDoWhileLoop") + void testStackDepthBounded() { + AtomicInteger maxDepth = new AtomicInteger(0); + AtomicInteger minDepth = new AtomicInteger(Integer.MAX_VALUE); + AtomicInteger maxMongoDepth = new AtomicInteger(0); + AtomicInteger minMongoDepth = new AtomicInteger(Integer.MAX_VALUE); + AtomicInteger stepCount = new AtomicInteger(0); + // Capture one sample of mongodb package frames for printing + String[][] sampleMongoFrames = {null}; + + AsyncRunnable chain = beginAsync(); + for (int i = 0; i < 1000; i++) { + chain = chain.thenRun(c -> { + stepCount.incrementAndGet(); + StackTraceElement[] stack = Thread.currentThread().getStackTrace(); + int depth = stack.length; + maxDepth.updateAndGet(current -> Math.max(current, depth)); + minDepth.updateAndGet(current -> Math.min(current, depth)); + int mongoFrames = 0; + for (StackTraceElement frame : stack) { + if (frame.getClassName().startsWith("com.mongodb")) { + mongoFrames++; + } + } + int mf = mongoFrames; + maxMongoDepth.updateAndGet(current -> Math.max(current, mf)); + minMongoDepth.updateAndGet(current -> Math.min(current, mf)); + // Capture first sample + if (sampleMongoFrames[0] == null) { + String[] frames = new String[mf]; + int idx = 0; + for (StackTraceElement frame : stack) { + if (frame.getClassName().startsWith("com.mongodb")) { + frames[idx++] = frame.getClassName() + "." + frame.getMethodName() + + "(" + frame.getFileName() + ":" + frame.getLineNumber() + ")"; + } + } + sampleMongoFrames[0] = frames; + } + c.complete(c); + }); + } + + chain.finish((v, e) -> { + assertTrue(stepCount.get() == 1000, "Expected 1000 steps, got " + stepCount.get()); + int depth = maxDepth.get(); + int mongoDepth = maxMongoDepth.get(); + String summary = "Stack depth: min=" + minDepth.get() + ", max=" + depth + + " | MongoDB frames: min=" + minMongoDepth.get() + ", max=" + mongoDepth; + System.out.printf(summary + "%n"); + if (sampleMongoFrames[0] != null) { + System.out.printf("MongoDB stack frames (sample):%n"); + for (int i = 0; i < sampleMongoFrames[0].length; i++) { + System.out.printf(" " + (i + 1) + ". " + sampleMongoFrames[0][i] + "%n"); + } + } + assertTrue(depth < 200, + "Stack depth too deep (min=" + minDepth.get() + ", max=" + depth + + "). Trampoline may not be working correctly."); + }); + } }