Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion cassandra/ioutils.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
include 'cython_marshal.pyx'
from cassandra.buffer cimport Buffer, from_ptr_and_size

from libc.stdint cimport int32_t
from libc.stdint cimport int32_t, uint16_t
from cassandra.bytesio cimport BytesIOReader


Expand Down Expand Up @@ -45,3 +45,9 @@ cdef inline int32_t read_int(BytesIOReader reader) except ?0xDEAD:
buf.ptr = reader.read(4)
buf.size = 4
return unpack_num[int32_t](&buf)

cdef inline uint16_t read_short(BytesIOReader reader) except ?0xFFFE:
cdef Buffer buf
buf.ptr = reader.read(2)
buf.size = 2
return unpack_num[uint16_t](&buf)
189 changes: 189 additions & 0 deletions cassandra/metadata_parser.pyx
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
# Copyright ScyllaDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Cython-optimized metadata parsing for CQL protocol ResultMessage.

Uses BytesIOReader for zero-copy reads, eliminating per-read bytes allocation
that dominates recv_results_metadata cost.
"""

include "ioutils.pyx"


# ---------- low-level readers on BytesIOReader ----------
# read_int(BytesIOReader) and read_short(BytesIOReader) are provided by ioutils.pyx

cdef inline str read_string_br(BytesIOReader reader):
"""Read a [string]: a [short] n, followed by n bytes of UTF-8."""
cdef uint16_t size = read_short(reader)
cdef char *ptr = reader.read(size)
return ptr[:size].decode('utf8')

cdef inline bytes read_binary_string_br(BytesIOReader reader):
"""Read a [short bytes]: a [short] n, followed by n raw bytes."""
cdef uint16_t size = read_short(reader)
cdef char *ptr = reader.read(size)
return ptr[:size]

cdef inline bytes read_binary_longstring_br(BytesIOReader reader):
"""Read a [bytes]: an [int] n, followed by n raw bytes."""
cdef int32_t size = read_int(reader)
cdef char *ptr = reader.read(size)
return ptr[:size]


# ---------- flag constants (mirrored from ResultMessage) ----------
# These MUST stay in sync with the class attributes in ResultMessage (protocol.py).
# They are duplicated here as compile-time DEF constants for Cython performance.

DEF _FLAGS_GLOBAL_TABLES_SPEC = 0x0001
DEF _HAS_MORE_PAGES_FLAG = 0x0002
DEF _NO_METADATA_FLAG = 0x0004
DEF _METADATA_ID_FLAG = 0x0008
DEF _CONTINUOUS_PAGING_FLAG = 0x40000000
DEF _CONTINUOUS_PAGING_LAST_FLAG = 0x80000000


# ---------- read_type using BytesIOReader ----------

cdef object _read_type_br(BytesIOReader reader, dict type_codes_map, object user_type_map,
object ListType, object SetType, object MapType,
object TupleType, object UserType, object CUSTOM_TYPE,
object lookup_casstype):
"""
Cython version of ResultMessage.read_type() operating on BytesIOReader.

Parameters are passed in to avoid module-level imports from protocol.py
(which would create circular dependencies). They are captured once in the
closure created by make_recv_results_metadata().
"""
cdef uint16_t optid = read_short(reader)

typeclass = type_codes_map.get(optid)
if typeclass is None:
from cassandra.protocol import NotSupportedError
raise NotSupportedError(
"Unknown data type code 0x%04x. Have to skip entire result set." % (optid,))

if typeclass is ListType or typeclass is SetType:
subtype = _read_type_br(reader, type_codes_map, user_type_map,
ListType, SetType, MapType, TupleType, UserType,
CUSTOM_TYPE, lookup_casstype)
typeclass = typeclass.apply_parameters((subtype,))
elif typeclass is MapType:
keysubtype = _read_type_br(reader, type_codes_map, user_type_map,
ListType, SetType, MapType, TupleType, UserType,
CUSTOM_TYPE, lookup_casstype)
valsubtype = _read_type_br(reader, type_codes_map, user_type_map,
ListType, SetType, MapType, TupleType, UserType,
CUSTOM_TYPE, lookup_casstype)
typeclass = typeclass.apply_parameters((keysubtype, valsubtype))
elif typeclass is TupleType:
num_items = read_short(reader)
types = tuple(_read_type_br(reader, type_codes_map, user_type_map,
ListType, SetType, MapType, TupleType, UserType,
CUSTOM_TYPE, lookup_casstype)
for _ in range(num_items))
typeclass = typeclass.apply_parameters(types)
elif typeclass is UserType:
ks = read_string_br(reader)
udt_name = read_string_br(reader)
num_fields = read_short(reader)
names_and_types = tuple(
(read_string_br(reader),
_read_type_br(reader, type_codes_map, user_type_map,
ListType, SetType, MapType, TupleType, UserType,
CUSTOM_TYPE, lookup_casstype))
for _ in range(num_fields))
names, types = zip(*names_and_types) if num_fields > 0 else ((), ())
specialized_type = typeclass.make_udt_class(ks, udt_name, names, types)
specialized_type.mapped_class = user_type_map.get(ks, {}).get(udt_name)
typeclass = specialized_type
elif typeclass is CUSTOM_TYPE:
classname = read_string_br(reader)
typeclass = lookup_casstype(classname)

return typeclass


# ---------- public factory: creates closures that capture type objects ----------

def make_recv_results_metadata():
"""
Factory that returns a recv_results_metadata function suitable for use
as an unbound method replacement on FastResultMessage.

