Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion cassandra/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -687,10 +687,29 @@ class OperationTimedOut(DriverException):
The last :class:`~.Host` this operation was attempted against.
"""

def __init__(self, errors=None, last_host=None):
timeout = None
"""
The timeout value (in seconds) that was in effect when the operation
timed out, or ``None`` if not applicable.
"""

in_flight = None
"""
The number of in-flight requests on the connection at the time of
the timeout (includes orphaned requests), or ``None`` if not applicable.
"""

def __init__(self, errors=None, last_host=None, timeout=None, in_flight=None):
self.errors = errors
self.last_host = last_host
self.timeout = timeout
self.in_flight = in_flight
message = "errors=%s, last_host=%s" % (self.errors, self.last_host)
if self.timeout is not None:
message += " (timeout=%ss" % self.timeout
if self.in_flight is not None:
message += ", in_flight=%d" % self.in_flight
message += ")"
Exception.__init__(self, message)


Expand Down
60 changes: 56 additions & 4 deletions cassandra/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@
from cassandra import (ConsistencyLevel, AuthenticationFailed, InvalidRequest,
OperationTimedOut, UnsupportedOperation,
SchemaTargetType, DriverException, ProtocolVersion,
UnresolvableContactPoints, DependencyException)
UnresolvableContactPoints, DependencyException,
WriteType, consistency_value_to_name)
from cassandra.auth import _proxy_execute_key, PlainTextAuthProvider
from cassandra.client_routes import ClientRoutesChangeType, ClientRoutesConfig, _ClientRoutesHandler
from cassandra.connection import (ClientRoutesEndPointFactory, ConnectionException, ConnectionShutdown,
Expand Down Expand Up @@ -191,6 +192,19 @@ def _connection_reduce_fn(val,import_fn):

log = logging.getLogger(__name__)

_RETRY_DECISION_NAMES = {
RetryPolicy.RETRY: "RETRY",
RetryPolicy.RETHROW: "RETHROW",
RetryPolicy.IGNORE: "IGNORE",
RetryPolicy.RETRY_NEXT_HOST: "RETRY_NEXT_HOST",
}


def _retry_decision_name(retry):
"""Return a human-readable name for a retry policy decision tuple."""
retry_type = retry[0] if isinstance(retry, tuple) else retry
return _RETRY_DECISION_NAMES.get(retry_type, "UNKNOWN(%s)" % retry_type)


_GRAPH_PAGING_MIN_DSE_VERSION = Version('6.8.0')

Expand Down Expand Up @@ -1683,7 +1697,8 @@ def add_execution_profile(self, name, profile, pool_wait_timeout=5):
futures.update(session.update_created_pools())
_, not_done = wait_futures(futures, pool_wait_timeout)
if not_done:
raise OperationTimedOut("Failed to create all new connection pools in the %ss timeout.")
raise OperationTimedOut("Failed to create all new connection pools in the %ss timeout." % pool_wait_timeout,
timeout=pool_wait_timeout)

def connection_factory(self, endpoint, host_conn = None, *args, **kwargs):
"""
Expand Down Expand Up @@ -4505,6 +4520,8 @@ def _on_timeout(self, _attempts=0):
)
return

conn_in_flight = None
conn_orphaned = None
if self._connection is not None:
try:
self._connection._requests.pop(self._req_id)
Expand All @@ -4515,9 +4532,20 @@ def _on_timeout(self, _attempts=0):
except KeyError:
key = "Connection defunct by heartbeat"
errors = {key: "Client request timeout. See Session.execute[_async](timeout)"}
self._set_final_exception(OperationTimedOut(errors, self._current_host))
log.debug("Client request timeout (host=%s, timeout=%ss, in_flight=%d, orphaned=%d): "
"connection defunct by heartbeat",
self._current_host, self.timeout,
self._connection.in_flight,
len(self._connection.orphaned_request_ids))
self._set_final_exception(OperationTimedOut(errors, self._current_host,
timeout=self.timeout,
in_flight=self._connection.in_flight))
return

# Capture connection stats before pool.return_connection() can alter state
conn_in_flight = self._connection.in_flight
conn_orphaned = len(self._connection.orphaned_request_ids)

pool = self.session._pools.get(self._current_host)
if pool and not pool.is_shutdown:
# Do not return the stream ID to the pool yet. We cannot reuse it
Expand All @@ -4542,7 +4570,11 @@ def _on_timeout(self, _attempts=0):
host = str(connection.endpoint) if connection else 'unknown'
errors = {host: "Request timed out while waiting for schema agreement. See Session.execute[_async](timeout) and Cluster.max_schema_agreement_wait."}

