diff --git a/benchmarks/vector_deserialize.py b/benchmarks/vector_deserialize.py new file mode 100644 index 0000000000..e1a17f07d4 --- /dev/null +++ b/benchmarks/vector_deserialize.py @@ -0,0 +1,316 @@ +#!/usr/bin/env python +# Copyright ScyllaDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Benchmark for VectorType deserialization performance. + +Tests different optimization strategies: +1. Current implementation (Python with struct.unpack/numpy) +2. Python struct.unpack only +3. Numpy frombuffer + tolist() +4. Cython DesVectorType deserializer + +Run with: python benchmarks/vector_deserialize.py +""" + +import sys +import time +import struct + +# Add parent directory to path +sys.path.insert(0, '.') + +from cassandra.cqltypes import FloatType, DoubleType, Int32Type, LongType +from cassandra.marshal import float_pack, double_pack, int32_pack, int64_pack + + +def create_test_data(vector_size, element_type): + """Create serialized test data for a vector.""" + if element_type == FloatType: + values = [float(i * 0.1) for i in range(vector_size)] + pack_fn = float_pack + elif element_type == DoubleType: + values = [float(i * 0.1) for i in range(vector_size)] + pack_fn = double_pack + elif element_type == Int32Type: + values = list(range(vector_size)) + pack_fn = int32_pack + elif element_type == LongType: + values = list(range(vector_size)) + pack_fn = int64_pack + else: + raise ValueError(f"Unsupported element type: {element_type}") + + # Serialize the vector + serialized = b''.join(pack_fn(v) for v in values) + + return serialized, values + + +def benchmark_current_implementation(vector_type, serialized_data, iterations=10000): + """Benchmark the current VectorType.deserialize implementation.""" + protocol_version = 4 + + start = time.perf_counter() + for _ in range(iterations): + result = vector_type.deserialize(serialized_data, protocol_version) + end = time.perf_counter() + + elapsed = end - start + per_op = (elapsed / iterations) * 1_000_000 # microseconds + + return elapsed, per_op, result + + +def benchmark_struct_optimization(vector_type, serialized_data, iterations=10000): + """Benchmark struct.unpack optimization.""" + vector_size = vector_type.vector_size + subtype = vector_type.subtype + + # Determine format string - subtype is a class, use identity or issubclass + if subtype is FloatType or (isinstance(subtype, type) and issubclass(subtype, FloatType)): + format_str = f'>{vector_size}f' + elif subtype is DoubleType or (isinstance(subtype, type) and issubclass(subtype, DoubleType)): + format_str = f'>{vector_size}d' + elif subtype is Int32Type or (isinstance(subtype, type) and issubclass(subtype, Int32Type)): + format_str = f'>{vector_size}i' + elif subtype is LongType or (isinstance(subtype, type) and issubclass(subtype, LongType)): + format_str = f'>{vector_size}q' + else: + return None, None, None + + start = time.perf_counter() + for _ in range(iterations): + result = list(struct.unpack(format_str, serialized_data)) + end = time.perf_counter() + + elapsed = end - start + per_op = (elapsed / iterations) * 1_000_000 # microseconds + + return elapsed, per_op, result + + +def benchmark_numpy_optimization(vector_type, serialized_data, iterations=10000): + """Benchmark numpy.frombuffer optimization.""" + try: + import numpy as np + except ImportError: + return None, None, None + + vector_size = vector_type.vector_size + subtype = vector_type.subtype + + # Determine dtype + if subtype is FloatType or (isinstance(subtype, type) and issubclass(subtype, FloatType)): + dtype = '>f4' + elif subtype is DoubleType or (isinstance(subtype, type) and issubclass(subtype, DoubleType)): + dtype = '>f8' + elif subtype is Int32Type or (isinstance(subtype, type) and issubclass(subtype, Int32Type)): + dtype = '>i4' + elif subtype is LongType or (isinstance(subtype, type) and issubclass(subtype, LongType)): + dtype = '>i8' + else: + return None, None, None + + start = time.perf_counter() + for _ in range(iterations): + arr = np.frombuffer(serialized_data, dtype=dtype, count=vector_size) + result = arr.tolist() + end = time.perf_counter() + + elapsed = end - start + per_op = (elapsed / iterations) * 1_000_000 # microseconds + + return elapsed, per_op, result + + +def benchmark_cython_deserializer(vector_type, serialized_data, iterations=10000): + """Benchmark Cython DesVectorType deserializer.""" + try: + from cassandra.deserializers import find_deserializer + except ImportError: + return None, None, None + + protocol_version = 4 + + # Get the Cython deserializer + deserializer = find_deserializer(vector_type) + + # Check if we got the Cython deserializer + if deserializer.__class__.__name__ != 'DesVectorType': + return None, None, None + + start = time.perf_counter() + for _ in range(iterations): + result = deserializer.deserialize_bytes(serialized_data, protocol_version) + end = time.perf_counter() + + elapsed = end - start + per_op = (elapsed / iterations) * 1_000_000 # microseconds + + return elapsed, per_op, result + + +def verify_results(expected, *results): + """Verify that all results match expected values.""" + for i, result in enumerate(results): + if result is None: + continue + if len(result) != len(expected): + print(f" ❌ Result {i} length mismatch: {len(result)} vs {len(expected)}") + return False + for j, (a, b) in enumerate(zip(result, expected)): + # Use relative tolerance for floating point comparison + if isinstance(a, float) and isinstance(b, float): + # Allow 0.01% relative error for floats + if abs(a - b) > max(abs(a), abs(b)) * 1e-4 + 1e-7: + print(f" ❌ Result {i} value mismatch at index {j}: {a} vs {b}") + return False + elif abs(a - b) > 1e-9: + print(f" ❌ Result {i} value mismatch at index {j}: {a} vs {b}") + return False + return True + + +def run_benchmark_suite(vector_size, element_type, type_name, iterations=10000): + """Run complete benchmark suite for a given vector configuration.""" + print(f"\n{'='*80}") + print(f"Benchmark: Vector<{type_name}, {vector_size}>") + print(f"{'='*80}") + print(f"Iterations: {iterations:,}") + + # Create test data + from cassandra.cqltypes import lookup_casstype + cass_typename = f'org.apache.cassandra.db.marshal.{element_type.__name__}' + vector_typename = f'org.apache.cassandra.db.marshal.VectorType({cass_typename}, {vector_size})' + vector_type = lookup_casstype(vector_typename) + + serialized_data, expected_values = create_test_data(vector_size, element_type) + data_size = len(serialized_data) + + print(f"Serialized size: {data_size:,} bytes") + print() + + # Run benchmarks + results = [] + + # 1. Current implementation (baseline) + print("1. Current implementation (baseline)...") + elapsed, per_op, result_current = benchmark_current_implementation( + vector_type, serialized_data, iterations) + results.append(result_current) + print(f" Total: {elapsed:.4f}s, Per-op: {per_op:.2f} μs") + baseline_time = per_op + + # 2. Struct optimization + print("2. Python struct.unpack optimization...") + elapsed, per_op, result_struct = benchmark_struct_optimization( + vector_type, serialized_data, iterations) + results.append(result_struct) + if per_op is not None: + speedup = baseline_time / per_op + print(f" Total: {elapsed:.4f}s, Per-op: {per_op:.2f} μs, Speedup: {speedup:.2f}x") + else: + print(" Not applicable for this type") + + # 3. Numpy with tolist() + print("3. Numpy frombuffer + tolist()...") + elapsed, per_op, result_numpy = benchmark_numpy_optimization( + vector_type, serialized_data, iterations) + results.append(result_numpy) + if per_op is not None: + speedup = baseline_time / per_op + print(f" Total: {elapsed:.4f}s, Per-op: {per_op:.2f} μs, Speedup: {speedup:.2f}x") + else: + print(" Numpy not available") + + # 4. Cython deserializer + print("4. Cython DesVectorType deserializer...") + elapsed, per_op, result_cython = benchmark_cython_deserializer( + vector_type, serialized_data, iterations) + if per_op is not None: + results.append(result_cython) + speedup = baseline_time / per_op + print(f" Total: {elapsed:.4f}s, Per-op: {per_op:.2f} μs, Speedup: {speedup:.2f}x") + else: + print(" Cython deserializers not available") + + # Verify results + print("\nVerifying results...") + if verify_results(expected_values, *results): + print(" ✓ All results match!") + else: + print(" ✗ Result mismatch detected!") + + return baseline_time + + +def main(): + """Run all benchmarks.""" + # Pin to single CPU core for consistent measurements + try: + import os + os.sched_setaffinity(0, {0}) # Pin to CPU core 0 + print("Pinned to CPU core 0 for consistent measurements") + except (AttributeError, OSError) as e: + print(f"Could not pin to single core: {e}") + print("Running without CPU affinity...") + + print("="*80) + print("VectorType Deserialization Performance Benchmark") + print("="*80) + + # Test configurations: (vector_size, element_type, type_name, iterations) + test_configs = [ + # Small vectors + (3, FloatType, "float", 50000), + (4, FloatType, "float", 50000), + + # Medium vectors (common in ML) + (128, FloatType, "float", 10000), + (384, FloatType, "float", 10000), + + # Large vectors (embeddings) + (768, FloatType, "float", 5000), + (1536, FloatType, "float", 2000), + + # Other types (smaller iteration counts) + (128, DoubleType, "double", 10000), + (768, DoubleType, "double", 5000), + (1536, DoubleType, "double", 2000), + (64, Int32Type, "int", 15000), + (128, Int32Type, "int", 10000), + ] + + summary = [] + + for vector_size, element_type, type_name, iterations in test_configs: + baseline = run_benchmark_suite(vector_size, element_type, type_name, iterations) + summary.append((f"Vector<{type_name}, {vector_size}>", baseline)) + + # Print summary + print("\n" + "="*80) + print("SUMMARY - Current Implementation Performance") + print("="*80) + for config, baseline_time in summary: + print(f"{config:30s}: {baseline_time:8.2f} μs") + + print("\n" + "="*80) + print("Benchmark complete!") + print("="*80) + + +if __name__ == '__main__': + main() diff --git a/cassandra/buffer.pxd b/cassandra/buffer.pxd index 0bbb1d5f57..829f278b69 100644 --- a/cassandra/buffer.pxd +++ b/cassandra/buffer.pxd @@ -41,18 +41,8 @@ cdef inline char *buf_read(Buffer *buf, Py_ssize_t size) except NULL: raise IndexError("Requested more than length of buffer") return buf.ptr -cdef inline int slice_buffer(Buffer *buf, Buffer *out, - Py_ssize_t start, Py_ssize_t size) except -1: - if size < 0: - raise ValueError("Length must be positive") +cdef inline void from_ptr_and_size(char *ptr, Py_ssize_t size, Buffer *buf): + buf.ptr = ptr + buf.size = size - if start + size > buf.size: - raise IndexError("Buffer slice out of bounds") - out.ptr = buf.ptr + start - out.size = size - return 0 - -cdef inline void from_ptr_and_size(char *ptr, Py_ssize_t size, Buffer *out): - out.ptr = ptr - out.size = size diff --git a/cassandra/cqltypes.py b/cassandra/cqltypes.py index e36c48563c..e52fb528f6 100644 --- a/cassandra/cqltypes.py +++ b/cassandra/cqltypes.py @@ -50,6 +50,10 @@ varint_pack, varint_unpack, point_be, point_le, vints_pack, vints_unpack, uvint_unpack, uvint_pack) from cassandra import util +from cassandra.cython_deps import HAVE_NUMPY + +if HAVE_NUMPY: + import numpy as np _little_endian_flag = 1 # we always serialize LE import ipaddress @@ -1452,25 +1456,69 @@ def deserialize(cls, byts, protocol_version): raise ValueError( "Expected vector of type {0} and dimension {1} to have serialized size {2}; observed serialized size of {3} instead"\ .format(cls.subtype.typename, cls.vector_size, expected_byte_size, len(byts))) - indexes = (serialized_size * x for x in range(0, cls.vector_size)) - return [cls.subtype.deserialize(byts[idx:idx + serialized_size], protocol_version) for idx in indexes] + # Optimization: bulk deserialization for common numeric types + # For small vectors: use struct.unpack (2.8-3.6x faster for 3-4 elements) + # For large vectors with numpy: use numpy.frombuffer (1.3-1.5x faster for 128+ elements) + # Threshold at 32 elements balances simplicity with performance + use_numpy = HAVE_NUMPY and cls.vector_size >= 32 + + if cls.subtype is FloatType or (isinstance(cls.subtype, type) and issubclass(cls.subtype, FloatType)): + if use_numpy: + return np.frombuffer(byts, dtype='>f4', count=cls.vector_size).tolist() + return list(struct.unpack(f'>{cls.vector_size}f', byts)) + elif cls.subtype is DoubleType or (isinstance(cls.subtype, type) and issubclass(cls.subtype, DoubleType)): + if use_numpy: + return np.frombuffer(byts, dtype='>f8', count=cls.vector_size).tolist() + return list(struct.unpack(f'>{cls.vector_size}d', byts)) + elif cls.subtype is Int32Type or (isinstance(cls.subtype, type) and issubclass(cls.subtype, Int32Type)): + if use_numpy: + return np.frombuffer(byts, dtype='>i4', count=cls.vector_size).tolist() + return list(struct.unpack(f'>{cls.vector_size}i', byts)) + elif cls.subtype is LongType or (isinstance(cls.subtype, type) and issubclass(cls.subtype, LongType)): + if use_numpy: + return np.frombuffer(byts, dtype='>i8', count=cls.vector_size).tolist() + return list(struct.unpack(f'>{cls.vector_size}q', byts)) + # Fallback: element-by-element deserialization for other fixed-size types + result = [None] * cls.vector_size + subtype_deserialize = cls.subtype.deserialize + offset = 0 + for i in range(cls.vector_size): + result[i] = subtype_deserialize(byts[offset:offset + serialized_size], protocol_version) + offset += serialized_size + return result + + # Variable-size subtype path + result = [None] * cls.vector_size idx = 0 - rv = [] - while (len(rv) < cls.vector_size): + byts_len = len(byts) + subtype_deserialize = cls.subtype.deserialize + + for i in range(cls.vector_size): + if idx >= byts_len: + raise ValueError("Error reading additional data during vector deserialization after successfully adding {} elements"\ + .format(i)) + try: size, bytes_read = uvint_unpack(byts[idx:]) - idx += bytes_read - rv.append(cls.subtype.deserialize(byts[idx:idx + size], protocol_version)) - idx += size - except: + except (IndexError, KeyError): raise ValueError("Error reading additional data during vector deserialization after successfully adding {} elements"\ - .format(len(rv))) + .format(i)) + + idx += bytes_read - # If we have any additional data in the serialized vector treat that as an error as well - if idx < len(byts): + if idx + size > byts_len: + raise ValueError("Error reading additional data during vector deserialization after successfully adding {} elements"\ + .format(i)) + + result[i] = subtype_deserialize(byts[idx:idx + size], protocol_version) + idx += size + + # Check for additional data + if idx < byts_len: raise ValueError("Additional bytes remaining after vector deserialization completed") - return rv + + return result @classmethod def serialize(cls, v, protocol_version): diff --git a/cassandra/cython_marshal.pyx b/cassandra/cython_marshal.pyx index 0a926b6eef..ac07b6378f 100644 --- a/cassandra/cython_marshal.pyx +++ b/cassandra/cython_marshal.pyx @@ -19,6 +19,19 @@ from libc.stdint cimport (int8_t, int16_t, int32_t, int64_t, from libc.string cimport memcpy from cassandra.buffer cimport Buffer, buf_read, to_bytes +# Use ntohs/ntohl for efficient big-endian to native conversion (single bswap instruction on x86) +# Platform-specific header: arpa/inet.h on POSIX, winsock2.h on Windows +cdef extern from *: + """ + #ifdef _WIN32 + #include + #else + #include + #endif + """ + uint16_t ntohs(uint16_t netshort) nogil + uint32_t ntohl(uint32_t netlong) nogil + cdef bint is_little_endian from cassandra.util import is_little_endian @@ -36,35 +49,40 @@ ctypedef fused num_t: cdef inline num_t unpack_num(Buffer *buf, num_t *dummy=NULL): # dummy pointer because cython wants the fused type as an arg """ - Copy to aligned destination, conditionally swapping to native byte order + Copy to aligned destination, conditionally swapping to native byte order. + Uses ntohs/ntohl for 16/32-bit types (compiles to single bswap instruction). """ - cdef Py_ssize_t start, end, i + cdef Py_ssize_t i cdef char *src = buf_read(buf, sizeof(num_t)) - cdef num_t ret = 0 + cdef num_t ret cdef char *out = &ret + cdef uint32_t temp32 # For float byte-swapping + + # Copy to aligned location first + memcpy(&ret, src, sizeof(num_t)) + + if not is_little_endian: + return ret - if is_little_endian: + # Use optimized byte-swap intrinsics for 16-bit and 32-bit types + if num_t is int16_t or num_t is uint16_t: + return ntohs(ret) + elif num_t is int32_t or num_t is uint32_t: + return ntohl(ret) + elif num_t is float: + # For float, reinterpret bits as uint32, swap, then reinterpret back + temp32 = (&ret)[0] + temp32 = ntohl(temp32) + return (&temp32)[0] + else: + # 64-bit, double, or 8-bit: use byte-swap loop (8-bit loop is no-op) for i in range(sizeof(num_t)): out[sizeof(num_t) - i - 1] = src[i] - else: - memcpy(out, src, sizeof(num_t)) - - return ret + return ret cdef varint_unpack(Buffer *term): """Unpack a variable-sized integer""" return varint_unpack_py3(to_bytes(term)) -# TODO: Optimize these two functions cdef varint_unpack_py3(bytes term): - val = int(''.join(["%02x" % i for i in term]), 16) - if (term[0] & 128) != 0: - shift = len(term) * 8 # * Note below - val -= 1 << shift - return val - -# * Note * -# '1 << (len(term) * 8)' Cython tries to do native -# integer shifts, which overflows. We need this to -# emulate Python shifting, which will expand the long -# to accommodate + return int.from_bytes(term, byteorder='big', signed=True) diff --git a/cassandra/deserializers.pyx b/cassandra/deserializers.pyx index 97d249d02f..f9b2664857 100644 --- a/cassandra/deserializers.pyx +++ b/cassandra/deserializers.pyx @@ -13,10 +13,11 @@ # limitations under the License. -from libc.stdint cimport int32_t, uint16_t +from libc.stdint cimport int32_t, int64_t, int16_t, uint16_t, uint32_t +from libc.string cimport memcpy include 'cython_marshal.pyx' -from cassandra.buffer cimport Buffer, to_bytes, slice_buffer +from cassandra.buffer cimport Buffer, to_bytes, from_ptr_and_size from cassandra.cython_utils cimport datetime_from_timestamp from cython.view cimport array as cython_array @@ -29,6 +30,11 @@ from uuid import UUID from cassandra import cqltypes from cassandra import util +# Import numpy availability flag and conditionally import numpy +from cassandra.cython_deps import HAVE_NUMPY +if HAVE_NUMPY: + import numpy as np + cdef class Deserializer: """Cython-based deserializer class for a cqltype""" @@ -58,10 +64,11 @@ cdef class DesBytesTypeByteArray(Deserializer): # TODO: Use libmpdec: http://www.bytereef.org/mpdecimal/index.html cdef class DesDecimalType(Deserializer): cdef deserialize(self, Buffer *buf, int protocol_version): - cdef Buffer varint_buf - slice_buffer(buf, &varint_buf, 4, buf.size - 4) - cdef int32_t scale = unpack_num[int32_t](buf) + + # Create a view of the remaining bytes (after the 4-byte scale) + cdef Buffer varint_buf + from_ptr_and_size(buf.ptr + 4, buf.size - 4, &varint_buf) unscaled = varint_unpack(&varint_buf) return Decimal('%de%d' % (unscaled, -scale)) @@ -181,8 +188,208 @@ cdef class DesVarcharType(DesUTF8Type): pass +#-------------------------------------------------------------------------- +# Vector deserialization + +cdef inline bint _is_float_type(object subtype): + return subtype is cqltypes.FloatType or issubclass(subtype, cqltypes.FloatType) + +cdef inline bint _is_double_type(object subtype): + return subtype is cqltypes.DoubleType or issubclass(subtype, cqltypes.DoubleType) + +cdef inline bint _is_int32_type(object subtype): + return subtype is cqltypes.Int32Type or issubclass(subtype, cqltypes.Int32Type) + +cdef inline bint _is_int64_type(object subtype): + return subtype is cqltypes.LongType or issubclass(subtype, cqltypes.LongType) + +cdef inline list _deserialize_numpy_vector(Buffer *buf, int vector_size, str dtype): + """Unified numpy deserialization for large vectors""" + return np.frombuffer(buf.ptr[:buf.size], dtype=dtype, count=vector_size).tolist() + +cdef class DesVectorType(Deserializer): + """ + Optimized Cython deserializer for VectorType. + + For float and double vectors, uses direct memory access with C-level casting + for significantly better performance than Python-level deserialization. + """ + + cdef int vector_size + cdef object subtype + + def __init__(self, cqltype): + super().__init__(cqltype) + self.vector_size = cqltype.vector_size + self.subtype = cqltype.subtype + + def deserialize_bytes(self, bytes data, int protocol_version): + """Python-callable wrapper for deserialize that takes bytes.""" + cdef Buffer buf + buf.ptr = data + buf.size = len(data) + return self.deserialize(&buf, protocol_version) + + cdef deserialize(self, Buffer *buf, int protocol_version): + cdef int expected_size + cdef int elem_size + cdef bint use_numpy = HAVE_NUMPY and self.vector_size >= 32 + + # Determine element type, size, and dispatch appropriately + if _is_float_type(self.subtype): + elem_size = 4 + expected_size = self.vector_size * elem_size + if buf.size == expected_size: + if use_numpy: + return _deserialize_numpy_vector(buf, self.vector_size, '>f4') + return self._deserialize_float(buf) + raise ValueError( + f"Expected vector of type {self.subtype.typename} and dimension {self.vector_size} " + f"to have serialized size {expected_size}; observed serialized size of {buf.size} instead") + elif _is_double_type(self.subtype): + elem_size = 8 + expected_size = self.vector_size * elem_size + if buf.size == expected_size: + if use_numpy: + return _deserialize_numpy_vector(buf, self.vector_size, '>f8') + return self._deserialize_double(buf) + raise ValueError( + f"Expected vector of type {self.subtype.typename} and dimension {self.vector_size} " + f"to have serialized size {expected_size}; observed serialized size of {buf.size} instead") + elif _is_int32_type(self.subtype): + elem_size = 4 + expected_size = self.vector_size * elem_size + if buf.size == expected_size: + if use_numpy: + return _deserialize_numpy_vector(buf, self.vector_size, '>i4') + return self._deserialize_int32(buf) + raise ValueError( + f"Expected vector of type {self.subtype.typename} and dimension {self.vector_size} " + f"to have serialized size {expected_size}; observed serialized size of {buf.size} instead") + elif _is_int64_type(self.subtype): + elem_size = 8 + expected_size = self.vector_size * elem_size + if buf.size == expected_size: + if use_numpy: + return _deserialize_numpy_vector(buf, self.vector_size, '>i8') + return self._deserialize_int64(buf) + raise ValueError( + f"Expected vector of type {self.subtype.typename} and dimension {self.vector_size} " + f"to have serialized size {expected_size}; observed serialized size of {buf.size} instead") + else: + # Unsupported type, use generic deserialization + return self._deserialize_generic(buf, protocol_version) + + cdef inline list _deserialize_float(self, Buffer *buf): + """Deserialize float vector using direct C-level access with byte swapping""" + cdef Py_ssize_t i + cdef list result + cdef float temp + cdef uint32_t temp32 + + result = [None] * self.vector_size + for i in range(self.vector_size): + # Read 4 bytes and convert from big-endian + temp32 = ntohl(((buf.ptr + i * 4))[0]) + temp = (&temp32)[0] + result[i] = temp + + return result + + cdef inline list _deserialize_double(self, Buffer *buf): + """Deserialize double vector using direct C-level access with byte swapping""" + cdef Py_ssize_t i + cdef list result + cdef double temp + cdef char *src_bytes + cdef char *out_bytes + cdef int j + + result = [None] * self.vector_size + for i in range(self.vector_size): + src_bytes = buf.ptr + i * 8 + out_bytes = &temp + + # Swap bytes for big-endian to native conversion + if is_little_endian: + for j in range(8): + out_bytes[7 - j] = src_bytes[j] + else: + memcpy(&temp, src_bytes, 8) + + result[i] = temp + + return result + + cdef inline list _deserialize_int32(self, Buffer *buf): + """Deserialize int32 vector using direct C-level access with ntohl""" + cdef Py_ssize_t i + cdef list result + cdef int32_t temp + + result = [None] * self.vector_size + for i in range(self.vector_size): + temp = ntohl(((buf.ptr + i * 4))[0]) + result[i] = temp + + return result + + cdef inline list _deserialize_int64(self, Buffer *buf): + """Deserialize int64/long vector using direct C-level access with byte swapping""" + cdef Py_ssize_t i + cdef list result + cdef int64_t temp + cdef char *src_bytes + cdef char *out_bytes + cdef int j + + result = [None] * self.vector_size + for i in range(self.vector_size): + src_bytes = buf.ptr + i * 8 + out_bytes = &temp + + # Swap bytes for big-endian to native conversion + if is_little_endian: + for j in range(8): + out_bytes[7 - j] = src_bytes[j] + else: + memcpy(&temp, src_bytes, 8) + + result[i] = temp + + return result + + cdef inline list _deserialize_generic(self, Buffer *buf, int protocol_version): + """Fallback: element-by-element deserialization for non-optimized types""" + cdef Py_ssize_t i + cdef Buffer elem_buf + cdef int offset = 0 + cdef int serialized_size = self.subtype.serial_size() + cdef list result = [None] * self.vector_size + + if serialized_size is None: + raise ValueError( + f"VectorType with variable-size subtype {self.subtype.typename} " + "is not supported in Cython deserializer") + + # Validate total size before processing + cdef int expected_size = self.vector_size * serialized_size + if buf.size != expected_size: + raise ValueError( + f"Expected vector of type {self.subtype.typename} and dimension {self.vector_size} " + f"to have serialized size {expected_size}; observed serialized size of {buf.size} instead") + + for i in range(self.vector_size): + from_ptr_and_size(buf.ptr + offset, serialized_size, &elem_buf) + result[i] = self.subtype.deserialize(to_bytes(&elem_buf), protocol_version) + offset += serialized_size + + return result + + cdef class _DesParameterizedType(Deserializer): + cdef object subtypes cdef Deserializer[::1] deserializers cdef Py_ssize_t subtypes_len @@ -247,22 +454,39 @@ cdef inline int subelem( Read the next element from the buffer: first read the size (in bytes) of the element, then fill elem_buf with a newly sliced buffer of this size (and the right offset). + + Protocol: n >= 0: n bytes follow + n == -1: NULL value + n == -2: not set value + n < -2: invalid """ cdef int32_t elemlen _unpack_len(buf, offset[0], &elemlen) offset[0] += sizeof(int32_t) - slice_buffer(buf, elem_buf, offset[0], elemlen) - offset[0] += elemlen - return 0 + # Happy path: non-negative length element that fits in buffer + if elemlen >= 0: + if offset[0] + elemlen <= buf.size: + from_ptr_and_size(buf.ptr + offset[0], elemlen, elem_buf) + offset[0] += elemlen + return 0 + raise IndexError("Element length %d at offset %d exceeds buffer size %d" % (elemlen, offset[0], buf.size)) + # NULL value (-1) or not set value (-2) + elif elemlen == -1 or elemlen == -2: + from_ptr_and_size(NULL, elemlen, elem_buf) + return 0 + # Invalid value (n < -2) + else: + raise ValueError("Invalid element length %d at offset %d" % (elemlen, offset[0])) -cdef int _unpack_len(Buffer *buf, int offset, int32_t *output) except -1: - cdef Buffer itemlen_buf - slice_buffer(buf, &itemlen_buf, offset, sizeof(int32_t)) - - output[0] = unpack_num[int32_t](&itemlen_buf) +cdef inline int _unpack_len(Buffer *buf, int offset, int32_t *output) except -1: + """Read a big-endian int32 at the given offset using direct pointer access.""" + if offset + sizeof(int32_t) > buf.size: + raise IndexError("Cannot read length field: offset %d + 4 exceeds buffer size %d" % (offset, buf.size)) + cdef uint32_t *src = (buf.ptr + offset) + output[0] = ntohl(src[0]) return 0 #-------------------------------------------------------------------------- @@ -322,7 +546,6 @@ cdef class DesTupleType(_DesParameterizedType): cdef int32_t itemlen cdef tuple res = tuple_new(self.subtypes_len) cdef Buffer item_buf - cdef Buffer itemlen_buf cdef Deserializer deserializer # collections inside UDTs are always encoded with at least the @@ -333,16 +556,24 @@ cdef class DesTupleType(_DesParameterizedType): values = [] for i in range(self.subtypes_len): item = None - if p < buf.size: - slice_buffer(buf, &itemlen_buf, p, 4) - itemlen = unpack_num[int32_t](&itemlen_buf) + if p + 4 <= buf.size: + # Read itemlen directly using ntohl instead of slice_buffer + itemlen = ntohl(((buf.ptr + p))[0]) p += 4 - if itemlen >= 0: - slice_buffer(buf, &item_buf, p, itemlen) + + if itemlen >= 0 and p + itemlen <= buf.size: + from_ptr_and_size(buf.ptr + p, itemlen, &item_buf) p += itemlen deserializer = self.deserializers[i] item = from_binary(deserializer, &item_buf, protocol_version) + elif itemlen < 0: + # NULL value, item stays None + pass + else: + raise IndexError("Tuple item length %d at offset %d exceeds buffer size %d" % (itemlen, p, buf.size)) + elif p < buf.size: + raise IndexError("Cannot read tuple item length at offset %d: only %d bytes remain" % (p, buf.size - p)) tuple_set(res, i, item) @@ -384,15 +615,23 @@ cdef class DesCompositeType(_DesParameterizedType): break element_length = unpack_num[uint16_t](buf) - slice_buffer(buf, &elem_buf, 2, element_length) - deserializer = self.deserializers[i] - item = from_binary(deserializer, &elem_buf, protocol_version) - tuple_set(res, i, item) + # Validate that we have enough data for the element and EOC byte (happy path check) + if 2 + element_length + 1 <= buf.size: + from_ptr_and_size(buf.ptr + 2, element_length, &elem_buf) + + deserializer = self.deserializers[i] + item = from_binary(deserializer, &elem_buf, protocol_version) + tuple_set(res, i, item) - # skip element length, element, and the EOC (one byte) - start = 2 + element_length + 1 - slice_buffer(buf, buf, start, buf.size - start) + # skip element length, element, and the EOC (one byte) + # Advance buffer in-place with direct assignment + start = 2 + element_length + 1 + buf.ptr = buf.ptr + start + buf.size = buf.size - start + else: + raise IndexError("Composite element length %d requires %d bytes but only %d remain" % + (element_length, 2 + element_length + 1, buf.size)) return res @@ -474,6 +713,8 @@ cpdef Deserializer find_deserializer(cqltype): cls = DesReversedType elif issubclass(cqltype, cqltypes.FrozenType): cls = DesFrozenType + elif issubclass(cqltype, cqltypes.VectorType): + cls = DesVectorType else: cls = GenericDeserializer diff --git a/cassandra/ioutils.pyx b/cassandra/ioutils.pyx index b0ab4f16cb..530377d528 100644 --- a/cassandra/ioutils.pyx +++ b/cassandra/ioutils.pyx @@ -15,9 +15,21 @@ include 'cython_marshal.pyx' from cassandra.buffer cimport Buffer, from_ptr_and_size -from libc.stdint cimport int32_t +from libc.stdint cimport int32_t, uint32_t from cassandra.bytesio cimport BytesIOReader +# Use ntohl for efficient big-endian to native conversion (single bswap instruction) +# Platform-specific header: arpa/inet.h on POSIX, winsock2.h on Windows +cdef extern from *: + """ + #ifdef _WIN32 + #include + #else + #include + #endif + """ + uint32_t ntohl(uint32_t netlong) nogil + cdef inline int get_buf(BytesIOReader reader, Buffer *buf_out) except -1: """ @@ -41,7 +53,6 @@ cdef inline int get_buf(BytesIOReader reader, Buffer *buf_out) except -1: return 0 cdef inline int32_t read_int(BytesIOReader reader) except ?0xDEAD: - cdef Buffer buf - buf.ptr = reader.read(4) - buf.size = 4 - return unpack_num[int32_t](&buf) + """Read a big-endian int32 directly from the reader.""" + cdef uint32_t *src = reader.read(4) + return ntohl(src[0]) diff --git a/cassandra/marshal.py b/cassandra/marshal.py index 413e1831d4..a7238ea4b7 100644 --- a/cassandra/marshal.py +++ b/cassandra/marshal.py @@ -40,11 +40,7 @@ def _make_packer(format_string): def varint_unpack(term): - val = int(''.join("%02x" % i for i in term), 16) - if (term[0] & 128) != 0: - len_term = len(term) # pulling this out of the expression to avoid overflow in cython optimized code - val -= 1 << (len_term * 8) - return val + return int.from_bytes(term, byteorder='big', signed=True) def bit_length(n): diff --git a/cassandra/numpy_parser.pyx b/cassandra/numpy_parser.pyx index 0ad34f66e2..de2f24c310 100644 --- a/cassandra/numpy_parser.pyx +++ b/cassandra/numpy_parser.pyx @@ -112,7 +112,7 @@ def make_arrays(ParseDesc desc, array_size): (e.g. this can be fed into pandas.DataFrame) """ array_descs = np.empty((desc.rowsize,), arrDescDtype) - arrays = [] + arrays = [None] * desc.rowsize for i, coltype in enumerate(desc.coltypes): arr = make_array(coltype, array_size) @@ -123,7 +123,7 @@ def make_arrays(ParseDesc desc, array_size): array_descs[i]['mask_ptr'] = arr.mask.ctypes.data except AttributeError: array_descs[i]['mask_ptr'] = 0 - arrays.append(arr) + arrays[i] = arr return array_descs, arrays @@ -131,7 +131,23 @@ def make_arrays(ParseDesc desc, array_size): def make_array(coltype, array_size): """ Allocate a new NumPy array of the given column type and size. + For VectorType, creates a 2D array (array_size x vector_dimension). """ + # Check if this is a VectorType + if hasattr(coltype, 'vector_size') and hasattr(coltype, 'subtype'): + # VectorType - create 2D array (rows x vector_dimension) + vector_size = coltype.vector_size + subtype = coltype.subtype + try: + dtype = _cqltype_to_numpy[subtype] + a = np.ma.empty((array_size, vector_size), dtype=dtype) + a.mask = np.zeros((array_size, vector_size), dtype=bool) + except KeyError: + # Unsupported vector subtype - fall back to object array + a = np.empty((array_size,), dtype=obj_dtype) + return a + + # Scalar types try: a = np.ma.empty((array_size,), dtype=_cqltype_to_numpy[coltype]) a.mask = np.zeros((array_size,), dtype=bool) @@ -174,6 +190,7 @@ cdef inline int unpack_row( def make_native_byteorder(arr): """ Make sure all values have a native endian in the NumPy arrays. + Handles both 1D (scalar types) and 2D (VectorType) arrays. """ if is_little_endian and not arr.dtype.kind == 'O': # We have arrays in big-endian order. First swap the bytes diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py index b4eab35875..f6cf96ec0a 100644 --- a/tests/integration/__init__.py +++ b/tests/integration/__init__.py @@ -279,6 +279,17 @@ def xfail_scylla_version(filter: Callable[[Version], bool], reason: str, *args, greaterthanorequalcass3_11 = unittest.skipUnless(CASSANDRA_VERSION >= Version('3.11'), 'Cassandra version 3.11 or greater required') greaterthanorequalcass40 = unittest.skipUnless(CASSANDRA_VERSION >= Version('4.0'), 'Cassandra version 4.0 or greater required') greaterthanorequalcass50 = unittest.skipUnless(CASSANDRA_VERSION >= Version('5.0-beta'), 'Cassandra version 5.0 or greater required') +def _is_cass50_or_scylla_2025_4_plus(): + if CASSANDRA_VERSION >= Version('5.0-beta'): + return True + if SCYLLA_VERSION is None: + return False + return Version(get_scylla_version(SCYLLA_VERSION)) >= Version('2025.4') + +greaterthanorequalcass50_or_scylla_2025_4 = unittest.skipUnless( + _is_cass50_or_scylla_2025_4_plus(), + 'Cassandra >= 5.0 or Scylla >= 2025.4 required' +) lessthanorequalcass40 = unittest.skipUnless(CASSANDRA_VERSION <= Version('4.0'), 'Cassandra version less or equal to 4.0 required') lessthancass40 = unittest.skipUnless(CASSANDRA_VERSION < Version('4.0'), 'Cassandra version less than 4.0 required') lessthancass30 = unittest.skipUnless(CASSANDRA_VERSION < Version('3.0'), 'Cassandra version less then 3.0 required') diff --git a/tests/integration/standard/test_types.py b/tests/integration/standard/test_types.py index ad69fbada9..4a051fd12a 100644 --- a/tests/integration/standard/test_types.py +++ b/tests/integration/standard/test_types.py @@ -40,7 +40,8 @@ from tests.integration import use_singledc, execute_until_pass, notprotocolv1, \ BasicSharedKeyspaceUnitTestCase, greaterthancass21, lessthancass30, \ - greaterthanorequalcass3_10, TestCluster, requires_composite_type, greaterthanorequalcass50 + greaterthanorequalcass3_10, TestCluster, requires_composite_type, \ + greaterthanorequalcass50_or_scylla_2025_4 from tests.integration.datatype_utils import update_datatypes, PRIMITIVE_DATATYPES, COLLECTION_TYPES, PRIMITIVE_DATATYPES_KEYS, \ get_sample, get_all_samples, get_collection_sample import pytest @@ -984,7 +985,7 @@ def run_inserts_at_version(self, proto_ver): finally: session.cluster.shutdown() -@greaterthanorequalcass50 +@greaterthanorequalcass50_or_scylla_2025_4 class TypeTestsVector(BasicSharedKeyspaceUnitTestCase): def _get_first_j(self, rs): diff --git a/tests/unit/test_numpy_parser.py b/tests/unit/test_numpy_parser.py new file mode 100644 index 0000000000..0c9cb2e0ff --- /dev/null +++ b/tests/unit/test_numpy_parser.py @@ -0,0 +1,305 @@ +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import struct +import unittest +from unittest.mock import Mock + +try: + import numpy as np + from cassandra.numpy_parser import NumpyParser + from cassandra.bytesio import BytesIOReader + from cassandra.parsing import ParseDesc + from cassandra.deserializers import obj_array + HAVE_NUMPY = True +except ImportError: + HAVE_NUMPY = False + +from cassandra import cqltypes + + +@unittest.skipUnless(HAVE_NUMPY, "NumPy not available") +class TestNumpyParserVectorType(unittest.TestCase): + """Tests for VectorType support in NumpyParser""" + + def _create_vector_type(self, subtype, vector_size): + """Helper to create a VectorType class""" + return type( + f'VectorType({vector_size})', + (cqltypes.VectorType,), + {'vector_size': vector_size, 'subtype': subtype} + ) + + def _serialize_vectors(self, vectors, format_char): + """Serialize a list of vectors using struct.pack""" + buffer = bytearray() + # Write row count + buffer.extend(struct.pack('>i', len(vectors))) + # Write each vector + for vector in vectors: + # Write byte size of vector (doesn't include size prefix in CQL) + byte_size = len(vector) * struct.calcsize(f'>{format_char}') + buffer.extend(struct.pack('>i', byte_size)) + # Write vector elements + buffer.extend(struct.pack(f'>{len(vector)}{format_char}', *vector)) + return bytes(buffer) + + def test_vector_float_2d_array(self): + """Test that VectorType creates and populates a 2D NumPy array""" + vector_size = 4 + vector_type = self._create_vector_type(cqltypes.FloatType, vector_size) + + # Create test data: 3 rows of 4-dimensional float vectors + vectors = [ + [1.0, 2.0, 3.0, 4.0], + [5.0, 6.0, 7.0, 8.0], + [9.0, 10.0, 11.0, 12.0], + ] + + # Serialize the data + serialized = self._serialize_vectors(vectors, 'f') + + # Parse with NumpyParser + parser = NumpyParser() + reader = BytesIOReader(serialized) + + desc = ParseDesc( + colnames=['vec'], + coltypes=[vector_type], + column_encryption_policy=None, + coldescs=None, + deserializers=obj_array([None]), + protocol_version=5 + ) + + result = parser.parse_rows(reader, desc) + + # Verify result structure + self.assertIn('vec', result) + arr = result['vec'] + + # Verify it's a 2D array with correct shape + self.assertEqual(arr.ndim, 2) + self.assertEqual(arr.shape, (3, 4)) + + # Verify the data + expected = np.array(vectors, dtype=' creates and populates a 2D NumPy array""" + vector_size = 3 + vector_type = self._create_vector_type(cqltypes.DoubleType, vector_size) + + # Create test data: 2 rows of 3-dimensional double vectors + vectors = [ + [1.5, 2.5, 3.5], + [4.5, 5.5, 6.5], + ] + + serialized = self._serialize_vectors(vectors, 'd') + + parser = NumpyParser() + reader = BytesIOReader(serialized) + + desc = ParseDesc( + colnames=['embedding'], + coltypes=[vector_type], + column_encryption_policy=None, + coldescs=None, + deserializers=obj_array([None]), + protocol_version=5 + ) + + result = parser.parse_rows(reader, desc) + + arr = result['embedding'] + self.assertEqual(arr.shape, (2, 3)) + + expected = np.array(vectors, dtype=' creates and populates a 2D NumPy array""" + vector_size = 128 + vector_type = self._create_vector_type(cqltypes.Int32Type, vector_size) + + # Create test data: 2 rows of 128-dimensional int vectors + vectors = [ + list(range(0, 128)), + list(range(128, 256)), + ] + + serialized = self._serialize_vectors(vectors, 'i') + + parser = NumpyParser() + reader = BytesIOReader(serialized) + + desc = ParseDesc( + colnames=['features'], + coltypes=[vector_type], + column_encryption_policy=None, + coldescs=None, + deserializers=obj_array([None]), + protocol_version=5 + ) + + result = parser.parse_rows(reader, desc) + + arr = result['features'] + self.assertEqual(arr.shape, (2, 128)) + + expected = np.array(vectors, dtype=' creates and populates a 2D NumPy array""" + vector_size = 5 + vector_type = self._create_vector_type(cqltypes.LongType, vector_size) + + vectors = [ + [100, 200, 300, 400, 500], + [600, 700, 800, 900, 1000], + ] + + serialized = self._serialize_vectors(vectors, 'q') + + parser = NumpyParser() + reader = BytesIOReader(serialized) + + desc = ParseDesc( + colnames=['ids'], + coltypes=[vector_type], + column_encryption_policy=None, + coldescs=None, + deserializers=obj_array([None]), + protocol_version=5 + ) + + result = parser.parse_rows(reader, desc) + + arr = result['ids'] + self.assertEqual(arr.shape, (2, 5)) + + expected = np.array(vectors, dtype=' creates and populates a 2D NumPy array""" + vector_size = 8 + vector_type = self._create_vector_type(cqltypes.ShortType, vector_size) + + vectors = [ + [1, 2, 3, 4, 5, 6, 7, 8], + [9, 10, 11, 12, 13, 14, 15, 16], + ] + + serialized = self._serialize_vectors(vectors, 'h') + + parser = NumpyParser() + reader = BytesIOReader(serialized) + + desc = ParseDesc( + colnames=['small_vec'], + coltypes=[vector_type], + column_encryption_policy=None, + coldescs=None, + deserializers=obj_array([None]), + protocol_version=5 + ) + + result = parser.parse_rows(reader, desc) + + arr = result['small_vec'] + self.assertEqual(arr.shape, (2, 8)) + + expected = np.array(vectors, dtype='i', 2)) # row count + + # Row 1: id=1, vec=[1.0, 2.0, 3.0] + buffer.extend(struct.pack('>i', 4)) # int32 size + buffer.extend(struct.pack('>i', 1)) # id value + buffer.extend(struct.pack('>i', 12)) # vector size (3 floats) + buffer.extend(struct.pack('>3f', 1.0, 2.0, 3.0)) + + # Row 2: id=2, vec=[4.0, 5.0, 6.0] + buffer.extend(struct.pack('>i', 4)) + buffer.extend(struct.pack('>i', 2)) + buffer.extend(struct.pack('>i', 12)) + buffer.extend(struct.pack('>3f', 4.0, 5.0, 6.0)) + + parser = NumpyParser() + reader = BytesIOReader(bytes(buffer)) + + desc = ParseDesc( + colnames=['id', 'vec'], + coltypes=[cqltypes.Int32Type, vector_type], + column_encryption_policy=None, + coldescs=None, + deserializers=obj_array([None, None]), + protocol_version=5 + ) + + result = parser.parse_rows(reader, desc) + + # Verify id column (1D array) + self.assertEqual(result['id'].shape, (2,)) + np.testing.assert_array_equal(result['id'], np.array([1, 2], dtype='4f', 1.0, 2.0, 3.0, 4.0) + result_float = vt_float.deserialize(data_float, 5) + self.assertEqual(result_float, [1.0, 2.0, 3.0, 4.0]) + + # Test double vector + from cassandra.cqltypes import DoubleType + vt_double = VectorType.apply_parameters(['DoubleType', 3], {}) + des_double = find_deserializer(vt_double) + self.assertEqual(des_double.__class__.__name__, 'DesVectorType') + + data_double = struct.pack('>3d', 1.5, 2.5, 3.5) + result_double = vt_double.deserialize(data_double, 5) + self.assertEqual(result_double, [1.5, 2.5, 3.5]) + + # Test int32 vector + vt_int32 = VectorType.apply_parameters(['Int32Type', 4], {}) + des_int32 = find_deserializer(vt_int32) + self.assertEqual(des_int32.__class__.__name__, 'DesVectorType') + + data_int32 = struct.pack('>4i', 1, 2, 3, 4) + result_int32 = vt_int32.deserialize(data_int32, 5) + self.assertEqual(result_int32, [1, 2, 3, 4]) + + # Test int64/long vector + vt_int64 = VectorType.apply_parameters(['LongType', 2], {}) + des_int64 = find_deserializer(vt_int64) + self.assertEqual(des_int64.__class__.__name__, 'DesVectorType') + + data_int64 = struct.pack('>2q', 100, 200) + result_int64 = vt_int64.deserialize(data_int64, 5) + self.assertEqual(result_int64, [100, 200]) + + # Test int16/short vector + from cassandra.cqltypes import ShortType + vt_int16 = VectorType.apply_parameters(['ShortType', 3], {}) + des_int16 = find_deserializer(vt_int16) + self.assertEqual(des_int16.__class__.__name__, 'DesVectorType') + + data_int16 = struct.pack('>3h', 10, 20, 30) + result_int16 = vt_int16.deserialize(data_int16, 5) + self.assertEqual(result_int16, [10, 20, 30]) + + # Test error handling: wrong buffer size + with self.assertRaises(ValueError) as cm: + vt_float.deserialize(struct.pack('>3f', 1.0, 2.0, 3.0), 5) # 3 floats instead of 4 + self.assertIn('Expected vector', str(cm.exception)) + self.assertIn('serialized size', str(cm.exception)) + ZERO = datetime.timedelta(0)