Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 153 additions & 0 deletions benchmarks/bench_lz4.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
# Copyright 2026 ScyllaDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Microbenchmark comparing the Python lz4 wrappers (connection.py) against
the Cython direct-C-linkage wrappers (cython_lz4.pyx) for the CQL binary
protocol v4 LZ4 compression path.

Usage (pin to one core for stable numbers):

taskset -c 0 python benchmarks/bench_lz4.py

Payload sizes tested: 1 KB, 8 KB, 64 KB.
"""

import os
import struct
import timeit

# ---------------------------------------------------------------------------
# Python wrappers (duplicated from connection.py to avoid import side-effects)
# ---------------------------------------------------------------------------
try:
import lz4.block as lz4_block
HAS_PYTHON_LZ4 = True
except ImportError:
HAS_PYTHON_LZ4 = False
print("WARNING: lz4 Python package not available, skipping Python benchmarks")

int32_pack = struct.Struct('>i').pack


def py_lz4_compress(byts):
return int32_pack(len(byts)) + lz4_block.compress(byts)[4:]


def py_lz4_decompress(byts):
return lz4_block.decompress(byts[3::-1] + byts[4:])


# ---------------------------------------------------------------------------
# Cython wrappers
# ---------------------------------------------------------------------------
try:
from cassandra.cython_lz4 import lz4_compress as cy_lz4_compress
from cassandra.cython_lz4 import lz4_decompress as cy_lz4_decompress
HAS_CYTHON = True
except ImportError:
HAS_CYTHON = False
print("WARNING: cassandra.cython_lz4 not available, skipping Cython benchmarks")


# ---------------------------------------------------------------------------
# Benchmark helpers
# ---------------------------------------------------------------------------
SIZES = {
"1KB": 1024,
"8KB": 8 * 1024,
"64KB": 64 * 1024,
}

# Number of inner-loop iterations per timeit.repeat() call.
INNER = 10_000
# Number of repetitions (we report the minimum).
REPEAT = 5


def make_payload(size):
"""Generate a pseudo-realistic compressible payload."""
# Mix of repetitive and random-ish bytes to simulate CQL result rows.
chunk = (b"row_value_" + os.urandom(6)) * (size // 16 + 1)
return chunk[:size]


def bench(label, func, arg, inner=INNER, repeat=REPEAT):
"""Return the best per-call time in nanoseconds."""
times = timeit.repeat(lambda: func(arg), number=inner, repeat=repeat)
best = min(times) / inner
ns = best * 1e9
return ns


def main():
if not HAS_PYTHON_LZ4 and not HAS_CYTHON:
print("ERROR: Neither lz4 Python package nor cassandra.cython_lz4 available.")
return

headers = f"{'Payload':<8} {'Operation':<12} "
if HAS_PYTHON_LZ4:
headers += f"{'Python (ns)':>12} "
if HAS_CYTHON:
headers += f"{'Cython (ns)':>12} "
if HAS_PYTHON_LZ4 and HAS_CYTHON:
headers += f"{'Speedup':>8}"
print(headers)
print("-" * len(headers))

for size_label, size in SIZES.items():
payload = make_payload(size)

# -- compress --
py_compressed = None
cy_compressed = None

row = f"{size_label:<8} {'compress':<12} "
if HAS_PYTHON_LZ4:
py_ns = bench("py_compress", py_lz4_compress, payload)
py_compressed = py_lz4_compress(payload)
row += f"{py_ns:>12.1f} "
if HAS_CYTHON:
cy_ns = bench("cy_compress", cy_lz4_compress, payload)
cy_compressed = cy_lz4_compress(payload)
row += f"{cy_ns:>12.1f} "
if HAS_PYTHON_LZ4:
speedup = py_ns / cy_ns if cy_ns > 0 else float('inf')
row += f"{speedup:>7.2f}x"
print(row)

# Verify cross-compatibility: Cython can decompress Python's output
if HAS_PYTHON_LZ4 and HAS_CYTHON:
assert cy_lz4_decompress(py_compressed) == payload, "cross-compat failed (py->cy)"
assert py_lz4_decompress(cy_compressed) == payload, "cross-compat failed (cy->py)"

# -- decompress --
row = f"{size_label:<8} {'decompress':<12} "
if HAS_PYTHON_LZ4:
py_ns = bench("py_decompress", py_lz4_decompress, py_compressed)
row += f"{py_ns:>12.1f} "
if HAS_CYTHON:
# Use Cython-compressed data for the Cython decompress benchmark
cy_ns = bench("cy_decompress", cy_lz4_decompress, cy_compressed)
row += f"{cy_ns:>12.1f} "
if HAS_PYTHON_LZ4:
speedup = py_ns / cy_ns if cy_ns > 0 else float('inf')
row += f"{speedup:>7.2f}x"
print(row)

print()


if __name__ == "__main__":
main()
16 changes: 14 additions & 2 deletions cassandra/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@
try:
import lz4
except ImportError:
log.debug("lz4 package could not be imported. LZ4 Compression will not be available")
pass
lz4 = None
else:
# The compress and decompress functions we need were moved from the lz4 to
# the lz4.block namespace, so we try both here.
Expand Down Expand Up @@ -102,6 +101,19 @@ def lz4_decompress(byts):
locally_supported_compressions['lz4'] = (lz4_compress, lz4_decompress)
segment_codec_lz4 = SegmentCodec(lz4_compress, lz4_decompress)

# Prefer the Cython wrappers that call liblz4 directly (no Python object
# allocation overhead for the byte-order conversion). This also enables
# LZ4 support when the Cython extension is available but the Python lz4
# package is not installed.
try:
from cassandra.cython_lz4 import lz4_compress, lz4_decompress
locally_supported_compressions['lz4'] = (lz4_compress, lz4_decompress)
segment_codec_lz4 = SegmentCodec(lz4_compress, lz4_decompress)
except ImportError:
if lz4 is None:
log.debug("Neither the lz4 package nor the cython_lz4 extension could "
"be imported. LZ4 Compression will not be available")

try:
import snappy
except ImportError:
Expand Down
212 changes: 212 additions & 0 deletions cassandra/cython_lz4.pyx
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
# Copyright 2026 ScyllaDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Cython-optimized LZ4 compression/decompression wrappers that call the LZ4 C
library directly, bypassing the Python lz4 module's overhead.

These functions produce output that is wire-compatible with the CQL binary
protocol v4 LZ4 compression format:

[4 bytes big-endian uncompressed length] [LZ4 compressed data]

The Cassandra/ScyllaDB protocol requires big-endian byte order for the
uncompressed length prefix, while the Python lz4 library uses little-endian.
The pure-Python wrappers in connection.py work around this mismatch by
byte-swapping and slicing Python bytes objects, which allocates intermediate
objects on every call. By calling LZ4_compress_default() and
LZ4_decompress_safe() through Cython's C interface we avoid all intermediate
Python object allocations and perform the byte-order conversion with simple
C pointer operations.
"""

