diff --git a/fuzz/src/array/mod.rs b/fuzz/src/array/mod.rs index 730791e3dd8..9b757d0a8f3 100644 --- a/fuzz/src/array/mod.rs +++ b/fuzz/src/array/mod.rs @@ -60,10 +60,6 @@ use vortex_array::search_sorted::SearchSorted; use vortex_array::search_sorted::SearchSortedSide; use vortex_btrblocks::BtrBlocksCompressor; use vortex_btrblocks::BtrBlocksCompressorBuilder; -use vortex_btrblocks::SchemeExt; -use vortex_btrblocks::schemes::float; -use vortex_btrblocks::schemes::integer; -use vortex_btrblocks::schemes::string; use vortex_error::VortexExpect; use vortex_error::vortex_panic; use vortex_mask::Mask; @@ -546,11 +542,7 @@ pub fn compress_array(array: &ArrayRef, strategy: CompressorStrategy) -> ArrayRe .compress(array) .vortex_expect("BtrBlocksCompressor compress should succeed in fuzz test"), CompressorStrategy::Compact => BtrBlocksCompressorBuilder::default() - .include([ - string::ZstdScheme.id(), - integer::PcoScheme.id(), - float::PcoScheme.id(), - ]) + .with_compact() .build() .compress(array) .vortex_expect("Compact compress should succeed in fuzz test"), diff --git a/vortex-btrblocks/public-api.lock b/vortex-btrblocks/public-api.lock index 37410874777..b2336456828 100644 --- a/vortex-btrblocks/public-api.lock +++ b/vortex-btrblocks/public-api.lock @@ -616,7 +616,9 @@ pub fn vortex_btrblocks::BtrBlocksCompressorBuilder::build(self) -> vortex_btrbl pub fn vortex_btrblocks::BtrBlocksCompressorBuilder::exclude(self, ids: impl core::iter::traits::collect::IntoIterator) -> Self -pub fn vortex_btrblocks::BtrBlocksCompressorBuilder::include(self, ids: impl core::iter::traits::collect::IntoIterator) -> Self +pub fn vortex_btrblocks::BtrBlocksCompressorBuilder::with_compact(self) -> Self + +pub fn vortex_btrblocks::BtrBlocksCompressorBuilder::with_new_scheme(self, scheme: &'static dyn vortex_compressor::scheme::Scheme) -> Self impl core::clone::Clone for vortex_btrblocks::BtrBlocksCompressorBuilder @@ -633,5 +635,3 @@ pub fn vortex_btrblocks::BtrBlocksCompressorBuilder::fmt(&self, f: &mut core::fm pub const vortex_btrblocks::ALL_SCHEMES: &[&dyn vortex_compressor::scheme::Scheme] pub fn vortex_btrblocks::compress_patches(patches: vortex_array::patches::Patches) -> vortex_error::VortexResult - -pub fn vortex_btrblocks::default_excluded() -> vortex_utils::aliases::hash_set::HashSet diff --git a/vortex-btrblocks/src/builder.rs b/vortex-btrblocks/src/builder.rs index 6127f1e3910..3b6531a3941 100644 --- a/vortex-btrblocks/src/builder.rs +++ b/vortex-btrblocks/src/builder.rs @@ -41,8 +41,6 @@ pub const ALL_SCHEMES: &[&dyn Scheme] = &[ &integer::RunEndScheme, &integer::SequenceScheme, &rle::RLE_INTEGER_SCHEME, - #[cfg(feature = "pco")] - &integer::PcoScheme, //////////////////////////////////////////////////////////////////////////////////////////////// // Float schemes. //////////////////////////////////////////////////////////////////////////////////////////////// @@ -52,8 +50,6 @@ pub const ALL_SCHEMES: &[&dyn Scheme] = &[ &float::FloatDictScheme, &float::NullDominatedSparseScheme, &rle::RLE_FLOAT_SCHEME, - #[cfg(feature = "pco")] - &float::PcoScheme, //////////////////////////////////////////////////////////////////////////////////////////////// // String schemes. //////////////////////////////////////////////////////////////////////////////////////////////// @@ -61,36 +57,18 @@ pub const ALL_SCHEMES: &[&dyn Scheme] = &[ &string::FSSTScheme, &string::StringConstantScheme, &string::NullDominatedSparseScheme, - #[cfg(feature = "zstd")] - &string::ZstdScheme, - #[cfg(all(feature = "zstd", feature = "unstable_encodings"))] - &string::ZstdBuffersScheme, // Decimal schemes. &decimal::DecimalScheme, // Temporal schemes. &temporal::TemporalScheme, ]; -/// Returns the set of scheme IDs excluded by default (behind feature gates or known-expensive). -pub fn default_excluded() -> HashSet { - #[allow(unused_mut, reason = "depends on enabled feature flags")] - let mut excluded = HashSet::new(); - #[cfg(feature = "pco")] - { - excluded.insert(integer::PcoScheme.id()); - excluded.insert(float::PcoScheme.id()); - } - #[cfg(feature = "zstd")] - excluded.insert(string::ZstdScheme.id()); - #[cfg(all(feature = "zstd", feature = "unstable_encodings"))] - excluded.insert(string::ZstdBuffersScheme.id()); - excluded -} - /// Builder for creating configured [`BtrBlocksCompressor`] instances. /// -/// Use this builder to configure which compression schemes are allowed. -/// By default, all schemes are enabled except those in [`default_excluded`]. +/// By default, all schemes in [`ALL_SCHEMES`] are enabled. Feature-gated schemes (Pco, Zstd) +/// are not in `ALL_SCHEMES` and must be added explicitly via +/// [`with_new_scheme`](BtrBlocksCompressorBuilder::with_new_scheme) or +/// [`with_compact`](BtrBlocksCompressorBuilder::with_compact). /// /// # Examples /// @@ -98,52 +76,69 @@ pub fn default_excluded() -> HashSet { /// use vortex_btrblocks::{BtrBlocksCompressorBuilder, Scheme, SchemeExt}; /// use vortex_btrblocks::schemes::integer::IntDictScheme; /// -/// // Default compressor - all non-excluded schemes allowed. +/// // Default compressor with all schemes in ALL_SCHEMES. /// let compressor = BtrBlocksCompressorBuilder::default().build(); /// /// // Exclude specific schemes. /// let compressor = BtrBlocksCompressorBuilder::default() /// .exclude([IntDictScheme.id()]) /// .build(); -/// -/// // Exclude then re-include. -/// let compressor = BtrBlocksCompressorBuilder::default() -/// .exclude([IntDictScheme.id()]) -/// .include([IntDictScheme.id()]) -/// .build(); /// ``` #[derive(Debug, Clone)] pub struct BtrBlocksCompressorBuilder { - schemes: HashSet<&'static dyn Scheme>, + schemes: Vec<&'static dyn Scheme>, } impl Default for BtrBlocksCompressorBuilder { fn default() -> Self { - let excluded = default_excluded(); Self { - schemes: ALL_SCHEMES - .iter() - .copied() - .filter(|s| !excluded.contains(&s.id())) - .collect(), + schemes: ALL_SCHEMES.to_vec(), } } } impl BtrBlocksCompressorBuilder { - /// Includes the specified compression schemes by their [`SchemeId`]. + /// Adds an external compression scheme not in [`ALL_SCHEMES`]. /// - /// Only schemes present in [`ALL_SCHEMES`] can be included. - pub fn include(mut self, ids: impl IntoIterator) -> Self { - let ids: HashSet<_> = ids.into_iter().collect(); - for scheme in ALL_SCHEMES { - if ids.contains(&scheme.id()) { - self.schemes.insert(*scheme); - } - } + /// This allows encoding crates outside of `vortex-btrblocks` to register their own schemes with + /// the compressor. + /// + /// # Panics + /// + /// Panics if a scheme with the same [`SchemeId`] is already present. + pub fn with_new_scheme(mut self, scheme: &'static dyn Scheme) -> Self { + assert!( + !self.schemes.iter().any(|s| s.id() == scheme.id()), + "scheme {:?} is already present in the builder", + scheme.id(), + ); + + self.schemes.push(scheme); self } + /// Adds compact encoding schemes (Zstd for strings, Pco for numerics). + /// + /// This provides better compression ratios than the default, especially for floating-point + /// heavy datasets. Requires the `zstd` feature. When the `pco` feature is also enabled, + /// Pco schemes for integers and floats are included. + /// + /// # Panics + /// + /// Panics if any of the compact schemes are already present. + #[cfg(feature = "zstd")] + pub fn with_compact(self) -> Self { + // This should be fast since we don't have that many schemes. + let builder = self.with_new_scheme(&string::ZstdScheme); + + #[cfg(feature = "pco")] + let builder = builder + .with_new_scheme(&integer::PcoScheme) + .with_new_scheme(&float::PcoScheme); + + builder + } + /// Excludes the specified compression schemes by their [`SchemeId`]. pub fn exclude(mut self, ids: impl IntoIterator) -> Self { let ids: HashSet<_> = ids.into_iter().collect(); @@ -152,15 +147,7 @@ impl BtrBlocksCompressorBuilder { } /// Builds the configured [`BtrBlocksCompressor`]. - /// - /// The resulting scheme list preserves the order of [`ALL_SCHEMES`] for deterministic - /// tie-breaking. pub fn build(self) -> BtrBlocksCompressor { - let schemes = ALL_SCHEMES - .iter() - .copied() - .filter(|s| self.schemes.contains(s)) - .collect(); - BtrBlocksCompressor(CascadingCompressor::new(schemes)) + BtrBlocksCompressor(CascadingCompressor::new(self.schemes)) } } diff --git a/vortex-btrblocks/src/lib.rs b/vortex-btrblocks/src/lib.rs index eac32412e4a..5ea55a40c66 100644 --- a/vortex-btrblocks/src/lib.rs +++ b/vortex-btrblocks/src/lib.rs @@ -63,7 +63,6 @@ pub mod schemes; // Btrblocks-specific exports. pub use builder::ALL_SCHEMES; pub use builder::BtrBlocksCompressorBuilder; -pub use builder::default_excluded; pub use canonical_compressor::BtrBlocksCompressor; pub use schemes::patches::compress_patches; pub use vortex_compressor::CascadingCompressor; diff --git a/vortex-file/src/strategy.rs b/vortex-file/src/strategy.rs index efd693c5ca1..c855a825c17 100644 --- a/vortex-file/src/strategy.rs +++ b/vortex-file/src/strategy.rs @@ -28,6 +28,10 @@ use vortex_array::arrays::VarBinView; use vortex_array::dtype::FieldPath; use vortex_array::session::ArrayRegistry; use vortex_array::session::ArraySession; +use vortex_btrblocks::BtrBlocksCompressor; +use vortex_btrblocks::BtrBlocksCompressorBuilder; +use vortex_btrblocks::SchemeExt; +use vortex_btrblocks::schemes::integer::IntDictScheme; use vortex_bytebool::ByteBool; use vortex_datetime_parts::DateTimeParts; use vortex_decimal_byte_parts::DecimalByteParts; @@ -59,8 +63,6 @@ use vortex_zigzag::ZigZag; #[rustfmt::skip] #[cfg(feature = "zstd")] use vortex_btrblocks::{ - BtrBlocksCompressorBuilder, - SchemeExt, schemes::float, schemes::integer, schemes::string, @@ -127,7 +129,7 @@ pub static ALLOWED_ENCODINGS: LazyLock = LazyLock::new(|| { /// repartitioning and compressing them to strike a balance between size on-disk, /// bulk decoding performance, and IOPS required to perform an indexed read. pub struct WriteStrategyBuilder { - compressor: Option>, + compressor_override: Option>, row_block_size: usize, field_writers: HashMap>, allow_encodings: Option, @@ -139,7 +141,7 @@ impl Default for WriteStrategyBuilder { /// and then finally built yielding the [`LayoutStrategy`]. fn default() -> Self { Self { - compressor: None, + compressor_override: None, row_block_size: 8192, field_writers: HashMap::new(), allow_encodings: Some(ALLOWED_ENCODINGS.clone()), @@ -149,15 +151,6 @@ impl Default for WriteStrategyBuilder { } impl WriteStrategyBuilder { - /// Override the [compressor][CompressorPlugin] used for compressing chunks in the file. - /// - /// If not provided, this will use a BtrBlocks-style cascading compressor that tries to balance - /// total size with decoding performance. - pub fn with_compressor(mut self, compressor: C) -> Self { - self.compressor = Some(Arc::new(compressor)); - self - } - /// Override the row block size used to determine the zone map sizes. pub fn with_row_block_size(mut self, row_block_size: usize) -> Self { self.row_block_size = row_block_size; @@ -190,14 +183,51 @@ impl WriteStrategyBuilder { self } + /// Override the [compressor](CompressorPlugin) used for compressing chunks in the file. + /// + /// If not provided, this will use a BtrBlocks-style cascading compressor that tries to balance + /// total size with decoding performance. + /// + /// # Panics + /// + /// Panics if a compressor has already been set via + /// [`with_compressor`](Self::with_compressor), + /// [`with_cuda_compatible_encodings`](Self::with_cuda_compatible_encodings), or + /// [`with_compact_encodings`](Self::with_compact_encodings). + /// + /// These methods are mutually exclusive. + pub fn with_compressor(mut self, compressor: C) -> Self { + assert!( + self.compressor_override.is_none(), + "A compressor has already been configured. `with_compressor`, \ + `with_cuda_compatible_encodings`, and `with_compact_encodings` are mutually exclusive." + ); + self.compressor_override = Some(Arc::new(compressor)); + self + } + /// Configure a write strategy that emits only CUDA-compatible encodings. /// + /// This method simply exists as a wrapper around [`with_compressor`]. + /// /// This configures BtrBlocks to exclude schemes without CUDA kernel support. /// With the `unstable_encodings` feature, strings use buffer-level Zstd compression /// (`ZstdBuffersArray`) which preserves the array buffer layout for zero-conversion /// GPU decompression. Without it, strings use interleaved Zstd compression. + /// + /// # Panics + /// + /// Panics if a compressor has already been set. See [`with_compressor`] + /// + /// [`with_compressor`]: Self::with_compressor. #[cfg(feature = "zstd")] pub fn with_cuda_compatible_encodings(mut self) -> Self { + assert!( + self.compressor_override.is_none(), + "A compressor has already been configured. `with_compressor`, \ + `with_cuda_compatible_encodings`, and `with_compact_encodings` are mutually exclusive." + ); + let mut builder = BtrBlocksCompressorBuilder::default().exclude([ integer::SparseScheme.id(), integer::RLE_INTEGER_SCHEME.id(), @@ -209,33 +239,41 @@ impl WriteStrategyBuilder { #[cfg(feature = "unstable_encodings")] { - builder = builder.include([string::ZstdBuffersScheme.id()]); + builder = builder.with_new_scheme(&string::ZstdBuffersScheme); } #[cfg(not(feature = "unstable_encodings"))] { - builder = builder.include([string::ZstdScheme.id()]); + builder = builder.with_new_scheme(&string::ZstdScheme); } - self.compressor = Some(Arc::new(builder.build())); + self.compressor_override = Some(Arc::new(builder.build())); self } /// Configure a write strategy that uses compact encodings (Pco for numerics, Zstd for /// strings/binary). /// + /// This method simply exists as a wrapper around [`with_compressor`]. + /// /// This provides better compression ratios than the default BtrBlocks strategy, /// especially for floating-point heavy datasets. + /// + /// # Panics + /// + /// Panics if a compressor has already been set. See [`with_compressor`] + /// + /// [`with_compressor`]: Self::with_compressor. #[cfg(feature = "zstd")] pub fn with_compact_encodings(mut self) -> Self { - let btrblocks = BtrBlocksCompressorBuilder::default() - .include([ - string::ZstdScheme.id(), - integer::PcoScheme.id(), - float::PcoScheme.id(), - ]) - .build(); - - self.compressor = Some(Arc::new(btrblocks)); + assert!( + self.compressor_override.is_none(), + "A compressor has already been configured. `with_compressor`, \ + `with_cuda_compatible_encodings`, and `with_compact_encodings` are mutually exclusive." + ); + + self.compressor_override = Some(Arc::new( + BtrBlocksCompressorBuilder::default().with_compact().build(), + )); self } @@ -254,12 +292,22 @@ impl WriteStrategyBuilder { let chunked = ChunkedLayoutStrategy::new(flat.clone()); // 6. buffer chunks so they end up with closer segment ids physically let buffered = BufferedStrategy::new(chunked, 2 * ONE_MEG); // 2MB - // 5. compress each chunk - let compressing = if let Some(ref compressor) = self.compressor { - CompressingStrategy::new_opaque(buffered, compressor.clone()) - } else { - CompressingStrategy::new_btrblocks(buffered, true) - }; + + // 5. compress each chunk. + // Exclude IntDictScheme from the default compressor because DictStrategy (step 3) already + // dictionary-encodes columns. Allowing IntDictScheme here would redundantly + // dictionary-encode the integer codes produced by that earlier step. + let data_compressor: Arc = + if let Some(ref compressor) = self.compressor_override { + compressor.clone() + } else { + Arc::new( + BtrBlocksCompressorBuilder::default() + .exclude([IntDictScheme.id()]) + .build(), + ) + }; + let compressing = CompressingStrategy::new(buffered, data_compressor); // 4. prior to compression, coalesce up to a minimum size let coalescing = RepartitionStrategy::new( @@ -279,11 +327,13 @@ impl WriteStrategyBuilder { ); // 2.1. | 3.1. compress stats tables and dict values. - let compress_then_flat = if let Some(ref compressor) = self.compressor { - CompressingStrategy::new_opaque(flat, compressor.clone()) - } else { - CompressingStrategy::new_btrblocks(flat, false) - }; + let stats_compressor: Arc = + if let Some(ref compressor) = self.compressor_override { + compressor.clone() + } else { + Arc::new(BtrBlocksCompressor::default()) + }; + let compress_then_flat = CompressingStrategy::new(flat, stats_compressor); // 3. apply dict encoding or fallback let dict = DictStrategy::new( diff --git a/vortex-file/tests/test_write_table.rs b/vortex-file/tests/test_write_table.rs index db7b16feada..74f5d0951d2 100644 --- a/vortex-file/tests/test_write_table.rs +++ b/vortex-file/tests/test_write_table.rs @@ -20,6 +20,7 @@ use vortex_array::field_path; use vortex_array::scalar_fn::session::ScalarFnSession; use vortex_array::session::ArraySession; use vortex_array::validity::Validity; +use vortex_btrblocks::BtrBlocksCompressor; use vortex_buffer::ByteBuffer; use vortex_file::OpenOptionsSessionExt; use vortex_file::WriteOptionsSessionExt; @@ -67,9 +68,9 @@ async fn test_file_roundtrip() { // Create a writer which by default uses the BtrBlocks compressor for a.compressed, but leaves // the b and the a.raw columns uncompressed. - let default_strategy = Arc::new(CompressingStrategy::new_btrblocks( + let default_strategy = Arc::new(CompressingStrategy::new( FlatLayoutStrategy::default(), - false, + BtrBlocksCompressor::default(), )); let writer = Arc::new( diff --git a/vortex-layout/public-api.lock b/vortex-layout/public-api.lock index 88e1dc65a40..1b31f837935 100644 --- a/vortex-layout/public-api.lock +++ b/vortex-layout/public-api.lock @@ -168,9 +168,7 @@ pub struct vortex_layout::layouts::compressed::CompressingStrategy impl vortex_layout::layouts::compressed::CompressingStrategy -pub fn vortex_layout::layouts::compressed::CompressingStrategy::new_btrblocks(child: S, exclude_int_dict_encoding: bool) -> Self - -pub fn vortex_layout::layouts::compressed::CompressingStrategy::new_opaque(child: S, compressor: C) -> Self +pub fn vortex_layout::layouts::compressed::CompressingStrategy::new(child: S, compressor: C) -> Self pub fn vortex_layout::layouts::compressed::CompressingStrategy::with_concurrency(self, concurrency: usize) -> Self diff --git a/vortex-layout/src/layouts/compressed.rs b/vortex-layout/src/layouts/compressed.rs index 539e59982d6..1d5dac46b6d 100644 --- a/vortex-layout/src/layouts/compressed.rs +++ b/vortex-layout/src/layouts/compressed.rs @@ -9,9 +9,6 @@ use vortex_array::ArrayContext; use vortex_array::ArrayRef; use vortex_array::expr::stats::Stat; use vortex_btrblocks::BtrBlocksCompressor; -use vortex_btrblocks::BtrBlocksCompressorBuilder; -use vortex_btrblocks::SchemeExt; -use vortex_btrblocks::schemes::integer::IntDictScheme; use vortex_error::VortexResult; use vortex_io::runtime::Handle; @@ -60,32 +57,11 @@ pub struct CompressingStrategy { } impl CompressingStrategy { - /// Create a new writer that uses the BtrBlocks-style cascading compressor to compress chunks. - /// - /// This provides a good balance between decoding speed and small file size. - /// - /// Set `exclude_int_dict_encoding` to true to prevent dictionary encoding of integer arrays, - /// which is useful when compressing dictionary codes to avoid recursive dictionary encoding. - pub fn new_btrblocks(child: S, exclude_int_dict_encoding: bool) -> Self { - let compressor = if exclude_int_dict_encoding { - BtrBlocksCompressorBuilder::default() - .exclude([IntDictScheme.id()]) - .build() - } else { - BtrBlocksCompressor::default() - }; - Self::new(child, Arc::new(compressor)) - } - - /// Create a new compressor from a plugin interface. - pub fn new_opaque(child: S, compressor: C) -> Self { - Self::new(child, Arc::new(compressor)) - } - - fn new(child: S, compressor: Arc) -> Self { + /// Create a new compressing layout strategy with the given child strategy and compressor. + pub fn new(child: S, compressor: C) -> Self { Self { child: Arc::new(child), - compressor, + compressor: Arc::new(compressor), concurrency: std::thread::available_parallelism() .map(|v| v.get()) .unwrap_or(1), diff --git a/vortex-layout/src/layouts/table.rs b/vortex-layout/src/layouts/table.rs index 2f2bf9df863..db92059c58b 100644 --- a/vortex-layout/src/layouts/table.rs +++ b/vortex-layout/src/layouts/table.rs @@ -90,8 +90,9 @@ impl TableStrategy { /// # use vortex_layout::layouts::flat::writer::FlatLayoutStrategy; /// # use vortex_layout::layouts::table::TableStrategy; /// + /// # use vortex_btrblocks::BtrBlocksCompressor; /// // A strategy for compressing data using the balanced BtrBlocks compressor. - /// let compress = CompressingStrategy::new_btrblocks(FlatLayoutStrategy::default(), true); + /// let compress = CompressingStrategy::new(FlatLayoutStrategy::default(), BtrBlocksCompressor::default()); /// /// // Our combined strategy uses no compression for validity buffers, BtrBlocks compression /// // for most columns, and stores a nested binary column uncompressed (flat) because it