From 192bb46002b4feda217220380a5f0cdd3425e05b Mon Sep 17 00:00:00 2001 From: omniCoder77 Date: Sun, 5 Apr 2026 15:32:53 +0530 Subject: [PATCH] feat(connection): add support for GRACEFUL_DISCONNECT events Implement GRACEFUL_DISCONNECT event parsing in protocol.py. Add connection draining logic to connection.py to prevent new requests while allowing in-flight ones to finish. Register GRACEFUL_DISCONNECT watchers in ControlConnection. Update HostConnection and HostConnectionPool to clean up draining connections once empty. Add a test script to verify advertisement and handling of the new event. --- cassandra/cluster.py | 12 +++++++++++- cassandra/connection.py | 17 +++++++++++++++++ cassandra/pool.py | 15 +++++++++++++++ cassandra/protocol.py | 10 +++++++++- 4 files changed, 52 insertions(+), 2 deletions(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 6b2ab4b288..2a7cbc1f58 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -3718,7 +3718,8 @@ def _try_connect(self, host): connection.register_watchers({ "TOPOLOGY_CHANGE": partial(_watch_callback, self_weakref, '_handle_topology_change'), "STATUS_CHANGE": partial(_watch_callback, self_weakref, '_handle_status_change'), - "SCHEMA_CHANGE": partial(_watch_callback, self_weakref, '_handle_schema_change') + "SCHEMA_CHANGE": partial(_watch_callback, self_weakref, '_handle_schema_change'), + "GRACEFUL_DISCONNECT": partial(_watch_callback, self_weakref, '_handle_graceful_disconnect') }, register_timeout=self._timeout) sel_peers = self._get_peers_query(self.PeersQueryType.PEERS, connection) @@ -3748,6 +3749,15 @@ def _try_connect(self, host): return connection + def _handle_graceful_disconnect(self, event): + if self._connection is None: + return + host = self._cluster.metadata.get_host(self._connection.endpoint) + print("host found:", host) + if host: + print(self) + self._cluster.on_down(host, is_host_addition=False) + def reconnect(self): if self._is_shutdown: return diff --git a/cassandra/connection.py b/cassandra/connection.py index 246cec79f9..8f790b5238 100644 --- a/cassandra/connection.py +++ b/cassandra/connection.py @@ -674,6 +674,9 @@ class Connection(object): CALLBACK_ERR_THREAD_THRESHOLD = 100 + supports_graceful_disconnect = False + is_draining = False + in_buffer_size = 4096 out_buffer_size = 4096 @@ -1061,6 +1064,8 @@ def get_request_id(self): return self.highest_request_id def handle_pushed(self, response): + if response.event_type == 'GRACEFUL_DISCONNECT': + self._handle_graceful_disconnect() log.debug("Message pushed from server: %r", response) for cb in self._push_watchers.get(response.event_type, []): try: @@ -1068,9 +1073,20 @@ def handle_pushed(self, response): except Exception: log.exception("Pushed event handler errored, ignoring:") + def _handle_graceful_disconnect(self): + log.info("Received GRACEFUL_DISCONNECT from %s. Draining connection...", self.endpoint) + self.is_draining = True + self._socket_writable = False + + with self.lock: + if self.in_flight == 0: + self.close() + def send_msg(self, msg, request_id, cb, encoder=ProtocolHandler.encode_message, decoder=ProtocolHandler.decode_message, result_metadata=None): if self.is_defunct: raise ConnectionShutdown("Connection to %s is defunct" % self.endpoint) + if self.is_draining: + raise ConnectionShutdown("Connection to %s is draining" % self.endpoint) elif self.is_closed: raise ConnectionShutdown("Connection to %s is closed" % self.endpoint) elif not self._socket_writable: @@ -1397,6 +1413,7 @@ def _handle_options_response(self, options_response): locally_supported_compressions[compression_type] self._send_startup_message(compression_type, no_compact=self.no_compact) + self.supports_graceful_disconnect = options_response.options.get('GRACEFUL_DISCONNECT') == ['true'] @defunct_on_error def _send_startup_message(self, compression=None, no_compact=False): diff --git a/cassandra/pool.py b/cassandra/pool.py index 37fdaee96b..912f042845 100644 --- a/cassandra/pool.py +++ b/cassandra/pool.py @@ -459,6 +459,13 @@ def return_connection(self, connection, stream_was_orphaned=False): with self._stream_available_condition: self._stream_available_condition.notify() + if connection.is_draining: + with connection.lock: + if connection.in_flight == 0: + log.debug("Graceful drain complete for %s. Closing connection.", self.host) + connection.close() + return + if connection.is_defunct or connection.is_closed: if connection.signaled_error and not self.shutdown_on_error: return @@ -777,6 +784,14 @@ def return_connection(self, connection, stream_was_orphaned=False): connection.in_flight -= 1 in_flight = connection.in_flight + if connection.is_draining and connection.in_flight == 0: + with self._lock: + if connection in self._connections: + self._connections.remove(connection) + self.open_count -= 1 + connection.close() + return + if connection.is_defunct or connection.is_closed: if not connection.signaled_error: log.debug("Defunct or closed connection (%s) returned to pool, potentially " diff --git a/cassandra/protocol.py b/cassandra/protocol.py index 69340a805d..8eb3a8af64 100644 --- a/cassandra/protocol.py +++ b/cassandra/protocol.py @@ -986,7 +986,8 @@ def send_body(self, f, protocol_version): known_event_types = frozenset(( 'TOPOLOGY_CHANGE', 'STATUS_CHANGE', - 'SCHEMA_CHANGE' + 'SCHEMA_CHANGE', + 'GRACEFUL_DISCONNECT' )) @@ -1056,6 +1057,13 @@ def recv_schema_change(cls, f, protocol_version): event = {'target_type': SchemaTargetType.KEYSPACE, 'change_type': change_type, 'keyspace': keyspace} return event + @classmethod + def recv_graceful_disconnect(cls, f, protocol_version): + """ + Graceful disconnect events contain no extra arguments. + """ + return {} + class ReviseRequestMessage(_MessageType):