diff --git a/buildSrc/src/main/kotlin/conventions/testing-base.gradle.kts b/buildSrc/src/main/kotlin/conventions/testing-base.gradle.kts index 4708c742d4..a7a365b97a 100644 --- a/buildSrc/src/main/kotlin/conventions/testing-base.gradle.kts +++ b/buildSrc/src/main/kotlin/conventions/testing-base.gradle.kts @@ -36,6 +36,12 @@ tasks.withType { jvmArgs.add("-Dio.netty.leakDetection.level=paranoid") + // deliberately small stack size to trigger StackOverflowErrors in tests that + // silently pass because of the default large stack size on x64 machines + // see JAVA-6071 for details + jvmArgs.add("-Xss512k") + + // Pass any `org.mongodb.*` system settings systemProperties = System.getProperties() diff --git a/driver-core/src/main/com/mongodb/internal/connection/AsynchronousChannelStream.java b/driver-core/src/main/com/mongodb/internal/connection/AsynchronousChannelStream.java index 89396dae5d..2ca8c61a88 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/AsynchronousChannelStream.java +++ b/driver-core/src/main/com/mongodb/internal/connection/AsynchronousChannelStream.java @@ -234,21 +234,26 @@ private BasicCompletionHandler(final ByteBuf dst, final OperationContext operati @Override public void completed(final Integer result, final Void attachment) { - AsyncCompletionHandler localHandler = getHandlerAndClear(); - beginAsync().thenSupply((c) -> { + if (result == -1) { + AsyncCompletionHandler localHandler = getHandlerAndClear(); ByteBuf localByteBuf = byteBufReference.getAndSet(null); - if (result == -1) { - localByteBuf.release(); - throw new MongoSocketReadException("Prematurely reached end of stream", serverAddress); - } else if (!localByteBuf.hasRemaining()) { - localByteBuf.flip(); - c.complete(localByteBuf); - } else { + localByteBuf.release(); + localHandler.failed(new MongoSocketReadException("Prematurely reached end of stream", serverAddress)); + } else if (!byteBufReference.get().hasRemaining()) { + AsyncCompletionHandler localHandler = getHandlerAndClear(); + ByteBuf localByteBuf = byteBufReference.getAndSet(null); + localByteBuf.flip(); + localHandler.completed(localByteBuf); + } else { + // this try catch simulate beginAsync + try { long readTimeoutMS = operationContext.getTimeoutContext().getReadTimeoutMS(); - getChannel().read(localByteBuf.asNIO(), readTimeoutMS, MILLISECONDS, null, - new BasicCompletionHandler(localByteBuf, operationContext, c.asHandler())); + // we pass `this` as a handler to avoid excessive stacktrace chain + getChannel().read(byteBufReference.get().asNIO(), readTimeoutMS, MILLISECONDS, null, this); + } catch (Exception e) { + failed(e, attachment); } - }).finish(localHandler.asCallback()); + } } @Override diff --git a/driver-core/src/main/com/mongodb/internal/connection/tlschannel/async/AsynchronousTlsChannel.java b/driver-core/src/main/com/mongodb/internal/connection/tlschannel/async/AsynchronousTlsChannel.java index 04114318f9..c1e3f06733 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/tlschannel/async/AsynchronousTlsChannel.java +++ b/driver-core/src/main/com/mongodb/internal/connection/tlschannel/async/AsynchronousTlsChannel.java @@ -98,8 +98,8 @@ public void read(ByteBuffer dst, A attach, CompletionHandler group.submit(() -> handler.completed((int) c, attach)), - e -> group.submit(() -> handler.failed(e, attach))); + c -> group.execute(() -> handler.completed((int) c, attach)), + e -> group.execute(() -> handler.failed(e, attach))); } @Override @@ -119,8 +119,8 @@ public void read( new ByteBufferSet(dst), timeout, unit, - c -> group.submit(() -> handler.completed((int) c, attach)), - e -> group.submit(() -> handler.failed(e, attach))); + c -> group.execute(() -> handler.completed((int) c, attach)), + e -> group.execute(() -> handler.failed(e, attach))); } @Override @@ -145,8 +145,8 @@ public void read( bufferSet, timeout, unit, - c -> group.submit(() -> handler.completed(c, attach)), - e -> group.submit(() -> handler.failed(e, attach))); + c -> group.execute(() -> handler.completed(c, attach)), + e -> group.execute(() -> handler.failed(e, attach))); } @Override @@ -185,8 +185,8 @@ public void write(ByteBuffer src, A attach, CompletionHandler group.submit(() -> handler.completed((int) c, attach)), - e -> group.submit(() -> handler.failed(e, attach))); + c -> group.execute(() -> handler.completed((int) c, attach)), + e -> group.execute(() -> handler.failed(e, attach))); } @Override @@ -205,8 +205,8 @@ public void write( new ByteBufferSet(src), timeout, unit, - c -> group.submit(() -> handler.completed((int) c, attach)), - e -> group.submit(() -> handler.failed(e, attach))); + c -> group.execute(() -> handler.completed((int) c, attach)), + e -> group.execute(() -> handler.failed(e, attach))); } @Override @@ -228,8 +228,8 @@ public void write( bufferSet, timeout, unit, - c -> group.submit(() -> handler.completed(c, attach)), - e -> group.submit(() -> handler.failed(e, attach))); + c -> group.execute(() -> handler.completed(c, attach)), + e -> group.execute(() -> handler.failed(e, attach))); } @Override @@ -251,11 +251,11 @@ public Future write(ByteBuffer src) { } private void completeWithZeroInt(A attach, CompletionHandler handler) { - group.submit(() -> handler.completed(0, attach)); + group.execute(() -> handler.completed(0, attach)); } private void completeWithZeroLong(A attach, CompletionHandler handler) { - group.submit(() -> handler.completed(0L, attach)); + group.execute(() -> handler.completed(0L, attach)); } /** diff --git a/driver-core/src/main/com/mongodb/internal/connection/tlschannel/async/AsynchronousTlsChannelGroup.java b/driver-core/src/main/com/mongodb/internal/connection/tlschannel/async/AsynchronousTlsChannelGroup.java index d9b1420a6e..75092d889d 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/tlschannel/async/AsynchronousTlsChannelGroup.java +++ b/driver-core/src/main/com/mongodb/internal/connection/tlschannel/async/AsynchronousTlsChannelGroup.java @@ -43,6 +43,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; @@ -65,7 +66,7 @@ * instance of this class is a singleton-like object that manages a thread pool that makes it * possible to run a group of asynchronous channels. */ -public class AsynchronousTlsChannelGroup { +public class AsynchronousTlsChannelGroup implements Executor { private static final Logger LOGGER = Loggers.getLogger("connection.tls"); @@ -224,8 +225,15 @@ public AsynchronousTlsChannelGroup(@Nullable final ExecutorService executorServi selectorThread.start(); } - void submit(final Runnable r) { - executor.submit(r); + @Override + public void execute(final Runnable r) { + executor.execute(() -> { + try { + r.run(); + } catch (Throwable e) { + LOGGER.error(null, e); + } + }); } RegisteredSocket registerSocket(TlsChannel reader, SocketChannel socketChannel) {