Skip to content

Commit 44ca104

Browse files
lwwmanningclaude
authored andcommitted
TurboQuant encoding for Vectors (#7167)
Lossy quantization for vector data (e.g., embeddings) based on TurboQuant (https://arxiv.org/abs/2504.19874). Supports both MSE-optimal and inner-product-optimal (Prod with QJL correction) variants at 1-8 bits per coordinate. Key components: - Single TurboQuant array encoding with optional QJL correction fields, storing quantized codes, norms, centroids, and rotation signs as children. - Structured Random Hadamard Transform (SRHT) for O(d log d) rotation, fully self-contained with no external linear algebra library. - Max-Lloyd centroid computation on Beta(d/2, d/2) distribution. - Approximate cosine similarity and dot product compute directly on quantized arrays without full decompression. - Pluggable TurboQuantScheme for BtrBlocks, exposed via WriteStrategyBuilder::with_vector_quantization(). - Benchmarks covering common embedding dimensions (128, 768, 1024, 1536). Also refactors CompressingStrategy to a single constructor, and adds vortex_tensor::initialize() for session registration of tensor types, encodings, and scalar functions. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Co-Authored-By: Will Manning <will@willmanning.io> Signed-off-by: Connor Tsui <connor.tsui20@gmail.com>
1 parent 1966144 commit 44ca104

35 files changed

Lines changed: 4005 additions & 70 deletions

Cargo.lock

Lines changed: 8 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

_typos.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[default]
2-
extend-ignore-identifiers-re = ["ffor", "FFOR", "FoR", "typ", "ratatui"]
2+
extend-ignore-identifiers-re = ["ffor", "FFOR", "FoR", "typ", "ratatui", "wht", "WHT"]
33
# We support a few common special comments to tell the checker to ignore sections of code
44
extend-ignore-re = [
55
"(#|//)\\s*spellchecker:ignore-next-line\\n.*", # Ignore the next line

vortex-btrblocks/public-api.lock

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -618,6 +618,8 @@ pub fn vortex_btrblocks::BtrBlocksCompressorBuilder::exclude(self, ids: impl cor
618618

619619
pub fn vortex_btrblocks::BtrBlocksCompressorBuilder::include(self, ids: impl core::iter::traits::collect::IntoIterator<Item = vortex_compressor::scheme::SchemeId>) -> Self
620620

621+
pub fn vortex_btrblocks::BtrBlocksCompressorBuilder::with_scheme(self, scheme: &'static dyn vortex_compressor::scheme::Scheme) -> Self
622+
621623
impl core::clone::Clone for vortex_btrblocks::BtrBlocksCompressorBuilder
622624

623625
pub fn vortex_btrblocks::BtrBlocksCompressorBuilder::clone(&self) -> vortex_btrblocks::BtrBlocksCompressorBuilder

vortex-btrblocks/src/builder.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,15 @@ impl BtrBlocksCompressorBuilder {
144144
self
145145
}
146146

147+
/// Adds an external compression scheme not in [`ALL_SCHEMES`].
148+
///
149+
/// This allows encoding crates outside of `vortex-btrblocks` to register
150+
/// their own schemes with the compressor.
151+
pub fn with_scheme(mut self, scheme: &'static dyn Scheme) -> Self {
152+
self.schemes.insert(scheme);
153+
self
154+
}
155+
147156
/// Excludes the specified compression schemes by their [`SchemeId`].
148157
pub fn exclude(mut self, ids: impl IntoIterator<Item = SchemeId>) -> Self {
149158
let ids: HashSet<_> = ids.into_iter().collect();

vortex-file/src/strategy.rs

Lines changed: 78 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,11 @@ use vortex_array::arrays::VarBinView;
2828
use vortex_array::dtype::FieldPath;
2929
use vortex_array::session::ArrayRegistry;
3030
use vortex_array::session::ArraySession;
31+
use vortex_btrblocks::BtrBlocksCompressorBuilder;
3132
use vortex_bytebool::ByteBool;
3233
use vortex_datetime_parts::DateTimeParts;
3334
use vortex_decimal_byte_parts::DecimalByteParts;
35+
use vortex_error::vortex_panic;
3436
use vortex_fastlanes::BitPacked;
3537
use vortex_fastlanes::Delta;
3638
use vortex_fastlanes::FoR;
@@ -53,13 +55,14 @@ use vortex_pco::Pco;
5355
use vortex_runend::RunEnd;
5456
use vortex_sequence::Sequence;
5557
use vortex_sparse::Sparse;
58+
#[cfg(feature = "unstable_encodings")]
59+
use vortex_tensor::encodings::turboquant::TurboQuant;
5660
use vortex_utils::aliases::hash_map::HashMap;
5761
use vortex_zigzag::ZigZag;
5862

5963
#[rustfmt::skip]
6064
#[cfg(feature = "zstd")]
6165
use vortex_btrblocks::{
62-
BtrBlocksCompressorBuilder,
6366
SchemeExt,
6467
schemes::float,
6568
schemes::integer,
@@ -111,6 +114,8 @@ pub static ALLOWED_ENCODINGS: LazyLock<ArrayRegistry> = LazyLock::new(|| {
111114
session.register(RunEnd);
112115
session.register(Sequence);
113116
session.register(Sparse);
117+
#[cfg(feature = "unstable_encodings")]
118+
session.register(TurboQuant);
114119
session.register(ZigZag);
115120

116121
#[cfg(feature = "zstd")]
@@ -127,23 +132,26 @@ pub static ALLOWED_ENCODINGS: LazyLock<ArrayRegistry> = LazyLock::new(|| {
127132
/// repartitioning and compressing them to strike a balance between size on-disk,
128133
/// bulk decoding performance, and IOPS required to perform an indexed read.
129134
pub struct WriteStrategyBuilder {
130-
compressor: Option<Arc<dyn CompressorPlugin>>,
131135
row_block_size: usize,
132136
field_writers: HashMap<FieldPath, Arc<dyn LayoutStrategy>>,
133137
allow_encodings: Option<ArrayRegistry>,
134138
flat_strategy: Option<Arc<dyn LayoutStrategy>>,
139+
// builder and compressor are mutually exclusive
140+
builder: Option<BtrBlocksCompressorBuilder>,
141+
compressor: Option<Arc<dyn CompressorPlugin>>,
135142
}
136143

137144
impl Default for WriteStrategyBuilder {
138145
/// Create a new empty builder. It can be further configured,
139146
/// and then finally built yielding the [`LayoutStrategy`].
140147
fn default() -> Self {
141148
Self {
142-
compressor: None,
143149
row_block_size: 8192,
144150
field_writers: HashMap::new(),
145151
allow_encodings: Some(ALLOWED_ENCODINGS.clone()),
146152
flat_strategy: None,
153+
builder: None,
154+
compressor: None,
147155
}
148156
}
149157
}
@@ -154,6 +162,9 @@ impl WriteStrategyBuilder {
154162
/// If not provided, this will use a BtrBlocks-style cascading compressor that tries to balance
155163
/// total size with decoding performance.
156164
pub fn with_compressor<C: CompressorPlugin>(mut self, compressor: C) -> Self {
165+
if self.builder.is_some() {
166+
vortex_panic!("Cannot configure both a custom compressor and custom builder schemes");
167+
}
157168
self.compressor = Some(Arc::new(compressor));
158169
self
159170
}
@@ -198,7 +209,12 @@ impl WriteStrategyBuilder {
198209
/// GPU decompression. Without it, strings use interleaved Zstd compression.
199210
#[cfg(feature = "zstd")]
200211
pub fn with_cuda_compatible_encodings(mut self) -> Self {
201-
let mut builder = BtrBlocksCompressorBuilder::default().exclude([
212+
if self.compressor.is_some() {
213+
vortex_panic!(
214+
"Cannot configure both a custom compressor and CUDA compatible encodings"
215+
);
216+
}
217+
let b = self.builder.take().unwrap_or_default().exclude([
202218
integer::SparseScheme.id(),
203219
integer::RLE_INTEGER_SCHEME.id(),
204220
float::RLE_FLOAT_SCHEME.id(),
@@ -209,14 +225,13 @@ impl WriteStrategyBuilder {
209225

210226
#[cfg(feature = "unstable_encodings")]
211227
{
212-
builder = builder.include([string::ZstdBuffersScheme.id()]);
228+
self.builder = Some(b.include([string::ZstdBuffersScheme.id()]));
213229
}
214230
#[cfg(not(feature = "unstable_encodings"))]
215231
{
216-
builder = builder.include([string::ZstdScheme.id()]);
232+
self.builder = Some(b.include([string::ZstdScheme.id()]));
217233
}
218234

219-
self.compressor = Some(Arc::new(builder.build()));
220235
self
221236
}
222237

@@ -227,21 +242,47 @@ impl WriteStrategyBuilder {
227242
/// especially for floating-point heavy datasets.
228243
#[cfg(feature = "zstd")]
229244
pub fn with_compact_encodings(mut self) -> Self {
230-
let btrblocks = BtrBlocksCompressorBuilder::default()
231-
.include([
232-
string::ZstdScheme.id(),
233-
integer::PcoScheme.id(),
234-
float::PcoScheme.id(),
235-
])
236-
.build();
237-
238-
self.compressor = Some(Arc::new(btrblocks));
245+
if self.compressor.is_some() {
246+
vortex_panic!("Cannot configure both a custom compressor and compact encodings");
247+
}
248+
self.builder = Some(self.builder.take().unwrap_or_default().include([
249+
string::ZstdScheme.id(),
250+
integer::PcoScheme.id(),
251+
float::PcoScheme.id(),
252+
]));
253+
self
254+
}
255+
256+
/// Enable TurboQuant lossy vector quantization for tensor columns.
257+
///
258+
/// When enabled, `Vector` and `FixedShapeTensor` extension arrays are
259+
/// compressed using the TurboQuant algorithm with QJL correction for
260+
/// unbiased inner product estimation.
261+
///
262+
/// This augments any existing compressor configuration rather than
263+
/// replacing it. If no compressor has been set, the default BtrBlocks
264+
/// compressor is used with TurboQuant added.
265+
#[cfg(feature = "unstable_encodings")]
266+
pub fn with_vector_quantization(mut self) -> Self {
267+
if self.compressor.is_some() {
268+
vortex_panic!("Cannot configure both a custom compressor and vector quantization");
269+
}
270+
use vortex_tensor::encodings::turboquant::scheme::TURBOQUANT_SCHEME;
271+
self.builder = Some(
272+
self.builder
273+
.take()
274+
.unwrap_or_default()
275+
.with_scheme(&TURBOQUANT_SCHEME),
276+
);
239277
self
240278
}
241279

242280
/// Builds the canonical [`LayoutStrategy`] implementation, with the configured overrides
243281
/// applied.
244282
pub fn build(self) -> Arc<dyn LayoutStrategy> {
283+
use vortex_btrblocks::SchemeExt as _;
284+
use vortex_btrblocks::schemes::integer::IntDictScheme;
285+
245286
let flat: Arc<dyn LayoutStrategy> = if let Some(flat) = self.flat_strategy {
246287
flat
247288
} else if let Some(allow_encodings) = self.allow_encodings {
@@ -254,12 +295,24 @@ impl WriteStrategyBuilder {
254295
let chunked = ChunkedLayoutStrategy::new(flat.clone());
255296
// 6. buffer chunks so they end up with closer segment ids physically
256297
let buffered = BufferedStrategy::new(chunked, 2 * ONE_MEG); // 2MB
298+
257299
// 5. compress each chunk
258-
let compressing = if let Some(ref compressor) = self.compressor {
259-
CompressingStrategy::new_opaque(buffered, compressor.clone())
260-
} else {
261-
CompressingStrategy::new_btrblocks(buffered, true)
262-
};
300+
let data_compressor: Arc<dyn CompressorPlugin> =
301+
if let Some(ref compressor) = self.compressor {
302+
assert!(
303+
self.builder.is_none(),
304+
"Cannot configure both a custom compressor and custom builder schemes"
305+
);
306+
compressor.clone()
307+
} else {
308+
Arc::new(
309+
self.builder
310+
.unwrap_or_default()
311+
.exclude([IntDictScheme.id()])
312+
.build(),
313+
)
314+
};
315+
let compressing = CompressingStrategy::new(buffered, data_compressor.clone());
263316

264317
// 4. prior to compression, coalesce up to a minimum size
265318
let coalescing = RepartitionStrategy::new(
@@ -279,11 +332,12 @@ impl WriteStrategyBuilder {
279332
);
280333

281334
// 2.1. | 3.1. compress stats tables and dict values.
282-
let compress_then_flat = if let Some(ref compressor) = self.compressor {
283-
CompressingStrategy::new_opaque(flat, compressor.clone())
335+
let stats_compressor = if let Some(compressor) = self.compressor {
336+
compressor.clone()
284337
} else {
285-
CompressingStrategy::new_btrblocks(flat, false)
338+
Arc::new(BtrBlocksCompressorBuilder::default().build())
286339
};
340+
let compress_then_flat = CompressingStrategy::new(flat, stats_compressor);
287341

288342
// 3. apply dict encoding or fallback
289343
let dict = DictStrategy::new(

vortex-file/tests/test_write_table.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use vortex_array::field_path;
2020
use vortex_array::scalar_fn::session::ScalarFnSession;
2121
use vortex_array::session::ArraySession;
2222
use vortex_array::validity::Validity;
23+
use vortex_btrblocks::BtrBlocksCompressor;
2324
use vortex_buffer::ByteBuffer;
2425
use vortex_file::OpenOptionsSessionExt;
2526
use vortex_file::WriteOptionsSessionExt;
@@ -67,9 +68,9 @@ async fn test_file_roundtrip() {
6768

6869
// Create a writer which by default uses the BtrBlocks compressor for a.compressed, but leaves
6970
// the b and the a.raw columns uncompressed.
70-
let default_strategy = Arc::new(CompressingStrategy::new_btrblocks(
71+
let default_strategy = Arc::new(CompressingStrategy::new(
7172
FlatLayoutStrategy::default(),
72-
false,
73+
BtrBlocksCompressor::default(),
7374
));
7475

7576
let writer = Arc::new(

vortex-layout/public-api.lock

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -168,9 +168,7 @@ pub struct vortex_layout::layouts::compressed::CompressingStrategy
168168

169169
impl vortex_layout::layouts::compressed::CompressingStrategy
170170

171-
pub fn vortex_layout::layouts::compressed::CompressingStrategy::new_btrblocks<S: vortex_layout::LayoutStrategy>(child: S, exclude_int_dict_encoding: bool) -> Self
172-
173-
pub fn vortex_layout::layouts::compressed::CompressingStrategy::new_opaque<S: vortex_layout::LayoutStrategy, C: vortex_layout::layouts::compressed::CompressorPlugin>(child: S, compressor: C) -> Self
171+
pub fn vortex_layout::layouts::compressed::CompressingStrategy::new<S: vortex_layout::LayoutStrategy, C: vortex_layout::layouts::compressed::CompressorPlugin>(child: S, compressor: C) -> Self
174172

175173
pub fn vortex_layout::layouts::compressed::CompressingStrategy::with_concurrency(self, concurrency: usize) -> Self
176174

vortex-layout/src/layouts/compressed.rs

Lines changed: 3 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,6 @@ use vortex_array::ArrayContext;
99
use vortex_array::ArrayRef;
1010
use vortex_array::expr::stats::Stat;
1111
use vortex_btrblocks::BtrBlocksCompressor;
12-
use vortex_btrblocks::BtrBlocksCompressorBuilder;
13-
use vortex_btrblocks::SchemeExt;
14-
use vortex_btrblocks::schemes::integer::IntDictScheme;
1512
use vortex_error::VortexResult;
1613
use vortex_io::runtime::Handle;
1714

@@ -60,32 +57,11 @@ pub struct CompressingStrategy {
6057
}
6158

6259
impl CompressingStrategy {
63-
/// Create a new writer that uses the BtrBlocks-style cascading compressor to compress chunks.
64-
///
65-
/// This provides a good balance between decoding speed and small file size.
66-
///
67-
/// Set `exclude_int_dict_encoding` to true to prevent dictionary encoding of integer arrays,
68-
/// which is useful when compressing dictionary codes to avoid recursive dictionary encoding.
69-
pub fn new_btrblocks<S: LayoutStrategy>(child: S, exclude_int_dict_encoding: bool) -> Self {
70-
let compressor = if exclude_int_dict_encoding {
71-
BtrBlocksCompressorBuilder::default()
72-
.exclude([IntDictScheme.id()])
73-
.build()
74-
} else {
75-
BtrBlocksCompressor::default()
76-
};
77-
Self::new(child, Arc::new(compressor))
78-
}
79-
80-
/// Create a new compressor from a plugin interface.
81-
pub fn new_opaque<S: LayoutStrategy, C: CompressorPlugin>(child: S, compressor: C) -> Self {
82-
Self::new(child, Arc::new(compressor))
83-
}
84-
85-
fn new<S: LayoutStrategy>(child: S, compressor: Arc<dyn CompressorPlugin>) -> Self {
60+
/// Create a new compressing strategy that wraps a child strategy with a compressor plugin.
61+
pub fn new<S: LayoutStrategy, C: CompressorPlugin>(child: S, compressor: C) -> Self {
8662
Self {
8763
child: Arc::new(child),
88-
compressor,
64+
compressor: Arc::new(compressor),
8965
concurrency: std::thread::available_parallelism()
9066
.map(|v| v.get())
9167
.unwrap_or(1),

vortex-layout/src/layouts/table.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,12 +86,13 @@ impl TableStrategy {
8686
/// ```ignore
8787
/// # use std::sync::Arc;
8888
/// # use vortex_array::dtype::{field_path, Field, FieldPath};
89+
/// # use vortex_btrblocks::BtrBlocksCompressor;
8990
/// # use vortex_layout::layouts::compressed::CompressingStrategy;
9091
/// # use vortex_layout::layouts::flat::writer::FlatLayoutStrategy;
9192
/// # use vortex_layout::layouts::table::TableStrategy;
9293
///
9394
/// // A strategy for compressing data using the balanced BtrBlocks compressor.
94-
/// let compress = CompressingStrategy::new_btrblocks(FlatLayoutStrategy::default(), true);
95+
/// let compress = CompressingStrategy::new(FlatLayoutStrategy::default(), BtrBlocksCompressor::default());
9596
///
9697
/// // Our combined strategy uses no compression for validity buffers, BtrBlocks compression
9798
/// // for most columns, and stores a nested binary column uncompressed (flat) because it

vortex-python/src/io.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ impl PyVortexWriteOptions {
291291
/// ```python
292292
/// >>> vx.io.VortexWriteOptions.compact().write(sprl, "tiny.vortex")
293293
/// >>> os.path.getsize('tiny.vortex')
294-
/// 55120
294+
/// 55460
295295
/// ```
296296
///
297297
/// Random numbers are not (usually) composed of random bytes!

0 commit comments

Comments
 (0)