Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ tasks.withType<Test> {

jvmArgs.add("-Dio.netty.leakDetection.level=paranoid")

// deliberately small stack size to trigger StackOverflowErrors in tests that
// silently pass because of the default large stack size on x64 machines
// see JAVA-6071 for details
jvmArgs.add("-Xss512k")


// Pass any `org.mongodb.*` system settings
systemProperties =
System.getProperties()
Expand Down
Copy link
Member

@stIncMale stIncMale Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be discussed in-person either during one of our catchup meetings, or at a dedicated meeting (the latter may be a 1on1 meeting).

  • I still don't understand the issue clearly:
    • [just to make sure I understand this part correctly] Is it true that the graph of callbacks and handlers is created successfully, and only when this c.complete(localByteBuf) is eventually called, do we construct the long stack of method calls?
      • The above means that the actual stack trance, which is not fully provided in the PR description as it's too large, has the aforementioned complete call in it? Answer: yes.
    • Why does the issue reproduce only on a standalone deployment?
      • Answer: Because for some reason the standalone deployment responds to the bulkWrite command with all the documents in the created cursor, but a replicaset responds with only a subset, requiring the driver to execute getMores, which means, the driver does not read enough data via each AsynchronousChannelStream.readAsync to cause the StackOverflowError. We have https://jira.mongodb.org/plugins/servlet/desk/portal/13/HELP-88629, but it looks like it should be completely reformulated with our current understanding.
    • Does TLS results in us reading the server response in smaller chunks (leading to a larger number of chunks) compared to no TLS?
      • Answer: we don't know, this wasn't investigated.

Nonetheless

  • The immediate reaction I have is that we should try to preserve the use of the beginAsync API usage here. One of the reasons it was introduced is that implementing callbacks/handlers without it is too error-prone: we still throw exceptions/assertions in callback-based code, and unexpected exceptions are still being thrown, yet we can't practically always properly handle them.
    • That being said, if keeping the beginAsync API usage is impossible, the idea of the fix proposed seems right, though I haven't thoroughly reviewed the change to the BasicCompletionHandler.completed method.

Alternative ideas:

  • a) I have a hunch that passing localHandler instead of c.asHandler() here may solve the issue.
    • Even if it does, I don't think it's correct, because if we do that, the beginAsync API won't be able to know that we have "completed" localHandler in a situation when we do it, and localHandler throws an exception. This may lead to localHandler being "completed" more than once by the AsyncSupplier.finish method, which absolutely must not be done. This also leads to not executing any code that is "placed between" thenSupply and finish (currently, there is none).
      • I left this note only to warn us from trying this.
  • b) Instead of passing localHandler.asCallback() into finish here, pass a callback that at first extracts localHandler via getHandlerAndClear, and then completes it. This way we should be able to pass the current BasicCompletionHandler (this) into getChannel().read(...) here, similarly to how it is proposed in the current PR. But this approach may have the same problem as a).
  • c) Instead of creating a new BasicCompletionHandler here, we could re-set BasicCompletionHandler.byteBufReference and BaseCompletionHandler.handlerReference on the existing BasicCompletionHandler (this) to localByteBuf and c.asHandler() respectively. This way we will save on creating a new BasicCompletionHandler / a new stack frame per each chunk of data received.
  • d) Use the beginAsync API at a higher level than BasicCompletionHandler.completed. Maybe in AsynchronousChannelStream.readAsync? Will it give us the equivalent guarantees?
  • e) Somehow change the implementation of AsyncFunction/AsyncSupplier.finish such that executing all the callbacks in the onion of callbacks we currently build does not require growing the call stack by at least one frame per each callback. I do now know if this is doable, and currently fail to imagine how this could even be achieved.

c) and d) seem to be the most promising to me, with d) being very nice and simple, provided that it preserves the existing guarantees, which is an open question.

Copy link
Member