from cpython.bytes cimport (PyBytes_AS_STRING, PyBytes_GET_SIZE,
PyBytes_FromStringAndSize)
from libc.stdint cimport uint32_t, INT32_MAX
from libc.stdlib cimport malloc, free
from libc.string cimport memcpy

cdef extern from "alloca.h":
void *alloca(size_t size) noexcept nogil

# Use htonl/ntohl for big-endian ↔ native conversion (single bswap
# instruction on x86). Same cross-platform pattern used in
# cython_marshal.pyx (see PR #732).
cdef extern from *:
"""
#ifdef _WIN32
#include <winsock2.h>
#else
#include <arpa/inet.h>
#endif
"""
uint32_t htonl(uint32_t hostlong) nogil
uint32_t ntohl(uint32_t netlong) nogil

# CQL native protocol v4 frames have a 32-bit body length, so the
# theoretical maximum is ~2 GiB. We use 256 MiB as a practical upper
# bound (matching the server's default frame size limit) to avoid
# accidentally allocating multi-GiB buffers on corrupt headers.
DEF MAX_DECOMPRESSED_LENGTH = 268435456 # 256 MiB

# LZ4_MAX_INPUT_SIZE from lz4.h — the LZ4 C API uses C int (32-bit
# signed) for sizes, so we must reject Python bytes objects that
# exceed this before casting Py_ssize_t down to int.
DEF LZ4_MAX_INPUT_SIZE = 0x7E000000 # 2 113 929 216 bytes

# Maximum LZ4_compressBound value for which we use alloca (stack
# allocation) instead of malloc. 128 KiB is well within the default
# 8 MiB thread stack size and covers CQL frames up to ~127 KiB
# uncompressed — the vast majority of real traffic. Larger frames
# fall back to heap allocation.
DEF STACK_ALLOC_THRESHOLD = 131072 # 128 KiB


cdef extern from "lz4.h":
int LZ4_compress_default(const char *src, char *dst,
int srcSize, int dstCapacity) nogil
int LZ4_decompress_safe(const char *src, char *dst,
int compressedSize, int dstCapacity) nogil
int LZ4_compressBound(int inputSize) nogil


cdef inline void _write_be32(char *dst, uint32_t value) noexcept nogil:
"""Write a 32-bit unsigned integer in big-endian byte order."""
cdef uint32_t tmp = htonl(value)
memcpy(dst, &tmp, 4)


