Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 73 additions & 19 deletions cassandra/io/asyncioreactor.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import errno
import threading

from cassandra.connection import Connection, ConnectionShutdown
Expand Down Expand Up @@ -142,32 +143,58 @@ def close(self):

# close from the loop thread to avoid races when removing file
# descriptors
asyncio.run_coroutine_threadsafe(
future = asyncio.run_coroutine_threadsafe(
self._close(), loop=self._loop
)

# When called from outside the event loop thread, wait for _close()
# to complete so the socket is actually closed when close() returns.
# This prevents a race window where is_closed=True but the socket fd
# is still open, which causes EBADF in pending I/O operations.
if threading.current_thread() != self._loop_thread:
try:
future.result(timeout=5)
except Exception:
# Best-effort close: suppress errors during shutdown
log.debug("Error waiting for async close of %s",
self.endpoint, exc_info=True)

async def _close(self):
log.debug("Closing connection (%s) to %s" % (id(self), self.endpoint))
if self._write_watcher:
self._write_watcher.cancel()
if self._read_watcher:
self._read_watcher.cancel()
if self._socket:
self._loop.remove_writer(self._socket.fileno())
self._loop.remove_reader(self._socket.fileno())
self._socket.close()

log.debug("Closed socket to %s" % (self.endpoint,))

if not self.is_defunct:
msg = "Connection to %s was closed" % self.endpoint
if self.last_error:
msg += ": %s" % (self.last_error,)
self.error_all_requests(ConnectionShutdown(msg))
# don't leave in-progress operations hanging
self.connected_event.set()
try:
if self._write_watcher:
self._write_watcher.cancel()
if self._read_watcher:
self._read_watcher.cancel()
if self._socket:
# remove_reader/remove_writer are not supported on Windows
# ProactorEventLoop — ignore failures so the socket still
# gets closed.
try:
self._loop.remove_writer(self._socket.fileno())
except (NotImplementedError, OSError):
pass # not supported on Windows ProactorEventLoop
try:
self._loop.remove_reader(self._socket.fileno())
except (NotImplementedError, OSError):
pass # not supported on Windows ProactorEventLoop
self._socket.close()

log.debug("Closed socket to %s" % (self.endpoint,))
finally:
if not self.is_defunct:
msg = "Connection to %s was closed" % self.endpoint
if self.last_error:
msg += ": %s" % (self.last_error,)
self.error_all_requests(ConnectionShutdown(msg))
# don't leave in-progress operations hanging
self.connected_event.set()

def push(self, data):
if self.is_closed or self.is_defunct:
raise ConnectionShutdown(
"Connection to %s is already closed" % (self.endpoint,))

buff_size = self.out_buffer_size
if len(data) > buff_size:
chunks = []
Expand Down Expand Up @@ -202,6 +229,28 @@ async def handle_write(self):
if next_msg:
await self._loop.sock_sendall(self._socket, next_msg)
except socket.error as err:
# Peer disconnected — do a clean close instead of defunct.
# Use ConnectionError (parent of BrokenPipeError,
# ConnectionResetError, ConnectionAbortedError) plus a
# winerror check for Windows IOCP which may raise plain
# OSError with winerror=10054 (WSAECONNRESET) or
# 10053 (WSAECONNABORTED).
# Also check errno for ENOTCONN/ESHUTDOWN/ECONNRESET
# which macOS raises as plain OSError (not ConnectionError).
_peer_errnos = (
errno.ENOTCONN, errno.ESHUTDOWN,
errno.ECONNRESET, errno.ECONNABORTED,
)
if (isinstance(err, ConnectionError)
or getattr(err, 'winerror', None) in (10053, 10054)
or getattr(err, 'errno', None) in _peer_errnos):
log.debug("Connection %s closed by peer during write: %s",
self, err)
self.close()
return
# Connection is already shutting down, just exit
if self.is_closed or self.is_defunct:
return
log.debug("Exception in send for %s: %s", self, err)
self.defunct(err)
return
Expand All @@ -223,6 +272,9 @@ async def handle_read(self):
await asyncio.sleep(0)
continue
except socket.error as err:
# Connection is already shutting down, just exit
if self.is_closed or self.is_defunct:
return
log.debug("Exception during socket recv for %s: %s",
self, err)
self.defunct(err)
Expand All @@ -234,5 +286,7 @@ async def handle_read(self):
self.process_io_buffer()
else:
log.debug("Connection %s closed by server", self)
self.last_error = ConnectionShutdown(
"Connection to %s was closed by server" % self.endpoint)
self.close()
return
Loading
Loading