@stIncMale stIncMale Feb 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed this with @katcharov and @strogiyotec via Zoom, and the idea we ended up with is neither of the aforementioned. It is closest to a combination of c) and e), but is still quite different. What we should do:

  1. Get rid of BasicCompletionHandler and rewrite AsynchronousChannelStream.readAsync via the beginAsync API using AsyncRunnable.thenRunDoWhileLoop or a similar method.
  2. Change AsyncCallbackLoop and RetryingAsyncCallbackSupplier such that the amount of stack memory they consume does not depend on the number of loop iterations / retry attempts.

Currently:

  • @katcharov has an AI-assisted PoC that passes the below test written by @katcharov in both SameThreadAsyncFunctionsTest and SeparateThreadAsyncFunctionsTest.
  • I have a no-AI-involved PoC that passes the below test written by @katcharov in both SameThreadAsyncFunctionsTest and SeparateThreadAsyncFunctionsTest.
  • @vbabanin had ideas when we talked, maybe he'll also have something.

The aforementioned test is

// AsyncFunctionsAbstractTest.java
@Test
void testThenRunDoWhileLoop() {
    assertBehavesSameVariations(8,
            () -> {
                int i = 0;
                do {
                    i++;
                    sync(i);
                } while (i < 3 && plainTest(i));
            },
            (callback) -> {
                final int[] i = new int[1];
                beginAsync().thenRunDoWhileLoop((c) -> {
                    i[0]++;
                    async(i[0], c);
                }, () -> i[0] < 3 && plainTest(i[0])).finish(callback);
            });
}

P.S. I selfishly want the AI-assisted solution to turn out incorrect once we inspect the code, because I don't want to lose this battle to AI—it would be sorrowful. At least I didn't lose to the damned AI.
P.P.S. I inputted that (em dash) above via option + shift + - in macOS. It wasn't generated by AI.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should check other implementations of com.mongodb.connection.AsyncCompletionHandler, java.nio.channels.CompletionHandler we have to see if their completed/failed methods may throw exceptions (they must try their best not to, and the beginAsync API helps with that).

Examples:

  1. AsynchronousChannelStream.AsyncWritableByteChannelAdapter.WriteCompletionHandler seems fine.
  2. com.mongodb.internal.connection.FutureAsyncCompletionHandler is dead code. Let's remove it.
  3. etc.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@katcharov, it seems to me that AsyncSupplier.finish should be implemented simply as finish(null, callback). Not only this seems to be trivially equivalent to the current code, but also all the AsyncFunctionsAbstractTest tests pass when I do such a change.

Original file line number Diff line number Diff line change
Expand Up @@ -234,21 +234,26 @@ private BasicCompletionHandler(final ByteBuf dst, final OperationContext operati

@Override
public void completed(final Integer result, final Void attachment) {
AsyncCompletionHandler<ByteBuf> localHandler = getHandlerAndClear();
beginAsync().<ByteBuf>thenSupply((c) -> {
if (result == -1) {
AsyncCompletionHandler<ByteBuf> localHandler = getHandlerAndClear();
ByteBuf localByteBuf = byteBufReference.getAndSet(null);
if (result == -1) {
localByteBuf.release();
throw new MongoSocketReadException("Prematurely reached end of stream", serverAddress);
} else if (!localByteBuf.hasRemaining()) {
localByteBuf.flip();
c.complete(localByteBuf);
} else {
localByteBuf.release();
localHandler.failed(new MongoSocketReadException("Prematurely reached end of stream", serverAddress));
} else if (!byteBufReference.get().hasRemaining()) {
AsyncCompletionHandler<ByteBuf> localHandler = getHandlerAndClear();
ByteBuf localByteBuf = byteBufReference.getAndSet(null);
localByteBuf.flip();
localHandler.completed(localByteBuf);
} else {
// this try catch simulate beginAsync
try {
long readTimeoutMS = operationContext.getTimeoutContext().getReadTimeoutMS();
getChannel().read(localByteBuf.asNIO(), readTimeoutMS, MILLISECONDS, null,
new BasicCompletionHandler(localByteBuf, operationContext, c.asHandler()));
// we pass `this` as a handler to avoid excessive stacktrace chain
getChannel().read(byteBufReference.get().asNIO(), readTimeoutMS, MILLISECONDS, null, this);
} catch (Exception e) {
failed(e, attachment);
}
}).finish(localHandler.asCallback());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ public <A> void read(ByteBuffer dst, A attach, CompletionHandler<Integer, ? supe
new ByteBufferSet(dst),
0,
TimeUnit.MILLISECONDS,
c -> group.submit(() -> handler.completed((int) c, attach)),
e -> group.submit(() -> handler.failed(e, attach)));
c -> group.execute(() -> handler.completed((int) c, attach)),
e -> group.execute(() -> handler.failed(e, attach)));
}

@Override
Expand All @@ -119,8 +119,8 @@ public <A> void read(
new ByteBufferSet(dst),
timeout,
unit,
c -> group.submit(() -> handler.completed((int) c, attach)),
e -> group.submit(() -> handler.failed(e, attach)));
c -> group.execute(() -> handler.completed((int) c, attach)),
e -> group.execute(() -> handler.failed(e, attach)));
}

