Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions pymongo/asynchronous/mongo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2779,7 +2779,7 @@ def __init__(
self._last_error: Optional[Exception] = None
self._retrying = False
self._always_retryable = False
self._multiple_retries = _csot.get_timeout() is not None
self._max_retries = float("inf") if _csot.get_timeout() is not None else 1
self._client = mongo_client
self._retry_policy = mongo_client._retry_policy
self._func = func
Expand Down Expand Up @@ -2852,6 +2852,8 @@ async def run(self) -> T:
# ConnectionFailures do not supply a code property
exc_code = getattr(exc, "code", None)
overloaded = exc.has_error_label("SystemOverloadedError")
if overloaded:
self._max_retries = self._client.options.max_adaptive_retries
always_retryable = exc.has_error_label("RetryableError") and overloaded
if not self._client.options.retry_reads or (
not always_retryable
Expand Down Expand Up @@ -2890,6 +2892,8 @@ async def run(self) -> T:
exc_to_check = exc.error
retryable_write_label = exc_to_check.has_error_label("RetryableWriteError")
overloaded = exc_to_check.has_error_label("SystemOverloadedError")
if overloaded:
self._max_retries = self._client.options.max_adaptive_retries
always_retryable = exc_to_check.has_error_label("RetryableError") and overloaded

# Always retry abortTransaction and commitTransaction up to once
Expand Down Expand Up @@ -2943,7 +2947,9 @@ async def run(self) -> T:

def _is_not_eligible_for_retry(self) -> bool:
"""Checks if the exchange is not eligible for retry"""
return not self._retryable or (self._is_retrying() and not self._multiple_retries)
return not self._retryable or (
self._is_retrying() and self._attempt_number >= self._max_retries
)

def _is_retrying(self) -> bool:
"""Checks if the exchange is currently undergoing a retry"""
Expand Down
10 changes: 8 additions & 2 deletions pymongo/synchronous/mongo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2769,7 +2769,7 @@ def __init__(
self._last_error: Optional[Exception] = None
self._retrying = False
self._always_retryable = False
self._multiple_retries = _csot.get_timeout() is not None
self._max_retries = float("inf") if _csot.get_timeout() is not None else 1
self._client = mongo_client
self._retry_policy = mongo_client._retry_policy
self._func = func
Expand Down Expand Up @@ -2842,6 +2842,8 @@ def run(self) -> T:
# ConnectionFailures do not supply a code property
exc_code = getattr(exc, "code", None)
overloaded = exc.has_error_label("SystemOverloadedError")
if overloaded:
self._max_retries = self._client.options.max_adaptive_retries
always_retryable = exc.has_error_label("RetryableError") and overloaded
if not self._client.options.retry_reads or (
not always_retryable
Expand Down Expand Up @@ -2880,6 +2882,8 @@ def run(self) -> T:
exc_to_check = exc.error
retryable_write_label = exc_to_check.has_error_label("RetryableWriteError")
overloaded = exc_to_check.has_error_label("SystemOverloadedError")
if overloaded:
self._max_retries = self._client.options.max_adaptive_retries
always_retryable = exc_to_check.has_error_label("RetryableError") and overloaded

# Always retry abortTransaction and commitTransaction up to once
Expand Down Expand Up @@ -2933,7 +2937,9 @@ def run(self) -> T:

def _is_not_eligible_for_retry(self) -> bool:
"""Checks if the exchange is not eligible for retry"""
return not self._retryable or (self._is_retrying() and not self._multiple_retries)
return not self._retryable or (
self._is_retrying() and self._attempt_number >= self._max_retries
)

def _is_retrying(self) -> bool:
"""Checks if the exchange is currently undergoing a retry"""
Expand Down
130 changes: 129 additions & 1 deletion test/asynchronous/test_retryable_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@
import sys
import threading
from test.asynchronous.utils import async_set_fail_point
from unittest import mock

from pymongo.errors import OperationFailure
from pymongo import MongoClient
from pymongo.common import MAX_ADAPTIVE_RETRIES
from pymongo.errors import OperationFailure, PyMongoError

sys.path[0:0] = [""]

Expand All @@ -38,6 +41,7 @@
)

from pymongo.monitoring import (
CommandFailedEvent,
ConnectionCheckedOutEvent,
ConnectionCheckOutFailedEvent,
ConnectionCheckOutFailedReason,
Expand Down Expand Up @@ -145,6 +149,19 @@ async def test_pool_paused_error_is_retryable(self):


class TestRetryableReads(AsyncIntegrationTest):
async def asyncSetUp(self) -> None:
await super().asyncSetUp()
self.setup_client = MongoClient(**async_client_context.client_options)
self.addCleanup(self.setup_client.close)

# TODO: After PYTHON-4595 we can use async event handlers and remove this workaround.
def configure_fail_point_sync(self, command_args, off=False) -> None:
cmd = {"configureFailPoint": "failCommand", **command_args}
if off:
cmd["mode"] = "off"
cmd.pop("data", None)
self.setup_client.admin.command(cmd)

@async_client_context.require_multiple_mongoses
@async_client_context.require_failCommand_fail_point
async def test_retryable_reads_are_retried_on_a_different_mongos_when_one_is_available(self):
Expand Down Expand Up @@ -383,6 +400,117 @@ async def test_03_03_retryable_reads_caused_by_overload_errors_are_retried_on_th
# 6. Assert that both events occurred on the same server.
assert listener.failed_events[0].connection_id == listener.succeeded_events[0].connection_id

@async_client_context.require_failCommand_fail_point
@async_client_context.require_version_min(4, 4, 0) # type:ignore[untyped-decorator]
async def test_overload_then_nonoverload_retries_increased_reads(self) -> None:
# Create a client.
listener = OvertCommandListener()

# Configure the client to listen to CommandFailedEvents. In the attached listener, configure a fail point with error
# code `91` (ShutdownInProgress) and `RetryableError` and `SystemOverloadedError` labels.
overload_fail_point = {
"configureFailPoint": "failCommand",
"mode": {"times": 1},
"data": {
"failCommands": ["find"],
"errorLabels": ["RetryableError", "SystemOverloadedError"],
"errorCode": 91,
},
}

# Configure a fail point with error code `91` (ShutdownInProgress) with only the `RetryableError` error label.
non_overload_fail_point = {
"configureFailPoint": "failCommand",
"mode": "alwaysOn",
"data": {
"failCommands": ["find"],
"errorCode": 91,
"errorLabels": ["RetryableError"],
},
}

def failed(event: CommandFailedEvent) -> None:
# Configure the fail point command only if the failed event is for the 91 error configured in step 2.
if listener.failed_events:
return
assert event.failure["code"] == 91
self.configure_fail_point_sync(non_overload_fail_point)
self.addCleanup(self.configure_fail_point_sync, {}, off=True)
listener.failed_events.append(event)

listener.failed = failed

client = await self.async_rs_client(event_listeners=[listener])
await client.test.test.insert_one({})

self.configure_fail_point_sync(overload_fail_point)
self.addCleanup(self.configure_fail_point_sync, {}, off=True)

with self.assertRaises(PyMongoError):
await client.test.test.find_one()

started_finds = [e for e in listener.started_events if e.command_name == "find"]
self.assertEqual(len(started_finds), MAX_ADAPTIVE_RETRIES + 1)

@async_client_context.require_failCommand_fail_point
@async_client_context.require_version_min(4, 4, 0) # type:ignore[untyped-decorator]
async def test_backoff_is_not_applied_for_non_overload_errors(self):
if _IS_SYNC:
mock_target = "pymongo.synchronous.helpers._RetryPolicy.backoff"
else:
mock_target = "pymongo.asynchronous.helpers._RetryPolicy.backoff"

# Create a client.
listener = OvertCommandListener()

# Configure the client to listen to CommandFailedEvents. In the attached listener, configure a fail point with error
# code `91` (ShutdownInProgress) and `RetryableError` and `SystemOverloadedError` labels.
overload_fail_point = {
"configureFailPoint": "failCommand",
"mode": {"times": 1},
"data": {
"failCommands": ["find"],
"errorLabels": ["RetryableError", "SystemOverloadedError"],
"errorCode": 91,
},
}

# Configure a fail point with error code `91` (ShutdownInProgress) with only the `RetryableError` error label.
non_overload_fail_point = {
"configureFailPoint": "failCommand",
"mode": "alwaysOn",
"data": {
"failCommands": ["find"],
"errorCode": 91,
"errorLabels": ["RetryableError"],
},
}

def failed(event: CommandFailedEvent) -> None:
# Configure the fail point command only if the failed event is for the 91 error configured in step 2.
if listener.failed_events:
return
assert event.failure["code"] == 91
self.configure_fail_point_sync(non_overload_fail_point)
self.addCleanup(self.configure_fail_point_sync, {}, off=True)
listener.failed_events.append(event)

listener.failed = failed

client = await self.async_rs_client(event_listeners=[listener])
await client.test.test.insert_one({})
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT should we make test.test part of every test's setup in the future so we don't have to keep remembering to do it here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by test.test?


self.configure_fail_point_sync(overload_fail_point)
self.addCleanup(self.configure_fail_point_sync, {}, off=True)

# Perform a findOne operation with coll. Expect the operation to fail.
with mock.patch(mock_target, return_value=0) as mock_backoff:
with self.assertRaises(PyMongoError):
await client.test.test.find_one()

# Assert that backoff was applied only once for the initial overload error and not for the subsequent non-overload retryable errors.
self.assertEqual(mock_backoff.call_count, 1)


if __name__ == "__main__":
unittest.main()
108 changes: 108 additions & 0 deletions test/asynchronous/test_retryable_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import sys
import threading
from test.asynchronous.utils import async_set_fail_point, flaky
from unittest import mock

from pymongo.common import MAX_ADAPTIVE_RETRIES

sys.path[0:0] = [""]

Expand Down Expand Up @@ -784,6 +787,111 @@ def failed(event: CommandFailedEvent) -> None:
# Assert that the error does not contain the error label `NoWritesPerformed`.
assert "NoWritesPerformed" not in exc.exception.errors["errorLabels"]

async def test_overload_then_nonoverload_retries_increased_writes(self) -> None:
# Create a client with retryWrites=true.
listener = OvertCommandListener()

# Configure the client to listen to CommandFailedEvents. In the attached listener, configure a fail point with error
# code `91` (ShutdownInProgress) and `RetryableError` and `SystemOverloadedError` labels.
overload_fail_point = {
"configureFailPoint": "failCommand",
"mode": {"times": 1},
"data": {
"failCommands": ["insert"],
"errorLabels": ["RetryableError", "SystemOverloadedError"],
"errorCode": 91,
},
}

# Configure a fail point with error code `91` (ShutdownInProgress) with the `RetryableError` and `RetryableWriteError` error labels.
non_overload_fail_point = {
"configureFailPoint": "failCommand",
"mode": "alwaysOn",
"data": {
"failCommands": ["insert"],
"errorCode": 91,
"errorLabels": ["RetryableError", "RetryableWriteError"],
},
}

def failed(event: CommandFailedEvent) -> None:
# Configure the fail point command only if the failed event is for the 91 error configured in step 2.
if listener.failed_events:
return
assert event.failure["code"] == 91
self.configure_fail_point_sync(non_overload_fail_point)
self.addCleanup(self.configure_fail_point_sync, {}, off=True)
listener.failed_events.append(event)

listener.failed = failed

client = await self.async_rs_client(retryWrites=True, event_listeners=[listener])

self.configure_fail_point_sync(overload_fail_point)
self.addCleanup(self.configure_fail_point_sync, {}, off=True)
Comment on lines +830 to +831
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we still initialize the db even though a write operation is performed?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to. We only do a separate insert_one on tests that make find_one calls because a find_one against an empty collection returns None.


with self.assertRaises(PyMongoError):
await client.test.test.insert_one({"x": 1})

started_inserts = [e for e in listener.started_events if e.command_name == "insert"]
self.assertEqual(len(started_inserts), MAX_ADAPTIVE_RETRIES + 1)

async def test_backoff_is_not_applied_for_non_overload_errors(self):
if _IS_SYNC:
mock_target = "pymongo.synchronous.helpers._RetryPolicy.backoff"
else:
mock_target = "pymongo.asynchronous.helpers._RetryPolicy.backoff"

# Create a client.
listener = OvertCommandListener()

# Configure the client to listen to CommandFailedEvents. In the attached listener, configure a fail point with error
# code `91` (ShutdownInProgress) and `RetryableError` and `SystemOverloadedError` labels.
overload_fail_point = {
"configureFailPoint": "failCommand",
"mode": {"times": 1},
"data": {
"failCommands": ["insert"],
"errorLabels": ["RetryableError", "SystemOverloadedError"],
"errorCode": 91,
},
}

# Configure a fail point with error code `91` (ShutdownInProgress) with only the `RetryableError` error label.
non_overload_fail_point = {
"configureFailPoint": "failCommand",
"mode": "alwaysOn",
"data": {
"failCommands": ["insert"],
"errorCode": 91,
"errorLabels": ["RetryableError", "RetryableWriteError"],
},
}

def failed(event: CommandFailedEvent) -> None:
# Configure the fail point command only if the failed event is for the 91 error configured in step 2.
if listener.failed_events:
return
assert event.failure["code"] == 91
self.configure_fail_point_sync(non_overload_fail_point)
self.addCleanup(self.configure_fail_point_sync, {}, off=True)
listener.failed_events.append(event)

listener.failed = failed

client = await self.async_rs_client(event_listeners=[listener])

self.configure_fail_point_sync(overload_fail_point)
self.addCleanup(self.configure_fail_point_sync, {}, off=True)

# Perform a findOne operation with coll. Expect the operation to fail.
with mock.patch(mock_target, return_value=0) as mock_backoff:
with self.assertRaises(PyMongoError):
await client.test.test.insert_one({})

# Assert that backoff was applied only once for the initial overload error and not for the subsequent non-overload retryable errors.
self.assertEqual(mock_backoff.call_count, 1)


if __name__ == "__main__":
unittest.main()
Loading
Loading