Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bson/src/test/unit/util/ThreadTestHelpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public static void executeAll(final Runnable... runnables) {
CountDownLatch latch = new CountDownLatch(runnables.length);
List<Throwable> failures = Collections.synchronizedList(new ArrayList<>());
for (final Runnable runnable : runnables) {
service.submit(() -> {
service.execute(() -> {
try {
runnable.run();
} catch (Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void run() throws Exception {
CountDownLatch latch = new CountDownLatch(50);

for (int i = 0; i < 50; i++) {
gridFSService.submit(exportFile(latch, i));
gridFSService.execute(exportFile(latch, i));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we relying on the JVM own built‑in default handler to handle callback exceptions?

Copy link
Member

@stIncMale stIncMale Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We replace ExecutorService.submit with Executor.execute because only the latter allows an exception that originates from the executed task to be propagated as an uncaught exception (submit catches it and places in the returned Future, which we do not inspect). Once there is an uncaught exception, ThreadGroup.uncaughtException is called, which emits the information about the uncaught exception to the standard error stream.

The above happens only if the thread does not have an explicit, UncaughtExceptionHandler, and there is no default uncaught exception handler.

Thus with this change, uncaught exceptions won't stay unnoticed anymore:

  • at the very least the info about them will be emitted to the standard error stream;
  • an application may register the default uncaught exception handler to change that behavior.
    • And it absolutely should. The majority of applications should terminate when a VirtualMachineError happens, because such errors, being unexpected and asynchronous on top of that, may cause application invariants to be violated.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our test code is "the application". So ideally, we should register a default uncaught exception handler that logs uncaught exceptions using our logging, and terminates the VM if it's a VirtualMachineError (it should also try and log such errors, but make sure the VM terminates even if logging fails).

I'll create a technical debt ticket for this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

latch.await(1, TimeUnit.MINUTES);
Expand All @@ -107,7 +107,7 @@ private Runnable exportFile(final CountDownLatch latch, final int fileId) {
return () -> {
UnsafeByteArrayOutputStream outputStream = new UnsafeByteArrayOutputStream(5242880);
bucket.downloadToStream(GridFSMultiFileDownloadBenchmark.this.getFileName(fileId), outputStream);
fileService.submit(() -> {
fileService.execute(() -> {
try {
FileOutputStream fos = new FileOutputStream(new File(tempDirectory, String.format("%02d", fileId) + ".txt"));
fos.write(outputStream.getByteArray());
Expand All @@ -124,7 +124,7 @@ private void importFiles() throws Exception {
CountDownLatch latch = new CountDownLatch(50);

for (int i = 0; i < 50; i++) {
fileService.submit(importFile(latch, i));
fileService.execute(importFile(latch, i));
}

latch.await(1, TimeUnit.MINUTES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public void run() throws Exception {
CountDownLatch latch = new CountDownLatch(50);

for (int i = 0; i < 50; i++) {
fileService.submit(importFile(latch, i));
fileService.execute(importFile(latch, i));
}

latch.await(1, TimeUnit.MINUTES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void run() throws Exception {
CountDownLatch latch = new CountDownLatch(100);

for (int i = 0; i < 100; i++) {
documentReadingService.submit(exportJsonFile(i, latch));
documentReadingService.execute(exportJsonFile(i, latch));
}

latch.await(1, TimeUnit.MINUTES);
Expand All @@ -125,7 +125,7 @@ private Runnable exportJsonFile(final int fileId, final CountDownLatch latch) {
List<RawBsonDocument> documents = collection.find(new BsonDocument("fileId", new BsonInt32(fileId)))
.batchSize(5000)
.into(new ArrayList<>(5000));
fileWritingService.submit(writeJsonFile(fileId, documents, latch));
fileWritingService.execute(writeJsonFile(fileId, documents, latch));
};
}

Expand Down Expand Up @@ -154,7 +154,7 @@ private void importJsonFiles() throws InterruptedException {

for (int i = 0; i < 100; i++) {
int fileId = i;
importService.submit(() -> {
importService.execute(() -> {
String resourcePath = "parallel/ldjson_multi/ldjson" + String.format("%03d", fileId) + ".txt";
try (BufferedReader reader = new BufferedReader(readFromRelativePath(resourcePath), 1024 * 64)) {
String json;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public void run() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(500);

for (int i = 0; i < 100; i++) {
fileReadingService.submit(importJsonFile(latch, i));
fileReadingService.execute(importJsonFile(latch, i));
}

latch.await(1, TimeUnit.MINUTES);
Expand All @@ -104,7 +104,7 @@ private Runnable importJsonFile(final CountDownLatch latch, final int fileId) {
documents.add(document);
if (documents.size() == 1000) {
List<RawBsonDocument> documentsToInsert = documents;
documentWritingService.submit(() -> {
documentWritingService.execute(() -> {
collection.insertMany(documentsToInsert, new InsertManyOptions().ordered(false));
latch.countDown();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public List<MongocryptBecnhmarkResult> run() throws InterruptedException {
for (int i = 0; i < threadCount; i++) {
DecryptTask decryptTask = new DecryptTask(mongoCrypt, encrypted, NUM_SECS, doneSignal);
decryptTasks.add(decryptTask);
executorService.submit(decryptTask);
executorService.execute(decryptTask);
}

// Await completion of all tasks. Tasks are expected to complete shortly after NUM_SECS. Time out `await` if time exceeds 2 * NUM_SECS.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1321,7 +1321,7 @@ private boolean initUnlessClosed() {
boolean result = true;
if (state == State.NEW) {
worker = Executors.newSingleThreadExecutor(new DaemonThreadFactory("AsyncGetter"));
worker.submit(() -> runAndLogUncaught(this::workerRun));
worker.execute(() -> runAndLogUncaught(this::workerRun));
state = State.INITIALIZED;
} else if (state == State.CLOSED) {
result = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ public <A> void read(ByteBuffer dst, A attach, CompletionHandler<Integer, ? supe
new ByteBufferSet(dst),
0,
TimeUnit.MILLISECONDS,
c -> group.submit(() -> handler.completed((int) c, attach)),
e -> group.submit(() -> handler.failed(e, attach)));
c -> group.execute(() -> handler.completed((int) c, attach)),
e -> group.execute(() -> handler.failed(e, attach)));
}

@Override
Expand All @@ -119,8 +119,8 @@ public <A> void read(
new ByteBufferSet(dst),
timeout,
unit,
c -> group.submit(() -> handler.completed((int) c, attach)),
e -> group.submit(() -> handler.failed(e, attach)));
c -> group.execute(() -> handler.completed((int) c, attach)),
e -> group.execute(() -> handler.failed(e, attach)));
}

@Override
Expand All @@ -145,8 +145,8 @@ public <A> void read(
bufferSet,
timeout,
unit,
c -> group.submit(() -> handler.completed(c, attach)),
e -> group.submit(() -> handler.failed(e, attach)));
c -> group.execute(() -> handler.completed(c, attach)),
e -> group.execute(() -> handler.failed(e, attach)));
}

@Override
Expand Down Expand Up @@ -185,8 +185,8 @@ public <A> void write(ByteBuffer src, A attach, CompletionHandler<Integer, ? sup
new ByteBufferSet(src),
0,
TimeUnit.MILLISECONDS,
c -> group.submit(() -> handler.completed((int) c, attach)),
e -> group.submit(() -> handler.failed(e, attach)));
c -> group.execute(() -> handler.completed((int) c, attach)),
e -> group.execute(() -> handler.failed(e, attach)));
}

@Override
Expand All @@ -205,8 +205,8 @@ public <A> void write(
new ByteBufferSet(src),
timeout,
unit,
c -> group.submit(() -> handler.completed((int) c, attach)),
e -> group.submit(() -> handler.failed(e, attach)));
c -> group.execute(() -> handler.completed((int) c, attach)),
e -> group.execute(() -> handler.failed(e, attach)));
}

@Override
Expand All @@ -228,8 +228,8 @@ public <A> void write(
bufferSet,
timeout,
unit,
c -> group.submit(() -> handler.completed(c, attach)),
e -> group.submit(() -> handler.failed(e, attach)));
c -> group.execute(() -> handler.completed(c, attach)),
e -> group.execute(() -> handler.failed(e, attach)));
}

@Override
Expand All @@ -251,11 +251,11 @@ public Future<Integer> write(ByteBuffer src) {
}

private <A> void completeWithZeroInt(A attach, CompletionHandler<Integer, ? super A> handler) {
group.submit(() -> handler.completed(0, attach));
group.execute(() -> handler.completed(0, attach));
}

private <A> void completeWithZeroLong(A attach, CompletionHandler<Long, ? super A> handler) {
group.submit(() -> handler.completed(0L, attach));
group.execute(() -> handler.completed(0L, attach));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
Expand All @@ -65,7 +66,7 @@
* instance of this class is a singleton-like object that manages a thread pool that makes it
* possible to run a group of asynchronous channels.
*/
public class AsynchronousTlsChannelGroup {
public class AsynchronousTlsChannelGroup implements Executor {

private static final Logger LOGGER = Loggers.getLogger("connection.tls");

Expand Down Expand Up @@ -224,8 +225,16 @@ public AsynchronousTlsChannelGroup(@Nullable final ExecutorService executorServi
selectorThread.start();
}

void submit(final Runnable r) {
executor.submit(r);

@Override
public void execute(final Runnable r) {
executor.execute(() -> {
try {
r.run();
} catch (Throwable t) {
LOGGER.error(null, t);
}
});
}

RegisteredSocket registerSocket(TlsChannel reader, SocketChannel socketChannel) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public void shouldThrowOnTimeout() throws InterruptedException {

// when
TimeoutTrackingConnectionGetter connectionGetter = new TimeoutTrackingConnectionGetter(provider, timeoutSettings);
cachedExecutor.submit(connectionGetter);
cachedExecutor.execute(connectionGetter);

connectionGetter.getLatch().await();

Expand All @@ -152,7 +152,7 @@ public void shouldNotUseMaxAwaitTimeMSWhenTimeoutMsIsSet() throws InterruptedExc

// when
TimeoutTrackingConnectionGetter connectionGetter = new TimeoutTrackingConnectionGetter(provider, timeoutSettings);
cachedExecutor.submit(connectionGetter);
cachedExecutor.execute(connectionGetter);

sleep(70); // wait for more than maxWaitTimeMS but less than timeoutMs.
internalConnection.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -816,7 +816,7 @@ public void shouldIgnoreWaitQueueTimeoutMSWhenTimeoutMsIsSet() {
+ " }"
+ "}");

executor.submit(() -> collection.find().first());
executor.execute(() -> collection.find().first());
sleep(150);

//when && then
Expand Down Expand Up @@ -851,7 +851,7 @@ public void shouldThrowOperationTimeoutExceptionWhenConnectionIsNotAvailableAndT
+ " }"
+ "}");

executor.submit(() -> collection.withTimeout(0, TimeUnit.MILLISECONDS).find().first());
executor.execute(() -> collection.withTimeout(0, TimeUnit.MILLISECONDS).find().first());
sleep(100);

//when && then
Expand Down Expand Up @@ -886,7 +886,7 @@ public void shouldUseWaitQueueTimeoutMSWhenTimeoutIsNotSet() {
+ " }"
+ "}");

executor.submit(() -> collection.find().first());
executor.execute(() -> collection.find().first());
sleep(200);

//when & then
Expand Down