The closure captures the type-code map and type objects once, so they
don't have to be looked up on every call.
"""
from cassandra.protocol import (
ResultMessage, CUSTOM_TYPE,
)
from cassandra.cqltypes import (
ListType, SetType, MapType, TupleType, UserType, lookup_casstype,
)

# Capture once
cdef dict type_codes_map = ResultMessage.type_codes

def read_type_br_closure(BytesIOReader reader, user_type_map):
return _read_type_br(reader, type_codes_map, user_type_map,
ListType, SetType, MapType, TupleType, UserType,
CUSTOM_TYPE, lookup_casstype)

def recv_results_metadata(self, BytesIOReader reader, user_type_map):
"""
Cython-optimized recv_results_metadata operating on BytesIOReader.
Replaces ResultMessage.recv_results_metadata.
"""
cdef int32_t flags = read_int(reader)
cdef int32_t colcount = read_int(reader)

if flags & _HAS_MORE_PAGES_FLAG:
self.paging_state = read_binary_longstring_br(reader)

if flags & _NO_METADATA_FLAG:
return

if flags & _CONTINUOUS_PAGING_FLAG:
self.continuous_paging_seq = read_int(reader)
self.continuous_paging_last = flags & _CONTINUOUS_PAGING_LAST_FLAG

if flags & _METADATA_ID_FLAG:
self.result_metadata_id = read_binary_string_br(reader)

cdef str ksname, cfname, colname
cdef object coltype
cdef int i
cdef list column_metadata = [None] * colcount

if flags & _FLAGS_GLOBAL_TABLES_SPEC:
ksname = read_string_br(reader)
cfname = read_string_br(reader)
for i in range(colcount):
colname = read_string_br(reader)
coltype = read_type_br_closure(reader, user_type_map)
column_metadata[i] = (ksname, cfname, colname, coltype)
else:
for i in range(colcount):
ksname = read_string_br(reader)
cfname = read_string_br(reader)
colname = read_string_br(reader)
coltype = read_type_br_closure(reader, user_type_map)
column_metadata[i] = (ksname, cfname, colname, coltype)

self.column_metadata = column_metadata

return recv_results_metadata
7 changes: 5 additions & 2 deletions cassandra/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -1212,15 +1212,18 @@ def cython_protocol_handler(colparser):
The default is to use obj_parser.ListParser
"""
from cassandra.row_parser import make_recv_results_rows
from cassandra.metadata_parser import make_recv_results_metadata

recv_results_metadata_br = make_recv_results_metadata()

class FastResultMessage(ResultMessage):
"""
Cython version of Result Message that has a faster implementation of
recv_results_row.
recv_results_rows using BytesIOReader for zero-copy metadata + row parsing.
"""
# type_codes = ResultMessage.type_codes.copy()
code_to_type = dict((v, k) for k, v in ResultMessage.type_codes.items())
recv_results_rows = make_recv_results_rows(colparser)
recv_results_rows = make_recv_results_rows(colparser, recv_results_metadata_br)

class CythonProtocolHandler(_ProtocolHandler):
"""
Expand Down
26 changes: 21 additions & 5 deletions cassandra/row_parser.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,27 @@ from cassandra.deserializers import make_deserializers

include "ioutils.pyx"

def make_recv_results_rows(ColumnParser colparser):
def make_recv_results_rows(ColumnParser colparser, recv_results_metadata_br):
"""
Create a recv_results_rows closure that uses:
- recv_results_metadata_br: Cython metadata parser operating on BytesIOReader
- colparser: Cython column parser for row data

A single BytesIOReader is created from the full remaining buffer and used
for both metadata parsing and row parsing, eliminating the per-read bytes
allocation overhead of Python BytesIO.
"""
def recv_results_rows(self, f, int protocol_version, user_type_map, result_metadata, column_encryption_policy):
"""
Parse protocol data given as a BytesIO f into a set of columns (e.g. list of tuples)
This is used as the recv_results_rows method of (Fast)ResultMessage
"""
self.recv_results_metadata(f, user_type_map)
# Create ONE BytesIOReader for the entire remaining buffer.
# This is used for both metadata parsing and row parsing.
reader = BytesIOReader(f.read())

# Use Cython-optimized metadata parsing on BytesIOReader
recv_results_metadata_br(self, reader, user_type_map)

column_metadata = self.column_metadata or result_metadata

Expand All @@ -35,14 +49,16 @@ def make_recv_results_rows(ColumnParser colparser):
desc = ParseDesc(self.column_names, self.column_types, column_encryption_policy,
[ColDesc(md[0], md[1], md[2]) for md in column_metadata],
make_deserializers(self.column_types), protocol_version)
reader = BytesIOReader(f.read())
# The reader's position is now right after the metadata;
# row data follows immediately — no need to create a second reader.
# Save position so we can rewind to the start of row data on error.
cdef Py_ssize_t rows_start_pos = reader.pos
try:
self.parsed_rows = colparser.parse_rows(reader, desc)
except Exception as e:
# Use explicitly the TupleRowParser to display better error messages for column decoding failures
rowparser = TupleRowParser()
reader.buf_ptr = reader.buf
reader.pos = 0
reader.pos = rows_start_pos
rowcount = read_int(reader)
for i in range(rowcount):
rowparser.unpack_row(reader, desc)
Expand Down
Loading