Skip to content

Commit 7c41837

Browse files
rustyconoverclaude
andcommitted
Add optional zstd compression for HTTP transport
Standard HTTP content negotiation with Content-Encoding/Accept-Encoding. Server-side Falcon middleware handles transparent compression/decompression; client compresses outgoing requests. Default level 3, set to None to disable. httpx auto-decompresses responses; _SyncTestResponse mirrors this behavior. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent ed275d0 commit 7c41837

7 files changed

Lines changed: 392 additions & 16 deletions

File tree

pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "vgi-rpc"
3-
version = "0.1.2"
3+
version = "0.1.3"
44
description = "Vector Gateway Interface - RPC framework based on Apache Arrow"
55
readme = "README.md"
66
requires-python = ">=3.13"
@@ -25,7 +25,7 @@ Issues = "https://github.com/Query-farm/vgi-rpc-python/issues"
2525
Documentation = "https://vgi-rpc-python.query.farm/"
2626

2727
[project.optional-dependencies]
28-
http = ["falcon>=3.0", "httpx>=0.24", "waitress>=2.0"]
28+
http = ["falcon>=3.0", "httpx>=0.24", "waitress>=2.0", "zstandard>=0.20"]
2929
s3 = ["boto3>=1.28"]
3030
gcs = ["google-cloud-storage>=2.10"]
3131
cli = ["typer>=0.9", "httpx>=0.24"]