cdef inline uint32_t _read_be32(const char *src) noexcept nogil:
"""Read a 32-bit unsigned integer in big-endian byte order."""
cdef uint32_t tmp
memcpy(&tmp, src, 4)
return ntohl(tmp)


def lz4_compress(bytes data not None):
"""Compress *data* using LZ4 for the CQL binary protocol.

Returns a bytes object containing a 4-byte big-endian uncompressed-length
header followed by the raw LZ4-compressed payload.

Raises ``RuntimeError`` if LZ4 compression fails (returns 0). This should
only happen if *data* exceeds ``LZ4_MAX_INPUT_SIZE`` (~1.9 GiB).
"""
cdef Py_ssize_t src_len = PyBytes_GET_SIZE(data)

if src_len > LZ4_MAX_INPUT_SIZE:
raise OverflowError(
"Input size %d exceeds LZ4_MAX_INPUT_SIZE (%d)" %
(src_len, LZ4_MAX_INPUT_SIZE))

cdef const char *src = PyBytes_AS_STRING(data)
cdef int src_size = <int>src_len

cdef int bound = LZ4_compressBound(src_size)
if bound <= 0:
raise RuntimeError(
"LZ4_compressBound() returned non-positive value for input "
"size %d; input may exceed LZ4_MAX_INPUT_SIZE" % src_size)

# Compress into a temporary buffer to learn the exact output size,
# then copy into an exact-size Python bytes object. For typical CQL
# frames (bound <= 128 KiB) we use alloca to avoid heap malloc/free
# overhead entirely — stack allocation is just a pointer decrement.
# Rare oversized frames fall back to heap allocation.
cdef char *tmp
cdef bint heap_allocated = bound > STACK_ALLOC_THRESHOLD
if heap_allocated:
tmp = <char *>malloc(bound)
if tmp == NULL:
raise MemoryError(
"Failed to allocate %d-byte LZ4 compression buffer" % bound)
else:
tmp = <char *>alloca(bound)

cdef int compressed_size
with nogil:
compressed_size = LZ4_compress_default(src, tmp, src_size, bound)
if compressed_size <= 0:
if heap_allocated:
free(tmp)
raise RuntimeError(
"LZ4_compress_default() failed for input size %d" % src_size)

# Build the final bytes: [4-byte BE header][compressed data].
cdef Py_ssize_t final_size = 4 + <Py_ssize_t>compressed_size
cdef bytes result = PyBytes_FromStringAndSize(NULL, final_size)
cdef char *out_ptr = PyBytes_AS_STRING(result)
_write_be32(out_ptr, <uint32_t>src_size)
memcpy(out_ptr + 4, tmp, <size_t>compressed_size)
if heap_allocated:
free(tmp)

return result


def lz4_decompress(bytes data not None):
"""Decompress a CQL-protocol LZ4 frame.

Expects *data* to start with a 4-byte big-endian uncompressed-length header
followed by raw LZ4-compressed payload (the format produced by
:func:`lz4_compress` and by the Cassandra/ScyllaDB server).

Raises ``ValueError`` if *data* is too short or the declared size is
non-positive. Raises ``RuntimeError`` if decompression fails (malformed
payload).
"""
cdef const char *src = PyBytes_AS_STRING(data)
cdef Py_ssize_t src_len = PyBytes_GET_SIZE(data)

if src_len < 4:
raise ValueError(
"LZ4-compressed frame too short: need at least 4 bytes for the "
"length header, got %d" % src_len)

cdef uint32_t uncompressed_size = _read_be32(src)

if uncompressed_size == 0:
return b""

if uncompressed_size > MAX_DECOMPRESSED_LENGTH:
raise ValueError(
"Declared uncompressed size %d exceeds safety limit of %d bytes; "
"frame header may be corrupt" % (uncompressed_size,
MAX_DECOMPRESSED_LENGTH))

cdef Py_ssize_t compressed_len = src_len - 4
if compressed_len > <Py_ssize_t>INT32_MAX:
raise ValueError(
"Compressed payload size %d exceeds maximum supported size" %
compressed_len)
cdef int compressed_size = <int>compressed_len
cdef bytes out = PyBytes_FromStringAndSize(NULL, <Py_ssize_t>uncompressed_size)
cdef char *out_ptr = PyBytes_AS_STRING(out)

cdef int result
with nogil:
result = LZ4_decompress_safe(src + 4, out_ptr,
compressed_size,
<int>uncompressed_size)
if result < 0:
raise RuntimeError(
"LZ4_decompress_safe() failed with error code %d; "
"compressed payload may be malformed" % result)

if <uint32_t>result != uncompressed_size:
raise RuntimeError(
"LZ4_decompress_safe() produced %d bytes but header declared %d" %
(result, uncompressed_size))

return out
Loading
Loading