From e79eee172c473455853ec2d2d445bca4279a254b Mon Sep 17 00:00:00 2001 From: Valentin Kovalenko Date: Thu, 26 Mar 2026 01:20:25 -0600 Subject: [PATCH 1/9] Refactor prose retryable writes/reads tests JAVA-6055 --- .../client/RetryableReadsProseTest.java | 24 +- .../client/RetryableWritesProseTest.java | 43 ++-- .../client/RetryableReadsProseTest.java | 24 +- .../client/RetryableWritesProseTest.java | 224 ++++++++---------- .../ConfigureFailPointCommandListener.java | 100 ++++++++ 5 files changed, 237 insertions(+), 178 deletions(-) create mode 100644 driver-sync/src/test/functional/com/mongodb/internal/event/ConfigureFailPointCommandListener.java diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/RetryableReadsProseTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/RetryableReadsProseTest.java index 22b7f7645e1..84dd0d733bf 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/RetryableReadsProseTest.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/RetryableReadsProseTest.java @@ -22,32 +22,27 @@ import org.bson.Document; import org.junit.jupiter.api.Test; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeoutException; - import static com.mongodb.client.model.Filters.eq; /** - * See - * Retryable Reads Tests. + * + * Prose Tests. */ final class RetryableReadsProseTest { /** - * See - * - * PoolClearedError Retryability Test. + * + * 1. PoolClearedError Retryability Test. */ @Test - void poolClearedExceptionMustBeRetryable() throws InterruptedException, ExecutionException, TimeoutException { + void poolClearedExceptionMustBeRetryable() throws Exception { RetryableWritesProseTest.poolClearedExceptionMustBeRetryable( SyncMongoClient::new, mongoCollection -> mongoCollection.find(eq(0)).iterator().hasNext(), "find", false); } /** - * See - * - * Retryable Reads Are Retried on a Different mongos When One is Available. + * + * 2.1 Retryable Reads Are Retried on a Different mongos When One is Available. */ @Test void retriesOnDifferentMongosWhenAvailable() { @@ -61,9 +56,8 @@ void retriesOnDifferentMongosWhenAvailable() { } /** - * See - * - * Retryable Reads Are Retried on the Same mongos When No Others are Available. + * + * 2.2 Retryable Reads Are Retried on the Same mongos When No Others are Available. */ @Test void retriesOnSameMongosWhenAnotherNotAvailable() { diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/RetryableWritesProseTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/RetryableWritesProseTest.java index 51a37ad1e35..7f48d8c925c 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/RetryableWritesProseTest.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/RetryableWritesProseTest.java @@ -16,66 +16,53 @@ package com.mongodb.reactivestreams.client; -import com.mongodb.client.test.CollectionHelper; import com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient; import org.bson.Document; -import org.bson.codecs.DocumentCodec; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeoutException; - /** - * See - * Retryable Write Prose Tests. + * + * Prose Tests. */ -public class RetryableWritesProseTest extends DatabaseTestCase { - private CollectionHelper collectionHelper; - - @BeforeEach - @Override - public void setUp() { - super.setUp(); - - collectionHelper = new CollectionHelper<>(new DocumentCodec(), collection.getNamespace()); - collectionHelper.create(); - } - +final class RetryableWritesProseTest { /** - * Prose test #2. + * + * 2. Test that drivers properly retry after encountering PoolClearedErrors. */ @Test - public void poolClearedExceptionMustBeRetryable() throws InterruptedException, ExecutionException, TimeoutException { + void poolClearedExceptionMustBeRetryable() throws Exception { com.mongodb.client.RetryableWritesProseTest.poolClearedExceptionMustBeRetryable( SyncMongoClient::new, mongoCollection -> mongoCollection.insertOne(new Document()), "insert", true); } /** - * Prose test #3. + * + * 3. Test that drivers return the original error after encountering a WriteConcernError with a RetryableWriteError label. */ @Test - public void originalErrorMustBePropagatedIfNoWritesPerformed() throws InterruptedException { + void originalErrorMustBePropagatedIfNoWritesPerformed() throws Exception { com.mongodb.client.RetryableWritesProseTest.originalErrorMustBePropagatedIfNoWritesPerformed( SyncMongoClient::new); } /** - * Prose test #4. + * + * 4. Test that in a sharded cluster writes are retried on a different mongos when one is available. */ @Test - public void retriesOnDifferentMongosWhenAvailable() { + void retriesOnDifferentMongosWhenAvailable() { com.mongodb.client.RetryableWritesProseTest.retriesOnDifferentMongosWhenAvailable( SyncMongoClient::new, mongoCollection -> mongoCollection.insertOne(new Document()), "insert", true); } /** - * Prose test #5. + * + * 5. Test that in a sharded cluster writes are retried on the same mongos when no others are available. */ @Test - public void retriesOnSameMongosWhenAnotherNotAvailable() { + void retriesOnSameMongosWhenAnotherNotAvailable() { com.mongodb.client.RetryableWritesProseTest.retriesOnSameMongosWhenAnotherNotAvailable( SyncMongoClient::new, mongoCollection -> mongoCollection.insertOne(new Document()), "insert", true); diff --git a/driver-sync/src/test/functional/com/mongodb/client/RetryableReadsProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/RetryableReadsProseTest.java index ccf18aad5b9..59b6a9aad19 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/RetryableReadsProseTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/RetryableReadsProseTest.java @@ -19,31 +19,26 @@ import org.bson.Document; import org.junit.jupiter.api.Test; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeoutException; - import static com.mongodb.client.model.Filters.eq; /** - * See - * Retryable Reads Tests. + * + * Prose Tests. */ final class RetryableReadsProseTest { /** - * See - * - * PoolClearedError Retryability Test. + * + * 1. PoolClearedError Retryability Test. */ @Test - void poolClearedExceptionMustBeRetryable() throws InterruptedException, ExecutionException, TimeoutException { + void poolClearedExceptionMustBeRetryable() throws Exception { RetryableWritesProseTest.poolClearedExceptionMustBeRetryable(MongoClients::create, mongoCollection -> mongoCollection.find(eq(0)).iterator().hasNext(), "find", false); } /** - * See - * - * Retryable Reads Are Retried on a Different mongos When One is Available. + * + * 2.1 Retryable Reads Are Retried on a Different mongos When One is Available. */ @Test void retriesOnDifferentMongosWhenAvailable() { @@ -56,9 +51,8 @@ void retriesOnDifferentMongosWhenAvailable() { } /** - * See - * - * Retryable Reads Are Retried on the Same mongos When No Others are Available. + * + * 2.2 Retryable Reads Are Retried on the Same mongos When No Others are Available. */ @Test void retriesOnSameMongosWhenAnotherNotAvailable() { diff --git a/driver-sync/src/test/functional/com/mongodb/client/RetryableWritesProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/RetryableWritesProseTest.java index 3d5743fb459..478841418aa 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/RetryableWritesProseTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/RetryableWritesProseTest.java @@ -22,12 +22,10 @@ import com.mongodb.MongoServerException; import com.mongodb.MongoWriteConcernException; import com.mongodb.ServerAddress; -import com.mongodb.assertions.Assertions; import com.mongodb.connection.ClusterConnectionMode; import com.mongodb.connection.ConnectionDescription; import com.mongodb.event.CommandEvent; import com.mongodb.event.CommandFailedEvent; -import com.mongodb.event.CommandListener; import com.mongodb.event.CommandSucceededEvent; import com.mongodb.event.ConnectionCheckOutFailedEvent; import com.mongodb.event.ConnectionCheckedOutEvent; @@ -35,28 +33,21 @@ import com.mongodb.internal.connection.ServerAddressHelper; import com.mongodb.internal.connection.TestCommandListener; import com.mongodb.internal.connection.TestConnectionPoolListener; -import org.bson.BsonArray; -import org.bson.BsonBoolean; +import com.mongodb.internal.event.ConfigureFailPointCommandListener; import org.bson.BsonDocument; import org.bson.BsonInt32; -import org.bson.BsonString; import org.bson.Document; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Predicate; import java.util.stream.Collectors; -import java.util.stream.Stream; import static com.mongodb.ClusterFixture.getConnectionString; import static com.mongodb.ClusterFixture.getMultiMongosConnectionString; @@ -67,6 +58,7 @@ import static com.mongodb.client.Fixture.getDefaultDatabaseName; import static com.mongodb.client.Fixture.getMongoClientSettingsBuilder; import static com.mongodb.client.Fixture.getMultiMongosMongoClientSettingsBuilder; +import static com.mongodb.client.Fixture.getPrimary; import static com.mongodb.internal.operation.CommandOperationHelper.NO_WRITES_PERFORMED_ERROR_LABEL; import static com.mongodb.internal.operation.CommandOperationHelper.RETRYABLE_WRITE_ERROR_LABEL; import static java.util.Arrays.asList; @@ -80,22 +72,16 @@ import static org.junit.jupiter.api.Assumptions.assumeTrue; /** - * See - * Retryable Write Prose Tests. + * + * Prose Tests. */ -public class RetryableWritesProseTest extends DatabaseTestCase { - - @BeforeEach - @Override - public void setUp() { - super.setUp(); - } - +public class RetryableWritesProseTest { /** - * Prose test #2. + * + * 2. Test that drivers properly retry after encountering PoolClearedErrors. */ @Test - public void poolClearedExceptionMustBeRetryable() throws InterruptedException, ExecutionException, TimeoutException { + void poolClearedExceptionMustBeRetryable() throws Exception { poolClearedExceptionMustBeRetryable(MongoClients::create, mongoCollection -> mongoCollection.insertOne(new Document()), "insert", true); } @@ -103,8 +89,7 @@ public void poolClearedExceptionMustBeRetryable() throws InterruptedException, E @SuppressWarnings("try") public static void poolClearedExceptionMustBeRetryable( final Function clientCreator, - final Function, R> operation, final String operationName, final boolean write) - throws InterruptedException, ExecutionException, TimeoutException { + final Function, R> operation, final String commandName, final boolean write) throws Exception { assumeTrue(serverVersionAtLeast(4, 3) && !(write && isStandalone())); TestConnectionPoolListener connectionPoolListener = new TestConnectionPoolListener(asList( "connectionCheckedOutEvent", @@ -120,7 +105,7 @@ public static void poolClearedExceptionMustBeRetryable( /* We fake server's state by configuring a fail point. This breaks the mechanism of the * streaming server monitoring protocol * (https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-monitoring.md#streaming-protocol) - * that allows the server to determine whether or not it needs to send a new state to the client. + * that allows the server to determine whether it needs to send a new state to the client. * As a result, the client has to wait for at least its heartbeat delay until it hears back from a server * (while it waits for a response, calling `ServerMonitor.connect` has no effect). * Thus, we want to use small heartbeat delay to reduce delays in the test. */ @@ -129,24 +114,23 @@ public static void poolClearedExceptionMustBeRetryable( .retryWrites(true) .addCommandListener(commandListener) .build(); - BsonDocument configureFailPoint = new BsonDocument() - .append("configureFailPoint", new BsonString("failCommand")) - .append("mode", new BsonDocument() - .append("times", new BsonInt32(1))) - .append("data", new BsonDocument() - .append("failCommands", new BsonArray(singletonList(new BsonString(operationName)))) - .append("errorCode", new BsonInt32(91)) - .append("errorLabels", write - ? new BsonArray(singletonList(new BsonString(RETRYABLE_WRITE_ERROR_LABEL))) - : new BsonArray()) - .append("blockConnection", BsonBoolean.valueOf(true)) - .append("blockTimeMS", new BsonInt32(1000))); + BsonDocument configureFailPoint = BsonDocument.parse( + "{\n" + + " configureFailPoint: 'failCommand',\n" + + " mode: {'times': 1},\n" + + " data: {\n" + + " failCommands: ['" + commandName + "'],\n" + + " errorCode: 91,\n" + + " blockConnection: true,\n" + + " blockTimeMS: 1000,\n" + + (write + ? " errorLabels: ['" + RETRYABLE_WRITE_ERROR_LABEL + "']\n" : "") + + " }\n" + + "}\n"); int timeoutSeconds = 5; try (MongoClient client = clientCreator.apply(clientSettings); - FailPoint ignored = FailPoint.enable(configureFailPoint, Fixture.getPrimary())) { - MongoCollection collection = client.getDatabase(getDefaultDatabaseName()) - .getCollection("poolClearedExceptionMustBeRetryable"); - collection.drop(); + FailPoint ignored = FailPoint.enable(configureFailPoint, getPrimary())) { + MongoCollection collection = dropAndGetCollection("poolClearedExceptionMustBeRetryable", client); ExecutorService ex = Executors.newFixedThreadPool(2); try { Future result1 = ex.submit(() -> operation.apply(collection)); @@ -160,83 +144,78 @@ public static void poolClearedExceptionMustBeRetryable( ex.shutdownNow(); } assertEquals(3, commandListener.getCommandStartedEvents().size()); - commandListener.getCommandStartedEvents().forEach(event -> assertEquals(operationName, event.getCommandName())); + commandListener.getCommandStartedEvents().forEach(event -> assertEquals(commandName, event.getCommandName())); } } /** - * Prose test #3. + * + * 3. Test that drivers return the original error after encountering a WriteConcernError with a RetryableWriteError label. */ @Test - public void originalErrorMustBePropagatedIfNoWritesPerformed() throws InterruptedException { + void originalErrorMustBePropagatedIfNoWritesPerformed() throws Exception { originalErrorMustBePropagatedIfNoWritesPerformed(MongoClients::create); } @SuppressWarnings("try") public static void originalErrorMustBePropagatedIfNoWritesPerformed( - final Function clientCreator) throws InterruptedException { + final Function clientCreator) throws Exception { assumeTrue(serverVersionAtLeast(6, 0) && isDiscoverableReplicaSet()); - ServerAddress primaryServerAddress = Fixture.getPrimary(); - CompletableFuture futureFailPointFromListener = new CompletableFuture<>(); - CommandListener commandListener = new CommandListener() { - private final AtomicBoolean configureFailPoint = new AtomicBoolean(true); - - @Override - public void commandSucceeded(final CommandSucceededEvent event) { - if (event.getCommandName().equals("insert") - && event.getResponse().getDocument("writeConcernError", new BsonDocument()) - .getInt32("code", new BsonInt32(-1)).intValue() == 91 - && configureFailPoint.compareAndSet(true, false)) { - Assertions.assertTrue(futureFailPointFromListener.complete(FailPoint.enable( - new BsonDocument() - .append("configureFailPoint", new BsonString("failCommand")) - .append("mode", new BsonDocument() - .append("times", new BsonInt32(1))) - .append("data", new BsonDocument() - .append("failCommands", new BsonArray(singletonList(new BsonString("insert")))) - .append("errorCode", new BsonInt32(10107)) - .append("errorLabels", new BsonArray(Stream.of(RETRYABLE_WRITE_ERROR_LABEL, NO_WRITES_PERFORMED_ERROR_LABEL) - .map(BsonString::new).collect(Collectors.toList())))), - primaryServerAddress - ))); - } + ServerAddress primaryServerAddress = getPrimary(); + BsonDocument configureFailPoint = BsonDocument.parse( + "{\n" + + " configureFailPoint: \"failCommand\",\n" + + " mode: { times: 1 },\n" + + " data: {\n" + + " failCommands: ['insert'],\n" + + " writeConcernError: {" + + " errorLabels: ['" + RETRYABLE_WRITE_ERROR_LABEL + "'],\n" + + " code: 91,\n" + + " errmsg: ''\n" + + " }\n" + + " }\n" + + "}\n"); + BsonDocument configureFailPointFromListener = BsonDocument.parse( + "{\n" + + " configureFailPoint: \"failCommand\",\n" + + " mode: { times: 1 },\n" + + " data: {\n" + + " failCommands: ['insert'],\n" + + " errorCode: 10107,\n" + + " errorLabels: ['" + RETRYABLE_WRITE_ERROR_LABEL + "', '" + NO_WRITES_PERFORMED_ERROR_LABEL + "']\n" + + " }\n" + + "}\n"); + Predicate configureFailPointEventMatcher = event -> { + if (event instanceof CommandSucceededEvent) { + CommandSucceededEvent commandSucceededEvent = (CommandSucceededEvent) event; + return commandSucceededEvent.getCommandName().equals("insert") + && commandSucceededEvent.getResponse().getDocument("writeConcernError", new BsonDocument()) + .getInt32("code", new BsonInt32(-1)).intValue() == 91; } + return false; }; - BsonDocument failPointDocument = new BsonDocument() - .append("configureFailPoint", new BsonString("failCommand")) - .append("mode", new BsonDocument() - .append("times", new BsonInt32(1))) - .append("data", new BsonDocument() - .append("writeConcernError", new BsonDocument() - .append("code", new BsonInt32(91)) - .append("errorLabels", new BsonArray(Stream.of(RETRYABLE_WRITE_ERROR_LABEL) - .map(BsonString::new).collect(Collectors.toList()))) - .append("errmsg", new BsonString("")) - ) - .append("failCommands", new BsonArray(singletonList(new BsonString("insert"))))); - try (MongoClient client = clientCreator.apply(getMongoClientSettingsBuilder() - .retryWrites(true) - .addCommandListener(commandListener) - .applyToServerSettings(builder -> - // see `poolClearedExceptionMustBeRetryable` for the explanation - builder.heartbeatFrequency(50, TimeUnit.MILLISECONDS)) - .build()); - FailPoint ignored = FailPoint.enable(failPointDocument, primaryServerAddress)) { - MongoCollection collection = client.getDatabase(getDefaultDatabaseName()) - .getCollection("originalErrorMustBePropagatedIfNoWritesPerformed"); - collection.drop(); + try (ConfigureFailPointCommandListener commandListener = new ConfigureFailPointCommandListener( + configureFailPointFromListener, primaryServerAddress, configureFailPointEventMatcher); + MongoClient client = clientCreator.apply(getMongoClientSettingsBuilder() + .retryWrites(true) + .addCommandListener(commandListener) + .applyToServerSettings(builder -> + // see `poolClearedExceptionMustBeRetryable` for the explanation + builder.heartbeatFrequency(50, TimeUnit.MILLISECONDS)) + .build()); + FailPoint ignored = FailPoint.enable(configureFailPoint, primaryServerAddress)) { + MongoCollection collection = dropAndGetCollection("originalErrorMustBePropagatedIfNoWritesPerformed", client); MongoWriteConcernException e = assertThrows(MongoWriteConcernException.class, () -> collection.insertOne(new Document())); assertEquals(91, e.getCode()); - } finally { - futureFailPointFromListener.thenAccept(FailPoint::close); } } /** - * Prose test #4. + * + * 4. Test that in a sharded cluster writes are retried on a different mongos when one is available. */ @Test - public void retriesOnDifferentMongosWhenAvailable() { + void retriesOnDifferentMongosWhenAvailable() { retriesOnDifferentMongosWhenAvailable(MongoClients::create, mongoCollection -> mongoCollection.insertOne(new Document()), "insert", true); } @@ -244,7 +223,7 @@ public void retriesOnDifferentMongosWhenAvailable() { @SuppressWarnings("try") public static void retriesOnDifferentMongosWhenAvailable( final Function clientCreator, - final Function, R> operation, final String operationName, final boolean write) { + final Function, R> operation, final String expectedCommandName, final boolean write) { if (write) { assumeTrue(serverVersionAtLeast(4, 4)); } @@ -253,20 +232,20 @@ public static void retriesOnDifferentMongosWhenAvailable( assumeTrue(connectionString != null); ServerAddress s0Address = ServerAddressHelper.createServerAddress(connectionString.getHosts().get(0)); ServerAddress s1Address = ServerAddressHelper.createServerAddress(connectionString.getHosts().get(1)); - BsonDocument failPointDocument = BsonDocument.parse( + BsonDocument configureFailPoint = BsonDocument.parse( "{\n" + " configureFailPoint: \"failCommand\",\n" + " mode: { times: 1 },\n" + " data: {\n" - + " failCommands: [\"" + operationName + "\"],\n" + + " failCommands: [\"" + expectedCommandName + "\"],\n" + + " errorCode: 6,\n" + (write - ? " errorLabels: [\"RetryableWriteError\"]," : "") - + " errorCode: 6\n" + ? " errorLabels: ['" + RETRYABLE_WRITE_ERROR_LABEL + "']" : "") + " }\n" + "}\n"); TestCommandListener commandListener = new TestCommandListener(singletonList("commandFailedEvent"), emptyList()); - try (FailPoint s0FailPoint = FailPoint.enable(failPointDocument, s0Address); - FailPoint s1FailPoint = FailPoint.enable(failPointDocument, s1Address); + try (FailPoint s0FailPoint = FailPoint.enable(configureFailPoint, s0Address); + FailPoint s1FailPoint = FailPoint.enable(configureFailPoint, s1Address); MongoClient client = clientCreator.apply(getMultiMongosMongoClientSettingsBuilder() .retryReads(true) .retryWrites(true) @@ -274,16 +253,14 @@ public static void retriesOnDifferentMongosWhenAvailable( // explicitly specify only s0 and s1, in case `getMultiMongosMongoClientSettingsBuilder` has more .applyToClusterSettings(builder -> builder.hosts(asList(s0Address, s1Address))) .build())) { - MongoCollection collection = client.getDatabase(getDefaultDatabaseName()) - .getCollection("retriesOnDifferentMongosWhenAvailable"); - collection.drop(); + MongoCollection collection = dropAndGetCollection("retriesOnDifferentMongosWhenAvailable", client); commandListener.reset(); assertThrows(MongoServerException.class, () -> operation.apply(collection)); List failedCommandEvents = commandListener.getEvents(); assertEquals(2, failedCommandEvents.size(), failedCommandEvents::toString); List unexpectedCommandNames = failedCommandEvents.stream() .map(CommandEvent::getCommandName) - .filter(commandName -> !commandName.equals(operationName)) + .filter(commandName -> !commandName.equals(expectedCommandName)) .collect(Collectors.toList()); assertTrue(unexpectedCommandNames.isEmpty(), unexpectedCommandNames::toString); Set failedServerAddresses = failedCommandEvents.stream() @@ -295,10 +272,11 @@ public static void retriesOnDifferentMongosWhenAvailable( } /** - * Prose test #5. + * + * 5. Test that in a sharded cluster writes are retried on the same mongos when no others are available. */ @Test - public void retriesOnSameMongosWhenAnotherNotAvailable() { + void retriesOnSameMongosWhenAnotherNotAvailable() { retriesOnSameMongosWhenAnotherNotAvailable(MongoClients::create, mongoCollection -> mongoCollection.insertOne(new Document()), "insert", true); } @@ -306,27 +284,29 @@ public void retriesOnSameMongosWhenAnotherNotAvailable() { @SuppressWarnings("try") public static void retriesOnSameMongosWhenAnotherNotAvailable( final Function clientCreator, - final Function, R> operation, final String operationName, final boolean write) { + final Function, R> operation, final String expectedCommandName, final boolean write) { if (write) { assumeTrue(serverVersionAtLeast(4, 4)); } assumeTrue(isSharded()); ConnectionString connectionString = getConnectionString(); ServerAddress s0Address = ServerAddressHelper.createServerAddress(connectionString.getHosts().get(0)); - BsonDocument failPointDocument = BsonDocument.parse( + BsonDocument configureFailPoint = BsonDocument.parse( "{\n" + " configureFailPoint: \"failCommand\",\n" + " mode: { times: 1 },\n" + " data: {\n" - + " failCommands: [\"" + operationName + "\"],\n" + + " failCommands: [\"" + expectedCommandName + "\"],\n" + + " errorCode: 6,\n" + (write - ? " errorLabels: [\"RetryableWriteError\"]," : "") - + " errorCode: 6\n" + ? " errorLabels: ['" + RETRYABLE_WRITE_ERROR_LABEL + "']," : "") + + (write + ? " closeConnection: true\n" : "") + " }\n" + "}\n"); TestCommandListener commandListener = new TestCommandListener( asList("commandFailedEvent", "commandSucceededEvent"), emptyList()); - try (FailPoint s0FailPoint = FailPoint.enable(failPointDocument, s0Address); + try (FailPoint s0FailPoint = FailPoint.enable(configureFailPoint, s0Address); MongoClient client = clientCreator.apply(getMongoClientSettingsBuilder() .retryReads(true) .retryWrites(true) @@ -336,16 +316,14 @@ public static void retriesOnSameMongosWhenAnotherNotAvailable( .hosts(singletonList(s0Address)) .mode(ClusterConnectionMode.MULTIPLE)) .build())) { - MongoCollection collection = client.getDatabase(getDefaultDatabaseName()) - .getCollection("retriesOnSameMongosWhenAnotherNotAvailable"); - collection.drop(); + MongoCollection collection = dropAndGetCollection("retriesOnSameMongosWhenAnotherNotAvailable", client); commandListener.reset(); operation.apply(collection); List commandEvents = commandListener.getEvents(); assertEquals(2, commandEvents.size(), commandEvents::toString); List unexpectedCommandNames = commandEvents.stream() .map(CommandEvent::getCommandName) - .filter(commandName -> !commandName.equals(operationName)) + .filter(commandName -> !commandName.equals(expectedCommandName)) .collect(Collectors.toList()); assertTrue(unexpectedCommandNames.isEmpty(), unexpectedCommandNames::toString); assertInstanceOf(CommandFailedEvent.class, commandEvents.get(0), commandEvents::toString); @@ -354,4 +332,10 @@ public static void retriesOnSameMongosWhenAnotherNotAvailable( assertEquals(s0Address, commandEvents.get(1).getConnectionDescription().getServerAddress(), commandEvents::toString); } } + + private static MongoCollection dropAndGetCollection(final String name, final MongoClient client) { + MongoCollection result = client.getDatabase(getDefaultDatabaseName()).getCollection(name); + result.drop(); + return result; + } } diff --git a/driver-sync/src/test/functional/com/mongodb/internal/event/ConfigureFailPointCommandListener.java b/driver-sync/src/test/functional/com/mongodb/internal/event/ConfigureFailPointCommandListener.java new file mode 100644 index 00000000000..dd99711cac3 --- /dev/null +++ b/driver-sync/src/test/functional/com/mongodb/internal/event/ConfigureFailPointCommandListener.java @@ -0,0 +1,100 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.internal.event; + +import com.mongodb.ServerAddress; +import com.mongodb.client.FailPoint; +import com.mongodb.event.CommandEvent; +import com.mongodb.event.CommandFailedEvent; +import com.mongodb.event.CommandListener; +import com.mongodb.event.CommandStartedEvent; +import com.mongodb.event.CommandSucceededEvent; +import org.bson.BsonDocument; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.Predicate; + +import static com.mongodb.assertions.Assertions.assertNotNull; +import static com.mongodb.assertions.Assertions.assertTrue; +import static com.mongodb.assertions.Assertions.fail; + +public final class ConfigureFailPointCommandListener implements CommandListener, AutoCloseable { + private final BsonDocument configureFailPoint; + private final ServerAddress serverAddress; + private final Predicate eventMatcher; + private final Object lock; + private final CompletableFuture failPointFuture; + + /** + * @param configureFailPoint See {@link FailPoint#enable(BsonDocument, ServerAddress)}. + * @param serverAddress See {@link FailPoint#enable(BsonDocument, ServerAddress)}. + * @param eventMatcher When an event is matched, an attempt to configure the fail point + * specified via {@code configureFailPoint} is made. The attempt is made at most once regardless of how many events are matched. + */ + public ConfigureFailPointCommandListener( + final BsonDocument configureFailPoint, + final ServerAddress serverAddress, + final Predicate eventMatcher) { + this.configureFailPoint = configureFailPoint; + this.serverAddress = serverAddress; + this.eventMatcher = eventMatcher; + lock = new Object(); + failPointFuture = new CompletableFuture<>(); + } + + @Override + public void commandStarted(final CommandStartedEvent event) { + onEvent(event); + } + + @Override + public void commandSucceeded(final CommandSucceededEvent event) { + onEvent(event); + } + + @Override + public void commandFailed(final CommandFailedEvent event) { + onEvent(event); + } + + private void onEvent(final CommandEvent event) { + synchronized (lock) { + if (!failPointFuture.isDone()) { + try { + if (eventMatcher.test(event)) { + assertTrue(failPointFuture.complete(FailPoint.enable(configureFailPoint, serverAddress))); + } + } catch (Throwable e) { + assertTrue(failPointFuture.completeExceptionally(e)); + } + } + } + } + + @Override + public void close() throws InterruptedException, ExecutionException { + synchronized (lock) { + if (failPointFuture.cancel(true)) { + fail("The listener was closed before (in the happens-before order) it attempted to configure the fail point"); + } else { + assertTrue(failPointFuture.isDone()); + assertNotNull(failPointFuture.get()).close(); + } + } + } +} From 72d70e4188528f79d5bfe80d77ac627313867ba5 Mon Sep 17 00:00:00 2001 From: Valentin Kovalenko Date: Thu, 26 Mar 2026 17:51:18 -0600 Subject: [PATCH 2/9] Implement prose backpressure retryable writes tests The relevant spec changes: - https://github.com/mongodb/specifications/blame/ba14b6bdc1dc695aa9cc20ccf9378592da1b2329/source/retryable-writes/tests/README.md#L265-L418 - See also https://jira.mongodb.org/browse/DRIVERS-3432 for the required fixes for "Test 3 Case 3" JAVA-6055 --- .../client/RetryableWritesProseTest.java | 33 ++++ .../client/RetryableWritesProseTest.java | 180 ++++++++++++++++++ 2 files changed, 213 insertions(+) diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/RetryableWritesProseTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/RetryableWritesProseTest.java index 7f48d8c925c..7e79b2af59d 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/RetryableWritesProseTest.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/RetryableWritesProseTest.java @@ -18,6 +18,7 @@ import com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient; import org.bson.Document; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; /** @@ -67,4 +68,36 @@ void retriesOnSameMongosWhenAnotherNotAvailable() { SyncMongoClient::new, mongoCollection -> mongoCollection.insertOne(new Document()), "insert", true); } + + /** + * + * 6. Test error propagation after encountering multiple errors. + * Case 1: Test that drivers return the correct error when receiving only errors without NoWritesPerformed. + */ + @Test + @Disabled("TODO-BACKPRESSURE Valentin Enable when implementing JAVA-6055") + void errorPropagationAfterEncounteringMultipleErrorsCase1() throws Exception { + com.mongodb.client.RetryableWritesProseTest.errorPropagationAfterEncounteringMultipleErrorsCase1(SyncMongoClient::new); + } + + /** + * + * 6. Test error propagation after encountering multiple errors. + * Case 2: Test that drivers return the correct error when receiving only errors with NoWritesPerformed. + */ + @Test + void errorPropagationAfterEncounteringMultipleErrorsCase2() throws Exception { + com.mongodb.client.RetryableWritesProseTest.errorPropagationAfterEncounteringMultipleErrorsCase2(SyncMongoClient::new); + } + + /** + * + * 6. Test error propagation after encountering multiple errors. + * Case 3: Test that drivers return the correct error when receiving some errors with NoWritesPerformed and some without NoWritesPerformed. + */ + @Test + @Disabled("TODO-BACKPRESSURE Valentin Enable when implementing JAVA-6055, fails on MongoDB 6.0") + void errorPropagationAfterEncounteringMultipleErrorsCase3() throws Exception { + com.mongodb.client.RetryableWritesProseTest.errorPropagationAfterEncounteringMultipleErrorsCase3(SyncMongoClient::new); + } } diff --git a/driver-sync/src/test/functional/com/mongodb/client/RetryableWritesProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/RetryableWritesProseTest.java index 478841418aa..0e4ce563178 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/RetryableWritesProseTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/RetryableWritesProseTest.java @@ -19,6 +19,7 @@ import com.mongodb.ConnectionString; import com.mongodb.Function; import com.mongodb.MongoClientSettings; +import com.mongodb.MongoException; import com.mongodb.MongoServerException; import com.mongodb.MongoWriteConcernException; import com.mongodb.ServerAddress; @@ -34,9 +35,11 @@ import com.mongodb.internal.connection.TestCommandListener; import com.mongodb.internal.connection.TestConnectionPoolListener; import com.mongodb.internal.event.ConfigureFailPointCommandListener; +import com.mongodb.lang.Nullable; import org.bson.BsonDocument; import org.bson.BsonInt32; import org.bson.Document; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import java.util.HashSet; @@ -55,6 +58,8 @@ import static com.mongodb.ClusterFixture.isSharded; import static com.mongodb.ClusterFixture.isStandalone; import static com.mongodb.ClusterFixture.serverVersionAtLeast; +import static com.mongodb.MongoException.RETRYABLE_ERROR_LABEL; +import static com.mongodb.MongoException.SYSTEM_OVERLOADED_ERROR_LABEL; import static com.mongodb.client.Fixture.getDefaultDatabaseName; import static com.mongodb.client.Fixture.getMongoClientSettingsBuilder; import static com.mongodb.client.Fixture.getMultiMongosMongoClientSettingsBuilder; @@ -66,6 +71,7 @@ import static java.util.Collections.singletonList; import static java.util.concurrent.TimeUnit.SECONDS; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -333,6 +339,180 @@ public static void retriesOnSameMongosWhenAnotherNotAvailable( } } + /** + * + * 6. Test error propagation after encountering multiple errors. + * Case 1: Test that drivers return the correct error when receiving only errors without NoWritesPerformed. + */ + @Test + @Disabled("TODO-BACKPRESSURE Valentin Enable when implementing JAVA-6055") + void errorPropagationAfterEncounteringMultipleErrorsCase1() throws Exception { + errorPropagationAfterEncounteringMultipleErrorsCase1(MongoClients::create); + } + + public static void errorPropagationAfterEncounteringMultipleErrorsCase1(final Function clientCreator) + throws Exception { + BsonDocument configureFailPoint = BsonDocument.parse( + "{\n" + + " configureFailPoint: 'failCommand',\n" + + " mode: {'times': 1},\n" + + " data: {\n" + + " failCommands: ['insert'],\n" + + " errorLabels: ['" + RETRYABLE_ERROR_LABEL + "', '" + SYSTEM_OVERLOADED_ERROR_LABEL + "'],\n" + + " errorCode: 91\n" + + " }\n" + + "}\n"); + BsonDocument configureFailPointFromListener = BsonDocument.parse( + "{\n" + + " configureFailPoint: \"failCommand\",\n" + + " mode: 'alwaysOn',\n" + + " data: {\n" + + " failCommands: ['insert'],\n" + + " errorCode: 10107,\n" + + " errorLabels: ['" + RETRYABLE_ERROR_LABEL + "', '" + SYSTEM_OVERLOADED_ERROR_LABEL + "']\n" + + " }\n" + + "}\n"); + Predicate configureFailPointEventMatcher = event -> { + if (event instanceof CommandFailedEvent) { + CommandFailedEvent commandFailedEvent = (CommandFailedEvent) event; + MongoException cause = assertInstanceOf(MongoException.class, commandFailedEvent.getThrowable()); + return cause.getCode() == 91; + } + return false; + }; + errorPropagationAfterEncounteringMultipleErrors( + clientCreator, + configureFailPoint, + configureFailPointFromListener, + configureFailPointEventMatcher, + 10107, + null); + } + + /** + * + * 6. Test error propagation after encountering multiple errors. + * Case 2: Test that drivers return the correct error when receiving only errors with NoWritesPerformed. + */ + @Test + void errorPropagationAfterEncounteringMultipleErrorsCase2() throws Exception { + errorPropagationAfterEncounteringMultipleErrorsCase2(MongoClients::create); + } + + public static void errorPropagationAfterEncounteringMultipleErrorsCase2(final Function clientCreator) + throws Exception { + BsonDocument configureFailPoint = BsonDocument.parse( + "{\n" + + " configureFailPoint: 'failCommand',\n" + + " mode: {'times': 1},\n" + + " data: {\n" + + " failCommands: ['insert'],\n" + + " errorLabels: ['" + RETRYABLE_ERROR_LABEL + "', '" + SYSTEM_OVERLOADED_ERROR_LABEL + + "', '" + NO_WRITES_PERFORMED_ERROR_LABEL + "'],\n" + + " errorCode: 91\n" + + " }\n" + + "}\n"); + BsonDocument configureFailPointFromListener = BsonDocument.parse( + "{\n" + + " configureFailPoint: \"failCommand\",\n" + + " mode: 'alwaysOn',\n" + + " data: {\n" + + " failCommands: ['insert'],\n" + + " errorCode: 10107,\n" + + " errorLabels: ['" + RETRYABLE_ERROR_LABEL + "', '" + SYSTEM_OVERLOADED_ERROR_LABEL + + "', '" + NO_WRITES_PERFORMED_ERROR_LABEL + "']\n" + + " }\n" + + "}\n"); + Predicate configureFailPointEventMatcher = event -> { + if (event instanceof CommandFailedEvent) { + CommandFailedEvent commandFailedEvent = (CommandFailedEvent) event; + MongoException cause = assertInstanceOf(MongoException.class, commandFailedEvent.getThrowable()); + return cause.getCode() == 91; + } + return false; + }; + errorPropagationAfterEncounteringMultipleErrors( + clientCreator, + configureFailPoint, + configureFailPointFromListener, + configureFailPointEventMatcher, + 91, + null); + } + + /** + * + * 6. Test error propagation after encountering multiple errors. + * Case 3: Test that drivers return the correct error when receiving some errors with NoWritesPerformed and some without NoWritesPerformed. + */ + @Test + @Disabled("TODO-BACKPRESSURE Valentin Enable when implementing JAVA-6055, fails on MongoDB 6.0") + void errorPropagationAfterEncounteringMultipleErrorsCase3() throws Exception { + errorPropagationAfterEncounteringMultipleErrorsCase3(MongoClients::create); + } + + public static void errorPropagationAfterEncounteringMultipleErrorsCase3(final Function clientCreator) + throws Exception { + BsonDocument configureFailPoint = BsonDocument.parse( + "{\n" + + " configureFailPoint: 'failCommand',\n" + + " mode: {'times': 1},\n" + + " data: {\n" + + " failCommands: ['insert'],\n" + + " errorLabels: ['" + RETRYABLE_ERROR_LABEL + "', '" + SYSTEM_OVERLOADED_ERROR_LABEL + "'],\n" + + " errorCode: 91\n" + + " }\n" + + "}\n"); + BsonDocument configureFailPointFromListener = BsonDocument.parse( + "{\n" + + " configureFailPoint: \"failCommand\",\n" + + " mode: 'alwaysOn',\n" + + " data: {\n" + + " failCommands: ['insert'],\n" + + " errorCode: 91,\n" + + " errorLabels: ['" + RETRYABLE_ERROR_LABEL + "', '" + SYSTEM_OVERLOADED_ERROR_LABEL + + "', '" + NO_WRITES_PERFORMED_ERROR_LABEL + "']\n" + + " }\n" + + "}\n"); + Predicate configureFailPointEventMatcher = event -> event instanceof CommandFailedEvent; + errorPropagationAfterEncounteringMultipleErrors( + clientCreator, + configureFailPoint, + configureFailPointFromListener, + configureFailPointEventMatcher, + 91, + NO_WRITES_PERFORMED_ERROR_LABEL); + } + + /** + * @param unexpectedErrorLabel {@code null} means there is no expectation. + */ + private static void errorPropagationAfterEncounteringMultipleErrors( + final Function clientCreator, + final BsonDocument configureFailPoint, + final BsonDocument configureFailPointFromListener, + final Predicate configureFailPointEventMatcher, + final int expectedErrorCode, + @Nullable final String unexpectedErrorLabel) throws Exception { + assumeTrue(serverVersionAtLeast(6, 0)); + assumeTrue(isDiscoverableReplicaSet()); + ServerAddress primaryServerAddress = getPrimary(); + try (ConfigureFailPointCommandListener commandListener = new ConfigureFailPointCommandListener( + configureFailPointFromListener, primaryServerAddress, configureFailPointEventMatcher); + MongoClient client = clientCreator.apply(getMongoClientSettingsBuilder() + .retryWrites(true) + .addCommandListener(commandListener) + .build()); + FailPoint ignored = FailPoint.enable(configureFailPoint, primaryServerAddress)) { + MongoCollection collection = dropAndGetCollection("errorPropagationAfterEncounteringMultipleErrors", client); + MongoException e = assertThrows(MongoException.class, () -> collection.insertOne(new Document())); + assertEquals(expectedErrorCode, e.getCode()); + if (unexpectedErrorLabel != null) { + assertFalse(e.hasErrorLabel(unexpectedErrorLabel)); + } + } + } + private static MongoCollection dropAndGetCollection(final String name, final MongoClient client) { MongoCollection result = client.getDatabase(getDefaultDatabaseName()).getCollection(name); result.drop(); From fcd0a1c420a93c86e085bf1c74906f8233dae791 Mon Sep 17 00:00:00 2001 From: Valentin Kovalenko Date: Tue, 31 Mar 2026 03:08:53 -0600 Subject: [PATCH 3/9] Replace comparisons with assertions in event matchers for `ConfigureFailPointCommandListener` --- .../mongodb/client/RetryableWritesProseTest.java | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/driver-sync/src/test/functional/com/mongodb/client/RetryableWritesProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/RetryableWritesProseTest.java index 0e4ce563178..64ed44a735f 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/RetryableWritesProseTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/RetryableWritesProseTest.java @@ -194,9 +194,12 @@ public static void originalErrorMustBePropagatedIfNoWritesPerformed( Predicate configureFailPointEventMatcher = event -> { if (event instanceof CommandSucceededEvent) { CommandSucceededEvent commandSucceededEvent = (CommandSucceededEvent) event; - return commandSucceededEvent.getCommandName().equals("insert") - && commandSucceededEvent.getResponse().getDocument("writeConcernError", new BsonDocument()) - .getInt32("code", new BsonInt32(-1)).intValue() == 91; + if (commandSucceededEvent.getCommandName().equals("insert")) { + assertEquals(91, commandSucceededEvent.getResponse().getDocument("writeConcernError", new BsonDocument()) + .getInt32("code", new BsonInt32(-1)).intValue()); + return true; + } + return false; } return false; }; @@ -376,7 +379,8 @@ public static void errorPropagationAfterEncounteringMultipleErrorsCase1(final Fu if (event instanceof CommandFailedEvent) { CommandFailedEvent commandFailedEvent = (CommandFailedEvent) event; MongoException cause = assertInstanceOf(MongoException.class, commandFailedEvent.getThrowable()); - return cause.getCode() == 91; + assertEquals(91, cause.getCode()); + return true; } return false; }; @@ -427,7 +431,8 @@ public static void errorPropagationAfterEncounteringMultipleErrorsCase2(final Fu if (event instanceof CommandFailedEvent) { CommandFailedEvent commandFailedEvent = (CommandFailedEvent) event; MongoException cause = assertInstanceOf(MongoException.class, commandFailedEvent.getThrowable()); - return cause.getCode() == 91; + assertEquals(91, cause.getCode()); + return true; } return false; }; From f9fd19e1473caae4614bae9374913c627ceb9d32 Mon Sep 17 00:00:00 2001 From: Valentin Kovalenko Date: Tue, 31 Mar 2026 03:49:36 -0600 Subject: [PATCH 4/9] Annotate named implementations of `CommandListener` with `@ThreadSafe`/`@NotThreadSafe`. --- .../mongodb/internal/event/CommandListenerMulticaster.java | 7 ++++++- .../mongodb/internal/connection/TestCommandListener.java | 2 ++ .../reactivestreams/client/ContextProviderTest.java | 2 ++ .../functional/com/mongodb/client/ContextProviderTest.java | 2 ++ .../internal/event/ConfigureFailPointCommandListener.java | 2 ++ 5 files changed, 14 insertions(+), 1 deletion(-) diff --git a/driver-core/src/main/com/mongodb/internal/event/CommandListenerMulticaster.java b/driver-core/src/main/com/mongodb/internal/event/CommandListenerMulticaster.java index e18318f8cde..8f9a9f51e24 100644 --- a/driver-core/src/main/com/mongodb/internal/event/CommandListenerMulticaster.java +++ b/driver-core/src/main/com/mongodb/internal/event/CommandListenerMulticaster.java @@ -16,6 +16,7 @@ package com.mongodb.internal.event; +import com.mongodb.annotations.ThreadSafe; import com.mongodb.event.CommandFailedEvent; import com.mongodb.event.CommandListener; import com.mongodb.event.CommandStartedEvent; @@ -29,7 +30,11 @@ import static com.mongodb.assertions.Assertions.isTrue; import static java.lang.String.format; - +/** + * This {@link CommandListener} is {@linkplain ThreadSafe thread-safe}, + * provided that the recipient listeners passed to {@link #CommandListenerMulticaster(List)} are. + */ +@ThreadSafe final class CommandListenerMulticaster implements CommandListener { private static final Logger LOGGER = Loggers.getLogger("protocol.event"); diff --git a/driver-core/src/test/functional/com/mongodb/internal/connection/TestCommandListener.java b/driver-core/src/test/functional/com/mongodb/internal/connection/TestCommandListener.java index 9381ad842a1..97fb8c82a4f 100644 --- a/driver-core/src/test/functional/com/mongodb/internal/connection/TestCommandListener.java +++ b/driver-core/src/test/functional/com/mongodb/internal/connection/TestCommandListener.java @@ -17,6 +17,7 @@ package com.mongodb.internal.connection; import com.mongodb.MongoTimeoutException; +import com.mongodb.annotations.ThreadSafe; import com.mongodb.client.TestListener; import com.mongodb.event.CommandEvent; import com.mongodb.event.CommandFailedEvent; @@ -56,6 +57,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +@ThreadSafe public class TestCommandListener implements CommandListener { private final List eventTypes; private final List ignoredCommandMonitoringEvents; diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ContextProviderTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ContextProviderTest.java index 90529171219..80913aab843 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ContextProviderTest.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ContextProviderTest.java @@ -19,6 +19,7 @@ import com.mongodb.ContextProvider; import com.mongodb.RequestContext; import com.mongodb.WriteConcern; +import com.mongodb.annotations.NotThreadSafe; import com.mongodb.event.CommandFailedEvent; import com.mongodb.event.CommandListener; import com.mongodb.event.CommandStartedEvent; @@ -186,6 +187,7 @@ public void contextShouldBeAvailableInCommandEvents() { } } + @NotThreadSafe private static final class TestCommandListener implements CommandListener { private int numCommandStartedEventsWithExpectedContext; private int numCommandSucceededEventsWithExpectedContext; diff --git a/driver-sync/src/test/functional/com/mongodb/client/ContextProviderTest.java b/driver-sync/src/test/functional/com/mongodb/client/ContextProviderTest.java index caf676a8ab7..c0247c9c7a2 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/ContextProviderTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/ContextProviderTest.java @@ -19,6 +19,7 @@ import com.mongodb.ContextProvider; import com.mongodb.RequestContext; import com.mongodb.WriteConcern; +import com.mongodb.annotations.NotThreadSafe; import com.mongodb.event.CommandFailedEvent; import com.mongodb.event.CommandListener; import com.mongodb.event.CommandStartedEvent; @@ -206,6 +207,7 @@ public void contextShouldBeAvailableInCommandEvents() { } } + @NotThreadSafe private static final class TestCommandListener implements CommandListener { private int numCommandStartedEventsWithExpectedContext; private int numCommandSucceededEventsWithExpectedContext; diff --git a/driver-sync/src/test/functional/com/mongodb/internal/event/ConfigureFailPointCommandListener.java b/driver-sync/src/test/functional/com/mongodb/internal/event/ConfigureFailPointCommandListener.java index dd99711cac3..105c196f52c 100644 --- a/driver-sync/src/test/functional/com/mongodb/internal/event/ConfigureFailPointCommandListener.java +++ b/driver-sync/src/test/functional/com/mongodb/internal/event/ConfigureFailPointCommandListener.java @@ -17,6 +17,7 @@ package com.mongodb.internal.event; import com.mongodb.ServerAddress; +import com.mongodb.annotations.ThreadSafe; import com.mongodb.client.FailPoint; import com.mongodb.event.CommandEvent; import com.mongodb.event.CommandFailedEvent; @@ -33,6 +34,7 @@ import static com.mongodb.assertions.Assertions.assertTrue; import static com.mongodb.assertions.Assertions.fail; +@ThreadSafe public final class ConfigureFailPointCommandListener implements CommandListener, AutoCloseable { private final BsonDocument configureFailPoint; private final ServerAddress serverAddress; From 78915452fa6d9634996f4fe59175d01d8fcba2da Mon Sep 17 00:00:00 2001 From: Valentin Kovalenko Date: Tue, 31 Mar 2026 04:02:01 -0600 Subject: [PATCH 5/9] Add assertions not required by the spec to `errorPropagationAfterEncounteringMultipleErrorsCase3` This made its `configureFailPointEventMatcher` the same as the one in cases 1 and 2, which lead to a nice simplification. --- .../client/RetryableWritesProseTest.java | 32 ++++++------------- 1 file changed, 9 insertions(+), 23 deletions(-) diff --git a/driver-sync/src/test/functional/com/mongodb/client/RetryableWritesProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/RetryableWritesProseTest.java index 64ed44a735f..593644a9751 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/RetryableWritesProseTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/RetryableWritesProseTest.java @@ -375,20 +375,10 @@ public static void errorPropagationAfterEncounteringMultipleErrorsCase1(final Fu + " errorLabels: ['" + RETRYABLE_ERROR_LABEL + "', '" + SYSTEM_OVERLOADED_ERROR_LABEL + "']\n" + " }\n" + "}\n"); - Predicate configureFailPointEventMatcher = event -> { - if (event instanceof CommandFailedEvent) { - CommandFailedEvent commandFailedEvent = (CommandFailedEvent) event; - MongoException cause = assertInstanceOf(MongoException.class, commandFailedEvent.getThrowable()); - assertEquals(91, cause.getCode()); - return true; - } - return false; - }; errorPropagationAfterEncounteringMultipleErrors( clientCreator, configureFailPoint, configureFailPointFromListener, - configureFailPointEventMatcher, 10107, null); } @@ -427,20 +417,10 @@ public static void errorPropagationAfterEncounteringMultipleErrorsCase2(final Fu + "', '" + NO_WRITES_PERFORMED_ERROR_LABEL + "']\n" + " }\n" + "}\n"); - Predicate configureFailPointEventMatcher = event -> { - if (event instanceof CommandFailedEvent) { - CommandFailedEvent commandFailedEvent = (CommandFailedEvent) event; - MongoException cause = assertInstanceOf(MongoException.class, commandFailedEvent.getThrowable()); - assertEquals(91, cause.getCode()); - return true; - } - return false; - }; errorPropagationAfterEncounteringMultipleErrors( clientCreator, configureFailPoint, configureFailPointFromListener, - configureFailPointEventMatcher, 91, null); } @@ -479,12 +459,10 @@ public static void errorPropagationAfterEncounteringMultipleErrorsCase3(final Fu + "', '" + NO_WRITES_PERFORMED_ERROR_LABEL + "']\n" + " }\n" + "}\n"); - Predicate configureFailPointEventMatcher = event -> event instanceof CommandFailedEvent; errorPropagationAfterEncounteringMultipleErrors( clientCreator, configureFailPoint, configureFailPointFromListener, - configureFailPointEventMatcher, 91, NO_WRITES_PERFORMED_ERROR_LABEL); } @@ -496,12 +474,20 @@ private static void errorPropagationAfterEncounteringMultipleErrors( final Function clientCreator, final BsonDocument configureFailPoint, final BsonDocument configureFailPointFromListener, - final Predicate configureFailPointEventMatcher, final int expectedErrorCode, @Nullable final String unexpectedErrorLabel) throws Exception { assumeTrue(serverVersionAtLeast(6, 0)); assumeTrue(isDiscoverableReplicaSet()); ServerAddress primaryServerAddress = getPrimary(); + Predicate configureFailPointEventMatcher = event -> { + if (event instanceof CommandFailedEvent) { + CommandFailedEvent commandFailedEvent = (CommandFailedEvent) event; + MongoException cause = assertInstanceOf(MongoException.class, commandFailedEvent.getThrowable()); + assertEquals(91, cause.getCode()); + return true; + } + return false; + }; try (ConfigureFailPointCommandListener commandListener = new ConfigureFailPointCommandListener( configureFailPointFromListener, primaryServerAddress, configureFailPointEventMatcher); MongoClient client = clientCreator.apply(getMongoClientSettingsBuilder() From 391c317d27894b547de7959bd7ebc45f30b045f7 Mon Sep 17 00:00:00 2001 From: Valentin Kovalenko Date: Tue, 31 Mar 2026 04:18:47 -0600 Subject: [PATCH 6/9] Improve the documentation of `ConfigureFailPointCommandListener` The new wording provides guarantees that allow assertions in event matchers. --- .../internal/event/ConfigureFailPointCommandListener.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/driver-sync/src/test/functional/com/mongodb/internal/event/ConfigureFailPointCommandListener.java b/driver-sync/src/test/functional/com/mongodb/internal/event/ConfigureFailPointCommandListener.java index 105c196f52c..a31182a51c0 100644 --- a/driver-sync/src/test/functional/com/mongodb/internal/event/ConfigureFailPointCommandListener.java +++ b/driver-sync/src/test/functional/com/mongodb/internal/event/ConfigureFailPointCommandListener.java @@ -46,7 +46,10 @@ public final class ConfigureFailPointCommandListener implements CommandListener, * @param configureFailPoint See {@link FailPoint#enable(BsonDocument, ServerAddress)}. * @param serverAddress See {@link FailPoint#enable(BsonDocument, ServerAddress)}. * @param eventMatcher When an event is matched, an attempt to configure the fail point - * specified via {@code configureFailPoint} is made. The attempt is made at most once regardless of how many events are matched. + * specified via {@code configureFailPoint} is made. + * The {@code eventMatcher} is guaranteed to be {@linkplain Predicate#test(Object) used} sequentially. + * The attempt is made at most once, + * and the {@code eventMatcher} {@linkplain Predicate#test(Object) test} that caused the attempt is the last one. */ public ConfigureFailPointCommandListener( final BsonDocument configureFailPoint, From 4b9a565902cfd4f6885592b190e9125a2dd1e24c Mon Sep 17 00:00:00 2001 From: Valentin Kovalenko Date: Wed, 1 Apr 2026 13:31:57 -0600 Subject: [PATCH 7/9] Make the new tests work when run against MongoDB 6 --- .../com/mongodb/client/RetryableWritesProseTest.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/driver-sync/src/test/functional/com/mongodb/client/RetryableWritesProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/RetryableWritesProseTest.java index 593644a9751..5258eacfbb8 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/RetryableWritesProseTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/RetryableWritesProseTest.java @@ -482,6 +482,10 @@ private static void errorPropagationAfterEncounteringMultipleErrors( Predicate configureFailPointEventMatcher = event -> { if (event instanceof CommandFailedEvent) { CommandFailedEvent commandFailedEvent = (CommandFailedEvent) event; + if (commandFailedEvent.getCommandName().equals("drop")) { + // this code may run against MongoDB 6, where dropping a nonexistent collection results in an error + return false; + } MongoException cause = assertInstanceOf(MongoException.class, commandFailedEvent.getThrowable()); assertEquals(91, cause.getCode()); return true; From 68fdf37638f3e31f927ab2b6bd81d6aad3722cf9 Mon Sep 17 00:00:00 2001 From: Valentin Kovalenko Date: Thu, 9 Apr 2026 12:59:18 -0600 Subject: [PATCH 8/9] Enable `errorPropagationAfterEncounteringMultipleErrorsCase3` --- .../functional/com/mongodb/client/RetryableWritesProseTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/driver-sync/src/test/functional/com/mongodb/client/RetryableWritesProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/RetryableWritesProseTest.java index 5258eacfbb8..c49d1a8b4f1 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/RetryableWritesProseTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/RetryableWritesProseTest.java @@ -431,7 +431,6 @@ public static void errorPropagationAfterEncounteringMultipleErrorsCase2(final Fu * Case 3: Test that drivers return the correct error when receiving some errors with NoWritesPerformed and some without NoWritesPerformed. */ @Test - @Disabled("TODO-BACKPRESSURE Valentin Enable when implementing JAVA-6055, fails on MongoDB 6.0") void errorPropagationAfterEncounteringMultipleErrorsCase3() throws Exception { errorPropagationAfterEncounteringMultipleErrorsCase3(MongoClients::create); } From 31562053d93f7cae3d4f5a12b1aec05a31e7969b Mon Sep 17 00:00:00 2001 From: Valentin Kovalenko Date: Thu, 9 Apr 2026 13:06:38 -0600 Subject: [PATCH 9/9] Enable reactive `errorPropagationAfterEncounteringMultipleErrorsCase3` --- .../mongodb/reactivestreams/client/RetryableWritesProseTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/RetryableWritesProseTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/RetryableWritesProseTest.java index 7e79b2af59d..38ef09a4771 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/RetryableWritesProseTest.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/RetryableWritesProseTest.java @@ -96,7 +96,6 @@ void errorPropagationAfterEncounteringMultipleErrorsCase2() throws Exception { * Case 3: Test that drivers return the correct error when receiving some errors with NoWritesPerformed and some without NoWritesPerformed. */ @Test - @Disabled("TODO-BACKPRESSURE Valentin Enable when implementing JAVA-6055, fails on MongoDB 6.0") void errorPropagationAfterEncounteringMultipleErrorsCase3() throws Exception { com.mongodb.client.RetryableWritesProseTest.errorPropagationAfterEncounteringMultipleErrorsCase3(SyncMongoClient::new); }