tests/test_http.py

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1241,3 +1241,185 @@ def test_request_id_in_client_log_extra(self, client: _SyncTestClient) -> None:
12411241
assert msg.extra is not None
12421242
assert "request_id" in msg.extra
12431243
assert len(str(msg.extra["request_id"])) == 16
1244+
1245+
1246+
# ---------------------------------------------------------------------------
1247+
# Tests: Zstd compression
1248+
# ---------------------------------------------------------------------------
1249+
1250+
# Zstd frame magic number (first 4 bytes of any zstd-compressed data)
1251+
_ZSTD_MAGIC = b"\x28\xb5\x2f\xfd"
1252+
1253+
1254+
class TestZstdCompression:
1255+
"""Tests for transparent zstd compression on the HTTP transport."""
1256+
1257+
def test_unary_round_trip_compressed(self) -> None:
1258+
"""Unary call works with compression on both client and server."""
1259+
client = make_sync_client(
1260+
RpcServer(RpcFixtureService, RpcFixtureServiceImpl()),
1261+
signing_key=b"test-key",
1262+
compression_level=3,
1263+
)
1264+
with http_connect(RpcFixtureService, client=client, compression_level=3) as proxy:
1265+
result = proxy.greet(name="World")
1266+
assert result == "Hello, World!"
1267+
client.close()
1268+
1269+
def test_stream_init_exchange_compressed(self) -> None:
1270+
"""Stream init + exchange round-trip with compression."""
1271+
client = make_sync_client(
1272+
RpcServer(RpcFixtureService, RpcFixtureServiceImpl()),
1273+
signing_key=b"test-key",
1274+
compression_level=3,
1275+
)
1276+
with http_connect(RpcFixtureService, client=client, compression_level=3) as proxy:
1277+
session = proxy.transform(factor=2.0)
1278+
assert isinstance(session, HttpStreamSession)
1279+
input_batch = AnnotatedBatch.from_pydict(
1280+
{"value": [1.0, 2.0, 3.0]},
1281+
schema=pa.schema([pa.field("value", pa.float64())]),
1282+
)
1283+
result = session.exchange(input_batch)
1284+
assert result.batch.num_rows == 3
1285+
assert result.batch.column("value")[0].as_py() == 2.0
1286+
assert result.batch.column("value")[1].as_py() == 4.0
1287+
client.close()
1288+
1289+
def test_producer_stream_continuation_compressed(self) -> None:
1290+
"""Producer stream with continuation works under compression."""
1291+
client = make_sync_client(
1292+
RpcServer(RpcFixtureService, RpcFixtureServiceImpl()),
1293+
signing_key=b"test-key",
1294+
max_stream_response_bytes=200,
1295+
compression_level=3,
1296+
)
1297+
with http_connect(RpcFixtureService, client=client, compression_level=3) as proxy:
1298+
session = proxy.generate(count=10)
1299+
batches = list(session)
1300+
assert len(batches) == 10
1301+
values = [ab.batch.column("i")[0].as_py() for ab in batches]
1302+
assert values == list(range(10))
1303+
client.close()
1304+
1305+
def test_no_compression(self) -> None:
1306+
"""Full round-trip with compression_level=None on both sides."""
1307+
client = make_sync_client(
1308+
RpcServer(RpcFixtureService, RpcFixtureServiceImpl()),
1309+
signing_key=b"test-key",
1310+
compression_level=None,
1311+
)
1312+
with http_connect(RpcFixtureService, client=client, compression_level=None) as proxy:
1313+
result = proxy.greet(name="NoCompression")
1314+
assert result == "Hello, NoCompression!"
1315+
client.close()
1316+
1317+
def test_server_compressed_client_uncompressed(self) -> None:
1318+
"""Server compresses responses; client sends uncompressed requests.
1319+
1320+
httpx (and _SyncTestResponse) auto-decompress zstd responses, so
1321+
this works transparently.
1322+
"""
1323+
client = make_sync_client(
1324+
RpcServer(RpcFixtureService, RpcFixtureServiceImpl()),
1325+
signing_key=b"test-key",
1326+
compression_level=3,
1327+
)
1328+
with http_connect(RpcFixtureService, client=client, compression_level=None) as proxy:
1329+
result = proxy.greet(name="ServerOnly")
1330+
assert result == "Hello, ServerOnly!"
1331+
client.close()
1332+
1333+
def test_uncompressed_request_to_compressed_server(self) -> None:
1334+
"""Server with compression middleware handles uncompressed requests fine."""
1335+
client = make_sync_client(
1336+
RpcServer(RpcFixtureService, RpcFixtureServiceImpl()),
1337+
signing_key=b"test-key",
1338+
compression_level=3,
1339+
)
1340+
# Client does NOT compress requests (compression_level=None)
1341+
# but server has compression middleware active — should work fine
1342+
# because the middleware only decompresses when Content-Encoding: zstd is present
1343+
with http_connect(RpcFixtureService, client=client, compression_level=None) as proxy:
1344+
result = proxy.greet(name="PlainRequest")
1345+
assert result == "Hello, PlainRequest!"
1346+
client.close()
1347+
1348+
def test_compressed_response_has_content_encoding_header(self) -> None:
1349+
"""Verify Content-Encoding: zstd header is present on compressed responses."""
1350+
server = RpcServer(RpcFixtureService, RpcFixtureServiceImpl())
1351+
app = make_wsgi_app(server, signing_key=b"test-key", compression_level=3)
1352+
falcon_client = falcon.testing.TestClient(app)
1353+
1354+
# Build a minimal unary request
1355+
req_buf = BytesIO()
1356+
schema = pa.schema([pa.field("name", pa.utf8())])
1357+
request_metadata = pa.KeyValueMetadata({b"vgi_rpc.method": b"greet", b"vgi_rpc.request_version": b"1"})
1358+
with ipc.new_stream(req_buf, schema) as writer:
1359+
batch = pa.RecordBatch.from_pydict({"name": ["Test"]}, schema=schema)
1360+
writer.write_batch(batch, custom_metadata=request_metadata)
1361+
1362+
# Send with Accept-Encoding: zstd to trigger response compression
1363+
result = falcon_client.simulate_post(
1364+
"/vgi/greet",
1365+
body=req_buf.getvalue(),
1366+
headers={
1367+
"Content-Type": _ARROW_CONTENT_TYPE,
1368+
"Accept-Encoding": "zstd",
1369+
},
1370+
)
1371+
assert result.headers.get("content-encoding") == "zstd"
1372+
# Verify the raw body starts with zstd magic bytes
1373+
assert result.content[:4] == _ZSTD_MAGIC
1374+
1375+
def test_compressed_request_body_has_zstd_magic(self) -> None:
1376+
"""Verify client request body is actually zstd-compressed."""
1377+
from vgi_rpc.http._common import _compress_body
1378+
1379+
# Build a minimal IPC request
1380+
req_buf = BytesIO()
1381+
schema = pa.schema([pa.field("name", pa.utf8())])
1382+
request_metadata = pa.KeyValueMetadata({b"vgi_rpc.method": b"greet", b"vgi_rpc.request_version": b"1"})
1383+
with ipc.new_stream(req_buf, schema) as writer:
1384+
batch = pa.RecordBatch.from_pydict({"name": ["Test"]}, schema=schema)
1385+
writer.write_batch(batch, custom_metadata=request_metadata)
1386+
1387+
raw = req_buf.getvalue()
1388+
compressed = _compress_body(raw, 3)
1389+
# Must start with zstd magic
1390+
assert compressed[:4] == _ZSTD_MAGIC
1391+
# Compressed should be different from raw
1392+
assert compressed != raw
1393+
1394+
def test_default_compression_level(self) -> None:
1395+
"""Default compression_level=3 is used when not specified.
1396+
1397+
Since fixtures use default compression_level=3, all existing tests
1398+
run with compression. This test just verifies the default value works.
1399+
"""
1400+
client = make_sync_client(
1401+
RpcServer(RpcFixtureService, RpcFixtureServiceImpl()),
1402+
signing_key=b"test-key",
1403+
)
1404+
# Default compression_level=3 on both sides
1405+
with http_connect(RpcFixtureService, client=client) as proxy:
1406+
result = proxy.greet(name="Default")
1407+
assert result == "Hello, Default!"
1408+
client.close()
1409+
1410+
def test_stream_exchange_no_compression(self) -> None:
1411+
"""Stream exchange works without compression."""
1412+
client = make_sync_client(
1413+
RpcServer(RpcFixtureService, RpcFixtureServiceImpl()),
1414+
signing_key=b"test-key",
1415+
compression_level=None,
1416+
)
1417+
with http_connect(RpcFixtureService, client=client, compression_level=None) as proxy:
1418+
session = proxy.transform(factor=3.0)
1419+
input_batch = AnnotatedBatch.from_pydict(
1420+
{"value": [10.0]},
1421+
schema=pa.schema([pa.field("value", pa.float64())]),
1422+
)
1423+
result = session.exchange(input_batch)
1424+
assert result.batch.column("value")[0].as_py() == 30.0
1425+
client.close()

uv.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)