From f42e225b411fe6c4d8e766c230785f6eeef52257 Mon Sep 17 00:00:00 2001 From: Mikita Hradovich Date: Thu, 26 Mar 2026 18:38:08 +0100 Subject: [PATCH 1/2] DRIVER-153: negotiate and implement SCYLLA_USE_METADATA_ID extension MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Scylla's SCYLLA_USE_METADATA_ID protocol extension (backport of CQL v5 prepared-statement metadata IDs to earlier protocol versions) allows the driver to skip sending full result metadata on every EXECUTE request. The server notifies the driver via the METADATA_CHANGED flag whenever the result schema changes, at which point the driver updates its cached metadata before deserialising the response. Changes: - protocol_features.py: parse SCYLLA_USE_METADATA_ID from SUPPORTED and include it in the STARTUP frame when negotiated - protocol.py: * fix _write_query_params to actually write _SKIP_METADATA_FLAG on the wire (it was stored on the message but never sent — dead code before) * recv_results_prepared: read result_metadata_id for Scylla extension (pre-v5) in addition to standard protocol v5+ * ExecuteMessage.send_body: send result_metadata_id for Scylla extension (pre-v5) when it is set - cluster.py: * ExecuteMessage is built with safe defaults (skip_meta=False, result_metadata_id=None); both are set in _query() after borrowing the connection, gated on connection.features.use_metadata_id and on the prepared statement actually having a result_metadata_id (so a statement prepared before the extension was available, or on a node that doesn't support it, never gets skip_meta=True with no id) * _set_result: update prepared_statement.result_metadata and result_metadata_id when the server signals METADATA_CHANGED in an EXECUTE response, keeping the driver's cached metadata in sync; uses getattr to safely handle FastResultMessage (Cython decoder) --- cassandra/cluster.py | 18 +++++- cassandra/protocol.py | 7 ++- cassandra/protocol_features.py | 16 ++++- tests/unit/test_protocol.py | 87 +++++++++++++++++++++++++++- tests/unit/test_protocol_features.py | 35 +++++++++++ 5 files changed, 156 insertions(+), 7 deletions(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 8da9df6a55..75aeaa9435 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -2998,9 +2998,7 @@ def _create_response_future(self, query, parameters, trace, custom_payload, message = ExecuteMessage( prepared_statement.query_id, query.values, cl, serial_cl, fetch_size, paging_state, timestamp, - skip_meta=bool(prepared_statement.result_metadata), - continuous_paging_options=continuous_paging_options, - result_metadata_id=prepared_statement.result_metadata_id) + continuous_paging_options=continuous_paging_options) elif isinstance(query, BatchStatement): if self._protocol_version < 2: raise UnsupportedOperation( @@ -4618,6 +4616,15 @@ def _query(self, host, message=None, cb=None): self._connection = connection result_meta = self.prepared_statement.result_metadata if self.prepared_statement else [] + if self.prepared_statement and isinstance(message, ExecuteMessage): + has_result_metadata_id = self.prepared_statement.result_metadata_id is not None + use_metadata_id = has_result_metadata_id and ( + ProtocolVersion.uses_prepared_metadata(connection.protocol_version) + or connection.features.use_metadata_id + ) + message.skip_meta = use_metadata_id + message.result_metadata_id = self.prepared_statement.result_metadata_id if use_metadata_id else None + if cb is None: cb = partial(self._set_result, host, connection, pool) @@ -4774,6 +4781,11 @@ def _set_result(self, host, connection, pool, response): self._paging_state = response.paging_state self._col_names = response.column_names self._col_types = response.column_types + new_result_metadata_id = getattr(response, 'result_metadata_id', None) + if self.prepared_statement and new_result_metadata_id is not None: + if response.column_metadata: + self.prepared_statement.result_metadata = response.column_metadata + self.prepared_statement.result_metadata_id = new_result_metadata_id if getattr(self.message, 'continuous_paging_options', None): self._handle_continuous_paging_first_response(connection, response) else: diff --git a/cassandra/protocol.py b/cassandra/protocol.py index 4628c7ee0e..a0d08b82e3 100644 --- a/cassandra/protocol.py +++ b/cassandra/protocol.py @@ -573,6 +573,9 @@ def _write_query_params(self, f, protocol_version): if self.timestamp is not None: flags |= _PROTOCOL_TIMESTAMP_FLAG + if self.skip_meta: + flags |= _SKIP_METADATA_FLAG + if self.keyspace is not None: if ProtocolVersion.uses_keyspace_flag(protocol_version): flags |= _WITH_KEYSPACE_FLAG @@ -642,6 +645,8 @@ def send_body(self, f, protocol_version): write_string(f, self.query_id) if ProtocolVersion.uses_prepared_metadata(protocol_version): write_string(f, self.result_metadata_id) + elif self.result_metadata_id is not None: + write_string(f, self.result_metadata_id) self._write_query_params(f, protocol_version) @@ -745,7 +750,7 @@ def decode_row(row): def recv_results_prepared(self, f, protocol_version, protocol_features, user_type_map): self.query_id = read_binary_string(f) - if ProtocolVersion.uses_prepared_metadata(protocol_version): + if ProtocolVersion.uses_prepared_metadata(protocol_version) or protocol_features.use_metadata_id: self.result_metadata_id = read_binary_string(f) else: self.result_metadata_id = None diff --git a/cassandra/protocol_features.py b/cassandra/protocol_features.py index 877998be7d..5193d72a44 100644 --- a/cassandra/protocol_features.py +++ b/cassandra/protocol_features.py @@ -10,6 +10,7 @@ LWT_OPTIMIZATION_META_BIT_MASK = "LWT_OPTIMIZATION_META_BIT_MASK" RATE_LIMIT_ERROR_EXTENSION = "SCYLLA_RATE_LIMIT_ERROR" TABLETS_ROUTING_V1 = "TABLETS_ROUTING_V1" +USE_METADATA_ID = "SCYLLA_USE_METADATA_ID" class ProtocolFeatures(object): rate_limit_error = None @@ -17,13 +18,16 @@ class ProtocolFeatures(object): sharding_info = None tablets_routing_v1 = False lwt_info = None + use_metadata_id = False - def __init__(self, rate_limit_error=None, shard_id=0, sharding_info=None, tablets_routing_v1=False, lwt_info=None): + def __init__(self, rate_limit_error=None, shard_id=0, sharding_info=None, tablets_routing_v1=False, lwt_info=None, + use_metadata_id=False): self.rate_limit_error = rate_limit_error self.shard_id = shard_id self.sharding_info = sharding_info self.tablets_routing_v1 = tablets_routing_v1 self.lwt_info = lwt_info + self.use_metadata_id = use_metadata_id @staticmethod def parse_from_supported(supported): @@ -31,7 +35,9 @@ def parse_from_supported(supported): shard_id, sharding_info = ProtocolFeatures.parse_sharding_info(supported) tablets_routing_v1 = ProtocolFeatures.parse_tablets_info(supported) lwt_info = ProtocolFeatures.parse_lwt_info(supported) - return ProtocolFeatures(rate_limit_error, shard_id, sharding_info, tablets_routing_v1, lwt_info) + use_metadata_id = ProtocolFeatures.parse_use_metadata_id(supported) + return ProtocolFeatures(rate_limit_error, shard_id, sharding_info, tablets_routing_v1, lwt_info, + use_metadata_id) @staticmethod def maybe_parse_rate_limit_error(supported): @@ -57,6 +63,8 @@ def add_startup_options(self, options): options[TABLETS_ROUTING_V1] = "" if self.lwt_info is not None: options[LWT_ADD_METADATA_MARK] = str(self.lwt_info.lwt_meta_bit_mask) + if self.use_metadata_id: + options[USE_METADATA_ID] = "" @staticmethod def parse_sharding_info(options): @@ -81,6 +89,10 @@ def parse_sharding_info(options): def parse_tablets_info(options): return TABLETS_ROUTING_V1 in options + @staticmethod + def parse_use_metadata_id(options): + return USE_METADATA_ID in options + @staticmethod def parse_lwt_info(options): value_list = options.get(LWT_ADD_METADATA_MARK, [None]) diff --git a/tests/unit/test_protocol.py b/tests/unit/test_protocol.py index 9704811239..17b474604b 100644 --- a/tests/unit/test_protocol.py +++ b/tests/unit/test_protocol.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import io +import struct import unittest from unittest.mock import Mock @@ -21,8 +23,10 @@ PrepareMessage, QueryMessage, ExecuteMessage, UnsupportedOperation, _PAGING_OPTIONS_FLAG, _WITH_SERIAL_CONSISTENCY_FLAG, _PAGE_SIZE_FLAG, _WITH_PAGING_STATE_FLAG, - BatchMessage + _SKIP_METADATA_FLAG, + BatchMessage, ResultMessage ) +from cassandra.protocol_features import ProtocolFeatures from cassandra.query import BatchType from cassandra.marshal import uint32_unpack from cassandra.cluster import ContinuousPagingOptions @@ -68,6 +72,87 @@ def test_execute_message(self): (b'\x00\x04',), (b'\x00\x00\x00\x01',), (b'\x00\x00',)]) + def test_execute_message_skip_meta_flag(self): + """skip_meta=True must set _SKIP_METADATA_FLAG (0x02) in the flags byte.""" + message = ExecuteMessage('1', [], 4, skip_meta=True) + mock_io = Mock() + + message.send_body(mock_io, 4) + # flags byte should be VALUES_FLAG | SKIP_METADATA_FLAG = 0x01 | 0x02 = 0x03 + self._check_calls(mock_io, [(b'\x00\x01',), (b'1',), (b'\x00\x04',), (b'\x03',), (b'\x00\x00',)]) + + def test_execute_message_scylla_metadata_id_v4(self): + """result_metadata_id should be written on protocol v4 when set (Scylla extension).""" + message = ExecuteMessage('1', [], 4) + message.result_metadata_id = b'foo' + mock_io = Mock() + + message.send_body(mock_io, 4) + # metadata_id written before query params (same position as v5) + self._check_calls(mock_io, [(b'\x00\x01',), (b'1',), + (b'\x00\x03',), (b'foo',), + (b'\x00\x04',), (b'\x01',), (b'\x00\x00',)]) + + def test_recv_results_prepared_scylla_extension_reads_metadata_id(self): + """ + When use_metadata_id is True (Scylla extension), result_metadata_id must be + read from the PREPARE response even for protocol v4. + """ + # Build a minimal valid PREPARE response binary (no bind/result columns): + # query_id: short(2) + b'ab' + # result_metadata_id: short(3) + b'xyz' <-- only present when extension active + # prepared flags: int(1) = global_tables_spec + # colcount: int(0) + # num_pk_indexes: int(0) + # ksname: short(2) + b'ks' + # cfname: short(2) + b'tb' + # result flags: int(4) = no_metadata + # result colcount: int(0) + buf = io.BytesIO( + struct.pack('>H', 2) + b'ab' # query_id + + struct.pack('>H', 3) + b'xyz' # result_metadata_id + + struct.pack('>i', 1) # prepared flags: global_tables_spec + + struct.pack('>i', 0) # colcount = 0 + + struct.pack('>i', 0) # num_pk_indexes = 0 + + struct.pack('>H', 2) + b'ks' # ksname + + struct.pack('>H', 2) + b'tb' # cfname + + struct.pack('>i', 4) # result flags: no_metadata + + struct.pack('>i', 0) # result colcount = 0 + ) + + features_with_extension = ProtocolFeatures(use_metadata_id=True) + msg = ResultMessage(kind=4) # RESULT_KIND_PREPARED = 4 + msg.recv_results_prepared(buf, protocol_version=4, + protocol_features=features_with_extension, + user_type_map={}) + assert msg.query_id == b'ab' + assert msg.result_metadata_id == b'xyz' + + def test_recv_results_prepared_no_extension_skips_metadata_id(self): + """ + Without use_metadata_id, result_metadata_id must NOT be read on protocol v4. + The buffer must NOT contain a metadata_id field. + """ + buf = io.BytesIO( + struct.pack('>H', 2) + b'ab' # query_id + # no result_metadata_id + + struct.pack('>i', 1) # prepared flags: global_tables_spec + + struct.pack('>i', 0) # colcount = 0 + + struct.pack('>i', 0) # num_pk_indexes = 0 + + struct.pack('>H', 2) + b'ks' # ksname + + struct.pack('>H', 2) + b'tb' # cfname + + struct.pack('>i', 4) # result flags: no_metadata + + struct.pack('>i', 0) # result colcount = 0 + ) + + features_without_extension = ProtocolFeatures(use_metadata_id=False) + msg = ResultMessage(kind=4) + msg.recv_results_prepared(buf, protocol_version=4, + protocol_features=features_without_extension, + user_type_map={}) + assert msg.query_id == b'ab' + assert msg.result_metadata_id is None + def test_query_message(self): """ Test to check the appropriate calls are made diff --git a/tests/unit/test_protocol_features.py b/tests/unit/test_protocol_features.py index 895c384f7e..387583680b 100644 --- a/tests/unit/test_protocol_features.py +++ b/tests/unit/test_protocol_features.py @@ -22,3 +22,38 @@ class OptionsHolder(object): assert protocol_features.rate_limit_error == 123 assert protocol_features.shard_id == 0 assert protocol_features.sharding_info is None + + def test_use_metadata_id_parsing(self): + """ + Test that SCYLLA_USE_METADATA_ID is parsed from SUPPORTED options. + """ + options = {'SCYLLA_USE_METADATA_ID': ['']} + protocol_features = ProtocolFeatures.parse_from_supported(options) + assert protocol_features.use_metadata_id is True + + def test_use_metadata_id_missing(self): + """ + Test that use_metadata_id is False when SCYLLA_USE_METADATA_ID is absent. + """ + options = {'SCYLLA_RATE_LIMIT_ERROR': ['ERROR_CODE=1']} + protocol_features = ProtocolFeatures.parse_from_supported(options) + assert protocol_features.use_metadata_id is False + + def test_use_metadata_id_startup_options(self): + """ + Test that SCYLLA_USE_METADATA_ID is included in STARTUP options when negotiated. + """ + options = {'SCYLLA_USE_METADATA_ID': ['']} + protocol_features = ProtocolFeatures.parse_from_supported(options) + startup = {} + protocol_features.add_startup_options(startup) + assert 'SCYLLA_USE_METADATA_ID' in startup + + def test_use_metadata_id_not_in_startup_when_not_negotiated(self): + """ + Test that SCYLLA_USE_METADATA_ID is NOT included in STARTUP when not negotiated. + """ + protocol_features = ProtocolFeatures.parse_from_supported({}) + startup = {} + protocol_features.add_startup_options(startup) + assert 'SCYLLA_USE_METADATA_ID' not in startup From 6eea397cf99cbaaf4195238f7b5d0e4861718528 Mon Sep 17 00:00:00 2001 From: Mikita Hradovich Date: Fri, 10 Apr 2026 00:32:00 +0200 Subject: [PATCH 2/2] DRIVER-153: add tests, defensive warning and docs for SCYLLA_USE_METADATA_ID - Add unit tests for the _METADATA_ID_FLAG path in recv_results_metadata (ROWS result with METADATA_CHANGED signal) - Add unit tests for _set_result metadata cache update on METADATA_CHANGED: update both result_metadata and result_metadata_id, no-op when id absent, warning when id present but column_metadata empty - Add unit tests for _query per-connection feature gating: skip_meta and result_metadata_id are set only when the connection negotiated SCYLLA_USE_METADATA_ID (or protocol v5) and the prepared statement carries a result_metadata_id - Add defensive log.warning in _set_result when server sends a new result_metadata_id without column_metadata (protocol violation) - Add write-order comment explaining thread-safety rationale for the two assignments to prepared_statement.result_metadata / result_metadata_id - Add SCYLLA_USE_METADATA_ID section to docs/scylla-specific.rst --- cassandra/cluster.py | 13 ++ docs/scylla-specific.rst | 40 +++++ tests/unit/test_protocol.py | 41 ++++- tests/unit/test_response_future.py | 251 +++++++++++++++++++++++++++++ 4 files changed, 344 insertions(+), 1 deletion(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 75aeaa9435..42f2ab95e8 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -4784,7 +4784,20 @@ def _set_result(self, host, connection, pool, response): new_result_metadata_id = getattr(response, 'result_metadata_id', None) if self.prepared_statement and new_result_metadata_id is not None: if response.column_metadata: + # Write result_metadata before result_metadata_id intentionally: + # a concurrent reader that still sees the old metadata_id will + # ask the server for full metadata and recover safely; a reader + # that sees the new metadata_id together with the new metadata + # is immediately correct. The opposite write order could expose + # a window where a reader uses a new metadata_id with stale metadata. self.prepared_statement.result_metadata = response.column_metadata + else: + log.warning( + "Server sent a new result_metadata_id but no column metadata " + "for prepared statement %r. The cached column metadata will not " + "be updated; only result_metadata_id is refreshed.", + getattr(self.prepared_statement, 'query_id', None) + ) self.prepared_statement.result_metadata_id = new_result_metadata_id if getattr(self.message, 'continuous_paging_options', None): self._handle_continuous_paging_first_response(connection, response) diff --git a/docs/scylla-specific.rst b/docs/scylla-specific.rst index e9fe695f8f..ed2f6f032b 100644 --- a/docs/scylla-specific.rst +++ b/docs/scylla-specific.rst @@ -156,3 +156,43 @@ https://github.com/scylladb/scylladb/blob/master/docs/dev/protocol-extensions.md Details on the sending tablet information to the drivers https://github.com/scylladb/scylladb/blob/master/docs/dev/protocol-extensions.md#sending-tablet-info-to-the-drivers + + +Prepared Statement Metadata Caching (``SCYLLA_USE_METADATA_ID``) +---------------------------------------------------------------- + +When executing prepared SELECT statements, the driver normally requests the server +to skip sending full result metadata with each response (``skip_meta`` optimization), +relying on the metadata cached from the initial ``PREPARE`` call. However, if the +table schema changes after a statement is prepared (e.g., a column is added, removed, +or its type is altered), this cached metadata becomes stale — leading to decoding +errors or incorrect data. + +ScyllaDB solves this by backporting the ``metadata_id`` mechanism from CQL native +protocol v5 as a v4 extension: ``SCYLLA_USE_METADATA_ID``. When this extension is +negotiated, the server includes a hash of the result metadata in the ``PREPARE`` +response. The driver sends this hash back with every ``EXECUTE`` request. If the +schema has changed, the server sets the ``METADATA_CHANGED`` flag and returns the +new metadata hash together with the updated column definitions. The driver +automatically updates its cache and uses the new metadata to decode the current +response — all transparently, with no application code change required. + +**Behaviour summary:** + +- Automatically negotiated at connection time when the ScyllaDB node supports it. +- ``skip_meta`` is enabled (metadata omitted from EXECUTE responses) only when it + is safe: the connection must have negotiated ``SCYLLA_USE_METADATA_ID`` (or use + CQL v5), *and* the prepared statement must carry a ``result_metadata_id`` obtained + from PREPARE. +- When a schema change is detected by the server, the driver refreshes both the + cached column metadata and the metadata hash for that prepared statement so that + all subsequent executions benefit immediately. +- Statements prepared before the extension was negotiated (e.g., during a rolling + upgrade) retain ``result_metadata_id=None`` and fall back to always requesting + full metadata, which is the safest option. + +**Current scope:** schema-change detection is implemented for SELECT statements. +UPDATE/INSERT coverage is planned in a separate effort. + +For full protocol details see the ScyllaDB CQL extensions documentation: +https://opensource.docs.scylladb.com/stable/cql/cql-extensions.html diff --git a/tests/unit/test_protocol.py b/tests/unit/test_protocol.py index 17b474604b..66b72a61ef 100644 --- a/tests/unit/test_protocol.py +++ b/tests/unit/test_protocol.py @@ -24,7 +24,8 @@ _PAGING_OPTIONS_FLAG, _WITH_SERIAL_CONSISTENCY_FLAG, _PAGE_SIZE_FLAG, _WITH_PAGING_STATE_FLAG, _SKIP_METADATA_FLAG, - BatchMessage, ResultMessage + BatchMessage, ResultMessage, + RESULT_KIND_ROWS ) from cassandra.protocol_features import ProtocolFeatures from cassandra.query import BatchType @@ -153,6 +154,44 @@ def test_recv_results_prepared_no_extension_skips_metadata_id(self): assert msg.query_id == b'ab' assert msg.result_metadata_id is None + def test_recv_results_metadata_changed_flag(self): + """ + When _METADATA_ID_FLAG (0x0008) is set in a ROWS result, + recv_results_metadata must read and store the new result_metadata_id + sent by the server (METADATA_CHANGED signal), and still populate + column_metadata normally. + """ + # Wire layout for a ROWS result with METADATA_CHANGED: + # flags: int(0x0008) = _METADATA_ID_FLAG + # colcount: int(0) + # result_metadata_id: short(4) + b'new1' + # (no columns — colcount=0 — to keep the buffer minimal) + buf = io.BytesIO( + struct.pack('>i', 0x0008) # flags: METADATA_ID_FLAG + + struct.pack('>i', 0) # colcount = 0 + + struct.pack('>H', 4) + b'new1' # result_metadata_id = b'new1' + ) + msg = ResultMessage(kind=RESULT_KIND_ROWS) + msg.recv_results_metadata(buf, user_type_map={}) + assert msg.result_metadata_id == b'new1' + assert msg.column_metadata == [] + + def test_recv_results_metadata_no_metadata_flag_skips_metadata_id(self): + """ + When _NO_METADATA_FLAG (0x0004) is set, recv_results_metadata returns + early and must NOT read or set result_metadata_id, even if the caller + mistakenly sets _METADATA_ID_FLAG alongside it. + """ + # flags = _NO_METADATA_FLAG (0x0004), colcount = 0 + buf = io.BytesIO( + struct.pack('>i', 0x0004) # flags: NO_METADATA + + struct.pack('>i', 0) # colcount = 0 + ) + msg = ResultMessage(kind=RESULT_KIND_ROWS) + msg.recv_results_metadata(buf, user_type_map={}) + assert not hasattr(msg, 'result_metadata_id') or msg.result_metadata_id is None + assert not hasattr(msg, 'column_metadata') or msg.column_metadata is None + def test_query_message(self): """ Test to check the appropriate calls are made diff --git a/tests/unit/test_response_future.py b/tests/unit/test_response_future.py index 7168ad2940..7baca083f0 100644 --- a/tests/unit/test_response_future.py +++ b/tests/unit/test_response_future.py @@ -23,6 +23,7 @@ from cassandra.connection import Connection, ConnectionException from cassandra.protocol import (ReadTimeoutErrorMessage, WriteTimeoutErrorMessage, UnavailableErrorMessage, ResultMessage, QueryMessage, + ExecuteMessage, OverloadedErrorMessage, IsBootstrappingErrorMessage, PreparedQueryNotFound, PrepareMessage, ServerError, RESULT_KIND_ROWS, RESULT_KIND_SET_KEYSPACE, @@ -723,3 +724,253 @@ def test_single_host_query_plan_exhausted_after_one_retry(self): # Instead, it should set a NoHostAvailable exception assert rf._final_exception is not None assert isinstance(rf._final_exception, NoHostAvailable) + + # ------------------------------------------------------------------------- + # Helpers for SCYLLA_USE_METADATA_ID tests + # ------------------------------------------------------------------------- + + def _make_rows_response(self, result_metadata_id=None, column_metadata=None): + """ + Return a real ResultMessage(kind=RESULT_KIND_ROWS) with all attributes + that _set_result accesses pre-set, so it passes isinstance checks and + doesn't trigger unexpected code paths. + """ + response = ResultMessage(kind=RESULT_KIND_ROWS) + response.paging_state = None + response.column_names = ['col'] + response.parsed_rows = [] + response.column_types = [] + response.column_metadata = column_metadata + response.result_metadata_id = result_metadata_id + response.trace_id = None + response.warnings = None + response.custom_payload = None + return response + + def _make_execute_response_future(self, session, connection, prepared_statement): + """ + Return a ResponseFuture whose message is an ExecuteMessage and which + has a prepared_statement set so that _query()'s feature-gating logic + is exercised. + """ + execute_msg = ExecuteMessage(b'qid', [], ConsistencyLevel.ONE) + query = SimpleStatement("SELECT * FROM foo") + rf = ResponseFuture( + session, execute_msg, query, timeout=1, + prepared_statement=prepared_statement, + ) + pool = session._pools.get.return_value + pool.borrow_connection.return_value = (connection, 1) + return rf + + # ------------------------------------------------------------------------- + # _set_result: METADATA_CHANGED update path + # ------------------------------------------------------------------------- + + def test_set_result_updates_metadata_when_metadata_changed(self): + """ + When the EXECUTE response carries a new result_metadata_id (server + detected a schema change), _set_result must update both + prepared_statement.result_metadata and prepared_statement.result_metadata_id. + """ + session = self.make_session() + pool = session._pools.get.return_value + connection = Mock(spec=Connection) + connection.protocol_version = 4 + connection.features = Mock() + connection.features.use_metadata_id = False + pool.borrow_connection.return_value = (connection, 1) + + old_meta = [('ks', 'tb', 'old_col', Mock())] + new_meta = [('ks', 'tb', 'new_col', Mock())] + ps = Mock() + ps.result_metadata = old_meta + ps.result_metadata_id = b'old_id' + + rf = self.make_response_future(session) + rf.prepared_statement = ps + rf.send_request() + + response = self._make_rows_response( + result_metadata_id=b'new_id', + column_metadata=new_meta, + ) + rf._set_result(None, None, None, response) + + assert ps.result_metadata is new_meta + assert ps.result_metadata_id == b'new_id' + + def test_set_result_does_not_update_metadata_when_metadata_id_absent(self): + """ + When the EXECUTE response has no result_metadata_id (normal skip-meta + path — server metadata unchanged), _set_result must leave the + prepared_statement's cached metadata untouched. + """ + session = self.make_session() + pool = session._pools.get.return_value + connection = Mock(spec=Connection) + connection.protocol_version = 4 + connection.features = Mock() + connection.features.use_metadata_id = False + pool.borrow_connection.return_value = (connection, 1) + + old_meta = [('ks', 'tb', 'col', Mock())] + ps = Mock() + ps.result_metadata = old_meta + ps.result_metadata_id = b'old_id' + + rf = self.make_response_future(session) + rf.prepared_statement = ps + rf.send_request() + + # result_metadata_id is None → server sent full metadata, no hash update + response = self._make_rows_response( + result_metadata_id=None, + column_metadata=old_meta, + ) + rf._set_result(None, None, None, response) + + assert ps.result_metadata is old_meta + assert ps.result_metadata_id == b'old_id' + + def test_set_result_warns_when_metadata_id_but_no_column_metadata(self): + """ + If the server sends a new result_metadata_id but no column metadata + (protocol violation), _set_result must still update result_metadata_id + and emit a WARNING log so the inconsistency is visible in logs. + """ + session = self.make_session() + pool = session._pools.get.return_value + connection = Mock(spec=Connection) + connection.protocol_version = 4 + connection.features = Mock() + connection.features.use_metadata_id = False + pool.borrow_connection.return_value = (connection, 1) + + ps = Mock() + ps.result_metadata = [('ks', 'tb', 'col', Mock())] + ps.result_metadata_id = b'old_id' + + rf = self.make_response_future(session) + rf.prepared_statement = ps + rf.send_request() + + # column_metadata is falsy (empty list) but result_metadata_id is set + response = self._make_rows_response( + result_metadata_id=b'new_id', + column_metadata=[], + ) + + with self.assertLogs('cassandra.cluster', level='WARNING') as log_ctx: + rf._set_result(None, None, None, response) + + assert any('result_metadata_id' in msg for msg in log_ctx.output) + # metadata_id is still updated even without column metadata + assert ps.result_metadata_id == b'new_id' + + # ------------------------------------------------------------------------- + # _query: per-connection feature gating for skip_meta / result_metadata_id + # ------------------------------------------------------------------------- + + def test_query_sets_skip_meta_with_scylla_extension(self): + """ + When the borrowed connection has negotiated SCYLLA_USE_METADATA_ID and + the prepared statement carries a result_metadata_id, _query() must set + skip_meta=True and attach the metadata_id to the ExecuteMessage. + """ + session = self.make_basic_session() + session.cluster._default_load_balancing_policy.make_query_plan.return_value = ['ip1'] + session._pools.get.return_value = self.make_pool() + + connection = Mock(spec=Connection) + connection.protocol_version = 4 + connection.features = Mock() + connection.features.use_metadata_id = True + session._pools.get.return_value.borrow_connection.return_value = (connection, 1) + + ps = Mock() + ps.result_metadata = [] + ps.result_metadata_id = b'meta_hash' + + rf = self._make_execute_response_future(session, connection, ps) + rf.send_request() + + assert rf.message.skip_meta is True + assert rf.message.result_metadata_id == b'meta_hash' + + def test_query_no_skip_meta_without_extension(self): + """ + When the connection does NOT have SCYLLA_USE_METADATA_ID (and protocol + is v4), _query() must leave skip_meta=False and result_metadata_id=None. + """ + session = self.make_basic_session() + session.cluster._default_load_balancing_policy.make_query_plan.return_value = ['ip1'] + session._pools.get.return_value = self.make_pool() + + connection = Mock(spec=Connection) + connection.protocol_version = 4 + connection.features = Mock() + connection.features.use_metadata_id = False + session._pools.get.return_value.borrow_connection.return_value = (connection, 1) + + ps = Mock() + ps.result_metadata = [] + ps.result_metadata_id = b'meta_hash' + + rf = self._make_execute_response_future(session, connection, ps) + rf.send_request() + + assert rf.message.skip_meta is False + assert rf.message.result_metadata_id is None + + def test_query_no_skip_meta_when_prepared_statement_has_no_metadata_id(self): + """ + Even if the connection supports SCYLLA_USE_METADATA_ID, if the prepared + statement was created before the extension was active (result_metadata_id + is None), _query() must NOT set skip_meta — the driver has no hash to send. + """ + session = self.make_basic_session() + session.cluster._default_load_balancing_policy.make_query_plan.return_value = ['ip1'] + session._pools.get.return_value = self.make_pool() + + connection = Mock(spec=Connection) + connection.protocol_version = 4 + connection.features = Mock() + connection.features.use_metadata_id = True # extension active on this connection + session._pools.get.return_value.borrow_connection.return_value = (connection, 1) + + ps = Mock() + ps.result_metadata = [] + ps.result_metadata_id = None # statement prepared without extension + + rf = self._make_execute_response_future(session, connection, ps) + rf.send_request() + + assert rf.message.skip_meta is False + assert rf.message.result_metadata_id is None + + def test_query_sets_skip_meta_for_protocol_v5(self): + """ + On a protocol-v5 connection (ProtocolVersion.uses_prepared_metadata), + _query() must set skip_meta=True and send result_metadata_id even if + the Scylla-specific use_metadata_id flag is False. + """ + session = self.make_basic_session() + session.cluster._default_load_balancing_policy.make_query_plan.return_value = ['ip1'] + session._pools.get.return_value = self.make_pool() + + connection = Mock(spec=Connection) + connection.protocol_version = 5 # CQL v5 — uses_prepared_metadata() is True + connection.features = Mock() + connection.features.use_metadata_id = False # Scylla extension not needed + session._pools.get.return_value.borrow_connection.return_value = (connection, 1) + + ps = Mock() + ps.result_metadata = [] + ps.result_metadata_id = b'v5_hash' + + rf = self._make_execute_response_future(session, connection, ps) + rf.send_request() + + assert rf.message.skip_meta is True + assert rf.message.result_metadata_id == b'v5_hash'