@Override
Expand All @@ -145,8 +145,8 @@ public <A> void read(
bufferSet,
timeout,
unit,
c -> group.submit(() -> handler.completed(c, attach)),
e -> group.submit(() -> handler.failed(e, attach)));
c -> group.execute(() -> handler.completed(c, attach)),
e -> group.execute(() -> handler.failed(e, attach)));
}

@Override
Expand Down Expand Up @@ -185,8 +185,8 @@ public <A> void write(ByteBuffer src, A attach, CompletionHandler<Integer, ? sup
new ByteBufferSet(src),
0,
TimeUnit.MILLISECONDS,
c -> group.submit(() -> handler.completed((int) c, attach)),
e -> group.submit(() -> handler.failed(e, attach)));
c -> group.execute(() -> handler.completed((int) c, attach)),
e -> group.execute(() -> handler.failed(e, attach)));
}

@Override
Expand All @@ -205,8 +205,8 @@ public <A> void write(
new ByteBufferSet(src),
timeout,
unit,
c -> group.submit(() -> handler.completed((int) c, attach)),
e -> group.submit(() -> handler.failed(e, attach)));
c -> group.execute(() -> handler.completed((int) c, attach)),
e -> group.execute(() -> handler.failed(e, attach)));
}

@Override
Expand All @@ -228,8 +228,8 @@ public <A> void write(
bufferSet,
timeout,
unit,
c -> group.submit(() -> handler.completed(c, attach)),
e -> group.submit(() -> handler.failed(e, attach)));
c -> group.execute(() -> handler.completed(c, attach)),
e -> group.execute(() -> handler.failed(e, attach)));
}

@Override
Expand All @@ -251,11 +251,11 @@ public Future<Integer> write(ByteBuffer src) {
}

private <A> void completeWithZeroInt(A attach, CompletionHandler<Integer, ? super A> handler) {
group.submit(() -> handler.completed(0, attach));
group.execute(() -> handler.completed(0, attach));
}

private <A> void completeWithZeroLong(A attach, CompletionHandler<Long, ? super A> handler) {
group.submit(() -> handler.completed(0L, attach));
group.execute(() -> handler.completed(0L, attach));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
Expand All @@ -65,7 +66,7 @@
* instance of this class is a singleton-like object that manages a thread pool that makes it
* possible to run a group of asynchronous channels.
*/
public class AsynchronousTlsChannelGroup {
public class AsynchronousTlsChannelGroup implements Executor {

private static final Logger LOGGER = Loggers.getLogger("connection.tls");

Expand Down Expand Up @@ -224,8 +225,15 @@ public AsynchronousTlsChannelGroup(@Nullable final ExecutorService executorServi
selectorThread.start();
}

void submit(final Runnable r) {
executor.submit(r);
@Override
public void execute(final Runnable r) {
executor.execute(() -> {
try {
r.run();
} catch (Throwable e) {
LOGGER.error(null, e);
}
});
}

RegisteredSocket registerSocket(TlsChannel reader, SocketChannel socketChannel) {
Expand Down