diff --git a/driver-core/src/main/com/mongodb/internal/event/CommandListenerMulticaster.java b/driver-core/src/main/com/mongodb/internal/event/CommandListenerMulticaster.java index e18318f8cde..8f9a9f51e24 100644 --- a/driver-core/src/main/com/mongodb/internal/event/CommandListenerMulticaster.java +++ b/driver-core/src/main/com/mongodb/internal/event/CommandListenerMulticaster.java @@ -16,6 +16,7 @@ package com.mongodb.internal.event; +import com.mongodb.annotations.ThreadSafe; import com.mongodb.event.CommandFailedEvent; import com.mongodb.event.CommandListener; import com.mongodb.event.CommandStartedEvent; @@ -29,7 +30,11 @@ import static com.mongodb.assertions.Assertions.isTrue; import static java.lang.String.format; - +/** + * This {@link CommandListener} is {@linkplain ThreadSafe thread-safe}, + * provided that the recipient listeners passed to {@link #CommandListenerMulticaster(List)} are. + */ +@ThreadSafe final class CommandListenerMulticaster implements CommandListener { private static final Logger LOGGER = Loggers.getLogger("protocol.event"); diff --git a/driver-core/src/test/functional/com/mongodb/internal/connection/TestCommandListener.java b/driver-core/src/test/functional/com/mongodb/internal/connection/TestCommandListener.java index 9381ad842a1..97fb8c82a4f 100644 --- a/driver-core/src/test/functional/com/mongodb/internal/connection/TestCommandListener.java +++ b/driver-core/src/test/functional/com/mongodb/internal/connection/TestCommandListener.java @@ -17,6 +17,7 @@ package com.mongodb.internal.connection; import com.mongodb.MongoTimeoutException; +import com.mongodb.annotations.ThreadSafe; import com.mongodb.client.TestListener; import com.mongodb.event.CommandEvent; import com.mongodb.event.CommandFailedEvent; @@ -56,6 +57,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +@ThreadSafe public class TestCommandListener implements CommandListener { private final List eventTypes; private final List ignoredCommandMonitoringEvents; diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ContextProviderTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ContextProviderTest.java index 90529171219..80913aab843 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ContextProviderTest.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ContextProviderTest.java @@ -19,6 +19,7 @@ import com.mongodb.ContextProvider; import com.mongodb.RequestContext; import com.mongodb.WriteConcern; +import com.mongodb.annotations.NotThreadSafe; import com.mongodb.event.CommandFailedEvent; import com.mongodb.event.CommandListener; import com.mongodb.event.CommandStartedEvent; @@ -186,6 +187,7 @@ public void contextShouldBeAvailableInCommandEvents() { } } + @NotThreadSafe private static final class TestCommandListener implements CommandListener { private int numCommandStartedEventsWithExpectedContext; private int numCommandSucceededEventsWithExpectedContext; diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/RetryableReadsProseTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/RetryableReadsProseTest.java index 22b7f7645e1..84dd0d733bf 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/RetryableReadsProseTest.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/RetryableReadsProseTest.java @@ -22,32 +22,27 @@ import org.bson.Document; import org.junit.jupiter.api.Test; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeoutException; - import static com.mongodb.client.model.Filters.eq; /** - * See - * Retryable Reads Tests. + * + * Prose Tests. */ final class RetryableReadsProseTest { /** - * See - * - * PoolClearedError Retryability Test. + * + * 1. PoolClearedError Retryability Test. */ @Test - void poolClearedExceptionMustBeRetryable() throws InterruptedException, ExecutionException, TimeoutException { + void poolClearedExceptionMustBeRetryable() throws Exception { RetryableWritesProseTest.poolClearedExceptionMustBeRetryable( SyncMongoClient::new, mongoCollection -> mongoCollection.find(eq(0)).iterator().hasNext(), "find", false); } /** - * See - * - * Retryable Reads Are Retried on a Different mongos When One is Available. + * + * 2.1 Retryable Reads Are Retried on a Different mongos When One is Available. */ @Test void retriesOnDifferentMongosWhenAvailable() { @@ -61,9 +56,8 @@ void retriesOnDifferentMongosWhenAvailable() { } /** - * See - * - * Retryable Reads Are Retried on the Same mongos When No Others are Available. + * + * 2.2 Retryable Reads Are Retried on the Same mongos When No Others are Available. */ @Test void retriesOnSameMongosWhenAnotherNotAvailable() { diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/RetryableWritesProseTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/RetryableWritesProseTest.java index 51a37ad1e35..38ef09a4771 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/RetryableWritesProseTest.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/RetryableWritesProseTest.java @@ -16,68 +16,87 @@ package com.mongodb.reactivestreams.client; -import com.mongodb.client.test.CollectionHelper; import com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient; import org.bson.Document; -import org.bson.codecs.DocumentCodec; -import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeoutException; - /** - * See - * Retryable Write Prose Tests. + * + * Prose Tests. */ -public class RetryableWritesProseTest extends DatabaseTestCase { - private CollectionHelper collectionHelper; - - @BeforeEach - @Override - public void setUp() { - super.setUp(); - - collectionHelper = new CollectionHelper<>(new DocumentCodec(), collection.getNamespace()); - collectionHelper.create(); - } - +final class RetryableWritesProseTest { /** - * Prose test #2. + * + * 2. Test that drivers properly retry after encountering PoolClearedErrors. */ @Test - public void poolClearedExceptionMustBeRetryable() throws InterruptedException, ExecutionException, TimeoutException { + void poolClearedExceptionMustBeRetryable() throws Exception { com.mongodb.client.RetryableWritesProseTest.poolClearedExceptionMustBeRetryable( SyncMongoClient::new, mongoCollection -> mongoCollection.insertOne(new Document()), "insert", true); } /** - * Prose test #3. + * + * 3. Test that drivers return the original error after encountering a WriteConcernError with a RetryableWriteError label. */ @Test - public void originalErrorMustBePropagatedIfNoWritesPerformed() throws InterruptedException { + void originalErrorMustBePropagatedIfNoWritesPerformed() throws Exception { com.mongodb.client.RetryableWritesProseTest.originalErrorMustBePropagatedIfNoWritesPerformed( SyncMongoClient::new); } /** - * Prose test #4. + * + * 4. Test that in a sharded cluster writes are retried on a different mongos when one is available. */ @Test - public void retriesOnDifferentMongosWhenAvailable() { + void retriesOnDifferentMongosWhenAvailable() { com.mongodb.client.RetryableWritesProseTest.retriesOnDifferentMongosWhenAvailable( SyncMongoClient::new, mongoCollection -> mongoCollection.insertOne(new Document()), "insert", true); } /** - * Prose test #5. + * + * 5. Test that in a sharded cluster writes are retried on the same mongos when no others are available. */ @Test - public void retriesOnSameMongosWhenAnotherNotAvailable() { + void retriesOnSameMongosWhenAnotherNotAvailable() { com.mongodb.client.RetryableWritesProseTest.retriesOnSameMongosWhenAnotherNotAvailable( SyncMongoClient::new, mongoCollection -> mongoCollection.insertOne(new Document()), "insert", true); } + + /** + * + * 6. Test error propagation after encountering multiple errors. + * Case 1: Test that drivers return the correct error when receiving only errors without NoWritesPerformed. + */ + @Test + @Disabled("TODO-BACKPRESSURE Valentin Enable when implementing JAVA-6055") + void errorPropagationAfterEncounteringMultipleErrorsCase1() throws Exception { + com.mongodb.client.RetryableWritesProseTest.errorPropagationAfterEncounteringMultipleErrorsCase1(SyncMongoClient::new); + } + + /** + * + * 6. Test error propagation after encountering multiple errors. + * Case 2: Test that drivers return the correct error when receiving only errors with NoWritesPerformed. + */ + @Test + void errorPropagationAfterEncounteringMultipleErrorsCase2() throws Exception { + com.mongodb.client.RetryableWritesProseTest.errorPropagationAfterEncounteringMultipleErrorsCase2(SyncMongoClient::new); + } + + /** + * + * 6. Test error propagation after encountering multiple errors. + * Case 3: Test that drivers return the correct error when receiving some errors with NoWritesPerformed and some without NoWritesPerformed. + */ + @Test + void errorPropagationAfterEncounteringMultipleErrorsCase3() throws Exception { + com.mongodb.client.RetryableWritesProseTest.errorPropagationAfterEncounteringMultipleErrorsCase3(SyncMongoClient::new); + } } diff --git a/driver-sync/src/test/functional/com/mongodb/client/ContextProviderTest.java b/driver-sync/src/test/functional/com/mongodb/client/ContextProviderTest.java index caf676a8ab7..c0247c9c7a2 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/ContextProviderTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/ContextProviderTest.java @@ -19,6 +19,7 @@ import com.mongodb.ContextProvider; import com.mongodb.RequestContext; import com.mongodb.WriteConcern; +import com.mongodb.annotations.NotThreadSafe; import com.mongodb.event.CommandFailedEvent; import com.mongodb.event.CommandListener; import com.mongodb.event.CommandStartedEvent; @@ -206,6 +207,7 @@ public void contextShouldBeAvailableInCommandEvents() { } } + @NotThreadSafe private static final class TestCommandListener implements CommandListener { private int numCommandStartedEventsWithExpectedContext; private int numCommandSucceededEventsWithExpectedContext; diff --git a/driver-sync/src/test/functional/com/mongodb/client/RetryableReadsProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/RetryableReadsProseTest.java index ccf18aad5b9..59b6a9aad19 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/RetryableReadsProseTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/RetryableReadsProseTest.java @@ -19,31 +19,26 @@ import org.bson.Document; import org.junit.jupiter.api.Test; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeoutException; - import static com.mongodb.client.model.Filters.eq; /** - * See - * Retryable Reads Tests. + * + * Prose Tests. */ final class RetryableReadsProseTest { /** - * See - * - * PoolClearedError Retryability Test. + * + * 1. PoolClearedError Retryability Test. */ @Test - void poolClearedExceptionMustBeRetryable() throws InterruptedException, ExecutionException, TimeoutException { + void poolClearedExceptionMustBeRetryable() throws Exception { RetryableWritesProseTest.poolClearedExceptionMustBeRetryable(MongoClients::create, mongoCollection -> mongoCollection.find(eq(0)).iterator().hasNext(), "find", false); } /** - * See - * - * Retryable Reads Are Retried on a Different mongos When One is Available. + * + * 2.1 Retryable Reads Are Retried on a Different mongos When One is Available. */ @Test void retriesOnDifferentMongosWhenAvailable() { @@ -56,9 +51,8 @@ void retriesOnDifferentMongosWhenAvailable() { } /** - * See - * - * Retryable Reads Are Retried on the Same mongos When No Others are Available. + * + * 2.2 Retryable Reads Are Retried on the Same mongos When No Others are Available. */ @Test void retriesOnSameMongosWhenAnotherNotAvailable() { diff --git a/driver-sync/src/test/functional/com/mongodb/client/RetryableWritesProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/RetryableWritesProseTest.java index 3d5743fb459..c49d1a8b4f1 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/RetryableWritesProseTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/RetryableWritesProseTest.java @@ -19,15 +19,14 @@ import com.mongodb.ConnectionString; import com.mongodb.Function; import com.mongodb.MongoClientSettings; +import com.mongodb.MongoException; import com.mongodb.MongoServerException; import com.mongodb.MongoWriteConcernException; import com.mongodb.ServerAddress; -import com.mongodb.assertions.Assertions; import com.mongodb.connection.ClusterConnectionMode; import com.mongodb.connection.ConnectionDescription; import com.mongodb.event.CommandEvent; import com.mongodb.event.CommandFailedEvent; -import com.mongodb.event.CommandListener; import com.mongodb.event.CommandSucceededEvent; import com.mongodb.event.ConnectionCheckOutFailedEvent; import com.mongodb.event.ConnectionCheckedOutEvent; @@ -35,28 +34,23 @@ import com.mongodb.internal.connection.ServerAddressHelper; import com.mongodb.internal.connection.TestCommandListener; import com.mongodb.internal.connection.TestConnectionPoolListener; -import org.bson.BsonArray; -import org.bson.BsonBoolean; +import com.mongodb.internal.event.ConfigureFailPointCommandListener; +import com.mongodb.lang.Nullable; import org.bson.BsonDocument; import org.bson.BsonInt32; -import org.bson.BsonString; import org.bson.Document; -import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Predicate; import java.util.stream.Collectors; -import java.util.stream.Stream; import static com.mongodb.ClusterFixture.getConnectionString; import static com.mongodb.ClusterFixture.getMultiMongosConnectionString; @@ -64,9 +58,12 @@ import static com.mongodb.ClusterFixture.isSharded; import static com.mongodb.ClusterFixture.isStandalone; import static com.mongodb.ClusterFixture.serverVersionAtLeast; +import static com.mongodb.MongoException.RETRYABLE_ERROR_LABEL; +import static com.mongodb.MongoException.SYSTEM_OVERLOADED_ERROR_LABEL; import static com.mongodb.client.Fixture.getDefaultDatabaseName; import static com.mongodb.client.Fixture.getMongoClientSettingsBuilder; import static com.mongodb.client.Fixture.getMultiMongosMongoClientSettingsBuilder; +import static com.mongodb.client.Fixture.getPrimary; import static com.mongodb.internal.operation.CommandOperationHelper.NO_WRITES_PERFORMED_ERROR_LABEL; import static com.mongodb.internal.operation.CommandOperationHelper.RETRYABLE_WRITE_ERROR_LABEL; import static java.util.Arrays.asList; @@ -74,28 +71,23 @@ import static java.util.Collections.singletonList; import static java.util.concurrent.TimeUnit.SECONDS; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assumptions.assumeTrue; /** - * See - * Retryable Write Prose Tests. + * + * Prose Tests. */ -public class RetryableWritesProseTest extends DatabaseTestCase { - - @BeforeEach - @Override - public void setUp() { - super.setUp(); - } - +public class RetryableWritesProseTest { /** - * Prose test #2. + * + * 2. Test that drivers properly retry after encountering PoolClearedErrors. */ @Test - public void poolClearedExceptionMustBeRetryable() throws InterruptedException, ExecutionException, TimeoutException { + void poolClearedExceptionMustBeRetryable() throws Exception { poolClearedExceptionMustBeRetryable(MongoClients::create, mongoCollection -> mongoCollection.insertOne(new Document()), "insert", true); } @@ -103,8 +95,7 @@ public void poolClearedExceptionMustBeRetryable() throws InterruptedException, E @SuppressWarnings("try") public static void poolClearedExceptionMustBeRetryable( final Function clientCreator, - final Function, R> operation, final String operationName, final boolean write) - throws InterruptedException, ExecutionException, TimeoutException { + final Function, R> operation, final String commandName, final boolean write) throws Exception { assumeTrue(serverVersionAtLeast(4, 3) && !(write && isStandalone())); TestConnectionPoolListener connectionPoolListener = new TestConnectionPoolListener(asList( "connectionCheckedOutEvent", @@ -120,7 +111,7 @@ public static void poolClearedExceptionMustBeRetryable( /* We fake server's state by configuring a fail point. This breaks the mechanism of the * streaming server monitoring protocol * (https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-monitoring.md#streaming-protocol) - * that allows the server to determine whether or not it needs to send a new state to the client. + * that allows the server to determine whether it needs to send a new state to the client. * As a result, the client has to wait for at least its heartbeat delay until it hears back from a server * (while it waits for a response, calling `ServerMonitor.connect` has no effect). * Thus, we want to use small heartbeat delay to reduce delays in the test. */ @@ -129,24 +120,23 @@ public static void poolClearedExceptionMustBeRetryable( .retryWrites(true) .addCommandListener(commandListener) .build(); - BsonDocument configureFailPoint = new BsonDocument() - .append("configureFailPoint", new BsonString("failCommand")) - .append("mode", new BsonDocument() - .append("times", new BsonInt32(1))) - .append("data", new BsonDocument() - .append("failCommands", new BsonArray(singletonList(new BsonString(operationName)))) - .append("errorCode", new BsonInt32(91)) - .append("errorLabels", write - ? new BsonArray(singletonList(new BsonString(RETRYABLE_WRITE_ERROR_LABEL))) - : new BsonArray()) - .append("blockConnection", BsonBoolean.valueOf(true)) - .append("blockTimeMS", new BsonInt32(1000))); + BsonDocument configureFailPoint = BsonDocument.parse( + "{\n" + + " configureFailPoint: 'failCommand',\n" + + " mode: {'times': 1},\n" + + " data: {\n" + + " failCommands: ['" + commandName + "'],\n" + + " errorCode: 91,\n" + + " blockConnection: true,\n" + + " blockTimeMS: 1000,\n" + + (write + ? " errorLabels: ['" + RETRYABLE_WRITE_ERROR_LABEL + "']\n" : "") + + " }\n" + + "}\n"); int timeoutSeconds = 5; try (MongoClient client = clientCreator.apply(clientSettings); - FailPoint ignored = FailPoint.enable(configureFailPoint, Fixture.getPrimary())) { - MongoCollection collection = client.getDatabase(getDefaultDatabaseName()) - .getCollection("poolClearedExceptionMustBeRetryable"); - collection.drop(); + FailPoint ignored = FailPoint.enable(configureFailPoint, getPrimary())) { + MongoCollection collection = dropAndGetCollection("poolClearedExceptionMustBeRetryable", client); ExecutorService ex = Executors.newFixedThreadPool(2); try { Future result1 = ex.submit(() -> operation.apply(collection)); @@ -160,83 +150,81 @@ public static void poolClearedExceptionMustBeRetryable( ex.shutdownNow(); } assertEquals(3, commandListener.getCommandStartedEvents().size()); - commandListener.getCommandStartedEvents().forEach(event -> assertEquals(operationName, event.getCommandName())); + commandListener.getCommandStartedEvents().forEach(event -> assertEquals(commandName, event.getCommandName())); } } /** - * Prose test #3. + * + * 3. Test that drivers return the original error after encountering a WriteConcernError with a RetryableWriteError label. */ @Test - public void originalErrorMustBePropagatedIfNoWritesPerformed() throws InterruptedException { + void originalErrorMustBePropagatedIfNoWritesPerformed() throws Exception { originalErrorMustBePropagatedIfNoWritesPerformed(MongoClients::create); } @SuppressWarnings("try") public static void originalErrorMustBePropagatedIfNoWritesPerformed( - final Function clientCreator) throws InterruptedException { + final Function clientCreator) throws Exception { assumeTrue(serverVersionAtLeast(6, 0) && isDiscoverableReplicaSet()); - ServerAddress primaryServerAddress = Fixture.getPrimary(); - CompletableFuture futureFailPointFromListener = new CompletableFuture<>(); - CommandListener commandListener = new CommandListener() { - private final AtomicBoolean configureFailPoint = new AtomicBoolean(true); - - @Override - public void commandSucceeded(final CommandSucceededEvent event) { - if (event.getCommandName().equals("insert") - && event.getResponse().getDocument("writeConcernError", new BsonDocument()) - .getInt32("code", new BsonInt32(-1)).intValue() == 91 - && configureFailPoint.compareAndSet(true, false)) { - Assertions.assertTrue(futureFailPointFromListener.complete(FailPoint.enable( - new BsonDocument() - .append("configureFailPoint", new BsonString("failCommand")) - .append("mode", new BsonDocument() - .append("times", new BsonInt32(1))) - .append("data", new BsonDocument() - .append("failCommands", new BsonArray(singletonList(new BsonString("insert")))) - .append("errorCode", new BsonInt32(10107)) - .append("errorLabels", new BsonArray(Stream.of(RETRYABLE_WRITE_ERROR_LABEL, NO_WRITES_PERFORMED_ERROR_LABEL) - .map(BsonString::new).collect(Collectors.toList())))), - primaryServerAddress - ))); + ServerAddress primaryServerAddress = getPrimary(); + BsonDocument configureFailPoint = BsonDocument.parse( + "{\n" + + " configureFailPoint: \"failCommand\",\n" + + " mode: { times: 1 },\n" + + " data: {\n" + + " failCommands: ['insert'],\n" + + " writeConcernError: {" + + " errorLabels: ['" + RETRYABLE_WRITE_ERROR_LABEL + "'],\n" + + " code: 91,\n" + + " errmsg: ''\n" + + " }\n" + + " }\n" + + "}\n"); + BsonDocument configureFailPointFromListener = BsonDocument.parse( + "{\n" + + " configureFailPoint: \"failCommand\",\n" + + " mode: { times: 1 },\n" + + " data: {\n" + + " failCommands: ['insert'],\n" + + " errorCode: 10107,\n" + + " errorLabels: ['" + RETRYABLE_WRITE_ERROR_LABEL + "', '" + NO_WRITES_PERFORMED_ERROR_LABEL + "']\n" + + " }\n" + + "}\n"); + Predicate configureFailPointEventMatcher = event -> { + if (event instanceof CommandSucceededEvent) { + CommandSucceededEvent commandSucceededEvent = (CommandSucceededEvent) event; + if (commandSucceededEvent.getCommandName().equals("insert")) { + assertEquals(91, commandSucceededEvent.getResponse().getDocument("writeConcernError", new BsonDocument()) + .getInt32("code", new BsonInt32(-1)).intValue()); + return true; } + return false; } + return false; }; - BsonDocument failPointDocument = new BsonDocument() - .append("configureFailPoint", new BsonString("failCommand")) - .append("mode", new BsonDocument() - .append("times", new BsonInt32(1))) - .append("data", new BsonDocument() - .append("writeConcernError", new BsonDocument() - .append("code", new BsonInt32(91)) - .append("errorLabels", new BsonArray(Stream.of(RETRYABLE_WRITE_ERROR_LABEL) - .map(BsonString::new).collect(Collectors.toList()))) - .append("errmsg", new BsonString("")) - ) - .append("failCommands", new BsonArray(singletonList(new BsonString("insert"))))); - try (MongoClient client = clientCreator.apply(getMongoClientSettingsBuilder() - .retryWrites(true) - .addCommandListener(commandListener) - .applyToServerSettings(builder -> - // see `poolClearedExceptionMustBeRetryable` for the explanation - builder.heartbeatFrequency(50, TimeUnit.MILLISECONDS)) - .build()); - FailPoint ignored = FailPoint.enable(failPointDocument, primaryServerAddress)) { - MongoCollection collection = client.getDatabase(getDefaultDatabaseName()) - .getCollection("originalErrorMustBePropagatedIfNoWritesPerformed"); - collection.drop(); + try (ConfigureFailPointCommandListener commandListener = new ConfigureFailPointCommandListener( + configureFailPointFromListener, primaryServerAddress, configureFailPointEventMatcher); + MongoClient client = clientCreator.apply(getMongoClientSettingsBuilder() + .retryWrites(true) + .addCommandListener(commandListener) + .applyToServerSettings(builder -> + // see `poolClearedExceptionMustBeRetryable` for the explanation + builder.heartbeatFrequency(50, TimeUnit.MILLISECONDS)) + .build()); + FailPoint ignored = FailPoint.enable(configureFailPoint, primaryServerAddress)) { + MongoCollection collection = dropAndGetCollection("originalErrorMustBePropagatedIfNoWritesPerformed", client); MongoWriteConcernException e = assertThrows(MongoWriteConcernException.class, () -> collection.insertOne(new Document())); assertEquals(91, e.getCode()); - } finally { - futureFailPointFromListener.thenAccept(FailPoint::close); } } /** - * Prose test #4. + * + * 4. Test that in a sharded cluster writes are retried on a different mongos when one is available. */ @Test - public void retriesOnDifferentMongosWhenAvailable() { + void retriesOnDifferentMongosWhenAvailable() { retriesOnDifferentMongosWhenAvailable(MongoClients::create, mongoCollection -> mongoCollection.insertOne(new Document()), "insert", true); } @@ -244,7 +232,7 @@ public void retriesOnDifferentMongosWhenAvailable() { @SuppressWarnings("try") public static void retriesOnDifferentMongosWhenAvailable( final Function clientCreator, - final Function, R> operation, final String operationName, final boolean write) { + final Function, R> operation, final String expectedCommandName, final boolean write) { if (write) { assumeTrue(serverVersionAtLeast(4, 4)); } @@ -253,20 +241,20 @@ public static void retriesOnDifferentMongosWhenAvailable( assumeTrue(connectionString != null); ServerAddress s0Address = ServerAddressHelper.createServerAddress(connectionString.getHosts().get(0)); ServerAddress s1Address = ServerAddressHelper.createServerAddress(connectionString.getHosts().get(1)); - BsonDocument failPointDocument = BsonDocument.parse( + BsonDocument configureFailPoint = BsonDocument.parse( "{\n" + " configureFailPoint: \"failCommand\",\n" + " mode: { times: 1 },\n" + " data: {\n" - + " failCommands: [\"" + operationName + "\"],\n" + + " failCommands: [\"" + expectedCommandName + "\"],\n" + + " errorCode: 6,\n" + (write - ? " errorLabels: [\"RetryableWriteError\"]," : "") - + " errorCode: 6\n" + ? " errorLabels: ['" + RETRYABLE_WRITE_ERROR_LABEL + "']" : "") + " }\n" + "}\n"); TestCommandListener commandListener = new TestCommandListener(singletonList("commandFailedEvent"), emptyList()); - try (FailPoint s0FailPoint = FailPoint.enable(failPointDocument, s0Address); - FailPoint s1FailPoint = FailPoint.enable(failPointDocument, s1Address); + try (FailPoint s0FailPoint = FailPoint.enable(configureFailPoint, s0Address); + FailPoint s1FailPoint = FailPoint.enable(configureFailPoint, s1Address); MongoClient client = clientCreator.apply(getMultiMongosMongoClientSettingsBuilder() .retryReads(true) .retryWrites(true) @@ -274,16 +262,14 @@ public static void retriesOnDifferentMongosWhenAvailable( // explicitly specify only s0 and s1, in case `getMultiMongosMongoClientSettingsBuilder` has more .applyToClusterSettings(builder -> builder.hosts(asList(s0Address, s1Address))) .build())) { - MongoCollection collection = client.getDatabase(getDefaultDatabaseName()) - .getCollection("retriesOnDifferentMongosWhenAvailable"); - collection.drop(); + MongoCollection collection = dropAndGetCollection("retriesOnDifferentMongosWhenAvailable", client); commandListener.reset(); assertThrows(MongoServerException.class, () -> operation.apply(collection)); List failedCommandEvents = commandListener.getEvents(); assertEquals(2, failedCommandEvents.size(), failedCommandEvents::toString); List unexpectedCommandNames = failedCommandEvents.stream() .map(CommandEvent::getCommandName) - .filter(commandName -> !commandName.equals(operationName)) + .filter(commandName -> !commandName.equals(expectedCommandName)) .collect(Collectors.toList()); assertTrue(unexpectedCommandNames.isEmpty(), unexpectedCommandNames::toString); Set failedServerAddresses = failedCommandEvents.stream() @@ -295,10 +281,11 @@ public static void retriesOnDifferentMongosWhenAvailable( } /** - * Prose test #5. + * + * 5. Test that in a sharded cluster writes are retried on the same mongos when no others are available. */ @Test - public void retriesOnSameMongosWhenAnotherNotAvailable() { + void retriesOnSameMongosWhenAnotherNotAvailable() { retriesOnSameMongosWhenAnotherNotAvailable(MongoClients::create, mongoCollection -> mongoCollection.insertOne(new Document()), "insert", true); } @@ -306,27 +293,29 @@ public void retriesOnSameMongosWhenAnotherNotAvailable() { @SuppressWarnings("try") public static void retriesOnSameMongosWhenAnotherNotAvailable( final Function clientCreator, - final Function, R> operation, final String operationName, final boolean write) { + final Function, R> operation, final String expectedCommandName, final boolean write) { if (write) { assumeTrue(serverVersionAtLeast(4, 4)); } assumeTrue(isSharded()); ConnectionString connectionString = getConnectionString(); ServerAddress s0Address = ServerAddressHelper.createServerAddress(connectionString.getHosts().get(0)); - BsonDocument failPointDocument = BsonDocument.parse( + BsonDocument configureFailPoint = BsonDocument.parse( "{\n" + " configureFailPoint: \"failCommand\",\n" + " mode: { times: 1 },\n" + " data: {\n" - + " failCommands: [\"" + operationName + "\"],\n" + + " failCommands: [\"" + expectedCommandName + "\"],\n" + + " errorCode: 6,\n" + (write - ? " errorLabels: [\"RetryableWriteError\"]," : "") - + " errorCode: 6\n" + ? " errorLabels: ['" + RETRYABLE_WRITE_ERROR_LABEL + "']," : "") + + (write + ? " closeConnection: true\n" : "") + " }\n" + "}\n"); TestCommandListener commandListener = new TestCommandListener( asList("commandFailedEvent", "commandSucceededEvent"), emptyList()); - try (FailPoint s0FailPoint = FailPoint.enable(failPointDocument, s0Address); + try (FailPoint s0FailPoint = FailPoint.enable(configureFailPoint, s0Address); MongoClient client = clientCreator.apply(getMongoClientSettingsBuilder() .retryReads(true) .retryWrites(true) @@ -336,16 +325,14 @@ public static void retriesOnSameMongosWhenAnotherNotAvailable( .hosts(singletonList(s0Address)) .mode(ClusterConnectionMode.MULTIPLE)) .build())) { - MongoCollection collection = client.getDatabase(getDefaultDatabaseName()) - .getCollection("retriesOnSameMongosWhenAnotherNotAvailable"); - collection.drop(); + MongoCollection collection = dropAndGetCollection("retriesOnSameMongosWhenAnotherNotAvailable", client); commandListener.reset(); operation.apply(collection); List commandEvents = commandListener.getEvents(); assertEquals(2, commandEvents.size(), commandEvents::toString); List unexpectedCommandNames = commandEvents.stream() .map(CommandEvent::getCommandName) - .filter(commandName -> !commandName.equals(operationName)) + .filter(commandName -> !commandName.equals(expectedCommandName)) .collect(Collectors.toList()); assertTrue(unexpectedCommandNames.isEmpty(), unexpectedCommandNames::toString); assertInstanceOf(CommandFailedEvent.class, commandEvents.get(0), commandEvents::toString); @@ -354,4 +341,175 @@ public static void retriesOnSameMongosWhenAnotherNotAvailable( assertEquals(s0Address, commandEvents.get(1).getConnectionDescription().getServerAddress(), commandEvents::toString); } } + + /** + * + * 6. Test error propagation after encountering multiple errors. + * Case 1: Test that drivers return the correct error when receiving only errors without NoWritesPerformed. + */ + @Test + @Disabled("TODO-BACKPRESSURE Valentin Enable when implementing JAVA-6055") + void errorPropagationAfterEncounteringMultipleErrorsCase1() throws Exception { + errorPropagationAfterEncounteringMultipleErrorsCase1(MongoClients::create); + } + + public static void errorPropagationAfterEncounteringMultipleErrorsCase1(final Function clientCreator) + throws Exception { + BsonDocument configureFailPoint = BsonDocument.parse( + "{\n" + + " configureFailPoint: 'failCommand',\n" + + " mode: {'times': 1},\n" + + " data: {\n" + + " failCommands: ['insert'],\n" + + " errorLabels: ['" + RETRYABLE_ERROR_LABEL + "', '" + SYSTEM_OVERLOADED_ERROR_LABEL + "'],\n" + + " errorCode: 91\n" + + " }\n" + + "}\n"); + BsonDocument configureFailPointFromListener = BsonDocument.parse( + "{\n" + + " configureFailPoint: \"failCommand\",\n" + + " mode: 'alwaysOn',\n" + + " data: {\n" + + " failCommands: ['insert'],\n" + + " errorCode: 10107,\n" + + " errorLabels: ['" + RETRYABLE_ERROR_LABEL + "', '" + SYSTEM_OVERLOADED_ERROR_LABEL + "']\n" + + " }\n" + + "}\n"); + errorPropagationAfterEncounteringMultipleErrors( + clientCreator, + configureFailPoint, + configureFailPointFromListener, + 10107, + null); + } + + /** + * + * 6. Test error propagation after encountering multiple errors. + * Case 2: Test that drivers return the correct error when receiving only errors with NoWritesPerformed. + */ + @Test + void errorPropagationAfterEncounteringMultipleErrorsCase2() throws Exception { + errorPropagationAfterEncounteringMultipleErrorsCase2(MongoClients::create); + } + + public static void errorPropagationAfterEncounteringMultipleErrorsCase2(final Function clientCreator) + throws Exception { + BsonDocument configureFailPoint = BsonDocument.parse( + "{\n" + + " configureFailPoint: 'failCommand',\n" + + " mode: {'times': 1},\n" + + " data: {\n" + + " failCommands: ['insert'],\n" + + " errorLabels: ['" + RETRYABLE_ERROR_LABEL + "', '" + SYSTEM_OVERLOADED_ERROR_LABEL + + "', '" + NO_WRITES_PERFORMED_ERROR_LABEL + "'],\n" + + " errorCode: 91\n" + + " }\n" + + "}\n"); + BsonDocument configureFailPointFromListener = BsonDocument.parse( + "{\n" + + " configureFailPoint: \"failCommand\",\n" + + " mode: 'alwaysOn',\n" + + " data: {\n" + + " failCommands: ['insert'],\n" + + " errorCode: 10107,\n" + + " errorLabels: ['" + RETRYABLE_ERROR_LABEL + "', '" + SYSTEM_OVERLOADED_ERROR_LABEL + + "', '" + NO_WRITES_PERFORMED_ERROR_LABEL + "']\n" + + " }\n" + + "}\n"); + errorPropagationAfterEncounteringMultipleErrors( + clientCreator, + configureFailPoint, + configureFailPointFromListener, + 91, + null); + } + + /** + * + * 6. Test error propagation after encountering multiple errors. + * Case 3: Test that drivers return the correct error when receiving some errors with NoWritesPerformed and some without NoWritesPerformed. + */ + @Test + void errorPropagationAfterEncounteringMultipleErrorsCase3() throws Exception { + errorPropagationAfterEncounteringMultipleErrorsCase3(MongoClients::create); + } + + public static void errorPropagationAfterEncounteringMultipleErrorsCase3(final Function clientCreator) + throws Exception { + BsonDocument configureFailPoint = BsonDocument.parse( + "{\n" + + " configureFailPoint: 'failCommand',\n" + + " mode: {'times': 1},\n" + + " data: {\n" + + " failCommands: ['insert'],\n" + + " errorLabels: ['" + RETRYABLE_ERROR_LABEL + "', '" + SYSTEM_OVERLOADED_ERROR_LABEL + "'],\n" + + " errorCode: 91\n" + + " }\n" + + "}\n"); + BsonDocument configureFailPointFromListener = BsonDocument.parse( + "{\n" + + " configureFailPoint: \"failCommand\",\n" + + " mode: 'alwaysOn',\n" + + " data: {\n" + + " failCommands: ['insert'],\n" + + " errorCode: 91,\n" + + " errorLabels: ['" + RETRYABLE_ERROR_LABEL + "', '" + SYSTEM_OVERLOADED_ERROR_LABEL + + "', '" + NO_WRITES_PERFORMED_ERROR_LABEL + "']\n" + + " }\n" + + "}\n"); + errorPropagationAfterEncounteringMultipleErrors( + clientCreator, + configureFailPoint, + configureFailPointFromListener, + 91, + NO_WRITES_PERFORMED_ERROR_LABEL); + } + + /** + * @param unexpectedErrorLabel {@code null} means there is no expectation. + */ + private static void errorPropagationAfterEncounteringMultipleErrors( + final Function clientCreator, + final BsonDocument configureFailPoint, + final BsonDocument configureFailPointFromListener, + final int expectedErrorCode, + @Nullable final String unexpectedErrorLabel) throws Exception { + assumeTrue(serverVersionAtLeast(6, 0)); + assumeTrue(isDiscoverableReplicaSet()); + ServerAddress primaryServerAddress = getPrimary(); + Predicate configureFailPointEventMatcher = event -> { + if (event instanceof CommandFailedEvent) { + CommandFailedEvent commandFailedEvent = (CommandFailedEvent) event; + if (commandFailedEvent.getCommandName().equals("drop")) { + // this code may run against MongoDB 6, where dropping a nonexistent collection results in an error + return false; + } + MongoException cause = assertInstanceOf(MongoException.class, commandFailedEvent.getThrowable()); + assertEquals(91, cause.getCode()); + return true; + } + return false; + }; + try (ConfigureFailPointCommandListener commandListener = new ConfigureFailPointCommandListener( + configureFailPointFromListener, primaryServerAddress, configureFailPointEventMatcher); + MongoClient client = clientCreator.apply(getMongoClientSettingsBuilder() + .retryWrites(true) + .addCommandListener(commandListener) + .build()); + FailPoint ignored = FailPoint.enable(configureFailPoint, primaryServerAddress)) { + MongoCollection collection = dropAndGetCollection("errorPropagationAfterEncounteringMultipleErrors", client); + MongoException e = assertThrows(MongoException.class, () -> collection.insertOne(new Document())); + assertEquals(expectedErrorCode, e.getCode()); + if (unexpectedErrorLabel != null) { + assertFalse(e.hasErrorLabel(unexpectedErrorLabel)); + } + } + } + + private static MongoCollection dropAndGetCollection(final String name, final MongoClient client) { + MongoCollection result = client.getDatabase(getDefaultDatabaseName()).getCollection(name); + result.drop(); + return result; + } } diff --git a/driver-sync/src/test/functional/com/mongodb/internal/event/ConfigureFailPointCommandListener.java b/driver-sync/src/test/functional/com/mongodb/internal/event/ConfigureFailPointCommandListener.java new file mode 100644 index 00000000000..a31182a51c0 --- /dev/null +++ b/driver-sync/src/test/functional/com/mongodb/internal/event/ConfigureFailPointCommandListener.java @@ -0,0 +1,105 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.internal.event; + +import com.mongodb.ServerAddress; +import com.mongodb.annotations.ThreadSafe; +import com.mongodb.client.FailPoint; +import com.mongodb.event.CommandEvent; +import com.mongodb.event.CommandFailedEvent; +import com.mongodb.event.CommandListener; +import com.mongodb.event.CommandStartedEvent; +import com.mongodb.event.CommandSucceededEvent; +import org.bson.BsonDocument; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.Predicate; + +import static com.mongodb.assertions.Assertions.assertNotNull; +import static com.mongodb.assertions.Assertions.assertTrue; +import static com.mongodb.assertions.Assertions.fail; + +@ThreadSafe +public final class ConfigureFailPointCommandListener implements CommandListener, AutoCloseable { + private final BsonDocument configureFailPoint; + private final ServerAddress serverAddress; + private final Predicate eventMatcher; + private final Object lock; + private final CompletableFuture failPointFuture; + + /** + * @param configureFailPoint See {@link FailPoint#enable(BsonDocument, ServerAddress)}. + * @param serverAddress See {@link FailPoint#enable(BsonDocument, ServerAddress)}. + * @param eventMatcher When an event is matched, an attempt to configure the fail point + * specified via {@code configureFailPoint} is made. + * The {@code eventMatcher} is guaranteed to be {@linkplain Predicate#test(Object) used} sequentially. + * The attempt is made at most once, + * and the {@code eventMatcher} {@linkplain Predicate#test(Object) test} that caused the attempt is the last one. + */ + public ConfigureFailPointCommandListener( + final BsonDocument configureFailPoint, + final ServerAddress serverAddress, + final Predicate eventMatcher) { + this.configureFailPoint = configureFailPoint; + this.serverAddress = serverAddress; + this.eventMatcher = eventMatcher; + lock = new Object(); + failPointFuture = new CompletableFuture<>(); + } + + @Override + public void commandStarted(final CommandStartedEvent event) { + onEvent(event); + } + + @Override + public void commandSucceeded(final CommandSucceededEvent event) { + onEvent(event); + } + + @Override + public void commandFailed(final CommandFailedEvent event) { + onEvent(event); + } + + private void onEvent(final CommandEvent event) { + synchronized (lock) { + if (!failPointFuture.isDone()) { + try { + if (eventMatcher.test(event)) { + assertTrue(failPointFuture.complete(FailPoint.enable(configureFailPoint, serverAddress))); + } + } catch (Throwable e) { + assertTrue(failPointFuture.completeExceptionally(e)); + } + } + } + } + + @Override + public void close() throws InterruptedException, ExecutionException { + synchronized (lock) { + if (failPointFuture.cancel(true)) { + fail("The listener was closed before (in the happens-before order) it attempted to configure the fail point"); + } else { + assertTrue(failPointFuture.isDone()); + assertNotNull(failPointFuture.get()).close(); + } + } + } +}