self._set_final_exception(OperationTimedOut(errors, self._current_host))
log.debug("Client request timeout (host=%s, timeout=%ss, in_flight=%s, orphaned=%s)",
self._current_host, self.timeout, conn_in_flight, conn_orphaned)
self._set_final_exception(OperationTimedOut(errors, self._current_host,
timeout=self.timeout,
in_flight=conn_in_flight))

def _on_speculative_execute(self):
self._timer = None
Expand Down Expand Up @@ -4791,11 +4823,31 @@ def _set_result(self, host, connection, pool, response):
self._metrics.on_read_timeout()
retry = retry_policy.on_read_timeout(
self.query, retry_num=self._query_retries, **response.info)
log.debug("Server read timeout (host=%s, consistency=%s, "
"received=%s, required=%s, data_retrieved=%s, "
"retry_num=%d, retry_decision=%s)",
host,
consistency_value_to_name(response.info.get('consistency')),
response.info.get('received_responses'),
response.info.get('required_responses'),
response.info.get('data_retrieved'),
self._query_retries,
_retry_decision_name(retry))
elif isinstance(response, WriteTimeoutErrorMessage):
if self._metrics is not None:
self._metrics.on_write_timeout()
retry = retry_policy.on_write_timeout(
self.query, retry_num=self._query_retries, **response.info)
log.debug("Server write timeout (host=%s, consistency=%s, "
"received=%s, required=%s, write_type=%s, "
"retry_num=%d, retry_decision=%s)",
host,
consistency_value_to_name(response.info.get('consistency')),
response.info.get('received_responses'),
response.info.get('required_responses'),
WriteType.value_to_name.get(response.info.get('write_type'), 'UNKNOWN'),
self._query_retries,
_retry_decision_name(retry))
elif isinstance(response, UnavailableErrorMessage):
if self._metrics is not None:
self._metrics.on_unavailable()
Expand Down
15 changes: 11 additions & 4 deletions cassandra/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -984,7 +984,8 @@ def factory(cls, endpoint, timeout, host_conn = None, *args, **kwargs):
raise conn.last_error
elif not conn.connected_event.is_set():
conn.close()
raise OperationTimedOut("Timed out creating connection (%s seconds)" % timeout)
raise OperationTimedOut("Timed out creating connection (%s seconds)" % timeout,
timeout=timeout)
else:
return conn

Expand Down Expand Up @@ -1247,6 +1248,7 @@ def wait_for_responses(self, *msgs, **kwargs):
msg += ": %s" % (self.last_error,)
raise ConnectionShutdown(msg)
timeout = kwargs.get('timeout')
original_timeout = timeout # preserve for exception reporting
fail_on_error = kwargs.get('fail_on_error', True)
waiter = ResponseWaiter(self, len(msgs), fail_on_error)

Expand All @@ -1271,7 +1273,8 @@ def wait_for_responses(self, *msgs, **kwargs):
if timeout is not None:
timeout -= 0.01
if timeout <= 0.0:
raise OperationTimedOut()
raise OperationTimedOut(timeout=original_timeout,
in_flight=self.in_flight)
time.sleep(0.01)

try:
Expand Down Expand Up @@ -1796,7 +1799,8 @@ def deliver(self, timeout=None):
if self.error:
raise self.error
elif not self.event.is_set():
raise OperationTimedOut()
raise OperationTimedOut(timeout=timeout,
in_flight=self.connection.in_flight)
else:
return self.responses

Expand All @@ -1823,7 +1827,10 @@ def wait(self, timeout):
if self._exception:
raise self._exception
else:
raise OperationTimedOut("Connection heartbeat timeout after %s seconds" % (timeout,), self.connection.endpoint)
raise OperationTimedOut("Connection heartbeat timeout after %s seconds" % (timeout,),
self.connection.endpoint,
timeout=timeout,
in_flight=self.connection.in_flight)

def _options_callback(self, response):
if isinstance(response, SupportedMessage):
Expand Down
58 changes: 58 additions & 0 deletions tests/unit/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,64 @@ def test_exception_types(self):
assert issubclass(UnsupportedOperation, DriverException)


class OperationTimedOutTest(unittest.TestCase):

