diff --git a/cassandra/__init__.py b/cassandra/__init__.py index 3ad8fcdfd1..46de7daaf0 100644 --- a/cassandra/__init__.py +++ b/cassandra/__init__.py @@ -687,10 +687,29 @@ class OperationTimedOut(DriverException): The last :class:`~.Host` this operation was attempted against. """ - def __init__(self, errors=None, last_host=None): + timeout = None + """ + The timeout value (in seconds) that was in effect when the operation + timed out, or ``None`` if not applicable. + """ + + in_flight = None + """ + The number of in-flight requests on the connection at the time of + the timeout (includes orphaned requests), or ``None`` if not applicable. + """ + + def __init__(self, errors=None, last_host=None, timeout=None, in_flight=None): self.errors = errors self.last_host = last_host + self.timeout = timeout + self.in_flight = in_flight message = "errors=%s, last_host=%s" % (self.errors, self.last_host) + if self.timeout is not None: + message += " (timeout=%ss" % self.timeout + if self.in_flight is not None: + message += ", in_flight=%d" % self.in_flight + message += ")" Exception.__init__(self, message) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 9eace8810d..646fd37e3a 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -46,7 +46,8 @@ from cassandra import (ConsistencyLevel, AuthenticationFailed, InvalidRequest, OperationTimedOut, UnsupportedOperation, SchemaTargetType, DriverException, ProtocolVersion, - UnresolvableContactPoints, DependencyException) + UnresolvableContactPoints, DependencyException, + WriteType, consistency_value_to_name) from cassandra.auth import _proxy_execute_key, PlainTextAuthProvider from cassandra.client_routes import ClientRoutesChangeType, ClientRoutesConfig, _ClientRoutesHandler from cassandra.connection import (ClientRoutesEndPointFactory, ConnectionException, ConnectionShutdown, @@ -191,6 +192,19 @@ def _connection_reduce_fn(val,import_fn): log = logging.getLogger(__name__) +_RETRY_DECISION_NAMES = { + RetryPolicy.RETRY: "RETRY", + RetryPolicy.RETHROW: "RETHROW", + RetryPolicy.IGNORE: "IGNORE", + RetryPolicy.RETRY_NEXT_HOST: "RETRY_NEXT_HOST", +} + + +def _retry_decision_name(retry): + """Return a human-readable name for a retry policy decision tuple.""" + retry_type = retry[0] if isinstance(retry, tuple) else retry + return _RETRY_DECISION_NAMES.get(retry_type, "UNKNOWN(%s)" % retry_type) + _GRAPH_PAGING_MIN_DSE_VERSION = Version('6.8.0') @@ -1683,7 +1697,8 @@ def add_execution_profile(self, name, profile, pool_wait_timeout=5): futures.update(session.update_created_pools()) _, not_done = wait_futures(futures, pool_wait_timeout) if not_done: - raise OperationTimedOut("Failed to create all new connection pools in the %ss timeout.") + raise OperationTimedOut("Failed to create all new connection pools in the %ss timeout." % pool_wait_timeout, + timeout=pool_wait_timeout) def connection_factory(self, endpoint, host_conn = None, *args, **kwargs): """ @@ -4505,6 +4520,8 @@ def _on_timeout(self, _attempts=0): ) return + conn_in_flight = None + conn_orphaned = None if self._connection is not None: try: self._connection._requests.pop(self._req_id) @@ -4515,9 +4532,20 @@ def _on_timeout(self, _attempts=0): except KeyError: key = "Connection defunct by heartbeat" errors = {key: "Client request timeout. See Session.execute[_async](timeout)"} - self._set_final_exception(OperationTimedOut(errors, self._current_host)) + log.debug("Client request timeout (host=%s, timeout=%ss, in_flight=%d, orphaned=%d): " + "connection defunct by heartbeat", + self._current_host, self.timeout, + self._connection.in_flight, + len(self._connection.orphaned_request_ids)) + self._set_final_exception(OperationTimedOut(errors, self._current_host, + timeout=self.timeout, + in_flight=self._connection.in_flight)) return + # Capture connection stats before pool.return_connection() can alter state + conn_in_flight = self._connection.in_flight + conn_orphaned = len(self._connection.orphaned_request_ids) + pool = self.session._pools.get(self._current_host) if pool and not pool.is_shutdown: # Do not return the stream ID to the pool yet. We cannot reuse it @@ -4542,7 +4570,11 @@ def _on_timeout(self, _attempts=0): host = str(connection.endpoint) if connection else 'unknown' errors = {host: "Request timed out while waiting for schema agreement. See Session.execute[_async](timeout) and Cluster.max_schema_agreement_wait."} - self._set_final_exception(OperationTimedOut(errors, self._current_host)) + log.debug("Client request timeout (host=%s, timeout=%ss, in_flight=%s, orphaned=%s)", + self._current_host, self.timeout, conn_in_flight, conn_orphaned) + self._set_final_exception(OperationTimedOut(errors, self._current_host, + timeout=self.timeout, + in_flight=conn_in_flight)) def _on_speculative_execute(self): self._timer = None @@ -4791,11 +4823,31 @@ def _set_result(self, host, connection, pool, response): self._metrics.on_read_timeout() retry = retry_policy.on_read_timeout( self.query, retry_num=self._query_retries, **response.info) + log.debug("Server read timeout (host=%s, consistency=%s, " + "received=%s, required=%s, data_retrieved=%s, " + "retry_num=%d, retry_decision=%s)", + host, + consistency_value_to_name(response.info.get('consistency')), + response.info.get('received_responses'), + response.info.get('required_responses'), + response.info.get('data_retrieved'), + self._query_retries, + _retry_decision_name(retry)) elif isinstance(response, WriteTimeoutErrorMessage): if self._metrics is not None: self._metrics.on_write_timeout() retry = retry_policy.on_write_timeout( self.query, retry_num=self._query_retries, **response.info) + log.debug("Server write timeout (host=%s, consistency=%s, " + "received=%s, required=%s, write_type=%s, " + "retry_num=%d, retry_decision=%s)", + host, + consistency_value_to_name(response.info.get('consistency')), + response.info.get('received_responses'), + response.info.get('required_responses'), + WriteType.value_to_name.get(response.info.get('write_type'), 'UNKNOWN'), + self._query_retries, + _retry_decision_name(retry)) elif isinstance(response, UnavailableErrorMessage): if self._metrics is not None: self._metrics.on_unavailable() diff --git a/cassandra/connection.py b/cassandra/connection.py index c045b36cb3..08501d0a2b 100644 --- a/cassandra/connection.py +++ b/cassandra/connection.py @@ -984,7 +984,8 @@ def factory(cls, endpoint, timeout, host_conn = None, *args, **kwargs): raise conn.last_error elif not conn.connected_event.is_set(): conn.close() - raise OperationTimedOut("Timed out creating connection (%s seconds)" % timeout) + raise OperationTimedOut("Timed out creating connection (%s seconds)" % timeout, + timeout=timeout) else: return conn @@ -1247,6 +1248,7 @@ def wait_for_responses(self, *msgs, **kwargs): msg += ": %s" % (self.last_error,) raise ConnectionShutdown(msg) timeout = kwargs.get('timeout') + original_timeout = timeout # preserve for exception reporting fail_on_error = kwargs.get('fail_on_error', True) waiter = ResponseWaiter(self, len(msgs), fail_on_error) @@ -1271,7 +1273,8 @@ def wait_for_responses(self, *msgs, **kwargs): if timeout is not None: timeout -= 0.01 if timeout <= 0.0: - raise OperationTimedOut() + raise OperationTimedOut(timeout=original_timeout, + in_flight=self.in_flight) time.sleep(0.01) try: @@ -1796,7 +1799,8 @@ def deliver(self, timeout=None): if self.error: raise self.error elif not self.event.is_set(): - raise OperationTimedOut() + raise OperationTimedOut(timeout=timeout, + in_flight=self.connection.in_flight) else: return self.responses @@ -1823,7 +1827,10 @@ def wait(self, timeout): if self._exception: raise self._exception else: - raise OperationTimedOut("Connection heartbeat timeout after %s seconds" % (timeout,), self.connection.endpoint) + raise OperationTimedOut("Connection heartbeat timeout after %s seconds" % (timeout,), + self.connection.endpoint, + timeout=timeout, + in_flight=self.connection.in_flight) def _options_callback(self, response): if isinstance(response, SupportedMessage): diff --git a/tests/unit/test_cluster.py b/tests/unit/test_cluster.py index 872d133b28..a4f0ebc4d3 100644 --- a/tests/unit/test_cluster.py +++ b/tests/unit/test_cluster.py @@ -87,6 +87,64 @@ def test_exception_types(self): assert issubclass(UnsupportedOperation, DriverException) +class OperationTimedOutTest(unittest.TestCase): + + def test_message_without_timeout(self): + """Default message format when no timeout info is provided.""" + exc = OperationTimedOut(errors={'host1': 'some error'}, last_host='host1') + msg = str(exc) + assert "errors={'host1': 'some error'}" in msg + assert "last_host=host1" in msg + assert "timeout=" not in msg + assert "in_flight=" not in msg + + def test_message_with_timeout_and_in_flight(self): + """Message includes timeout and in_flight when both are provided.""" + exc = OperationTimedOut(errors={'host1': 'err'}, last_host='host1', + timeout=10.0, in_flight=42) + msg = str(exc) + assert "(timeout=10.0s, in_flight=42)" in msg + + def test_message_with_timeout_no_in_flight(self): + """Message includes timeout but not in_flight when only timeout is set.""" + exc = OperationTimedOut(timeout=5.0) + msg = str(exc) + assert "(timeout=5.0s)" in msg + assert "in_flight=" not in msg + + def test_message_no_args(self): + """No-argument form should not crash and should have clean message.""" + exc = OperationTimedOut() + msg = str(exc) + assert "errors=None, last_host=None" in msg + assert "timeout=" not in msg + + def test_attributes_accessible(self): + """New and existing attributes should be readable.""" + exc = OperationTimedOut(errors={'h': 'e'}, last_host='h', + timeout=10.0, in_flight=42) + assert exc.errors == {'h': 'e'} + assert exc.last_host == 'h' + assert exc.timeout == 10.0 + assert exc.in_flight == 42 + + def test_attributes_default_none(self): + """New attributes should default to None when not provided.""" + exc = OperationTimedOut() + assert exc.timeout is None + assert exc.in_flight is None + assert exc.errors is None + assert exc.last_host is None + + def test_backward_compat_positional(self): + """Existing two-positional-arg form should still work.""" + exc = OperationTimedOut({'h': 'err'}, 'host1') + assert exc.errors == {'h': 'err'} + assert exc.last_host == 'host1' + assert exc.timeout is None + assert exc.in_flight is None + + class ClusterTest(unittest.TestCase): def test_tuple_for_contact_points(self): diff --git a/tests/unit/test_connection.py b/tests/unit/test_connection.py index a67b7e4678..2fa7c71196 100644 --- a/tests/unit/test_connection.py +++ b/tests/unit/test_connection.py @@ -520,6 +520,8 @@ def send_msg(msg, req_id, msg_callback): assert isinstance(exc, OperationTimedOut) assert exc.errors == 'Connection heartbeat timeout after 0.05 seconds' assert exc.last_host == DefaultEndPoint('localhost') + assert exc.timeout == 0.05 + assert isinstance(exc.in_flight, int) holder.return_connection.assert_has_calls( [call(connection)] * get_holders.call_count) diff --git a/tests/unit/test_response_future.py b/tests/unit/test_response_future.py index 7168ad2940..dd7fa75045 100644 --- a/tests/unit/test_response_future.py +++ b/tests/unit/test_response_future.py @@ -142,6 +142,8 @@ def test_heartbeat_defunct_deadlock(self): connection = MagicMock(spec=Connection) connection._requests = {} + connection.in_flight = 5 + connection.orphaned_request_ids = set() pool = Mock() pool.is_shutdown = False @@ -162,8 +164,10 @@ def test_heartbeat_defunct_deadlock(self): # Simulate ResponseFuture timing out rf._on_timeout() - with pytest.raises(OperationTimedOut, match="Connection defunct by heartbeat"): + with pytest.raises(OperationTimedOut, match="Connection defunct by heartbeat") as exc_info: rf.result() + assert exc_info.value.timeout == 1 + assert exc_info.value.in_flight == 5 def test_read_timeout_error_message(self): session = self.make_session() @@ -653,7 +657,7 @@ def test_timeout_does_not_release_stream_id(self): pool = self.make_pool() session._pools.get.return_value = pool connection = Mock(spec=Connection, lock=RLock(), _requests={}, request_ids=deque(), - orphaned_request_ids=set(), orphaned_threshold=256) + orphaned_request_ids=set(), orphaned_threshold=256, in_flight=3) pool.borrow_connection.return_value = (connection, 1) rf = self.make_response_future(session) @@ -663,8 +667,10 @@ def test_timeout_does_not_release_stream_id(self): rf._on_timeout() pool.return_connection.assert_called_once_with(connection, stream_was_orphaned=True) - with pytest.raises(OperationTimedOut, match="Client request timeout"): + with pytest.raises(OperationTimedOut, match="Client request timeout") as exc_info: rf.result() + assert exc_info.value.timeout == 1 + assert exc_info.value.in_flight == 3 assert len(connection.request_ids) == 0, \ "Request IDs should be empty but it's not: {}".format(connection.request_ids)