diff --git a/benchmarks/bench_lz4.py b/benchmarks/bench_lz4.py new file mode 100644 index 0000000000..d16df7d2ba --- /dev/null +++ b/benchmarks/bench_lz4.py @@ -0,0 +1,153 @@ +# Copyright 2026 ScyllaDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Microbenchmark comparing the Python lz4 wrappers (connection.py) against +the Cython direct-C-linkage wrappers (cython_lz4.pyx) for the CQL binary +protocol v4 LZ4 compression path. + +Usage (pin to one core for stable numbers): + + taskset -c 0 python benchmarks/bench_lz4.py + +Payload sizes tested: 1 KB, 8 KB, 64 KB. +""" + +import os +import struct +import timeit + +# --------------------------------------------------------------------------- +# Python wrappers (duplicated from connection.py to avoid import side-effects) +# --------------------------------------------------------------------------- +try: + import lz4.block as lz4_block + HAS_PYTHON_LZ4 = True +except ImportError: + HAS_PYTHON_LZ4 = False + print("WARNING: lz4 Python package not available, skipping Python benchmarks") + +int32_pack = struct.Struct('>i').pack + + +def py_lz4_compress(byts): + return int32_pack(len(byts)) + lz4_block.compress(byts)[4:] + + +def py_lz4_decompress(byts): + return lz4_block.decompress(byts[3::-1] + byts[4:]) + + +# --------------------------------------------------------------------------- +# Cython wrappers +# --------------------------------------------------------------------------- +try: + from cassandra.cython_lz4 import lz4_compress as cy_lz4_compress + from cassandra.cython_lz4 import lz4_decompress as cy_lz4_decompress + HAS_CYTHON = True +except ImportError: + HAS_CYTHON = False + print("WARNING: cassandra.cython_lz4 not available, skipping Cython benchmarks") + + +# --------------------------------------------------------------------------- +# Benchmark helpers +# --------------------------------------------------------------------------- +SIZES = { + "1KB": 1024, + "8KB": 8 * 1024, + "64KB": 64 * 1024, +} + +# Number of inner-loop iterations per timeit.repeat() call. +INNER = 10_000 +# Number of repetitions (we report the minimum). +REPEAT = 5 + + +def make_payload(size): + """Generate a pseudo-realistic compressible payload.""" + # Mix of repetitive and random-ish bytes to simulate CQL result rows. + chunk = (b"row_value_" + os.urandom(6)) * (size // 16 + 1) + return chunk[:size] + + +def bench(label, func, arg, inner=INNER, repeat=REPEAT): + """Return the best per-call time in nanoseconds.""" + times = timeit.repeat(lambda: func(arg), number=inner, repeat=repeat) + best = min(times) / inner + ns = best * 1e9 + return ns + + +def main(): + if not HAS_PYTHON_LZ4 and not HAS_CYTHON: + print("ERROR: Neither lz4 Python package nor cassandra.cython_lz4 available.") + return + + headers = f"{'Payload':<8} {'Operation':<12} " + if HAS_PYTHON_LZ4: + headers += f"{'Python (ns)':>12} " + if HAS_CYTHON: + headers += f"{'Cython (ns)':>12} " + if HAS_PYTHON_LZ4 and HAS_CYTHON: + headers += f"{'Speedup':>8}" + print(headers) + print("-" * len(headers)) + + for size_label, size in SIZES.items(): + payload = make_payload(size) + + # -- compress -- + py_compressed = None + cy_compressed = None + + row = f"{size_label:<8} {'compress':<12} " + if HAS_PYTHON_LZ4: + py_ns = bench("py_compress", py_lz4_compress, payload) + py_compressed = py_lz4_compress(payload) + row += f"{py_ns:>12.1f} " + if HAS_CYTHON: + cy_ns = bench("cy_compress", cy_lz4_compress, payload) + cy_compressed = cy_lz4_compress(payload) + row += f"{cy_ns:>12.1f} " + if HAS_PYTHON_LZ4: + speedup = py_ns / cy_ns if cy_ns > 0 else float('inf') + row += f"{speedup:>7.2f}x" + print(row) + + # Verify cross-compatibility: Cython can decompress Python's output + if HAS_PYTHON_LZ4 and HAS_CYTHON: + assert cy_lz4_decompress(py_compressed) == payload, "cross-compat failed (py->cy)" + assert py_lz4_decompress(cy_compressed) == payload, "cross-compat failed (cy->py)" + + # -- decompress -- + row = f"{size_label:<8} {'decompress':<12} " + if HAS_PYTHON_LZ4: + py_ns = bench("py_decompress", py_lz4_decompress, py_compressed) + row += f"{py_ns:>12.1f} " + if HAS_CYTHON: + # Use Cython-compressed data for the Cython decompress benchmark + cy_ns = bench("cy_decompress", cy_lz4_decompress, cy_compressed) + row += f"{cy_ns:>12.1f} " + if HAS_PYTHON_LZ4: + speedup = py_ns / cy_ns if cy_ns > 0 else float('inf') + row += f"{speedup:>7.2f}x" + print(row) + + print() + + +if __name__ == "__main__": + main() diff --git a/cassandra/connection.py b/cassandra/connection.py index c045b36cb3..999c2dd29c 100644 --- a/cassandra/connection.py +++ b/cassandra/connection.py @@ -66,8 +66,7 @@ try: import lz4 except ImportError: - log.debug("lz4 package could not be imported. LZ4 Compression will not be available") - pass + lz4 = None else: # The compress and decompress functions we need were moved from the lz4 to # the lz4.block namespace, so we try both here. @@ -102,6 +101,19 @@ def lz4_decompress(byts): locally_supported_compressions['lz4'] = (lz4_compress, lz4_decompress) segment_codec_lz4 = SegmentCodec(lz4_compress, lz4_decompress) +# Prefer the Cython wrappers that call liblz4 directly (no Python object +# allocation overhead for the byte-order conversion). This also enables +# LZ4 support when the Cython extension is available but the Python lz4 +# package is not installed. +try: + from cassandra.cython_lz4 import lz4_compress, lz4_decompress + locally_supported_compressions['lz4'] = (lz4_compress, lz4_decompress) + segment_codec_lz4 = SegmentCodec(lz4_compress, lz4_decompress) +except ImportError: + if lz4 is None: + log.debug("Neither the lz4 package nor the cython_lz4 extension could " + "be imported. LZ4 Compression will not be available") + try: import snappy except ImportError: diff --git a/cassandra/cython_lz4.pyx b/cassandra/cython_lz4.pyx new file mode 100644 index 0000000000..eeefad7305 --- /dev/null +++ b/cassandra/cython_lz4.pyx @@ -0,0 +1,212 @@ +# Copyright 2026 ScyllaDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Cython-optimized LZ4 compression/decompression wrappers that call the LZ4 C +library directly, bypassing the Python lz4 module's overhead. + +These functions produce output that is wire-compatible with the CQL binary +protocol v4 LZ4 compression format: + + [4 bytes big-endian uncompressed length] [LZ4 compressed data] + +The Cassandra/ScyllaDB protocol requires big-endian byte order for the +uncompressed length prefix, while the Python lz4 library uses little-endian. +The pure-Python wrappers in connection.py work around this mismatch by +byte-swapping and slicing Python bytes objects, which allocates intermediate +objects on every call. By calling LZ4_compress_default() and +LZ4_decompress_safe() through Cython's C interface we avoid all intermediate +Python object allocations and perform the byte-order conversion with simple +C pointer operations. +""" + +from cpython.bytes cimport (PyBytes_AS_STRING, PyBytes_GET_SIZE, + PyBytes_FromStringAndSize) +from libc.stdint cimport uint32_t, INT32_MAX +from libc.stdlib cimport malloc, free +from libc.string cimport memcpy + +cdef extern from "alloca.h": + void *alloca(size_t size) noexcept nogil + +# Use htonl/ntohl for big-endian ↔ native conversion (single bswap +# instruction on x86). Same cross-platform pattern used in +# cython_marshal.pyx (see PR #732). +cdef extern from *: + """ + #ifdef _WIN32 + #include + #else + #include + #endif + """ + uint32_t htonl(uint32_t hostlong) nogil + uint32_t ntohl(uint32_t netlong) nogil + +# CQL native protocol v4 frames have a 32-bit body length, so the +# theoretical maximum is ~2 GiB. We use 256 MiB as a practical upper +# bound (matching the server's default frame size limit) to avoid +# accidentally allocating multi-GiB buffers on corrupt headers. +DEF MAX_DECOMPRESSED_LENGTH = 268435456 # 256 MiB + +# LZ4_MAX_INPUT_SIZE from lz4.h — the LZ4 C API uses C int (32-bit +# signed) for sizes, so we must reject Python bytes objects that +# exceed this before casting Py_ssize_t down to int. +DEF LZ4_MAX_INPUT_SIZE = 0x7E000000 # 2 113 929 216 bytes + +# Maximum LZ4_compressBound value for which we use alloca (stack +# allocation) instead of malloc. 128 KiB is well within the default +# 8 MiB thread stack size and covers CQL frames up to ~127 KiB +# uncompressed — the vast majority of real traffic. Larger frames +# fall back to heap allocation. +DEF STACK_ALLOC_THRESHOLD = 131072 # 128 KiB + + +cdef extern from "lz4.h": + int LZ4_compress_default(const char *src, char *dst, + int srcSize, int dstCapacity) nogil + int LZ4_decompress_safe(const char *src, char *dst, + int compressedSize, int dstCapacity) nogil + int LZ4_compressBound(int inputSize) nogil + + +cdef inline void _write_be32(char *dst, uint32_t value) noexcept nogil: + """Write a 32-bit unsigned integer in big-endian byte order.""" + cdef uint32_t tmp = htonl(value) + memcpy(dst, &tmp, 4) + + +cdef inline uint32_t _read_be32(const char *src) noexcept nogil: + """Read a 32-bit unsigned integer in big-endian byte order.""" + cdef uint32_t tmp + memcpy(&tmp, src, 4) + return ntohl(tmp) + + +def lz4_compress(bytes data not None): + """Compress *data* using LZ4 for the CQL binary protocol. + + Returns a bytes object containing a 4-byte big-endian uncompressed-length + header followed by the raw LZ4-compressed payload. + + Raises ``RuntimeError`` if LZ4 compression fails (returns 0). This should + only happen if *data* exceeds ``LZ4_MAX_INPUT_SIZE`` (~1.9 GiB). + """ + cdef Py_ssize_t src_len = PyBytes_GET_SIZE(data) + + if src_len > LZ4_MAX_INPUT_SIZE: + raise OverflowError( + "Input size %d exceeds LZ4_MAX_INPUT_SIZE (%d)" % + (src_len, LZ4_MAX_INPUT_SIZE)) + + cdef const char *src = PyBytes_AS_STRING(data) + cdef int src_size = src_len + + cdef int bound = LZ4_compressBound(src_size) + if bound <= 0: + raise RuntimeError( + "LZ4_compressBound() returned non-positive value for input " + "size %d; input may exceed LZ4_MAX_INPUT_SIZE" % src_size) + + # Compress into a temporary buffer to learn the exact output size, + # then copy into an exact-size Python bytes object. For typical CQL + # frames (bound <= 128 KiB) we use alloca to avoid heap malloc/free + # overhead entirely — stack allocation is just a pointer decrement. + # Rare oversized frames fall back to heap allocation. + cdef char *tmp + cdef bint heap_allocated = bound > STACK_ALLOC_THRESHOLD + if heap_allocated: + tmp = malloc(bound) + if tmp == NULL: + raise MemoryError( + "Failed to allocate %d-byte LZ4 compression buffer" % bound) + else: + tmp = alloca(bound) + + cdef int compressed_size + with nogil: + compressed_size = LZ4_compress_default(src, tmp, src_size, bound) + if compressed_size <= 0: + if heap_allocated: + free(tmp) + raise RuntimeError( + "LZ4_compress_default() failed for input size %d" % src_size) + + # Build the final bytes: [4-byte BE header][compressed data]. + cdef Py_ssize_t final_size = 4 + compressed_size + cdef bytes result = PyBytes_FromStringAndSize(NULL, final_size) + cdef char *out_ptr = PyBytes_AS_STRING(result) + _write_be32(out_ptr, src_size) + memcpy(out_ptr + 4, tmp, compressed_size) + if heap_allocated: + free(tmp) + + return result + + +def lz4_decompress(bytes data not None): + """Decompress a CQL-protocol LZ4 frame. + + Expects *data* to start with a 4-byte big-endian uncompressed-length header + followed by raw LZ4-compressed payload (the format produced by + :func:`lz4_compress` and by the Cassandra/ScyllaDB server). + + Raises ``ValueError`` if *data* is too short or the declared size is + non-positive. Raises ``RuntimeError`` if decompression fails (malformed + payload). + """ + cdef const char *src = PyBytes_AS_STRING(data) + cdef Py_ssize_t src_len = PyBytes_GET_SIZE(data) + + if src_len < 4: + raise ValueError( + "LZ4-compressed frame too short: need at least 4 bytes for the " + "length header, got %d" % src_len) + + cdef uint32_t uncompressed_size = _read_be32(src) + + if uncompressed_size == 0: + return b"" + + if uncompressed_size > MAX_DECOMPRESSED_LENGTH: + raise ValueError( + "Declared uncompressed size %d exceeds safety limit of %d bytes; " + "frame header may be corrupt" % (uncompressed_size, + MAX_DECOMPRESSED_LENGTH)) + + cdef Py_ssize_t compressed_len = src_len - 4 + if compressed_len > INT32_MAX: + raise ValueError( + "Compressed payload size %d exceeds maximum supported size" % + compressed_len) + cdef int compressed_size = compressed_len + cdef bytes out = PyBytes_FromStringAndSize(NULL, uncompressed_size) + cdef char *out_ptr = PyBytes_AS_STRING(out) + + cdef int result + with nogil: + result = LZ4_decompress_safe(src + 4, out_ptr, + compressed_size, + uncompressed_size) + if result < 0: + raise RuntimeError( + "LZ4_decompress_safe() failed with error code %d; " + "compressed payload may be malformed" % result) + + if result != uncompressed_size: + raise RuntimeError( + "LZ4_decompress_safe() produced %d bytes but header declared %d" % + (result, uncompressed_size)) + + return out diff --git a/setup.py b/setup.py index 52e04a63e5..17ea7a9da4 100644 --- a/setup.py +++ b/setup.py @@ -340,9 +340,22 @@ def _setup_extensions(self): self.extensions.extend(cythonize( NoPatchExtension("*", ["cassandra/*.pyx"], extra_compile_args=compile_args), + exclude=["cassandra/cython_lz4.pyx"], nthreads=build_concurrency, compiler_directives={'language_level': 3}, )) + + # cython_lz4 needs to link against liblz4, so it gets its + # own Extension entry rather than riding the .pyx glob above. + self.extensions.extend(cythonize( + Extension('cassandra.cython_lz4', + ['cassandra/cython_lz4.pyx'], + libraries=['lz4'], + extra_compile_args=compile_args), + nthreads=build_concurrency, + compiler_directives={'language_level': 3}, + exclude_failures=not CASS_DRIVER_BUILD_EXTENSIONS_ARE_MUST, + )) except Exception: sys.stderr.write("Failed to cythonize one or more modules. These will not be compiled as extensions (optional).\n") if CASS_DRIVER_BUILD_EXTENSIONS_ARE_MUST: diff --git a/tests/unit/cython/test_cython_lz4.py b/tests/unit/cython/test_cython_lz4.py new file mode 100644 index 0000000000..d5d139d09a --- /dev/null +++ b/tests/unit/cython/test_cython_lz4.py @@ -0,0 +1,193 @@ +# Copyright 2026 ScyllaDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Unit tests for the Cython LZ4 direct-C-linkage wrappers. + +Tests verify: + - Round-trip correctness at various payload sizes + - Wire-format compatibility with the Python lz4 wrappers in connection.py + - Edge cases (empty input, minimal input, header-only frames) + - Error handling (truncated frames, corrupt payloads) +""" + +import os +import struct +import unittest + +try: + from cassandra.cython_lz4 import lz4_compress, lz4_decompress + HAS_CYTHON_LZ4 = True +except ImportError: + HAS_CYTHON_LZ4 = False + +try: + import lz4.block as lz4_block + HAS_PYTHON_LZ4 = True +except ImportError: + HAS_PYTHON_LZ4 = False + +int32_pack = struct.Struct('>i').pack + + +def _py_lz4_compress(byts): + """Python LZ4 compress wrapper (same logic as connection.py).""" + return int32_pack(len(byts)) + lz4_block.compress(byts)[4:] + + +def _py_lz4_decompress(byts): + """Python LZ4 decompress wrapper (same logic as connection.py).""" + return lz4_block.decompress(byts[3::-1] + byts[4:]) + + +@unittest.skipUnless(HAS_CYTHON_LZ4, "cassandra.cython_lz4 extension not available") +class CythonLZ4Test(unittest.TestCase): + """Tests for cassandra.cython_lz4.lz4_compress / lz4_decompress.""" + + def test_round_trip_small(self): + """Round-trip a small payload.""" + data = b"Hello, CQL!" * 10 + self.assertEqual(lz4_decompress(lz4_compress(data)), data) + + def test_round_trip_1kb(self): + data = os.urandom(512) + b"\x00" * 512 # 1 KB, partially compressible + self.assertEqual(lz4_decompress(lz4_compress(data)), data) + + def test_round_trip_8kb(self): + data = (b"row_data_" + os.urandom(7)) * 512 # ~8 KB + data = data[:8192] + self.assertEqual(lz4_decompress(lz4_compress(data)), data) + + def test_round_trip_64kb(self): + data = os.urandom(65536) + self.assertEqual(lz4_decompress(lz4_compress(data)), data) + + def test_round_trip_empty(self): + """Empty input should round-trip to empty bytes.""" + compressed = lz4_compress(b"") + self.assertEqual(lz4_decompress(compressed), b"") + + def test_round_trip_single_byte(self): + data = b"\x42" + self.assertEqual(lz4_decompress(lz4_compress(data)), data) + + def test_compress_header_format(self): + """Verify the 4-byte big-endian uncompressed length header.""" + data = b"x" * 300 + compressed = lz4_compress(data) + # First 4 bytes should be big-endian length of original data + header = struct.unpack('>I', compressed[:4])[0] + self.assertEqual(header, 300) + + def test_decompress_too_short(self): + """Frames shorter than 4 bytes should raise ValueError.""" + with self.assertRaises(ValueError): + lz4_decompress(b"") + with self.assertRaises(ValueError): + lz4_decompress(b"\x00") + with self.assertRaises(ValueError): + lz4_decompress(b"\x00\x00\x00") + + def test_decompress_zero_length_header(self): + """A 4-byte zero header (plus any trailing bytes) means empty output.""" + self.assertEqual(lz4_decompress(b"\x00\x00\x00\x00"), b"") + self.assertEqual(lz4_decompress(b"\x00\x00\x00\x00\xff"), b"") + + def test_decompress_corrupt_payload(self): + """Corrupted compressed data should raise RuntimeError.""" + # Valid header claiming 1000 bytes, but garbage payload + bad_frame = struct.pack('>I', 1000) + b"\xff" * 20 + with self.assertRaises(RuntimeError): + lz4_decompress(bad_frame) + + def test_decompress_oversized_header(self): + """Header claiming > 256 MiB should raise ValueError.""" + # 0x10000001 = 256 MiB + 1 + huge_header = struct.pack('>I', 0x10000001) + b"\x00" * 10 + with self.assertRaises(ValueError): + lz4_decompress(huge_header) + + def test_round_trip_all_zeros(self): + """All-zero payloads compress extremely well; verify correctness.""" + data = b"\x00" * 10000 + self.assertEqual(lz4_decompress(lz4_compress(data)), data) + + def test_round_trip_all_ones(self): + data = b"\xff" * 10000 + self.assertEqual(lz4_decompress(lz4_compress(data)), data) + + def test_compress_rejects_none(self): + """None input should raise TypeError (enforced by Cython bytes type).""" + with self.assertRaises(TypeError): + lz4_compress(None) + + def test_decompress_rejects_none(self): + with self.assertRaises(TypeError): + lz4_decompress(None) + + def test_compress_rejects_non_bytes(self): + """bytearray and other non-bytes types should raise TypeError.""" + with self.assertRaises(TypeError): + lz4_compress(bytearray(b"hello")) + with self.assertRaises(TypeError): + lz4_compress(memoryview(b"hello")) + with self.assertRaises(TypeError): + lz4_compress("hello") + + def test_decompress_rejects_non_bytes(self): + with self.assertRaises(TypeError): + lz4_decompress(bytearray(b"\x00\x00\x00\x05hello")) + with self.assertRaises(TypeError): + lz4_decompress("hello") + + def test_decompress_header_only_nonzero(self): + """A 4-byte header claiming non-zero size with no payload should fail.""" + header_only = struct.pack('>I', 10) # claims 10 bytes, but no data + with self.assertRaises(RuntimeError): + lz4_decompress(header_only) + + +@unittest.skipUnless(HAS_CYTHON_LZ4 and HAS_PYTHON_LZ4, + "Both cassandra.cython_lz4 and lz4 package required") +class CythonLZ4CrossCompatTest(unittest.TestCase): + """Verify wire-format compatibility between Cython and Python wrappers.""" + + def _check_cross_compat(self, data): + """Assert both directions of cross-compatibility.""" + py_compressed = _py_lz4_compress(data) + cy_compressed = lz4_compress(data) + + # Cython decompresses Python's output + self.assertEqual(lz4_decompress(py_compressed), data) + # Python decompresses Cython's output + self.assertEqual(_py_lz4_decompress(cy_compressed), data) + + def test_cross_compat_small(self): + self._check_cross_compat(b"Hello, world!" * 50) + + def test_cross_compat_1kb(self): + self._check_cross_compat(os.urandom(1024)) + + def test_cross_compat_8kb(self): + self._check_cross_compat(os.urandom(8192)) + + def test_cross_compat_64kb(self): + self._check_cross_compat(os.urandom(65536)) + + def test_cross_compat_empty(self): + self._check_cross_compat(b"") + + +if __name__ == "__main__": + unittest.main()