diff --git a/driver-core/src/main/com/mongodb/MongoException.java b/driver-core/src/main/com/mongodb/MongoException.java index a668dd344b7..2c585c93cf4 100644 --- a/driver-core/src/main/com/mongodb/MongoException.java +++ b/driver-core/src/main/com/mongodb/MongoException.java @@ -39,6 +39,7 @@ public class MongoException extends RuntimeException { * * @see #hasErrorLabel(String) * @since 3.8 + * @mongodb.driver.manual core/transactions-in-applications/#std-label-transient-transaction-error */ public static final String TRANSIENT_TRANSACTION_ERROR_LABEL = "TransientTransactionError"; @@ -47,9 +48,32 @@ public class MongoException extends RuntimeException { * * @see #hasErrorLabel(String) * @since 3.8 + * @mongodb.driver.manual core/transactions-in-applications/#std-label-unknown-transaction-commit-result */ public static final String UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL = "UnknownTransactionCommitResult"; + /** + * Server is overloaded and shedding load. + * If you retry, use exponential backoff because the server has indicated overload. + * This label on its own does not mean that the operation can be safely retried. + * + * @see #hasErrorLabel(String) + * @since 5.7 + * @mongodb.server.release 8.3 + */ + // TODO-BACKPRESSURE Valentin Add a @mongodb.driver.manual link or something similar, see `content/atlas/source/overload-errors.txt` in https://github.com/10gen/docs-mongodb-internal/pull/17281 + public static final String SYSTEM_OVERLOADED_ERROR_LABEL = "SystemOverloadedError"; + + /** + * The operation was not executed and is safe to retry. + * + * @see #hasErrorLabel(String) + * @since 5.7 + * @mongodb.server.release 8.3 + */ + // TODO-BACKPRESSURE Valentin Add a @mongodb.driver.manual link or something similar, see `content/atlas/source/overload-errors.txt` in https://github.com/10gen/docs-mongodb-internal/pull/17281 + public static final String RETRYABLE_ERROR_LABEL = "RetryableError"; + private static final long serialVersionUID = -4415279469780082174L; private final int code; diff --git a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnectionInitializer.java b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnectionInitializer.java index 574a85669d0..36f6688cb0e 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnectionInitializer.java +++ b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnectionInitializer.java @@ -172,7 +172,8 @@ private InternalConnectionInitializationDescription createInitializationDescript private BsonDocument createHelloCommand(final Authenticator authenticator, final InternalConnection connection) { BsonDocument helloCommandDocument = new BsonDocument(getHandshakeCommandName(), new BsonInt32(1)) - .append("helloOk", BsonBoolean.TRUE); + .append("helloOk", BsonBoolean.TRUE) + .append("backpressure", BsonBoolean.TRUE); if (clientMetadataDocument != null) { helloCommandDocument.append("client", clientMetadataDocument); } diff --git a/driver-core/src/main/com/mongodb/internal/event/CommandListenerMulticaster.java b/driver-core/src/main/com/mongodb/internal/event/CommandListenerMulticaster.java index e18318f8cde..8f9a9f51e24 100644 --- a/driver-core/src/main/com/mongodb/internal/event/CommandListenerMulticaster.java +++ b/driver-core/src/main/com/mongodb/internal/event/CommandListenerMulticaster.java @@ -16,6 +16,7 @@ package com.mongodb.internal.event; +import com.mongodb.annotations.ThreadSafe; import com.mongodb.event.CommandFailedEvent; import com.mongodb.event.CommandListener; import com.mongodb.event.CommandStartedEvent; @@ -29,7 +30,11 @@ import static com.mongodb.assertions.Assertions.isTrue; import static java.lang.String.format; - +/** + * This {@link CommandListener} is {@linkplain ThreadSafe thread-safe}, + * provided that the recipient listeners passed to {@link #CommandListenerMulticaster(List)} are. + */ +@ThreadSafe final class CommandListenerMulticaster implements CommandListener { private static final Logger LOGGER = Loggers.getLogger("protocol.event"); diff --git a/driver-core/src/main/com/mongodb/internal/operation/CommandOperationHelper.java b/driver-core/src/main/com/mongodb/internal/operation/CommandOperationHelper.java index 8332ad916fb..bc0d223b3db 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/CommandOperationHelper.java +++ b/driver-core/src/main/com/mongodb/internal/operation/CommandOperationHelper.java @@ -51,7 +51,7 @@ import static java.util.Arrays.asList; @SuppressWarnings("overloads") -final class CommandOperationHelper { +public final class CommandOperationHelper { static WriteConcern validateAndGetEffectiveWriteConcern(final WriteConcern writeConcernSetting, final SessionContext sessionContext) throws MongoClientException { boolean activeTransaction = sessionContext.hasActiveTransaction(); @@ -223,8 +223,8 @@ static boolean isRetryWritesEnabled(@Nullable final BsonDocument command) { || command.getFirstKey().equals("commitTransaction") || command.getFirstKey().equals("abortTransaction"))); } - static final String RETRYABLE_WRITE_ERROR_LABEL = "RetryableWriteError"; - private static final String NO_WRITES_PERFORMED_ERROR_LABEL = "NoWritesPerformed"; + public static final String RETRYABLE_WRITE_ERROR_LABEL = "RetryableWriteError"; + public static final String NO_WRITES_PERFORMED_ERROR_LABEL = "NoWritesPerformed"; private static boolean decideRetryableAndAddRetryableWriteErrorLabel(final Throwable t, @Nullable final Integer maxWireVersion) { if (!(t instanceof MongoException)) { diff --git a/driver-core/src/test/functional/com/mongodb/internal/connection/TestCommandListener.java b/driver-core/src/test/functional/com/mongodb/internal/connection/TestCommandListener.java index 9381ad842a1..97fb8c82a4f 100644 --- a/driver-core/src/test/functional/com/mongodb/internal/connection/TestCommandListener.java +++ b/driver-core/src/test/functional/com/mongodb/internal/connection/TestCommandListener.java @@ -17,6 +17,7 @@ package com.mongodb.internal.connection; import com.mongodb.MongoTimeoutException; +import com.mongodb.annotations.ThreadSafe; import com.mongodb.client.TestListener; import com.mongodb.event.CommandEvent; import com.mongodb.event.CommandFailedEvent; @@ -56,6 +57,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +@ThreadSafe public class TestCommandListener implements CommandListener { private final List eventTypes; private final List ignoredCommandMonitoringEvents; diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionInitializerSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionInitializerSpecification.groovy index 1d44f8dde46..a7bfaa36cce 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionInitializerSpecification.groovy +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionInitializerSpecification.groovy @@ -201,6 +201,7 @@ class InternalStreamConnectionInitializerSpecification extends Specification { def initializer = new InternalStreamConnectionInitializer(SINGLE, null, clientMetadataDocument, [], null) def expectedHelloCommandDocument = new BsonDocument(LEGACY_HELLO, new BsonInt32(1)) .append('helloOk', BsonBoolean.TRUE) + .append('backpressure', BsonBoolean.TRUE) .append('\$db', new BsonString('admin')) if (clientMetadataDocument != null) { expectedHelloCommandDocument.append('client', clientMetadataDocument) @@ -233,6 +234,7 @@ class InternalStreamConnectionInitializerSpecification extends Specification { def initializer = new InternalStreamConnectionInitializer(SINGLE, null, null, compressors, null) def expectedHelloCommandDocument = new BsonDocument(LEGACY_HELLO, new BsonInt32(1)) .append('helloOk', BsonBoolean.TRUE) + .append('backpressure', BsonBoolean.TRUE) .append('\$db', new BsonString('admin')) def compressionArray = new BsonArray() @@ -403,7 +405,8 @@ class InternalStreamConnectionInitializerSpecification extends Specification { ((SpeculativeAuthenticator) authenticator).getSpeculativeAuthenticateResponse() == null ((SpeculativeAuthenticator) authenticator) .createSpeculativeAuthenticateCommand(internalConnection) == null - BsonDocument.parse("{$LEGACY_HELLO: 1, helloOk: true, '\$db': 'admin'}") == decodeCommand(internalConnection.getSent()[0]) + BsonDocument.parse("{$LEGACY_HELLO: 1, helloOk: true, backpressure: true, '\$db': 'admin'}") == + decodeCommand(internalConnection.getSent()[0]) where: async << [true, false] @@ -500,7 +503,7 @@ class InternalStreamConnectionInitializerSpecification extends Specification { def createHelloCommand(final String firstClientChallenge, final String mechanism, final boolean hasSaslSupportedMechs) { - String hello = "{$LEGACY_HELLO: 1, helloOk: true, " + + String hello = "{$LEGACY_HELLO: 1, helloOk: true, backpressure: true, " + (hasSaslSupportedMechs ? 'saslSupportedMechs: "database.user", ' : '') + (mechanism == 'MONGODB-X509' ? 'speculativeAuthenticate: { authenticate: 1, ' + diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ContextProviderTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ContextProviderTest.java index 90529171219..80913aab843 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ContextProviderTest.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ContextProviderTest.java @@ -19,6 +19,7 @@ import com.mongodb.ContextProvider; import com.mongodb.RequestContext; import com.mongodb.WriteConcern; +import com.mongodb.annotations.NotThreadSafe; import com.mongodb.event.CommandFailedEvent; import com.mongodb.event.CommandListener; import com.mongodb.event.CommandStartedEvent; @@ -186,6 +187,7 @@ public void contextShouldBeAvailableInCommandEvents() { } } + @NotThreadSafe private static final class TestCommandListener implements CommandListener { private int numCommandStartedEventsWithExpectedContext; private int numCommandSucceededEventsWithExpectedContext; diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/RetryableReadsProseTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/RetryableReadsProseTest.java index 22b7f7645e1..84dd0d733bf 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/RetryableReadsProseTest.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/RetryableReadsProseTest.java @@ -22,32 +22,27 @@ import org.bson.Document; import org.junit.jupiter.api.Test; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeoutException; - import static com.mongodb.client.model.Filters.eq; /** - * See - * Retryable Reads Tests. + * + * Prose Tests. */ final class RetryableReadsProseTest { /** - * See - * - * PoolClearedError Retryability Test. + * + * 1. PoolClearedError Retryability Test. */ @Test - void poolClearedExceptionMustBeRetryable() throws InterruptedException, ExecutionException, TimeoutException { + void poolClearedExceptionMustBeRetryable() throws Exception { RetryableWritesProseTest.poolClearedExceptionMustBeRetryable( SyncMongoClient::new, mongoCollection -> mongoCollection.find(eq(0)).iterator().hasNext(), "find", false); } /** - * See - * - * Retryable Reads Are Retried on a Different mongos When One is Available. + * + * 2.1 Retryable Reads Are Retried on a Different mongos When One is Available. */ @Test void retriesOnDifferentMongosWhenAvailable() { @@ -61,9 +56,8 @@ void retriesOnDifferentMongosWhenAvailable() { } /** - * See - * - * Retryable Reads Are Retried on the Same mongos When No Others are Available. + * + * 2.2 Retryable Reads Are Retried on the Same mongos When No Others are Available. */ @Test void retriesOnSameMongosWhenAnotherNotAvailable() { diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/RetryableWritesProseTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/RetryableWritesProseTest.java index 51a37ad1e35..38ef09a4771 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/RetryableWritesProseTest.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/RetryableWritesProseTest.java @@ -16,68 +16,87 @@ package com.mongodb.reactivestreams.client; -import com.mongodb.client.test.CollectionHelper; import com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient; import org.bson.Document; -import org.bson.codecs.DocumentCodec; -import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeoutException; - /** - * See - * Retryable Write Prose Tests. + * + * Prose Tests. */ -public class RetryableWritesProseTest extends DatabaseTestCase { - private CollectionHelper collectionHelper; - - @BeforeEach - @Override - public void setUp() { - super.setUp(); - - collectionHelper = new CollectionHelper<>(new DocumentCodec(), collection.getNamespace()); - collectionHelper.create(); - } - +final class RetryableWritesProseTest { /** - * Prose test #2. + * + * 2. Test that drivers properly retry after encountering PoolClearedErrors. */ @Test - public void poolClearedExceptionMustBeRetryable() throws InterruptedException, ExecutionException, TimeoutException { + void poolClearedExceptionMustBeRetryable() throws Exception { com.mongodb.client.RetryableWritesProseTest.poolClearedExceptionMustBeRetryable( SyncMongoClient::new, mongoCollection -> mongoCollection.insertOne(new Document()), "insert", true); } /** - * Prose test #3. + * + * 3. Test that drivers return the original error after encountering a WriteConcernError with a RetryableWriteError label. */ @Test - public void originalErrorMustBePropagatedIfNoWritesPerformed() throws InterruptedException { + void originalErrorMustBePropagatedIfNoWritesPerformed() throws Exception { com.mongodb.client.RetryableWritesProseTest.originalErrorMustBePropagatedIfNoWritesPerformed( SyncMongoClient::new); } /** - * Prose test #4. + * + * 4. Test that in a sharded cluster writes are retried on a different mongos when one is available. */ @Test - public void retriesOnDifferentMongosWhenAvailable() { + void retriesOnDifferentMongosWhenAvailable() { com.mongodb.client.RetryableWritesProseTest.retriesOnDifferentMongosWhenAvailable( SyncMongoClient::new, mongoCollection -> mongoCollection.insertOne(new Document()), "insert", true); } /** - * Prose test #5. + * + * 5. Test that in a sharded cluster writes are retried on the same mongos when no others are available. */ @Test - public void retriesOnSameMongosWhenAnotherNotAvailable() { + void retriesOnSameMongosWhenAnotherNotAvailable() { com.mongodb.client.RetryableWritesProseTest.retriesOnSameMongosWhenAnotherNotAvailable( SyncMongoClient::new, mongoCollection -> mongoCollection.insertOne(new Document()), "insert", true); } + + /** + * + * 6. Test error propagation after encountering multiple errors. + * Case 1: Test that drivers return the correct error when receiving only errors without NoWritesPerformed. + */ + @Test + @Disabled("TODO-BACKPRESSURE Valentin Enable when implementing JAVA-6055") + void errorPropagationAfterEncounteringMultipleErrorsCase1() throws Exception { + com.mongodb.client.RetryableWritesProseTest.errorPropagationAfterEncounteringMultipleErrorsCase1(SyncMongoClient::new); + } + + /** + * + * 6. Test error propagation after encountering multiple errors. + * Case 2: Test that drivers return the correct error when receiving only errors with NoWritesPerformed. + */ + @Test + void errorPropagationAfterEncounteringMultipleErrorsCase2() throws Exception { + com.mongodb.client.RetryableWritesProseTest.errorPropagationAfterEncounteringMultipleErrorsCase2(SyncMongoClient::new); + } + + /** + * + * 6. Test error propagation after encountering multiple errors. + * Case 3: Test that drivers return the correct error when receiving some errors with NoWritesPerformed and some without NoWritesPerformed. + */ + @Test + void errorPropagationAfterEncounteringMultipleErrorsCase3() throws Exception { + com.mongodb.client.RetryableWritesProseTest.errorPropagationAfterEncounteringMultipleErrorsCase3(SyncMongoClient::new); + } } diff --git a/driver-sync/src/examples/documentation/TransactionExample.java b/driver-sync/src/examples/documentation/TransactionExample.java index 4f73122ee35..dea86b9ad4b 100644 --- a/driver-sync/src/examples/documentation/TransactionExample.java +++ b/driver-sync/src/examples/documentation/TransactionExample.java @@ -77,7 +77,8 @@ private void runTransactionWithRetry(final Runnable transactional) { System.out.println("Transaction aborted. Caught exception during transaction."); if (e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL)) { - System.out.println("TransientTransactionError, aborting transaction and retrying ..."); + System.out.printf("%s, aborting transaction and retrying ...%n", + MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL); } else { throw e; } @@ -94,7 +95,8 @@ private void commitWithRetry(final ClientSession clientSession) { } catch (MongoException e) { // can retry commit if (e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL)) { - System.out.println("UnknownTransactionCommitResult, retrying commit operation ..."); + System.out.printf("%s, retrying commit operation ...%n", + MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL); } else { System.out.println("Exception during commit ..."); throw e; diff --git a/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java index 7828ecde684..3d2d58dc4c8 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java @@ -48,6 +48,7 @@ import com.mongodb.event.ConnectionCreatedEvent; import com.mongodb.event.ConnectionReadyEvent; +import static com.mongodb.MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL; import static com.mongodb.internal.connection.CommandHelper.HELLO; import static com.mongodb.internal.connection.CommandHelper.LEGACY_HELLO; @@ -687,7 +688,7 @@ public void test10CustomTestWithTransactionUsesASingleTimeoutWithLock() { + " blockConnection: true," + " blockTimeMS: " + 25 + " errorCode: " + 24 - + " errorLabels: [\"TransientTransactionError\"]" + + " errorLabels: [\"" + TRANSIENT_TRANSACTION_ERROR_LABEL + "\"]" + " }" + "}"); diff --git a/driver-sync/src/test/functional/com/mongodb/client/ContextProviderTest.java b/driver-sync/src/test/functional/com/mongodb/client/ContextProviderTest.java index caf676a8ab7..c0247c9c7a2 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/ContextProviderTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/ContextProviderTest.java @@ -19,6 +19,7 @@ import com.mongodb.ContextProvider; import com.mongodb.RequestContext; import com.mongodb.WriteConcern; +import com.mongodb.annotations.NotThreadSafe; import com.mongodb.event.CommandFailedEvent; import com.mongodb.event.CommandListener; import com.mongodb.event.CommandStartedEvent; @@ -206,6 +207,7 @@ public void contextShouldBeAvailableInCommandEvents() { } } + @NotThreadSafe private static final class TestCommandListener implements CommandListener { private int numCommandStartedEventsWithExpectedContext; private int numCommandSucceededEventsWithExpectedContext; diff --git a/driver-sync/src/test/functional/com/mongodb/client/MongoWriteConcernWithResponseExceptionTest.java b/driver-sync/src/test/functional/com/mongodb/client/MongoWriteConcernWithResponseExceptionTest.java index 6f90b3f5f01..eccc892ce77 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/MongoWriteConcernWithResponseExceptionTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/MongoWriteConcernWithResponseExceptionTest.java @@ -43,6 +43,8 @@ import static com.mongodb.ClusterFixture.serverVersionAtLeast; import static com.mongodb.client.Fixture.getDefaultDatabaseName; import static com.mongodb.client.Fixture.getMongoClientSettingsBuilder; +import static com.mongodb.internal.operation.CommandOperationHelper.NO_WRITES_PERFORMED_ERROR_LABEL; +import static com.mongodb.internal.operation.CommandOperationHelper.RETRYABLE_WRITE_ERROR_LABEL; import static java.util.Collections.singletonList; import static org.junit.Assert.assertThrows; import static org.junit.Assume.assumeTrue; @@ -69,7 +71,7 @@ public static void doesNotLeak(final Function .append("data", new BsonDocument() .append("writeConcernError", new BsonDocument() .append("code", new BsonInt32(91)) - .append("errorLabels", new BsonArray(Stream.of("RetryableWriteError") + .append("errorLabels", new BsonArray(Stream.of(RETRYABLE_WRITE_ERROR_LABEL) .map(BsonString::new).collect(Collectors.toList()))) .append("errmsg", new BsonString("")) ) @@ -81,7 +83,7 @@ public static void doesNotLeak(final Function .append("data", new BsonDocument() .append("failCommands", new BsonArray(singletonList(new BsonString("insert")))) .append("errorCode", new BsonInt32(10107)) - .append("errorLabels", new BsonArray(Stream.of("RetryableWriteError", "NoWritesPerformed") + .append("errorLabels", new BsonArray(Stream.of(RETRYABLE_WRITE_ERROR_LABEL, NO_WRITES_PERFORMED_ERROR_LABEL) .map(BsonString::new).collect(Collectors.toList())))); doesNotLeak(clientCreator, writeConcernErrorFpDoc, true, noWritesPerformedFpDoc); doesNotLeak(clientCreator, noWritesPerformedFpDoc, false, writeConcernErrorFpDoc); diff --git a/driver-sync/src/test/functional/com/mongodb/client/RetryableReadsProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/RetryableReadsProseTest.java index ccf18aad5b9..59b6a9aad19 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/RetryableReadsProseTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/RetryableReadsProseTest.java @@ -19,31 +19,26 @@ import org.bson.Document; import org.junit.jupiter.api.Test; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeoutException; - import static com.mongodb.client.model.Filters.eq; /** - * See - * Retryable Reads Tests. + * + * Prose Tests. */ final class RetryableReadsProseTest { /** - * See - * - * PoolClearedError Retryability Test. + * + * 1. PoolClearedError Retryability Test. */ @Test - void poolClearedExceptionMustBeRetryable() throws InterruptedException, ExecutionException, TimeoutException { + void poolClearedExceptionMustBeRetryable() throws Exception { RetryableWritesProseTest.poolClearedExceptionMustBeRetryable(MongoClients::create, mongoCollection -> mongoCollection.find(eq(0)).iterator().hasNext(), "find", false); } /** - * See - * - * Retryable Reads Are Retried on a Different mongos When One is Available. + * + * 2.1 Retryable Reads Are Retried on a Different mongos When One is Available. */ @Test void retriesOnDifferentMongosWhenAvailable() { @@ -56,9 +51,8 @@ void retriesOnDifferentMongosWhenAvailable() { } /** - * See - * - * Retryable Reads Are Retried on the Same mongos When No Others are Available. + * + * 2.2 Retryable Reads Are Retried on the Same mongos When No Others are Available. */ @Test void retriesOnSameMongosWhenAnotherNotAvailable() { diff --git a/driver-sync/src/test/functional/com/mongodb/client/RetryableWritesProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/RetryableWritesProseTest.java index fae39864bb9..c49d1a8b4f1 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/RetryableWritesProseTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/RetryableWritesProseTest.java @@ -19,15 +19,14 @@ import com.mongodb.ConnectionString; import com.mongodb.Function; import com.mongodb.MongoClientSettings; +import com.mongodb.MongoException; import com.mongodb.MongoServerException; import com.mongodb.MongoWriteConcernException; import com.mongodb.ServerAddress; -import com.mongodb.assertions.Assertions; import com.mongodb.connection.ClusterConnectionMode; import com.mongodb.connection.ConnectionDescription; import com.mongodb.event.CommandEvent; import com.mongodb.event.CommandFailedEvent; -import com.mongodb.event.CommandListener; import com.mongodb.event.CommandSucceededEvent; import com.mongodb.event.ConnectionCheckOutFailedEvent; import com.mongodb.event.ConnectionCheckedOutEvent; @@ -35,28 +34,23 @@ import com.mongodb.internal.connection.ServerAddressHelper; import com.mongodb.internal.connection.TestCommandListener; import com.mongodb.internal.connection.TestConnectionPoolListener; -import org.bson.BsonArray; -import org.bson.BsonBoolean; +import com.mongodb.internal.event.ConfigureFailPointCommandListener; +import com.mongodb.lang.Nullable; import org.bson.BsonDocument; import org.bson.BsonInt32; -import org.bson.BsonString; import org.bson.Document; -import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Predicate; import java.util.stream.Collectors; -import java.util.stream.Stream; import static com.mongodb.ClusterFixture.getConnectionString; import static com.mongodb.ClusterFixture.getMultiMongosConnectionString; @@ -64,36 +58,36 @@ import static com.mongodb.ClusterFixture.isSharded; import static com.mongodb.ClusterFixture.isStandalone; import static com.mongodb.ClusterFixture.serverVersionAtLeast; +import static com.mongodb.MongoException.RETRYABLE_ERROR_LABEL; +import static com.mongodb.MongoException.SYSTEM_OVERLOADED_ERROR_LABEL; import static com.mongodb.client.Fixture.getDefaultDatabaseName; import static com.mongodb.client.Fixture.getMongoClientSettingsBuilder; import static com.mongodb.client.Fixture.getMultiMongosMongoClientSettingsBuilder; +import static com.mongodb.client.Fixture.getPrimary; +import static com.mongodb.internal.operation.CommandOperationHelper.NO_WRITES_PERFORMED_ERROR_LABEL; +import static com.mongodb.internal.operation.CommandOperationHelper.RETRYABLE_WRITE_ERROR_LABEL; import static java.util.Arrays.asList; import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; import static java.util.concurrent.TimeUnit.SECONDS; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assumptions.assumeTrue; /** - * See - * Retryable Write Prose Tests. + * + * Prose Tests. */ -public class RetryableWritesProseTest extends DatabaseTestCase { - - @BeforeEach - @Override - public void setUp() { - super.setUp(); - } - +public class RetryableWritesProseTest { /** - * Prose test #2. + * + * 2. Test that drivers properly retry after encountering PoolClearedErrors. */ @Test - public void poolClearedExceptionMustBeRetryable() throws InterruptedException, ExecutionException, TimeoutException { + void poolClearedExceptionMustBeRetryable() throws Exception { poolClearedExceptionMustBeRetryable(MongoClients::create, mongoCollection -> mongoCollection.insertOne(new Document()), "insert", true); } @@ -101,8 +95,7 @@ public void poolClearedExceptionMustBeRetryable() throws InterruptedException, E @SuppressWarnings("try") public static void poolClearedExceptionMustBeRetryable( final Function clientCreator, - final Function, R> operation, final String operationName, final boolean write) - throws InterruptedException, ExecutionException, TimeoutException { + final Function, R> operation, final String commandName, final boolean write) throws Exception { assumeTrue(serverVersionAtLeast(4, 3) && !(write && isStandalone())); TestConnectionPoolListener connectionPoolListener = new TestConnectionPoolListener(asList( "connectionCheckedOutEvent", @@ -118,7 +111,7 @@ public static void poolClearedExceptionMustBeRetryable( /* We fake server's state by configuring a fail point. This breaks the mechanism of the * streaming server monitoring protocol * (https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-monitoring.md#streaming-protocol) - * that allows the server to determine whether or not it needs to send a new state to the client. + * that allows the server to determine whether it needs to send a new state to the client. * As a result, the client has to wait for at least its heartbeat delay until it hears back from a server * (while it waits for a response, calling `ServerMonitor.connect` has no effect). * Thus, we want to use small heartbeat delay to reduce delays in the test. */ @@ -127,24 +120,23 @@ public static void poolClearedExceptionMustBeRetryable( .retryWrites(true) .addCommandListener(commandListener) .build(); - BsonDocument configureFailPoint = new BsonDocument() - .append("configureFailPoint", new BsonString("failCommand")) - .append("mode", new BsonDocument() - .append("times", new BsonInt32(1))) - .append("data", new BsonDocument() - .append("failCommands", new BsonArray(singletonList(new BsonString(operationName)))) - .append("errorCode", new BsonInt32(91)) - .append("errorLabels", write - ? new BsonArray(singletonList(new BsonString("RetryableWriteError"))) - : new BsonArray()) - .append("blockConnection", BsonBoolean.valueOf(true)) - .append("blockTimeMS", new BsonInt32(1000))); + BsonDocument configureFailPoint = BsonDocument.parse( + "{\n" + + " configureFailPoint: 'failCommand',\n" + + " mode: {'times': 1},\n" + + " data: {\n" + + " failCommands: ['" + commandName + "'],\n" + + " errorCode: 91,\n" + + " blockConnection: true,\n" + + " blockTimeMS: 1000,\n" + + (write + ? " errorLabels: ['" + RETRYABLE_WRITE_ERROR_LABEL + "']\n" : "") + + " }\n" + + "}\n"); int timeoutSeconds = 5; try (MongoClient client = clientCreator.apply(clientSettings); - FailPoint ignored = FailPoint.enable(configureFailPoint, Fixture.getPrimary())) { - MongoCollection collection = client.getDatabase(getDefaultDatabaseName()) - .getCollection("poolClearedExceptionMustBeRetryable"); - collection.drop(); + FailPoint ignored = FailPoint.enable(configureFailPoint, getPrimary())) { + MongoCollection collection = dropAndGetCollection("poolClearedExceptionMustBeRetryable", client); ExecutorService ex = Executors.newFixedThreadPool(2); try { Future result1 = ex.submit(() -> operation.apply(collection)); @@ -158,83 +150,81 @@ public static void poolClearedExceptionMustBeRetryable( ex.shutdownNow(); } assertEquals(3, commandListener.getCommandStartedEvents().size()); - commandListener.getCommandStartedEvents().forEach(event -> assertEquals(operationName, event.getCommandName())); + commandListener.getCommandStartedEvents().forEach(event -> assertEquals(commandName, event.getCommandName())); } } /** - * Prose test #3. + * + * 3. Test that drivers return the original error after encountering a WriteConcernError with a RetryableWriteError label. */ @Test - public void originalErrorMustBePropagatedIfNoWritesPerformed() throws InterruptedException { + void originalErrorMustBePropagatedIfNoWritesPerformed() throws Exception { originalErrorMustBePropagatedIfNoWritesPerformed(MongoClients::create); } @SuppressWarnings("try") public static void originalErrorMustBePropagatedIfNoWritesPerformed( - final Function clientCreator) throws InterruptedException { + final Function clientCreator) throws Exception { assumeTrue(serverVersionAtLeast(6, 0) && isDiscoverableReplicaSet()); - ServerAddress primaryServerAddress = Fixture.getPrimary(); - CompletableFuture futureFailPointFromListener = new CompletableFuture<>(); - CommandListener commandListener = new CommandListener() { - private final AtomicBoolean configureFailPoint = new AtomicBoolean(true); - - @Override - public void commandSucceeded(final CommandSucceededEvent event) { - if (event.getCommandName().equals("insert") - && event.getResponse().getDocument("writeConcernError", new BsonDocument()) - .getInt32("code", new BsonInt32(-1)).intValue() == 91 - && configureFailPoint.compareAndSet(true, false)) { - Assertions.assertTrue(futureFailPointFromListener.complete(FailPoint.enable( - new BsonDocument() - .append("configureFailPoint", new BsonString("failCommand")) - .append("mode", new BsonDocument() - .append("times", new BsonInt32(1))) - .append("data", new BsonDocument() - .append("failCommands", new BsonArray(singletonList(new BsonString("insert")))) - .append("errorCode", new BsonInt32(10107)) - .append("errorLabels", new BsonArray(Stream.of("RetryableWriteError", "NoWritesPerformed") - .map(BsonString::new).collect(Collectors.toList())))), - primaryServerAddress - ))); + ServerAddress primaryServerAddress = getPrimary(); + BsonDocument configureFailPoint = BsonDocument.parse( + "{\n" + + " configureFailPoint: \"failCommand\",\n" + + " mode: { times: 1 },\n" + + " data: {\n" + + " failCommands: ['insert'],\n" + + " writeConcernError: {" + + " errorLabels: ['" + RETRYABLE_WRITE_ERROR_LABEL + "'],\n" + + " code: 91,\n" + + " errmsg: ''\n" + + " }\n" + + " }\n" + + "}\n"); + BsonDocument configureFailPointFromListener = BsonDocument.parse( + "{\n" + + " configureFailPoint: \"failCommand\",\n" + + " mode: { times: 1 },\n" + + " data: {\n" + + " failCommands: ['insert'],\n" + + " errorCode: 10107,\n" + + " errorLabels: ['" + RETRYABLE_WRITE_ERROR_LABEL + "', '" + NO_WRITES_PERFORMED_ERROR_LABEL + "']\n" + + " }\n" + + "}\n"); + Predicate configureFailPointEventMatcher = event -> { + if (event instanceof CommandSucceededEvent) { + CommandSucceededEvent commandSucceededEvent = (CommandSucceededEvent) event; + if (commandSucceededEvent.getCommandName().equals("insert")) { + assertEquals(91, commandSucceededEvent.getResponse().getDocument("writeConcernError", new BsonDocument()) + .getInt32("code", new BsonInt32(-1)).intValue()); + return true; } + return false; } + return false; }; - BsonDocument failPointDocument = new BsonDocument() - .append("configureFailPoint", new BsonString("failCommand")) - .append("mode", new BsonDocument() - .append("times", new BsonInt32(1))) - .append("data", new BsonDocument() - .append("writeConcernError", new BsonDocument() - .append("code", new BsonInt32(91)) - .append("errorLabels", new BsonArray(Stream.of("RetryableWriteError") - .map(BsonString::new).collect(Collectors.toList()))) - .append("errmsg", new BsonString("")) - ) - .append("failCommands", new BsonArray(singletonList(new BsonString("insert"))))); - try (MongoClient client = clientCreator.apply(getMongoClientSettingsBuilder() - .retryWrites(true) - .addCommandListener(commandListener) - .applyToServerSettings(builder -> - // see `poolClearedExceptionMustBeRetryable` for the explanation - builder.heartbeatFrequency(50, TimeUnit.MILLISECONDS)) - .build()); - FailPoint ignored = FailPoint.enable(failPointDocument, primaryServerAddress)) { - MongoCollection collection = client.getDatabase(getDefaultDatabaseName()) - .getCollection("originalErrorMustBePropagatedIfNoWritesPerformed"); - collection.drop(); + try (ConfigureFailPointCommandListener commandListener = new ConfigureFailPointCommandListener( + configureFailPointFromListener, primaryServerAddress, configureFailPointEventMatcher); + MongoClient client = clientCreator.apply(getMongoClientSettingsBuilder() + .retryWrites(true) + .addCommandListener(commandListener) + .applyToServerSettings(builder -> + // see `poolClearedExceptionMustBeRetryable` for the explanation + builder.heartbeatFrequency(50, TimeUnit.MILLISECONDS)) + .build()); + FailPoint ignored = FailPoint.enable(configureFailPoint, primaryServerAddress)) { + MongoCollection collection = dropAndGetCollection("originalErrorMustBePropagatedIfNoWritesPerformed", client); MongoWriteConcernException e = assertThrows(MongoWriteConcernException.class, () -> collection.insertOne(new Document())); assertEquals(91, e.getCode()); - } finally { - futureFailPointFromListener.thenAccept(FailPoint::close); } } /** - * Prose test #4. + * + * 4. Test that in a sharded cluster writes are retried on a different mongos when one is available. */ @Test - public void retriesOnDifferentMongosWhenAvailable() { + void retriesOnDifferentMongosWhenAvailable() { retriesOnDifferentMongosWhenAvailable(MongoClients::create, mongoCollection -> mongoCollection.insertOne(new Document()), "insert", true); } @@ -242,7 +232,7 @@ public void retriesOnDifferentMongosWhenAvailable() { @SuppressWarnings("try") public static void retriesOnDifferentMongosWhenAvailable( final Function clientCreator, - final Function, R> operation, final String operationName, final boolean write) { + final Function, R> operation, final String expectedCommandName, final boolean write) { if (write) { assumeTrue(serverVersionAtLeast(4, 4)); } @@ -251,20 +241,20 @@ public static void retriesOnDifferentMongosWhenAvailable( assumeTrue(connectionString != null); ServerAddress s0Address = ServerAddressHelper.createServerAddress(connectionString.getHosts().get(0)); ServerAddress s1Address = ServerAddressHelper.createServerAddress(connectionString.getHosts().get(1)); - BsonDocument failPointDocument = BsonDocument.parse( + BsonDocument configureFailPoint = BsonDocument.parse( "{\n" + " configureFailPoint: \"failCommand\",\n" + " mode: { times: 1 },\n" + " data: {\n" - + " failCommands: [\"" + operationName + "\"],\n" + + " failCommands: [\"" + expectedCommandName + "\"],\n" + + " errorCode: 6,\n" + (write - ? " errorLabels: [\"RetryableWriteError\"]," : "") - + " errorCode: 6\n" + ? " errorLabels: ['" + RETRYABLE_WRITE_ERROR_LABEL + "']" : "") + " }\n" + "}\n"); TestCommandListener commandListener = new TestCommandListener(singletonList("commandFailedEvent"), emptyList()); - try (FailPoint s0FailPoint = FailPoint.enable(failPointDocument, s0Address); - FailPoint s1FailPoint = FailPoint.enable(failPointDocument, s1Address); + try (FailPoint s0FailPoint = FailPoint.enable(configureFailPoint, s0Address); + FailPoint s1FailPoint = FailPoint.enable(configureFailPoint, s1Address); MongoClient client = clientCreator.apply(getMultiMongosMongoClientSettingsBuilder() .retryReads(true) .retryWrites(true) @@ -272,16 +262,14 @@ public static void retriesOnDifferentMongosWhenAvailable( // explicitly specify only s0 and s1, in case `getMultiMongosMongoClientSettingsBuilder` has more .applyToClusterSettings(builder -> builder.hosts(asList(s0Address, s1Address))) .build())) { - MongoCollection collection = client.getDatabase(getDefaultDatabaseName()) - .getCollection("retriesOnDifferentMongosWhenAvailable"); - collection.drop(); + MongoCollection collection = dropAndGetCollection("retriesOnDifferentMongosWhenAvailable", client); commandListener.reset(); assertThrows(MongoServerException.class, () -> operation.apply(collection)); List failedCommandEvents = commandListener.getEvents(); assertEquals(2, failedCommandEvents.size(), failedCommandEvents::toString); List unexpectedCommandNames = failedCommandEvents.stream() .map(CommandEvent::getCommandName) - .filter(commandName -> !commandName.equals(operationName)) + .filter(commandName -> !commandName.equals(expectedCommandName)) .collect(Collectors.toList()); assertTrue(unexpectedCommandNames.isEmpty(), unexpectedCommandNames::toString); Set failedServerAddresses = failedCommandEvents.stream() @@ -293,10 +281,11 @@ public static void retriesOnDifferentMongosWhenAvailable( } /** - * Prose test #5. + * + * 5. Test that in a sharded cluster writes are retried on the same mongos when no others are available. */ @Test - public void retriesOnSameMongosWhenAnotherNotAvailable() { + void retriesOnSameMongosWhenAnotherNotAvailable() { retriesOnSameMongosWhenAnotherNotAvailable(MongoClients::create, mongoCollection -> mongoCollection.insertOne(new Document()), "insert", true); } @@ -304,27 +293,29 @@ public void retriesOnSameMongosWhenAnotherNotAvailable() { @SuppressWarnings("try") public static void retriesOnSameMongosWhenAnotherNotAvailable( final Function clientCreator, - final Function, R> operation, final String operationName, final boolean write) { + final Function, R> operation, final String expectedCommandName, final boolean write) { if (write) { assumeTrue(serverVersionAtLeast(4, 4)); } assumeTrue(isSharded()); ConnectionString connectionString = getConnectionString(); ServerAddress s0Address = ServerAddressHelper.createServerAddress(connectionString.getHosts().get(0)); - BsonDocument failPointDocument = BsonDocument.parse( + BsonDocument configureFailPoint = BsonDocument.parse( "{\n" + " configureFailPoint: \"failCommand\",\n" + " mode: { times: 1 },\n" + " data: {\n" - + " failCommands: [\"" + operationName + "\"],\n" + + " failCommands: [\"" + expectedCommandName + "\"],\n" + + " errorCode: 6,\n" + (write - ? " errorLabels: [\"RetryableWriteError\"]," : "") - + " errorCode: 6\n" + ? " errorLabels: ['" + RETRYABLE_WRITE_ERROR_LABEL + "']," : "") + + (write + ? " closeConnection: true\n" : "") + " }\n" + "}\n"); TestCommandListener commandListener = new TestCommandListener( asList("commandFailedEvent", "commandSucceededEvent"), emptyList()); - try (FailPoint s0FailPoint = FailPoint.enable(failPointDocument, s0Address); + try (FailPoint s0FailPoint = FailPoint.enable(configureFailPoint, s0Address); MongoClient client = clientCreator.apply(getMongoClientSettingsBuilder() .retryReads(true) .retryWrites(true) @@ -334,16 +325,14 @@ public static void retriesOnSameMongosWhenAnotherNotAvailable( .hosts(singletonList(s0Address)) .mode(ClusterConnectionMode.MULTIPLE)) .build())) { - MongoCollection collection = client.getDatabase(getDefaultDatabaseName()) - .getCollection("retriesOnSameMongosWhenAnotherNotAvailable"); - collection.drop(); + MongoCollection collection = dropAndGetCollection("retriesOnSameMongosWhenAnotherNotAvailable", client); commandListener.reset(); operation.apply(collection); List commandEvents = commandListener.getEvents(); assertEquals(2, commandEvents.size(), commandEvents::toString); List unexpectedCommandNames = commandEvents.stream() .map(CommandEvent::getCommandName) - .filter(commandName -> !commandName.equals(operationName)) + .filter(commandName -> !commandName.equals(expectedCommandName)) .collect(Collectors.toList()); assertTrue(unexpectedCommandNames.isEmpty(), unexpectedCommandNames::toString); assertInstanceOf(CommandFailedEvent.class, commandEvents.get(0), commandEvents::toString); @@ -352,4 +341,175 @@ public static void retriesOnSameMongosWhenAnotherNotAvailable( assertEquals(s0Address, commandEvents.get(1).getConnectionDescription().getServerAddress(), commandEvents::toString); } } + + /** + * + * 6. Test error propagation after encountering multiple errors. + * Case 1: Test that drivers return the correct error when receiving only errors without NoWritesPerformed. + */ + @Test + @Disabled("TODO-BACKPRESSURE Valentin Enable when implementing JAVA-6055") + void errorPropagationAfterEncounteringMultipleErrorsCase1() throws Exception { + errorPropagationAfterEncounteringMultipleErrorsCase1(MongoClients::create); + } + + public static void errorPropagationAfterEncounteringMultipleErrorsCase1(final Function clientCreator) + throws Exception { + BsonDocument configureFailPoint = BsonDocument.parse( + "{\n" + + " configureFailPoint: 'failCommand',\n" + + " mode: {'times': 1},\n" + + " data: {\n" + + " failCommands: ['insert'],\n" + + " errorLabels: ['" + RETRYABLE_ERROR_LABEL + "', '" + SYSTEM_OVERLOADED_ERROR_LABEL + "'],\n" + + " errorCode: 91\n" + + " }\n" + + "}\n"); + BsonDocument configureFailPointFromListener = BsonDocument.parse( + "{\n" + + " configureFailPoint: \"failCommand\",\n" + + " mode: 'alwaysOn',\n" + + " data: {\n" + + " failCommands: ['insert'],\n" + + " errorCode: 10107,\n" + + " errorLabels: ['" + RETRYABLE_ERROR_LABEL + "', '" + SYSTEM_OVERLOADED_ERROR_LABEL + "']\n" + + " }\n" + + "}\n"); + errorPropagationAfterEncounteringMultipleErrors( + clientCreator, + configureFailPoint, + configureFailPointFromListener, + 10107, + null); + } + + /** + * + * 6. Test error propagation after encountering multiple errors. + * Case 2: Test that drivers return the correct error when receiving only errors with NoWritesPerformed. + */ + @Test + void errorPropagationAfterEncounteringMultipleErrorsCase2() throws Exception { + errorPropagationAfterEncounteringMultipleErrorsCase2(MongoClients::create); + } + + public static void errorPropagationAfterEncounteringMultipleErrorsCase2(final Function clientCreator) + throws Exception { + BsonDocument configureFailPoint = BsonDocument.parse( + "{\n" + + " configureFailPoint: 'failCommand',\n" + + " mode: {'times': 1},\n" + + " data: {\n" + + " failCommands: ['insert'],\n" + + " errorLabels: ['" + RETRYABLE_ERROR_LABEL + "', '" + SYSTEM_OVERLOADED_ERROR_LABEL + + "', '" + NO_WRITES_PERFORMED_ERROR_LABEL + "'],\n" + + " errorCode: 91\n" + + " }\n" + + "}\n"); + BsonDocument configureFailPointFromListener = BsonDocument.parse( + "{\n" + + " configureFailPoint: \"failCommand\",\n" + + " mode: 'alwaysOn',\n" + + " data: {\n" + + " failCommands: ['insert'],\n" + + " errorCode: 10107,\n" + + " errorLabels: ['" + RETRYABLE_ERROR_LABEL + "', '" + SYSTEM_OVERLOADED_ERROR_LABEL + + "', '" + NO_WRITES_PERFORMED_ERROR_LABEL + "']\n" + + " }\n" + + "}\n"); + errorPropagationAfterEncounteringMultipleErrors( + clientCreator, + configureFailPoint, + configureFailPointFromListener, + 91, + null); + } + + /** + * + * 6. Test error propagation after encountering multiple errors. + * Case 3: Test that drivers return the correct error when receiving some errors with NoWritesPerformed and some without NoWritesPerformed. + */ + @Test + void errorPropagationAfterEncounteringMultipleErrorsCase3() throws Exception { + errorPropagationAfterEncounteringMultipleErrorsCase3(MongoClients::create); + } + + public static void errorPropagationAfterEncounteringMultipleErrorsCase3(final Function clientCreator) + throws Exception { + BsonDocument configureFailPoint = BsonDocument.parse( + "{\n" + + " configureFailPoint: 'failCommand',\n" + + " mode: {'times': 1},\n" + + " data: {\n" + + " failCommands: ['insert'],\n" + + " errorLabels: ['" + RETRYABLE_ERROR_LABEL + "', '" + SYSTEM_OVERLOADED_ERROR_LABEL + "'],\n" + + " errorCode: 91\n" + + " }\n" + + "}\n"); + BsonDocument configureFailPointFromListener = BsonDocument.parse( + "{\n" + + " configureFailPoint: \"failCommand\",\n" + + " mode: 'alwaysOn',\n" + + " data: {\n" + + " failCommands: ['insert'],\n" + + " errorCode: 91,\n" + + " errorLabels: ['" + RETRYABLE_ERROR_LABEL + "', '" + SYSTEM_OVERLOADED_ERROR_LABEL + + "', '" + NO_WRITES_PERFORMED_ERROR_LABEL + "']\n" + + " }\n" + + "}\n"); + errorPropagationAfterEncounteringMultipleErrors( + clientCreator, + configureFailPoint, + configureFailPointFromListener, + 91, + NO_WRITES_PERFORMED_ERROR_LABEL); + } + + /** + * @param unexpectedErrorLabel {@code null} means there is no expectation. + */ + private static void errorPropagationAfterEncounteringMultipleErrors( + final Function clientCreator, + final BsonDocument configureFailPoint, + final BsonDocument configureFailPointFromListener, + final int expectedErrorCode, + @Nullable final String unexpectedErrorLabel) throws Exception { + assumeTrue(serverVersionAtLeast(6, 0)); + assumeTrue(isDiscoverableReplicaSet()); + ServerAddress primaryServerAddress = getPrimary(); + Predicate configureFailPointEventMatcher = event -> { + if (event instanceof CommandFailedEvent) { + CommandFailedEvent commandFailedEvent = (CommandFailedEvent) event; + if (commandFailedEvent.getCommandName().equals("drop")) { + // this code may run against MongoDB 6, where dropping a nonexistent collection results in an error + return false; + } + MongoException cause = assertInstanceOf(MongoException.class, commandFailedEvent.getThrowable()); + assertEquals(91, cause.getCode()); + return true; + } + return false; + }; + try (ConfigureFailPointCommandListener commandListener = new ConfigureFailPointCommandListener( + configureFailPointFromListener, primaryServerAddress, configureFailPointEventMatcher); + MongoClient client = clientCreator.apply(getMongoClientSettingsBuilder() + .retryWrites(true) + .addCommandListener(commandListener) + .build()); + FailPoint ignored = FailPoint.enable(configureFailPoint, primaryServerAddress)) { + MongoCollection collection = dropAndGetCollection("errorPropagationAfterEncounteringMultipleErrors", client); + MongoException e = assertThrows(MongoException.class, () -> collection.insertOne(new Document())); + assertEquals(expectedErrorCode, e.getCode()); + if (unexpectedErrorLabel != null) { + assertFalse(e.hasErrorLabel(unexpectedErrorLabel)); + } + } + } + + private static MongoCollection dropAndGetCollection(final String name, final MongoClient client) { + MongoCollection result = client.getDatabase(getDefaultDatabaseName()).getCollection(name); + result.drop(); + return result; + } } diff --git a/driver-sync/src/test/functional/com/mongodb/client/WithTransactionProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/WithTransactionProseTest.java index 1afbf61565e..a840a83babb 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/WithTransactionProseTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/WithTransactionProseTest.java @@ -37,7 +37,9 @@ import static org.junit.jupiter.api.Assertions.fail; import static org.junit.jupiter.api.Assumptions.assumeTrue; -// See https://github.com/mongodb/specifications/blob/master/source/transactions-convenient-api/tests/README.md#prose-tests +/** + * Prose Tests. + */ public class WithTransactionProseTest extends DatabaseTestCase { private static final long START_TIME_MS = 1L; private static final long ERROR_GENERATING_INTERVAL = 121000L; @@ -52,11 +54,10 @@ public void setUp() { collection.insertOne(Document.parse("{ _id : 0 }")); } - // - // Test that the callback raises a custom exception or error that does not include either UnknownTransactionCommitResult or - // TransientTransactionError error labels. The callback will execute using withTransaction and assert that the callback's error - // bypasses any retry logic within withTransaction and is propagated to the caller of withTransaction. - // + /** + * + * Callback Raises a Custom Error. + */ @Test public void testCallbackRaisesCustomError() { final String exceptionMessage = "NotTransientOrUnknownError"; @@ -71,10 +72,10 @@ public void testCallbackRaisesCustomError() { } } - // - // Test that the callback that returns a custom value (e.g. boolean, string, object). Execute this callback using withTransaction - // and assert that the callback's return value is propagated to the caller of withTransaction. - // + /** + * + * Callback Returns a Value. + */ @Test public void testCallbackReturnsValue() { try (ClientSession session = client.startSession()) { @@ -87,10 +88,10 @@ public void testCallbackReturnsValue() { } } - // - // If the callback raises an error with the TransientTransactionError label and the retry timeout has been exceeded, withTransaction - // should propagate the error to its caller. - // + /** + * + * Retry Timeout is Enforced, first scenario on the list. + */ @Test public void testRetryTimeoutEnforcedTransientTransactionError() { final String errorMessage = "transient transaction error"; @@ -110,10 +111,10 @@ public void testRetryTimeoutEnforcedTransientTransactionError() { } } - // - // If committing raises an error with the UnknownTransactionCommitResult label, the error is not a write concern timeout, and the - // retry timeout has been exceeded, withTransaction should propagate the error to its caller. - // + /** + * + * Retry Timeout is Enforced, second scenario on the list. + */ @Test public void testRetryTimeoutEnforcedUnknownTransactionCommit() { MongoDatabase failPointAdminDb = client.getDatabase("admin"); @@ -137,11 +138,10 @@ public void testRetryTimeoutEnforcedUnknownTransactionCommit() { } } - // - // If committing raises an error with the TransientTransactionError label and the retry timeout has been exceeded, withTransaction - // should propagate the error to its caller. This case may occur if the commit was internally retried against a new primary after - // a failover and the second primary returned a NoSuchTransaction error response. - // + /** + * + * Retry Timeout is Enforced, third scenario on the list. + */ @Test public void testRetryTimeoutEnforcedTransientTransactionErrorOnCommit() { MongoDatabase failPointAdminDb = client.getDatabase("admin"); @@ -166,9 +166,9 @@ public void testRetryTimeoutEnforcedTransientTransactionErrorOnCommit() { } } - // - // Ensure cannot override timeout in transaction - // + /** + * Ensure cannot override timeout in transaction. + */ @Test public void testTimeoutMS() { try (ClientSession session = client.startSession(ClientSessionOptions.builder() @@ -182,9 +182,9 @@ public void testTimeoutMS() { } } - // - // Ensure legacy settings don't cause issues in sessions - // + /** + * Ensure legacy settings don't cause issues in sessions. + */ @Test public void testTimeoutMSAndLegacySettings() { try (ClientSession session = client.startSession(ClientSessionOptions.builder() diff --git a/driver-sync/src/test/functional/com/mongodb/internal/event/ConfigureFailPointCommandListener.java b/driver-sync/src/test/functional/com/mongodb/internal/event/ConfigureFailPointCommandListener.java new file mode 100644 index 00000000000..a31182a51c0 --- /dev/null +++ b/driver-sync/src/test/functional/com/mongodb/internal/event/ConfigureFailPointCommandListener.java @@ -0,0 +1,105 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.internal.event; + +import com.mongodb.ServerAddress; +import com.mongodb.annotations.ThreadSafe; +import com.mongodb.client.FailPoint; +import com.mongodb.event.CommandEvent; +import com.mongodb.event.CommandFailedEvent; +import com.mongodb.event.CommandListener; +import com.mongodb.event.CommandStartedEvent; +import com.mongodb.event.CommandSucceededEvent; +import org.bson.BsonDocument; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.Predicate; + +import static com.mongodb.assertions.Assertions.assertNotNull; +import static com.mongodb.assertions.Assertions.assertTrue; +import static com.mongodb.assertions.Assertions.fail; + +@ThreadSafe +public final class ConfigureFailPointCommandListener implements CommandListener, AutoCloseable { + private final BsonDocument configureFailPoint; + private final ServerAddress serverAddress; + private final Predicate eventMatcher; + private final Object lock; + private final CompletableFuture failPointFuture; + + /** + * @param configureFailPoint See {@link FailPoint#enable(BsonDocument, ServerAddress)}. + * @param serverAddress See {@link FailPoint#enable(BsonDocument, ServerAddress)}. + * @param eventMatcher When an event is matched, an attempt to configure the fail point + * specified via {@code configureFailPoint} is made. + * The {@code eventMatcher} is guaranteed to be {@linkplain Predicate#test(Object) used} sequentially. + * The attempt is made at most once, + * and the {@code eventMatcher} {@linkplain Predicate#test(Object) test} that caused the attempt is the last one. + */ + public ConfigureFailPointCommandListener( + final BsonDocument configureFailPoint, + final ServerAddress serverAddress, + final Predicate eventMatcher) { + this.configureFailPoint = configureFailPoint; + this.serverAddress = serverAddress; + this.eventMatcher = eventMatcher; + lock = new Object(); + failPointFuture = new CompletableFuture<>(); + } + + @Override + public void commandStarted(final CommandStartedEvent event) { + onEvent(event); + } + + @Override + public void commandSucceeded(final CommandSucceededEvent event) { + onEvent(event); + } + + @Override + public void commandFailed(final CommandFailedEvent event) { + onEvent(event); + } + + private void onEvent(final CommandEvent event) { + synchronized (lock) { + if (!failPointFuture.isDone()) { + try { + if (eventMatcher.test(event)) { + assertTrue(failPointFuture.complete(FailPoint.enable(configureFailPoint, serverAddress))); + } + } catch (Throwable e) { + assertTrue(failPointFuture.completeExceptionally(e)); + } + } + } + } + + @Override + public void close() throws InterruptedException, ExecutionException { + synchronized (lock) { + if (failPointFuture.cancel(true)) { + fail("The listener was closed before (in the happens-before order) it attempted to configure the fail point"); + } else { + assertTrue(failPointFuture.isDone()); + assertNotNull(failPointFuture.get()).close(); + } + } + } +}