def test_message_without_timeout(self):
"""Default message format when no timeout info is provided."""
exc = OperationTimedOut(errors={'host1': 'some error'}, last_host='host1')
msg = str(exc)
assert "errors={'host1': 'some error'}" in msg
assert "last_host=host1" in msg
assert "timeout=" not in msg
assert "in_flight=" not in msg

def test_message_with_timeout_and_in_flight(self):
"""Message includes timeout and in_flight when both are provided."""
exc = OperationTimedOut(errors={'host1': 'err'}, last_host='host1',
timeout=10.0, in_flight=42)
msg = str(exc)
assert "(timeout=10.0s, in_flight=42)" in msg

def test_message_with_timeout_no_in_flight(self):
"""Message includes timeout but not in_flight when only timeout is set."""
exc = OperationTimedOut(timeout=5.0)
msg = str(exc)
assert "(timeout=5.0s)" in msg
assert "in_flight=" not in msg

def test_message_no_args(self):
"""No-argument form should not crash and should have clean message."""
exc = OperationTimedOut()
msg = str(exc)
assert "errors=None, last_host=None" in msg
assert "timeout=" not in msg

def test_attributes_accessible(self):
"""New and existing attributes should be readable."""
exc = OperationTimedOut(errors={'h': 'e'}, last_host='h',
timeout=10.0, in_flight=42)
assert exc.errors == {'h': 'e'}
assert exc.last_host == 'h'
assert exc.timeout == 10.0
assert exc.in_flight == 42

def test_attributes_default_none(self):
"""New attributes should default to None when not provided."""
exc = OperationTimedOut()
assert exc.timeout is None
assert exc.in_flight is None
assert exc.errors is None
assert exc.last_host is None

def test_backward_compat_positional(self):
"""Existing two-positional-arg form should still work."""
exc = OperationTimedOut({'h': 'err'}, 'host1')
assert exc.errors == {'h': 'err'}
assert exc.last_host == 'host1'
assert exc.timeout is None
assert exc.in_flight is None


class ClusterTest(unittest.TestCase):

def test_tuple_for_contact_points(self):
Expand Down
2 changes: 2 additions & 0 deletions tests/unit/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,8 @@ def send_msg(msg, req_id, msg_callback):
assert isinstance(exc, OperationTimedOut)
assert exc.errors == 'Connection heartbeat timeout after 0.05 seconds'
assert exc.last_host == DefaultEndPoint('localhost')
assert exc.timeout == 0.05
assert isinstance(exc.in_flight, int)
holder.return_connection.assert_has_calls(
[call(connection)] * get_holders.call_count)

Expand Down
12 changes: 9 additions & 3 deletions tests/unit/test_response_future.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ def test_heartbeat_defunct_deadlock(self):

connection = MagicMock(spec=Connection)
connection._requests = {}
connection.in_flight = 5
connection.orphaned_request_ids = set()

pool = Mock()
pool.is_shutdown = False
Expand All @@ -162,8 +164,10 @@ def test_heartbeat_defunct_deadlock(self):

# Simulate ResponseFuture timing out
rf._on_timeout()
with pytest.raises(OperationTimedOut, match="Connection defunct by heartbeat"):
with pytest.raises(OperationTimedOut, match="Connection defunct by heartbeat") as exc_info:
rf.result()
assert exc_info.value.timeout == 1
assert exc_info.value.in_flight == 5

def test_read_timeout_error_message(self):
session = self.make_session()
Expand Down Expand Up @@ -653,7 +657,7 @@ def test_timeout_does_not_release_stream_id(self):
pool = self.make_pool()
session._pools.get.return_value = pool
connection = Mock(spec=Connection, lock=RLock(), _requests={}, request_ids=deque(),
orphaned_request_ids=set(), orphaned_threshold=256)
orphaned_request_ids=set(), orphaned_threshold=256, in_flight=3)
pool.borrow_connection.return_value = (connection, 1)

rf = self.make_response_future(session)
Expand All @@ -663,8 +667,10 @@ def test_timeout_does_not_release_stream_id(self):

rf._on_timeout()
pool.return_connection.assert_called_once_with(connection, stream_was_orphaned=True)
with pytest.raises(OperationTimedOut, match="Client request timeout"):
with pytest.raises(OperationTimedOut, match="Client request timeout") as exc_info:
rf.result()
assert exc_info.value.timeout == 1
assert exc_info.value.in_flight == 3

assert len(connection.request_ids) == 0, \
"Request IDs should be empty but it's not: {}".format(connection.request_ids)
Expand Down
Loading