Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion cassandra/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -3718,7 +3718,8 @@ def _try_connect(self, host):
connection.register_watchers({
"TOPOLOGY_CHANGE": partial(_watch_callback, self_weakref, '_handle_topology_change'),
"STATUS_CHANGE": partial(_watch_callback, self_weakref, '_handle_status_change'),
"SCHEMA_CHANGE": partial(_watch_callback, self_weakref, '_handle_schema_change')
"SCHEMA_CHANGE": partial(_watch_callback, self_weakref, '_handle_schema_change'),
"GRACEFUL_DISCONNECT": partial(_watch_callback, self_weakref, '_handle_graceful_disconnect')
}, register_timeout=self._timeout)

sel_peers = self._get_peers_query(self.PeersQueryType.PEERS, connection)
Expand Down Expand Up @@ -3748,6 +3749,15 @@ def _try_connect(self, host):

return connection

def _handle_graceful_disconnect(self, event):
if self._connection is None:
return
host = self._cluster.metadata.get_host(self._connection.endpoint)
print("host found:", host)
if host:
print(self)
self._cluster.on_down(host, is_host_addition=False)

def reconnect(self):
if self._is_shutdown:
return
Expand Down
17 changes: 17 additions & 0 deletions cassandra/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,9 @@ class Connection(object):

CALLBACK_ERR_THREAD_THRESHOLD = 100

supports_graceful_disconnect = False
is_draining = False

in_buffer_size = 4096
out_buffer_size = 4096

Expand Down Expand Up @@ -1061,16 +1064,29 @@ def get_request_id(self):
return self.highest_request_id

def handle_pushed(self, response):
if response.event_type == 'GRACEFUL_DISCONNECT':
self._handle_graceful_disconnect()
log.debug("Message pushed from server: %r", response)
for cb in self._push_watchers.get(response.event_type, []):
try:
cb(response.event_args)
except Exception:
log.exception("Pushed event handler errored, ignoring:")

def _handle_graceful_disconnect(self):
log.info("Received GRACEFUL_DISCONNECT from %s. Draining connection...", self.endpoint)
self.is_draining = True
self._socket_writable = False

with self.lock:
if self.in_flight == 0:
self.close()

def send_msg(self, msg, request_id, cb, encoder=ProtocolHandler.encode_message, decoder=ProtocolHandler.decode_message, result_metadata=None):
if self.is_defunct:
raise ConnectionShutdown("Connection to %s is defunct" % self.endpoint)
if self.is_draining:
raise ConnectionShutdown("Connection to %s is draining" % self.endpoint)
elif self.is_closed:
raise ConnectionShutdown("Connection to %s is closed" % self.endpoint)
elif not self._socket_writable:
Expand Down Expand Up @@ -1397,6 +1413,7 @@ def _handle_options_response(self, options_response):
locally_supported_compressions[compression_type]

self._send_startup_message(compression_type, no_compact=self.no_compact)
self.supports_graceful_disconnect = options_response.options.get('GRACEFUL_DISCONNECT') == ['true']

@defunct_on_error
def _send_startup_message(self, compression=None, no_compact=False):
Expand Down
15 changes: 15 additions & 0 deletions cassandra/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,13 @@ def return_connection(self, connection, stream_was_orphaned=False):
with self._stream_available_condition:
self._stream_available_condition.notify()

if connection.is_draining:
with connection.lock:
if connection.in_flight == 0:
log.debug("Graceful drain complete for %s. Closing connection.", self.host)
connection.close()
return

if connection.is_defunct or connection.is_closed:
if connection.signaled_error and not self.shutdown_on_error:
return
Expand Down Expand Up @@ -777,6 +784,14 @@ def return_connection(self, connection, stream_was_orphaned=False):
connection.in_flight -= 1
in_flight = connection.in_flight

if connection.is_draining and connection.in_flight == 0:
with self._lock:
if connection in self._connections:
self._connections.remove(connection)
self.open_count -= 1
connection.close()
return

if connection.is_defunct or connection.is_closed:
if not connection.signaled_error:
log.debug("Defunct or closed connection (%s) returned to pool, potentially "
Expand Down
10 changes: 9 additions & 1 deletion cassandra/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -986,7 +986,8 @@ def send_body(self, f, protocol_version):
known_event_types = frozenset((
'TOPOLOGY_CHANGE',
'STATUS_CHANGE',
'SCHEMA_CHANGE'
'SCHEMA_CHANGE',
'GRACEFUL_DISCONNECT'
))


Expand Down Expand Up @@ -1056,6 +1057,13 @@ def recv_schema_change(cls, f, protocol_version):
event = {'target_type': SchemaTargetType.KEYSPACE, 'change_type': change_type, 'keyspace': keyspace}
return event

@classmethod
def recv_graceful_disconnect(cls, f, protocol_version):
"""
Graceful disconnect events contain no extra arguments.
"""
return {}


class ReviseRequestMessage(_MessageType):

Expand Down