diff --git a/pymongo/asynchronous/mongo_client.py b/pymongo/asynchronous/mongo_client.py index 03e2d6073a..412a13ec70 100644 --- a/pymongo/asynchronous/mongo_client.py +++ b/pymongo/asynchronous/mongo_client.py @@ -2779,7 +2779,7 @@ def __init__( self._last_error: Optional[Exception] = None self._retrying = False self._always_retryable = False - self._multiple_retries = _csot.get_timeout() is not None + self._max_retries = float("inf") if _csot.get_timeout() is not None else 1 self._client = mongo_client self._retry_policy = mongo_client._retry_policy self._func = func @@ -2852,6 +2852,8 @@ async def run(self) -> T: # ConnectionFailures do not supply a code property exc_code = getattr(exc, "code", None) overloaded = exc.has_error_label("SystemOverloadedError") + if overloaded: + self._max_retries = self._client.options.max_adaptive_retries always_retryable = exc.has_error_label("RetryableError") and overloaded if not self._client.options.retry_reads or ( not always_retryable @@ -2890,6 +2892,8 @@ async def run(self) -> T: exc_to_check = exc.error retryable_write_label = exc_to_check.has_error_label("RetryableWriteError") overloaded = exc_to_check.has_error_label("SystemOverloadedError") + if overloaded: + self._max_retries = self._client.options.max_adaptive_retries always_retryable = exc_to_check.has_error_label("RetryableError") and overloaded # Always retry abortTransaction and commitTransaction up to once @@ -2943,7 +2947,9 @@ async def run(self) -> T: def _is_not_eligible_for_retry(self) -> bool: """Checks if the exchange is not eligible for retry""" - return not self._retryable or (self._is_retrying() and not self._multiple_retries) + return not self._retryable or ( + self._is_retrying() and self._attempt_number >= self._max_retries + ) def _is_retrying(self) -> bool: """Checks if the exchange is currently undergoing a retry""" diff --git a/pymongo/synchronous/mongo_client.py b/pymongo/synchronous/mongo_client.py index c049dcaeae..2bd6f31b72 100644 --- a/pymongo/synchronous/mongo_client.py +++ b/pymongo/synchronous/mongo_client.py @@ -2769,7 +2769,7 @@ def __init__( self._last_error: Optional[Exception] = None self._retrying = False self._always_retryable = False - self._multiple_retries = _csot.get_timeout() is not None + self._max_retries = float("inf") if _csot.get_timeout() is not None else 1 self._client = mongo_client self._retry_policy = mongo_client._retry_policy self._func = func @@ -2842,6 +2842,8 @@ def run(self) -> T: # ConnectionFailures do not supply a code property exc_code = getattr(exc, "code", None) overloaded = exc.has_error_label("SystemOverloadedError") + if overloaded: + self._max_retries = self._client.options.max_adaptive_retries always_retryable = exc.has_error_label("RetryableError") and overloaded if not self._client.options.retry_reads or ( not always_retryable @@ -2880,6 +2882,8 @@ def run(self) -> T: exc_to_check = exc.error retryable_write_label = exc_to_check.has_error_label("RetryableWriteError") overloaded = exc_to_check.has_error_label("SystemOverloadedError") + if overloaded: + self._max_retries = self._client.options.max_adaptive_retries always_retryable = exc_to_check.has_error_label("RetryableError") and overloaded # Always retry abortTransaction and commitTransaction up to once @@ -2933,7 +2937,9 @@ def run(self) -> T: def _is_not_eligible_for_retry(self) -> bool: """Checks if the exchange is not eligible for retry""" - return not self._retryable or (self._is_retrying() and not self._multiple_retries) + return not self._retryable or ( + self._is_retrying() and self._attempt_number >= self._max_retries + ) def _is_retrying(self) -> bool: """Checks if the exchange is currently undergoing a retry""" diff --git a/test/asynchronous/test_retryable_reads.py b/test/asynchronous/test_retryable_reads.py index 259cd9cff5..c7369db90f 100644 --- a/test/asynchronous/test_retryable_reads.py +++ b/test/asynchronous/test_retryable_reads.py @@ -20,8 +20,11 @@ import sys import threading from test.asynchronous.utils import async_set_fail_point +from unittest import mock -from pymongo.errors import OperationFailure +from pymongo import MongoClient +from pymongo.common import MAX_ADAPTIVE_RETRIES +from pymongo.errors import OperationFailure, PyMongoError sys.path[0:0] = [""] @@ -38,6 +41,7 @@ ) from pymongo.monitoring import ( + CommandFailedEvent, ConnectionCheckedOutEvent, ConnectionCheckOutFailedEvent, ConnectionCheckOutFailedReason, @@ -145,6 +149,19 @@ async def test_pool_paused_error_is_retryable(self): class TestRetryableReads(AsyncIntegrationTest): + async def asyncSetUp(self) -> None: + await super().asyncSetUp() + self.setup_client = MongoClient(**async_client_context.client_options) + self.addCleanup(self.setup_client.close) + + # TODO: After PYTHON-4595 we can use async event handlers and remove this workaround. + def configure_fail_point_sync(self, command_args, off=False) -> None: + cmd = {"configureFailPoint": "failCommand", **command_args} + if off: + cmd["mode"] = "off" + cmd.pop("data", None) + self.setup_client.admin.command(cmd) + @async_client_context.require_multiple_mongoses @async_client_context.require_failCommand_fail_point async def test_retryable_reads_are_retried_on_a_different_mongos_when_one_is_available(self): @@ -383,6 +400,117 @@ async def test_03_03_retryable_reads_caused_by_overload_errors_are_retried_on_th # 6. Assert that both events occurred on the same server. assert listener.failed_events[0].connection_id == listener.succeeded_events[0].connection_id + @async_client_context.require_failCommand_fail_point + @async_client_context.require_version_min(4, 4, 0) # type:ignore[untyped-decorator] + async def test_overload_then_nonoverload_retries_increased_reads(self) -> None: + # Create a client. + listener = OvertCommandListener() + + # Configure the client to listen to CommandFailedEvents. In the attached listener, configure a fail point with error + # code `91` (ShutdownInProgress) and `RetryableError` and `SystemOverloadedError` labels. + overload_fail_point = { + "configureFailPoint": "failCommand", + "mode": {"times": 1}, + "data": { + "failCommands": ["find"], + "errorLabels": ["RetryableError", "SystemOverloadedError"], + "errorCode": 91, + }, + } + + # Configure a fail point with error code `91` (ShutdownInProgress) with only the `RetryableError` error label. + non_overload_fail_point = { + "configureFailPoint": "failCommand", + "mode": "alwaysOn", + "data": { + "failCommands": ["find"], + "errorCode": 91, + "errorLabels": ["RetryableError"], + }, + } + + def failed(event: CommandFailedEvent) -> None: + # Configure the fail point command only if the failed event is for the 91 error configured in step 2. + if listener.failed_events: + return + assert event.failure["code"] == 91 + self.configure_fail_point_sync(non_overload_fail_point) + self.addCleanup(self.configure_fail_point_sync, {}, off=True) + listener.failed_events.append(event) + + listener.failed = failed + + client = await self.async_rs_client(event_listeners=[listener]) + await client.test.test.insert_one({}) + + self.configure_fail_point_sync(overload_fail_point) + self.addCleanup(self.configure_fail_point_sync, {}, off=True) + + with self.assertRaises(PyMongoError): + await client.test.test.find_one() + + started_finds = [e for e in listener.started_events if e.command_name == "find"] + self.assertEqual(len(started_finds), MAX_ADAPTIVE_RETRIES + 1) + + @async_client_context.require_failCommand_fail_point + @async_client_context.require_version_min(4, 4, 0) # type:ignore[untyped-decorator] + async def test_backoff_is_not_applied_for_non_overload_errors(self): + if _IS_SYNC: + mock_target = "pymongo.synchronous.helpers._RetryPolicy.backoff" + else: + mock_target = "pymongo.asynchronous.helpers._RetryPolicy.backoff" + + # Create a client. + listener = OvertCommandListener() + + # Configure the client to listen to CommandFailedEvents. In the attached listener, configure a fail point with error + # code `91` (ShutdownInProgress) and `RetryableError` and `SystemOverloadedError` labels. + overload_fail_point = { + "configureFailPoint": "failCommand", + "mode": {"times": 1}, + "data": { + "failCommands": ["find"], + "errorLabels": ["RetryableError", "SystemOverloadedError"], + "errorCode": 91, + }, + } + + # Configure a fail point with error code `91` (ShutdownInProgress) with only the `RetryableError` error label. + non_overload_fail_point = { + "configureFailPoint": "failCommand", + "mode": "alwaysOn", + "data": { + "failCommands": ["find"], + "errorCode": 91, + "errorLabels": ["RetryableError"], + }, + } + + def failed(event: CommandFailedEvent) -> None: + # Configure the fail point command only if the failed event is for the 91 error configured in step 2. + if listener.failed_events: + return + assert event.failure["code"] == 91 + self.configure_fail_point_sync(non_overload_fail_point) + self.addCleanup(self.configure_fail_point_sync, {}, off=True) + listener.failed_events.append(event) + + listener.failed = failed + + client = await self.async_rs_client(event_listeners=[listener]) + await client.test.test.insert_one({}) + + self.configure_fail_point_sync(overload_fail_point) + self.addCleanup(self.configure_fail_point_sync, {}, off=True) + + # Perform a findOne operation with coll. Expect the operation to fail. + with mock.patch(mock_target, return_value=0) as mock_backoff: + with self.assertRaises(PyMongoError): + await client.test.test.find_one() + + # Assert that backoff was applied only once for the initial overload error and not for the subsequent non-overload retryable errors. + self.assertEqual(mock_backoff.call_count, 1) + if __name__ == "__main__": unittest.main() diff --git a/test/asynchronous/test_retryable_writes.py b/test/asynchronous/test_retryable_writes.py index 6e2072a2ad..a63aa6783e 100644 --- a/test/asynchronous/test_retryable_writes.py +++ b/test/asynchronous/test_retryable_writes.py @@ -21,6 +21,9 @@ import sys import threading from test.asynchronous.utils import async_set_fail_point, flaky +from unittest import mock + +from pymongo.common import MAX_ADAPTIVE_RETRIES sys.path[0:0] = [""] @@ -784,6 +787,111 @@ def failed(event: CommandFailedEvent) -> None: # Assert that the error does not contain the error label `NoWritesPerformed`. assert "NoWritesPerformed" not in exc.exception.errors["errorLabels"] + async def test_overload_then_nonoverload_retries_increased_writes(self) -> None: + # Create a client with retryWrites=true. + listener = OvertCommandListener() + + # Configure the client to listen to CommandFailedEvents. In the attached listener, configure a fail point with error + # code `91` (ShutdownInProgress) and `RetryableError` and `SystemOverloadedError` labels. + overload_fail_point = { + "configureFailPoint": "failCommand", + "mode": {"times": 1}, + "data": { + "failCommands": ["insert"], + "errorLabels": ["RetryableError", "SystemOverloadedError"], + "errorCode": 91, + }, + } + + # Configure a fail point with error code `91` (ShutdownInProgress) with the `RetryableError` and `RetryableWriteError` error labels. + non_overload_fail_point = { + "configureFailPoint": "failCommand", + "mode": "alwaysOn", + "data": { + "failCommands": ["insert"], + "errorCode": 91, + "errorLabels": ["RetryableError", "RetryableWriteError"], + }, + } + + def failed(event: CommandFailedEvent) -> None: + # Configure the fail point command only if the failed event is for the 91 error configured in step 2. + if listener.failed_events: + return + assert event.failure["code"] == 91 + self.configure_fail_point_sync(non_overload_fail_point) + self.addCleanup(self.configure_fail_point_sync, {}, off=True) + listener.failed_events.append(event) + + listener.failed = failed + + client = await self.async_rs_client(retryWrites=True, event_listeners=[listener]) + + self.configure_fail_point_sync(overload_fail_point) + self.addCleanup(self.configure_fail_point_sync, {}, off=True) + + with self.assertRaises(PyMongoError): + await client.test.test.insert_one({"x": 1}) + + started_inserts = [e for e in listener.started_events if e.command_name == "insert"] + self.assertEqual(len(started_inserts), MAX_ADAPTIVE_RETRIES + 1) + + async def test_backoff_is_not_applied_for_non_overload_errors(self): + if _IS_SYNC: + mock_target = "pymongo.synchronous.helpers._RetryPolicy.backoff" + else: + mock_target = "pymongo.asynchronous.helpers._RetryPolicy.backoff" + + # Create a client. + listener = OvertCommandListener() + + # Configure the client to listen to CommandFailedEvents. In the attached listener, configure a fail point with error + # code `91` (ShutdownInProgress) and `RetryableError` and `SystemOverloadedError` labels. + overload_fail_point = { + "configureFailPoint": "failCommand", + "mode": {"times": 1}, + "data": { + "failCommands": ["insert"], + "errorLabels": ["RetryableError", "SystemOverloadedError"], + "errorCode": 91, + }, + } + + # Configure a fail point with error code `91` (ShutdownInProgress) with only the `RetryableError` error label. + non_overload_fail_point = { + "configureFailPoint": "failCommand", + "mode": "alwaysOn", + "data": { + "failCommands": ["insert"], + "errorCode": 91, + "errorLabels": ["RetryableError", "RetryableWriteError"], + }, + } + + def failed(event: CommandFailedEvent) -> None: + # Configure the fail point command only if the failed event is for the 91 error configured in step 2. + if listener.failed_events: + return + assert event.failure["code"] == 91 + self.configure_fail_point_sync(non_overload_fail_point) + self.addCleanup(self.configure_fail_point_sync, {}, off=True) + listener.failed_events.append(event) + + listener.failed = failed + + client = await self.async_rs_client(event_listeners=[listener]) + + self.configure_fail_point_sync(overload_fail_point) + self.addCleanup(self.configure_fail_point_sync, {}, off=True) + + # Perform a findOne operation with coll. Expect the operation to fail. + with mock.patch(mock_target, return_value=0) as mock_backoff: + with self.assertRaises(PyMongoError): + await client.test.test.insert_one({}) + + # Assert that backoff was applied only once for the initial overload error and not for the subsequent non-overload retryable errors. + self.assertEqual(mock_backoff.call_count, 1) + if __name__ == "__main__": unittest.main() diff --git a/test/test_retryable_reads.py b/test/test_retryable_reads.py index 9e6aac821c..7513194793 100644 --- a/test/test_retryable_reads.py +++ b/test/test_retryable_reads.py @@ -20,8 +20,11 @@ import sys import threading from test.utils import set_fail_point +from unittest import mock -from pymongo.errors import OperationFailure +from pymongo import MongoClient +from pymongo.common import MAX_ADAPTIVE_RETRIES +from pymongo.errors import OperationFailure, PyMongoError sys.path[0:0] = [""] @@ -38,6 +41,7 @@ ) from pymongo.monitoring import ( + CommandFailedEvent, ConnectionCheckedOutEvent, ConnectionCheckOutFailedEvent, ConnectionCheckOutFailedReason, @@ -145,6 +149,19 @@ def test_pool_paused_error_is_retryable(self): class TestRetryableReads(IntegrationTest): + def setUp(self) -> None: + super().setUp() + self.setup_client = MongoClient(**client_context.client_options) + self.addCleanup(self.setup_client.close) + + # TODO: After PYTHON-4595 we can use async event handlers and remove this workaround. + def configure_fail_point_sync(self, command_args, off=False) -> None: + cmd = {"configureFailPoint": "failCommand", **command_args} + if off: + cmd["mode"] = "off" + cmd.pop("data", None) + self.setup_client.admin.command(cmd) + @client_context.require_multiple_mongoses @client_context.require_failCommand_fail_point def test_retryable_reads_are_retried_on_a_different_mongos_when_one_is_available(self): @@ -381,6 +398,117 @@ def test_03_03_retryable_reads_caused_by_overload_errors_are_retried_on_the_same # 6. Assert that both events occurred on the same server. assert listener.failed_events[0].connection_id == listener.succeeded_events[0].connection_id + @client_context.require_failCommand_fail_point + @client_context.require_version_min(4, 4, 0) # type:ignore[untyped-decorator] + def test_overload_then_nonoverload_retries_increased_reads(self) -> None: + # Create a client. + listener = OvertCommandListener() + + # Configure the client to listen to CommandFailedEvents. In the attached listener, configure a fail point with error + # code `91` (ShutdownInProgress) and `RetryableError` and `SystemOverloadedError` labels. + overload_fail_point = { + "configureFailPoint": "failCommand", + "mode": {"times": 1}, + "data": { + "failCommands": ["find"], + "errorLabels": ["RetryableError", "SystemOverloadedError"], + "errorCode": 91, + }, + } + + # Configure a fail point with error code `91` (ShutdownInProgress) with only the `RetryableError` error label. + non_overload_fail_point = { + "configureFailPoint": "failCommand", + "mode": "alwaysOn", + "data": { + "failCommands": ["find"], + "errorCode": 91, + "errorLabels": ["RetryableError"], + }, + } + + def failed(event: CommandFailedEvent) -> None: + # Configure the fail point command only if the failed event is for the 91 error configured in step 2. + if listener.failed_events: + return + assert event.failure["code"] == 91 + self.configure_fail_point_sync(non_overload_fail_point) + self.addCleanup(self.configure_fail_point_sync, {}, off=True) + listener.failed_events.append(event) + + listener.failed = failed + + client = self.rs_client(event_listeners=[listener]) + client.test.test.insert_one({}) + + self.configure_fail_point_sync(overload_fail_point) + self.addCleanup(self.configure_fail_point_sync, {}, off=True) + + with self.assertRaises(PyMongoError): + client.test.test.find_one() + + started_finds = [e for e in listener.started_events if e.command_name == "find"] + self.assertEqual(len(started_finds), MAX_ADAPTIVE_RETRIES + 1) + + @client_context.require_failCommand_fail_point + @client_context.require_version_min(4, 4, 0) # type:ignore[untyped-decorator] + def test_backoff_is_not_applied_for_non_overload_errors(self): + if _IS_SYNC: + mock_target = "pymongo.synchronous.helpers._RetryPolicy.backoff" + else: + mock_target = "pymongo.helpers._RetryPolicy.backoff" + + # Create a client. + listener = OvertCommandListener() + + # Configure the client to listen to CommandFailedEvents. In the attached listener, configure a fail point with error + # code `91` (ShutdownInProgress) and `RetryableError` and `SystemOverloadedError` labels. + overload_fail_point = { + "configureFailPoint": "failCommand", + "mode": {"times": 1}, + "data": { + "failCommands": ["find"], + "errorLabels": ["RetryableError", "SystemOverloadedError"], + "errorCode": 91, + }, + } + + # Configure a fail point with error code `91` (ShutdownInProgress) with only the `RetryableError` error label. + non_overload_fail_point = { + "configureFailPoint": "failCommand", + "mode": "alwaysOn", + "data": { + "failCommands": ["find"], + "errorCode": 91, + "errorLabels": ["RetryableError"], + }, + } + + def failed(event: CommandFailedEvent) -> None: + # Configure the fail point command only if the failed event is for the 91 error configured in step 2. + if listener.failed_events: + return + assert event.failure["code"] == 91 + self.configure_fail_point_sync(non_overload_fail_point) + self.addCleanup(self.configure_fail_point_sync, {}, off=True) + listener.failed_events.append(event) + + listener.failed = failed + + client = self.rs_client(event_listeners=[listener]) + client.test.test.insert_one({}) + + self.configure_fail_point_sync(overload_fail_point) + self.addCleanup(self.configure_fail_point_sync, {}, off=True) + + # Perform a findOne operation with coll. Expect the operation to fail. + with mock.patch(mock_target, return_value=0) as mock_backoff: + with self.assertRaises(PyMongoError): + client.test.test.find_one() + + # Assert that backoff was applied only once for the initial overload error and not for the subsequent non-overload retryable errors. + self.assertEqual(mock_backoff.call_count, 1) + if __name__ == "__main__": unittest.main() diff --git a/test/test_retryable_writes.py b/test/test_retryable_writes.py index 5509083162..46ca3a43b8 100644 --- a/test/test_retryable_writes.py +++ b/test/test_retryable_writes.py @@ -21,6 +21,9 @@ import sys import threading from test.utils import flaky, set_fail_point +from unittest import mock + +from pymongo.common import MAX_ADAPTIVE_RETRIES sys.path[0:0] = [""] @@ -780,6 +783,111 @@ def failed(event: CommandFailedEvent) -> None: # Assert that the error does not contain the error label `NoWritesPerformed`. assert "NoWritesPerformed" not in exc.exception.errors["errorLabels"] + def test_overload_then_nonoverload_retries_increased_writes(self) -> None: + # Create a client with retryWrites=true. + listener = OvertCommandListener() + + # Configure the client to listen to CommandFailedEvents. In the attached listener, configure a fail point with error + # code `91` (ShutdownInProgress) and `RetryableError` and `SystemOverloadedError` labels. + overload_fail_point = { + "configureFailPoint": "failCommand", + "mode": {"times": 1}, + "data": { + "failCommands": ["insert"], + "errorLabels": ["RetryableError", "SystemOverloadedError"], + "errorCode": 91, + }, + } + + # Configure a fail point with error code `91` (ShutdownInProgress) with the `RetryableError` and `RetryableWriteError` error labels. + non_overload_fail_point = { + "configureFailPoint": "failCommand", + "mode": "alwaysOn", + "data": { + "failCommands": ["insert"], + "errorCode": 91, + "errorLabels": ["RetryableError", "RetryableWriteError"], + }, + } + + def failed(event: CommandFailedEvent) -> None: + # Configure the fail point command only if the failed event is for the 91 error configured in step 2. + if listener.failed_events: + return + assert event.failure["code"] == 91 + self.configure_fail_point_sync(non_overload_fail_point) + self.addCleanup(self.configure_fail_point_sync, {}, off=True) + listener.failed_events.append(event) + + listener.failed = failed + + client = self.rs_client(retryWrites=True, event_listeners=[listener]) + + self.configure_fail_point_sync(overload_fail_point) + self.addCleanup(self.configure_fail_point_sync, {}, off=True) + + with self.assertRaises(PyMongoError): + client.test.test.insert_one({"x": 1}) + + started_inserts = [e for e in listener.started_events if e.command_name == "insert"] + self.assertEqual(len(started_inserts), MAX_ADAPTIVE_RETRIES + 1) + + def test_backoff_is_not_applied_for_non_overload_errors(self): + if _IS_SYNC: + mock_target = "pymongo.synchronous.helpers._RetryPolicy.backoff" + else: + mock_target = "pymongo.helpers._RetryPolicy.backoff" + + # Create a client. + listener = OvertCommandListener() + + # Configure the client to listen to CommandFailedEvents. In the attached listener, configure a fail point with error + # code `91` (ShutdownInProgress) and `RetryableError` and `SystemOverloadedError` labels. + overload_fail_point = { + "configureFailPoint": "failCommand", + "mode": {"times": 1}, + "data": { + "failCommands": ["insert"], + "errorLabels": ["RetryableError", "SystemOverloadedError"], + "errorCode": 91, + }, + } + + # Configure a fail point with error code `91` (ShutdownInProgress) with only the `RetryableError` error label. + non_overload_fail_point = { + "configureFailPoint": "failCommand", + "mode": "alwaysOn", + "data": { + "failCommands": ["insert"], + "errorCode": 91, + "errorLabels": ["RetryableError", "RetryableWriteError"], + }, + } + + def failed(event: CommandFailedEvent) -> None: + # Configure the fail point command only if the failed event is for the 91 error configured in step 2. + if listener.failed_events: + return + assert event.failure["code"] == 91 + self.configure_fail_point_sync(non_overload_fail_point) + self.addCleanup(self.configure_fail_point_sync, {}, off=True) + listener.failed_events.append(event) + + listener.failed = failed + + client = self.rs_client(event_listeners=[listener]) + + self.configure_fail_point_sync(overload_fail_point) + self.addCleanup(self.configure_fail_point_sync, {}, off=True) + + # Perform a findOne operation with coll. Expect the operation to fail. + with mock.patch(mock_target, return_value=0) as mock_backoff: + with self.assertRaises(PyMongoError): + client.test.test.insert_one({}) + + # Assert that backoff was applied only once for the initial overload error and not for the subsequent non-overload retryable errors. + self.assertEqual(mock_backoff.call_count, 1) + if __name__ == "__main__": unittest.main()