Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions fuzz/fuzz_targets/file_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use vortex_array::dtype::StructFields;
use vortex_array::expr::lit;
use vortex_array::expr::root;
use vortex_array::scalar_fn::fns::operators::Operator;
use vortex_btrblocks::BtrBlocksCompressorBuilder;
use vortex_buffer::ByteBufferMut;
use vortex_error::VortexExpect;
use vortex_error::vortex_panic;
Expand Down Expand Up @@ -59,12 +60,11 @@ fuzz_target!(|fuzz: FuzzFileAction| -> Corpus {

let write_options = match compressor_strategy {
CompressorStrategy::Default => SESSION.write_options(),
CompressorStrategy::Compact => {
let strategy = WriteStrategyBuilder::default()
.with_compact_encodings()
.build();
SESSION.write_options().with_strategy(strategy)
}
CompressorStrategy::Compact => SESSION.write_options().with_strategy(
WriteStrategyBuilder::default()
.with_btrblocks_builder(BtrBlocksCompressorBuilder::default().with_compact())
.build(),
),
};

let mut full_buff = ByteBufferMut::empty();
Expand Down
3 changes: 2 additions & 1 deletion vortex-bench/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use tpcds::TpcDsBenchmark;
use tpch::benchmark::TpcHBenchmark;
pub use utils::file::*;
pub use utils::logging::*;
use vortex::compressor::BtrBlocksCompressorBuilder;
use vortex::error::VortexExpect;
use vortex::error::vortex_err;
use vortex::file::VortexWriteOptions;
Expand Down Expand Up @@ -231,7 +232,7 @@ impl CompactionStrategy {
match self {
CompactionStrategy::Compact => options.with_strategy(
WriteStrategyBuilder::default()
.with_compact_encodings()
.with_btrblocks_builder(BtrBlocksCompressorBuilder::default().with_compact())
.build(),
),
CompactionStrategy::Default => options,
Expand Down
4 changes: 3 additions & 1 deletion vortex-btrblocks/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,9 @@ impl vortex_btrblocks::BtrBlocksCompressorBuilder

pub fn vortex_btrblocks::BtrBlocksCompressorBuilder::build(self) -> vortex_btrblocks::BtrBlocksCompressor

pub fn vortex_btrblocks::BtrBlocksCompressorBuilder::exclude(self, ids: impl core::iter::traits::collect::IntoIterator<Item = vortex_compressor::scheme::SchemeId>) -> Self
pub fn vortex_btrblocks::BtrBlocksCompressorBuilder::exclude_schemes(self, ids: impl core::iter::traits::collect::IntoIterator<Item = vortex_compressor::scheme::SchemeId>) -> Self

pub fn vortex_btrblocks::BtrBlocksCompressorBuilder::only_cuda_compatible(self) -> Self

pub fn vortex_btrblocks::BtrBlocksCompressorBuilder::with_compact(self) -> Self

Expand Down
39 changes: 31 additions & 8 deletions vortex-btrblocks/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub const ALL_SCHEMES: &[&dyn Scheme] = &[
///
/// By default, all schemes in [`ALL_SCHEMES`] are enabled. Feature-gated schemes (Pco, Zstd)
/// are not in `ALL_SCHEMES` and must be added explicitly via
/// [`with_new_scheme`](BtrBlocksCompressorBuilder::with_new_scheme) or
/// [`with_scheme`](BtrBlocksCompressorBuilder::with_new_scheme) or
/// [`with_compact`](BtrBlocksCompressorBuilder::with_compact).
///
/// # Examples
Expand All @@ -79,9 +79,9 @@ pub const ALL_SCHEMES: &[&dyn Scheme] = &[
/// // Default compressor with all schemes in ALL_SCHEMES.
/// let compressor = BtrBlocksCompressorBuilder::default().build();
///
/// // Exclude specific schemes.
/// // Remove specific schemes.
/// let compressor = BtrBlocksCompressorBuilder::default()
/// .exclude([IntDictScheme.id()])
/// .exclude_schemes([IntDictScheme.id()])
/// .build();
/// ```
#[derive(Debug, Clone)]
Expand All @@ -100,8 +100,8 @@ impl Default for BtrBlocksCompressorBuilder {
impl BtrBlocksCompressorBuilder {
/// Adds an external compression scheme not in [`ALL_SCHEMES`].
///
/// This allows encoding crates outside of `vortex-btrblocks` to register their own schemes with
/// the compressor.
/// This allows encoding crates outside of `vortex-btrblocks` to register their own schemes
/// with the compressor.
///
/// # Panics
///
Expand All @@ -128,7 +128,6 @@ impl BtrBlocksCompressorBuilder {
/// Panics if any of the compact schemes are already present.
#[cfg(feature = "zstd")]
pub fn with_compact(self) -> Self {
// This should be fast since we don't have that many schemes.
let builder = self.with_new_scheme(&string::ZstdScheme);

#[cfg(feature = "pco")]
Expand All @@ -139,8 +138,32 @@ impl BtrBlocksCompressorBuilder {
builder
}

/// Excludes the specified compression schemes by their [`SchemeId`].
pub fn exclude(mut self, ids: impl IntoIterator<Item = SchemeId>) -> Self {
/// Excludes schemes without CUDA kernel support and adds Zstd for string compression.
///
/// With the `unstable_encodings` feature, buffer-level Zstd compression is used which
/// preserves the array buffer layout for zero-conversion GPU decompression. Without it,
/// interleaved Zstd compression is used.
#[cfg(feature = "zstd")]
pub fn only_cuda_compatible(self) -> Self {
let builder = self.exclude_schemes([
integer::SparseScheme.id(),
rle::RLE_INTEGER_SCHEME.id(),
rle::RLE_FLOAT_SCHEME.id(),
float::NullDominatedSparseScheme.id(),
string::StringDictScheme.id(),
string::FSSTScheme.id(),
]);

#[cfg(feature = "unstable_encodings")]
let builder = builder.with_new_scheme(&string::ZstdBuffersScheme);
#[cfg(not(feature = "unstable_encodings"))]
let builder = builder.with_new_scheme(&string::ZstdScheme);

builder
}

/// Removes the specified compression schemes by their [`SchemeId`].
pub fn exclude_schemes(mut self, ids: impl IntoIterator<Item = SchemeId>) -> Self {
let ids: HashSet<_> = ids.into_iter().collect();
self.schemes.retain(|s| !ids.contains(&s.id()));
self
Expand Down
4 changes: 2 additions & 2 deletions vortex-btrblocks/src/canonical_compressor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ use crate::CascadingCompressor;
/// // Default compressor - all schemes allowed.
/// let compressor = BtrBlocksCompressor::default();
///
/// // Exclude specific schemes using the builder.
/// // Remove specific schemes using the builder.
/// let compressor = BtrBlocksCompressorBuilder::default()
/// .exclude([IntDictScheme.id()])
/// .exclude_schemes([IntDictScheme.id()])
/// .build();
/// ```
#[derive(Clone)]
Expand Down
4 changes: 2 additions & 2 deletions vortex-btrblocks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@
//! // Default compressor with all schemes enabled.
//! let compressor = BtrBlocksCompressor::default();
//!
//! // Configure with builder to exclude specific schemes.
//! // Remove specific schemes using the builder.
//! let compressor = BtrBlocksCompressorBuilder::default()
//! .exclude([IntDictScheme.id()])
//! .exclude_schemes([IntDictScheme.id()])
//! .build();
//! ```
//!
Expand Down
3 changes: 2 additions & 1 deletion vortex-cuda/gpu-scan-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use vortex::VortexSessionDefault;
use vortex::array::ToCanonical;
use vortex::array::arrays::Dict;
use vortex::buffer::ByteBufferMut;
use vortex::compressor::BtrBlocksCompressorBuilder;
use vortex::error::VortexResult;
use vortex::file::OpenOptionsSessionExt;
use vortex::file::WriteOptionsSessionExt;
Expand Down Expand Up @@ -92,7 +93,7 @@ async fn main() -> VortexResult<()> {
#[cuda_available]
fn cuda_write_strategy() -> Arc<dyn vortex::layout::LayoutStrategy> {
WriteStrategyBuilder::default()
.with_cuda_compatible_encodings()
.with_btrblocks_builder(BtrBlocksCompressorBuilder::default().only_cuda_compatible())
.with_flat_strategy(Arc::new(CudaFlatLayoutStrategy::default()))
.build()
}
Expand Down
4 changes: 1 addition & 3 deletions vortex-file/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -346,12 +346,10 @@ pub fn vortex_file::WriteStrategyBuilder::build(self) -> alloc::sync::Arc<dyn vo

pub fn vortex_file::WriteStrategyBuilder::with_allow_encodings(self, allow_encodings: vortex_array::session::ArrayRegistry) -> Self

pub fn vortex_file::WriteStrategyBuilder::with_compact_encodings(self) -> Self
pub fn vortex_file::WriteStrategyBuilder::with_btrblocks_builder(self, builder: vortex_btrblocks::builder::BtrBlocksCompressorBuilder) -> Self

pub fn vortex_file::WriteStrategyBuilder::with_compressor<C: vortex_layout::layouts::compressed::CompressorPlugin>(self, compressor: C) -> Self

pub fn vortex_file::WriteStrategyBuilder::with_cuda_compatible_encodings(self) -> Self

pub fn vortex_file::WriteStrategyBuilder::with_field_writer(self, field: impl core::convert::Into<vortex_array::dtype::field::FieldPath>, writer: alloc::sync::Arc<dyn vortex_layout::strategy::LayoutStrategy>) -> Self

pub fn vortex_file::WriteStrategyBuilder::with_flat_strategy(self, flat: alloc::sync::Arc<dyn vortex_layout::strategy::LayoutStrategy>) -> Self
Expand Down
152 changes: 37 additions & 115 deletions vortex-file/src/strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use vortex_array::arrays::VarBinView;
use vortex_array::dtype::FieldPath;
use vortex_array::session::ArrayRegistry;
use vortex_array::session::ArraySession;
use vortex_btrblocks::BtrBlocksCompressor;
use vortex_btrblocks::BtrBlocksCompressorBuilder;
use vortex_btrblocks::SchemeExt;
use vortex_btrblocks::schemes::integer::IntDictScheme;
Expand Down Expand Up @@ -59,14 +58,6 @@ use vortex_sequence::Sequence;
use vortex_sparse::Sparse;
use vortex_utils::aliases::hash_map::HashMap;
use vortex_zigzag::ZigZag;

#[rustfmt::skip]
#[cfg(feature = "zstd")]
use vortex_btrblocks::{
schemes::float,
schemes::integer,
schemes::string,
};
#[cfg(feature = "zstd")]
use vortex_zstd::Zstd;
#[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
Expand Down Expand Up @@ -123,13 +114,24 @@ pub static ALLOWED_ENCODINGS: LazyLock<ArrayRegistry> = LazyLock::new(|| {
session.registry().clone()
});

/// Build a new [writer strategy][LayoutStrategy] to compress and reorganize chunks of a Vortex file.
/// How the compressor was configured on [`WriteStrategyBuilder`].
enum CompressorConfig {
/// A [`BtrBlocksCompressorBuilder`] that [`WriteStrategyBuilder::build`] will finalize.
/// `IntDictScheme` is automatically excluded from the data compressor to prevent recursive
/// dictionary encoding.
BtrBlocks(BtrBlocksCompressorBuilder),
/// An opaque compressor used as-is for both data and stats compression.
Opaque(Arc<dyn CompressorPlugin>),
}

/// Build a new [writer strategy](LayoutStrategy) to compress and reorganize chunks of a Vortex
/// file.
///
/// Vortex provides an out-of-the-box file writer that optimizes the layout of chunks on-disk,
/// repartitioning and compressing them to strike a balance between size on-disk,
/// bulk decoding performance, and IOPS required to perform an indexed read.
pub struct WriteStrategyBuilder {
compressor_override: Option<Arc<dyn CompressorPlugin>>,
compressor: CompressorConfig,
row_block_size: usize,
field_writers: HashMap<FieldPath, Arc<dyn LayoutStrategy>>,
allow_encodings: Option<ArrayRegistry>,
Expand All @@ -141,7 +143,7 @@ impl Default for WriteStrategyBuilder {
/// and then finally built yielding the [`LayoutStrategy`].
fn default() -> Self {
Self {
compressor_override: None,
compressor: CompressorConfig::BtrBlocks(BtrBlocksCompressorBuilder::default()),
row_block_size: 8192,
field_writers: HashMap::new(),
allow_encodings: Some(ALLOWED_ENCODINGS.clone()),
Expand Down Expand Up @@ -183,97 +185,20 @@ impl WriteStrategyBuilder {
self
}

/// Override the [compressor](CompressorPlugin) used for compressing chunks in the file.
///
/// If not provided, this will use a BtrBlocks-style cascading compressor that tries to balance
/// total size with decoding performance.
///
/// # Panics
///
/// Panics if a compressor has already been set via
/// [`with_compressor`](Self::with_compressor),
/// [`with_cuda_compatible_encodings`](Self::with_cuda_compatible_encodings), or
/// [`with_compact_encodings`](Self::with_compact_encodings).
///
/// These methods are mutually exclusive.
pub fn with_compressor<C: CompressorPlugin>(mut self, compressor: C) -> Self {
assert!(
self.compressor_override.is_none(),
"A compressor has already been configured. `with_compressor`, \
`with_cuda_compatible_encodings`, and `with_compact_encodings` are mutually exclusive."
);
self.compressor_override = Some(Arc::new(compressor));
self
}

/// Configure a write strategy that emits only CUDA-compatible encodings.
///
/// This method simply exists as a wrapper around [`with_compressor`].
///
/// This configures BtrBlocks to exclude schemes without CUDA kernel support.
/// With the `unstable_encodings` feature, strings use buffer-level Zstd compression
/// (`ZstdBuffersArray`) which preserves the array buffer layout for zero-conversion
/// GPU decompression. Without it, strings use interleaved Zstd compression.
///
/// # Panics
///
/// Panics if a compressor has already been set. See [`with_compressor`]
/// Override the default [`BtrBlocksCompressorBuilder`] used for compression.
///
/// [`with_compressor`]: Self::with_compressor.
#[cfg(feature = "zstd")]
pub fn with_cuda_compatible_encodings(mut self) -> Self {
assert!(
self.compressor_override.is_none(),
"A compressor has already been configured. `with_compressor`, \
`with_cuda_compatible_encodings`, and `with_compact_encodings` are mutually exclusive."
);

let mut builder = BtrBlocksCompressorBuilder::default().exclude([
integer::SparseScheme.id(),
integer::RLE_INTEGER_SCHEME.id(),
float::RLE_FLOAT_SCHEME.id(),
float::NullDominatedSparseScheme.id(),
string::StringDictScheme.id(),
string::FSSTScheme.id(),
]);

#[cfg(feature = "unstable_encodings")]
{
builder = builder.with_new_scheme(&string::ZstdBuffersScheme);
}
#[cfg(not(feature = "unstable_encodings"))]
{
builder = builder.with_new_scheme(&string::ZstdScheme);
}

self.compressor_override = Some(Arc::new(builder.build()));
/// The builder is finalized during [`build`](Self::build), producing two compressors: one for
/// data (with `IntDictScheme` excluded) and one for stats.
pub fn with_btrblocks_builder(mut self, builder: BtrBlocksCompressorBuilder) -> Self {
self.compressor = CompressorConfig::BtrBlocks(builder);
self
}

/// Configure a write strategy that uses compact encodings (Pco for numerics, Zstd for
/// strings/binary).
///
/// This method simply exists as a wrapper around [`with_compressor`].
///
/// This provides better compression ratios than the default BtrBlocks strategy,
/// especially for floating-point heavy datasets.
/// Set the compressor to an opaque [`CompressorPlugin`].
///
/// # Panics
///
/// Panics if a compressor has already been set. See [`with_compressor`]
///
/// [`with_compressor`]: Self::with_compressor.
#[cfg(feature = "zstd")]
pub fn with_compact_encodings(mut self) -> Self {
assert!(
self.compressor_override.is_none(),
"A compressor has already been configured. `with_compressor`, \
`with_cuda_compatible_encodings`, and `with_compact_encodings` are mutually exclusive."
);

self.compressor_override = Some(Arc::new(
BtrBlocksCompressorBuilder::default().with_compact().build(),
));
/// The compressor is used as-is for both data and stats compression.
pub fn with_compressor<C: CompressorPlugin>(mut self, compressor: C) -> Self {
self.compressor = CompressorConfig::Opaque(Arc::new(compressor));
self
}

Expand All @@ -294,19 +219,18 @@ impl WriteStrategyBuilder {
let buffered = BufferedStrategy::new(chunked, 2 * ONE_MEG); // 2MB

// 5. compress each chunk.
// Exclude IntDictScheme from the default compressor because DictStrategy (step 3) already
// Exclude IntDictScheme from the data compressor because DictStrategy (step 3) already
// dictionary-encodes columns. Allowing IntDictScheme here would redundantly
// dictionary-encode the integer codes produced by that earlier step.
let data_compressor: Arc<dyn CompressorPlugin> =
if let Some(ref compressor) = self.compressor_override {
compressor.clone()
} else {
Arc::new(
BtrBlocksCompressorBuilder::default()
.exclude([IntDictScheme.id()])
.build(),
)
};
let data_compressor: Arc<dyn CompressorPlugin> = match &self.compressor {
CompressorConfig::BtrBlocks(builder) => Arc::new(
builder
.clone()
.exclude_schemes([IntDictScheme.id()])
.build(),
),
CompressorConfig::Opaque(compressor) => compressor.clone(),
};
let compressing = CompressingStrategy::new(buffered, data_compressor);

// 4. prior to compression, coalesce up to a minimum size
Expand All @@ -327,12 +251,10 @@ impl WriteStrategyBuilder {
);

// 2.1. | 3.1. compress stats tables and dict values.
let stats_compressor: Arc<dyn CompressorPlugin> =
if let Some(ref compressor) = self.compressor_override {
compressor.clone()
} else {
Arc::new(BtrBlocksCompressor::default())
};
let stats_compressor: Arc<dyn CompressorPlugin> = match self.compressor {
CompressorConfig::BtrBlocks(builder) => Arc::new(builder.build()),
CompressorConfig::Opaque(compressor) => compressor,
};
let compress_then_flat = CompressingStrategy::new(flat, stats_compressor);

// 3. apply dict encoding or fallback
Expand Down
Loading
Loading