From 1f3e38f5392f894facbdd574ef57632454b76fd9 Mon Sep 17 00:00:00 2001 From: Yaniv Michael Kaul Date: Fri, 10 Apr 2026 11:12:38 +0300 Subject: [PATCH] perf: add Cython metadata parser using BytesIOReader Add cassandra/metadata_parser.pyx with zero-copy metadata parsing that uses BytesIOReader instead of Python BytesIO, eliminating per-read bytes allocation in recv_results_metadata and read_type. Modify row_parser.pyx to create a single BytesIOReader from the full remaining buffer and reuse it for both metadata parsing and row parsing, instead of using Python BytesIO for metadata then a separate BytesIOReader for rows. Benchmarks (taskset -c 0, Python 3.14, Cython): recv_results_metadata only (10 simple cols): 6082 -> 1265 ns (4.8x) recv_results_rows (10 cols, 0 rows): 11068 -> 5652 ns (1.96x) recv_results_rows (10 cols, 10 rows): 29590 -> 24759 ns (1.20x) recv_results_rows (10 cols, 100 rows): 181053 -> 173446 ns (1.04x) recv_results_rows (50 cols, 0 rows): 44904 -> 20569 ns (2.18x) --- cassandra/ioutils.pyx | 8 +- cassandra/metadata_parser.pyx | 189 ++++++++++++++++++++++++++++++++++ cassandra/protocol.py | 7 +- cassandra/row_parser.pyx | 26 ++++- 4 files changed, 222 insertions(+), 8 deletions(-) create mode 100644 cassandra/metadata_parser.pyx diff --git a/cassandra/ioutils.pyx b/cassandra/ioutils.pyx index b0ab4f16cb..2700901747 100644 --- a/cassandra/ioutils.pyx +++ b/cassandra/ioutils.pyx @@ -15,7 +15,7 @@ include 'cython_marshal.pyx' from cassandra.buffer cimport Buffer, from_ptr_and_size -from libc.stdint cimport int32_t +from libc.stdint cimport int32_t, uint16_t from cassandra.bytesio cimport BytesIOReader @@ -45,3 +45,9 @@ cdef inline int32_t read_int(BytesIOReader reader) except ?0xDEAD: buf.ptr = reader.read(4) buf.size = 4 return unpack_num[int32_t](&buf) + +cdef inline uint16_t read_short(BytesIOReader reader) except ?0xFFFE: + cdef Buffer buf + buf.ptr = reader.read(2) + buf.size = 2 + return unpack_num[uint16_t](&buf) diff --git a/cassandra/metadata_parser.pyx b/cassandra/metadata_parser.pyx new file mode 100644 index 0000000000..f996795ede --- /dev/null +++ b/cassandra/metadata_parser.pyx @@ -0,0 +1,189 @@ +# Copyright ScyllaDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Cython-optimized metadata parsing for CQL protocol ResultMessage. + +Uses BytesIOReader for zero-copy reads, eliminating per-read bytes allocation +that dominates recv_results_metadata cost. +""" + +include "ioutils.pyx" + + +# ---------- low-level readers on BytesIOReader ---------- +# read_int(BytesIOReader) and read_short(BytesIOReader) are provided by ioutils.pyx + +cdef inline str read_string_br(BytesIOReader reader): + """Read a [string]: a [short] n, followed by n bytes of UTF-8.""" + cdef uint16_t size = read_short(reader) + cdef char *ptr = reader.read(size) + return ptr[:size].decode('utf8') + +cdef inline bytes read_binary_string_br(BytesIOReader reader): + """Read a [short bytes]: a [short] n, followed by n raw bytes.""" + cdef uint16_t size = read_short(reader) + cdef char *ptr = reader.read(size) + return ptr[:size] + +cdef inline bytes read_binary_longstring_br(BytesIOReader reader): + """Read a [bytes]: an [int] n, followed by n raw bytes.""" + cdef int32_t size = read_int(reader) + cdef char *ptr = reader.read(size) + return ptr[:size] + + +# ---------- flag constants (mirrored from ResultMessage) ---------- +# These MUST stay in sync with the class attributes in ResultMessage (protocol.py). +# They are duplicated here as compile-time DEF constants for Cython performance. + +DEF _FLAGS_GLOBAL_TABLES_SPEC = 0x0001 +DEF _HAS_MORE_PAGES_FLAG = 0x0002 +DEF _NO_METADATA_FLAG = 0x0004 +DEF _METADATA_ID_FLAG = 0x0008 +DEF _CONTINUOUS_PAGING_FLAG = 0x40000000 +DEF _CONTINUOUS_PAGING_LAST_FLAG = 0x80000000 + + +# ---------- read_type using BytesIOReader ---------- + +cdef object _read_type_br(BytesIOReader reader, dict type_codes_map, object user_type_map, + object ListType, object SetType, object MapType, + object TupleType, object UserType, object CUSTOM_TYPE, + object lookup_casstype): + """ + Cython version of ResultMessage.read_type() operating on BytesIOReader. + + Parameters are passed in to avoid module-level imports from protocol.py + (which would create circular dependencies). They are captured once in the + closure created by make_recv_results_metadata(). + """ + cdef uint16_t optid = read_short(reader) + + typeclass = type_codes_map.get(optid) + if typeclass is None: + from cassandra.protocol import NotSupportedError + raise NotSupportedError( + "Unknown data type code 0x%04x. Have to skip entire result set." % (optid,)) + + if typeclass is ListType or typeclass is SetType: + subtype = _read_type_br(reader, type_codes_map, user_type_map, + ListType, SetType, MapType, TupleType, UserType, + CUSTOM_TYPE, lookup_casstype) + typeclass = typeclass.apply_parameters((subtype,)) + elif typeclass is MapType: + keysubtype = _read_type_br(reader, type_codes_map, user_type_map, + ListType, SetType, MapType, TupleType, UserType, + CUSTOM_TYPE, lookup_casstype) + valsubtype = _read_type_br(reader, type_codes_map, user_type_map, + ListType, SetType, MapType, TupleType, UserType, + CUSTOM_TYPE, lookup_casstype) + typeclass = typeclass.apply_parameters((keysubtype, valsubtype)) + elif typeclass is TupleType: + num_items = read_short(reader) + types = tuple(_read_type_br(reader, type_codes_map, user_type_map, + ListType, SetType, MapType, TupleType, UserType, + CUSTOM_TYPE, lookup_casstype) + for _ in range(num_items)) + typeclass = typeclass.apply_parameters(types) + elif typeclass is UserType: + ks = read_string_br(reader) + udt_name = read_string_br(reader) + num_fields = read_short(reader) + names_and_types = tuple( + (read_string_br(reader), + _read_type_br(reader, type_codes_map, user_type_map, + ListType, SetType, MapType, TupleType, UserType, + CUSTOM_TYPE, lookup_casstype)) + for _ in range(num_fields)) + names, types = zip(*names_and_types) if num_fields > 0 else ((), ()) + specialized_type = typeclass.make_udt_class(ks, udt_name, names, types) + specialized_type.mapped_class = user_type_map.get(ks, {}).get(udt_name) + typeclass = specialized_type + elif typeclass is CUSTOM_TYPE: + classname = read_string_br(reader) + typeclass = lookup_casstype(classname) + + return typeclass + + +# ---------- public factory: creates closures that capture type objects ---------- + +def make_recv_results_metadata(): + """ + Factory that returns a recv_results_metadata function suitable for use + as an unbound method replacement on FastResultMessage. + + The closure captures the type-code map and type objects once, so they + don't have to be looked up on every call. + """ + from cassandra.protocol import ( + ResultMessage, CUSTOM_TYPE, + ) + from cassandra.cqltypes import ( + ListType, SetType, MapType, TupleType, UserType, lookup_casstype, + ) + + # Capture once + cdef dict type_codes_map = ResultMessage.type_codes + + def read_type_br_closure(BytesIOReader reader, user_type_map): + return _read_type_br(reader, type_codes_map, user_type_map, + ListType, SetType, MapType, TupleType, UserType, + CUSTOM_TYPE, lookup_casstype) + + def recv_results_metadata(self, BytesIOReader reader, user_type_map): + """ + Cython-optimized recv_results_metadata operating on BytesIOReader. + Replaces ResultMessage.recv_results_metadata. + """ + cdef int32_t flags = read_int(reader) + cdef int32_t colcount = read_int(reader) + + if flags & _HAS_MORE_PAGES_FLAG: + self.paging_state = read_binary_longstring_br(reader) + + if flags & _NO_METADATA_FLAG: + return + + if flags & _CONTINUOUS_PAGING_FLAG: + self.continuous_paging_seq = read_int(reader) + self.continuous_paging_last = flags & _CONTINUOUS_PAGING_LAST_FLAG + + if flags & _METADATA_ID_FLAG: + self.result_metadata_id = read_binary_string_br(reader) + + cdef str ksname, cfname, colname + cdef object coltype + cdef int i + cdef list column_metadata = [None] * colcount + + if flags & _FLAGS_GLOBAL_TABLES_SPEC: + ksname = read_string_br(reader) + cfname = read_string_br(reader) + for i in range(colcount): + colname = read_string_br(reader) + coltype = read_type_br_closure(reader, user_type_map) + column_metadata[i] = (ksname, cfname, colname, coltype) + else: + for i in range(colcount): + ksname = read_string_br(reader) + cfname = read_string_br(reader) + colname = read_string_br(reader) + coltype = read_type_br_closure(reader, user_type_map) + column_metadata[i] = (ksname, cfname, colname, coltype) + + self.column_metadata = column_metadata + + return recv_results_metadata diff --git a/cassandra/protocol.py b/cassandra/protocol.py index 4628c7ee0e..c9dbbcbed0 100644 --- a/cassandra/protocol.py +++ b/cassandra/protocol.py @@ -1212,15 +1212,18 @@ def cython_protocol_handler(colparser): The default is to use obj_parser.ListParser """ from cassandra.row_parser import make_recv_results_rows + from cassandra.metadata_parser import make_recv_results_metadata + + recv_results_metadata_br = make_recv_results_metadata() class FastResultMessage(ResultMessage): """ Cython version of Result Message that has a faster implementation of - recv_results_row. + recv_results_rows using BytesIOReader for zero-copy metadata + row parsing. """ # type_codes = ResultMessage.type_codes.copy() code_to_type = dict((v, k) for k, v in ResultMessage.type_codes.items()) - recv_results_rows = make_recv_results_rows(colparser) + recv_results_rows = make_recv_results_rows(colparser, recv_results_metadata_br) class CythonProtocolHandler(_ProtocolHandler): """ diff --git a/cassandra/row_parser.pyx b/cassandra/row_parser.pyx index 88277a4593..1ec0ad8d3f 100644 --- a/cassandra/row_parser.pyx +++ b/cassandra/row_parser.pyx @@ -19,13 +19,27 @@ from cassandra.deserializers import make_deserializers include "ioutils.pyx" -def make_recv_results_rows(ColumnParser colparser): +def make_recv_results_rows(ColumnParser colparser, recv_results_metadata_br): + """ + Create a recv_results_rows closure that uses: + - recv_results_metadata_br: Cython metadata parser operating on BytesIOReader + - colparser: Cython column parser for row data + + A single BytesIOReader is created from the full remaining buffer and used + for both metadata parsing and row parsing, eliminating the per-read bytes + allocation overhead of Python BytesIO. + """ def recv_results_rows(self, f, int protocol_version, user_type_map, result_metadata, column_encryption_policy): """ Parse protocol data given as a BytesIO f into a set of columns (e.g. list of tuples) This is used as the recv_results_rows method of (Fast)ResultMessage """ - self.recv_results_metadata(f, user_type_map) + # Create ONE BytesIOReader for the entire remaining buffer. + # This is used for both metadata parsing and row parsing. + reader = BytesIOReader(f.read()) + + # Use Cython-optimized metadata parsing on BytesIOReader + recv_results_metadata_br(self, reader, user_type_map) column_metadata = self.column_metadata or result_metadata @@ -35,14 +49,16 @@ def make_recv_results_rows(ColumnParser colparser): desc = ParseDesc(self.column_names, self.column_types, column_encryption_policy, [ColDesc(md[0], md[1], md[2]) for md in column_metadata], make_deserializers(self.column_types), protocol_version) - reader = BytesIOReader(f.read()) + # The reader's position is now right after the metadata; + # row data follows immediately — no need to create a second reader. + # Save position so we can rewind to the start of row data on error. + cdef Py_ssize_t rows_start_pos = reader.pos try: self.parsed_rows = colparser.parse_rows(reader, desc) except Exception as e: # Use explicitly the TupleRowParser to display better error messages for column decoding failures rowparser = TupleRowParser() - reader.buf_ptr = reader.buf - reader.pos = 0 + reader.pos = rows_start_pos rowcount = read_int(reader) for i in range(rowcount): rowparser.unpack_row(reader, desc)