Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions vortex-array/src/array/erased.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,13 @@ impl ArrayRef {
self.is::<AnyCanonical>()
}

/// Returns a new array with all slots replaced.
pub fn with_slots(self, slots: Vec<Option<ArrayRef>>) -> VortexResult<ArrayRef> {
vortex_ensure!(self.slots().len() == slots.len(), "slot count mismatch");
let vtable = self.vtable().clone_boxed();
vtable.with_slots(self, slots)
}

/// Returns a new array with the slot at `slot_idx` replaced by `replacement`.
///
/// Takes ownership to allow in-place mutation when the refcount is 1.
Expand Down
140 changes: 129 additions & 11 deletions vortex-array/src/normalize.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use itertools::Itertools;
use vortex_error::VortexResult;
use vortex_error::vortex_bail;
use vortex_session::registry::Id;
use vortex_utils::aliases::hash_set::HashSet;

use crate::ArrayRef;
use crate::session::ArrayRegistry;
use crate::ExecutionCtx;

/// Options for normalizing an array.
pub struct NormalizeOptions<'a> {
/// The set of allowed array encodings (in addition to the canonical ones) that are permitted
/// in the normalized array.
pub allowed: &'a ArrayRegistry,
pub allowed: &'a HashSet<Id>,
/// The operation to perform when a non-allowed encoding is encountered.
pub operation: Operation,
pub operation: Operation<'a>,
}

/// The operation to perform when a non-allowed encoding is encountered.
pub enum Operation {
pub enum Operation<'a> {
Error,
// TODO(joe): add into canonical variant
Execute(&'a mut ExecutionCtx),
}

impl ArrayRef {
Expand All @@ -30,13 +30,17 @@ impl ArrayRef {
/// This operation performs a recursive traversal of the array. Any non-allowed encoding is
/// normalized per the configured operation.
pub fn normalize(self, options: &mut NormalizeOptions) -> VortexResult<ArrayRef> {
let array_ids = options.allowed.ids().collect_vec();
self.normalize_with_error(&array_ids)?;
// Note this takes ownership so we can at a later date remove non-allowed encodings.
Ok(self)
match &mut options.operation {
Operation::Error => {
self.normalize_with_error(options.allowed)?;
// Note this takes ownership so we can at a later date remove non-allowed encodings.
Ok(self)
}
Operation::Execute(ctx) => self.normalize_with_execution(options.allowed, *ctx),
}
}

fn normalize_with_error(&self, allowed: &[Id]) -> VortexResult<()> {
fn normalize_with_error(&self, allowed: &HashSet<Id>) -> VortexResult<()> {
if !allowed.contains(&self.encoding_id()) {
vortex_bail!(AssertionFailed: "normalize forbids encoding ({})", self.encoding_id())
}
Expand All @@ -46,4 +50,118 @@ impl ArrayRef {
}
Ok(())
}

fn normalize_with_execution(
self,
allowed: &HashSet<Id>,
ctx: &mut ExecutionCtx,
) -> VortexResult<ArrayRef> {
let mut normalized = self;

// Top-first execute the array tree while we hit non-allowed encodings.
while !allowed.contains(&normalized.encoding_id()) {
normalized = normalized.execute(ctx)?;
}

// Now we've normalized the root, we need to ensure the children are normalized also.
let slots = normalized.slots();
let mut normalized_slots = Vec::with_capacity(slots.len());
let mut any_slot_changed = false;

for slot in slots {
match slot {
Some(child) => {
let normalized_child = child.clone().normalize(&mut NormalizeOptions {
allowed,
operation: Operation::Execute(ctx),
})?;
any_slot_changed |= !ArrayRef::ptr_eq(&child, &normalized_child);
normalized_slots.push(Some(normalized_child));
}
None => normalized_slots.push(None),
}
}

if any_slot_changed {
normalized = normalized.with_slots(normalized_slots)?;
}

Ok(normalized)
}
}

#[cfg(test)]
mod tests {
use vortex_error::VortexResult;
use vortex_session::VortexSession;

use super::NormalizeOptions;
use super::Operation;
use crate::ArrayRef;
use crate::ExecutionCtx;
use crate::IntoArray;
use crate::arrays::PrimitiveArray;
use crate::arrays::SliceArray;
use crate::arrays::StructArray;
use crate::assert_arrays_eq;
use crate::validity::Validity;

#[test]
fn normalize_with_execution_keeps_parent_when_children_are_unchanged() -> VortexResult<()> {
let field = PrimitiveArray::from_iter(0i32..4).into_array();
let array = StructArray::try_new(
["field"].into(),
vec![field.clone()],
field.len(),
Validity::NonNullable,
)?
.into_array();
let allowed = HashSet::from_iter([array.encoding_id(), field.encoding_id()]);
let mut ctx = ExecutionCtx::new(VortexSession::empty());

let normalized = array.clone().normalize(&mut NormalizeOptions {
allowed: &allowed,
operation: Operation::Execute(&mut ctx),
})?;

assert!(ArrayRef::ptr_eq(&array, &normalized));
Ok(())
}

#[test]
fn normalize_with_execution_rebuilds_parent_when_a_child_changes() -> VortexResult<()> {
let unchanged = PrimitiveArray::from_iter(0i32..4).into_array();
let sliced =
SliceArray::new(PrimitiveArray::from_iter(10i32..20).into_array(), 2..6).into_array();
let array = StructArray::try_new(
["lhs", "rhs"].into(),
vec![unchanged.clone(), sliced.clone()],
unchanged.len(),
Validity::NonNullable,
)?
.into_array();
let allowed = HashSet::from_iter([array.encoding_id(), unchanged.encoding_id()]);
let mut ctx = ExecutionCtx::new(VortexSession::empty());

let normalized = array.clone().normalize(&mut NormalizeOptions {
allowed: &allowed,
operation: Operation::Execute(&mut ctx),
})?;

assert!(!ArrayRef::ptr_eq(&array, &normalized));

let original_children = array.children();
let normalized_children = normalized.children();
assert!(ArrayRef::ptr_eq(
&original_children[0],
&normalized_children[0]
));
assert!(!ArrayRef::ptr_eq(
&original_children[1],
&normalized_children[1]
));
assert_arrays_eq!(normalized_children[1], PrimitiveArray::from_iter(12i32..16));

Ok(())
}
}
79 changes: 40 additions & 39 deletions vortex-file/src/strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use vortex_alp::ALP;
// Compressed encodings from encoding crates
// Canonical array encodings from vortex-array
use vortex_alp::ALPRD;
use vortex_array::ArrayId;
use vortex_array::VTable;
use vortex_array::arrays::Bool;
use vortex_array::arrays::Chunked;
use vortex_array::arrays::Constant;
Expand All @@ -26,8 +28,6 @@ use vortex_array::arrays::Struct;
use vortex_array::arrays::VarBin;
use vortex_array::arrays::VarBinView;
use vortex_array::dtype::FieldPath;
use vortex_array::session::ArrayRegistry;
use vortex_array::session::ArraySession;
use vortex_btrblocks::BtrBlocksCompressor;
use vortex_btrblocks::BtrBlocksCompressorBuilder;
use vortex_btrblocks::SchemeExt;
Expand Down Expand Up @@ -67,6 +67,7 @@ use vortex_btrblocks::{
schemes::integer,
schemes::string,
};
use vortex_utils::aliases::hash_set::HashSet;
#[cfg(feature = "zstd")]
use vortex_zstd::Zstd;
#[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
Expand All @@ -78,49 +79,49 @@ const ONE_MEG: u64 = 1 << 20;
///
/// This includes all canonical encodings from vortex-array plus all compressed
/// encodings from the various encoding crates.
pub static ALLOWED_ENCODINGS: LazyLock<ArrayRegistry> = LazyLock::new(|| {
let session = ArraySession::empty();
pub static ALLOWED_ENCODINGS: LazyLock<HashSet<ArrayId>> = LazyLock::new(|| {
let mut allowed = HashSet::new();

// Canonical encodings from vortex-array
session.register(Null);
session.register(Bool);
session.register(Primitive);
session.register(Decimal);
session.register(VarBin);
session.register(VarBinView);
session.register(List);
session.register(ListView);
session.register(FixedSizeList);
session.register(Struct);
session.register(Extension);
session.register(Chunked);
session.register(Constant);
session.register(Masked);
session.register(Dict);
allowed.insert(Null.id());
allowed.insert(Bool.id());
allowed.insert(Primitive.id());
allowed.insert(Decimal.id());
allowed.insert(VarBin.id());
allowed.insert(VarBinView.id());
allowed.insert(List.id());
allowed.insert(ListView.id());
allowed.insert(FixedSizeList.id());
allowed.insert(Struct.id());
allowed.insert(Extension.id());
allowed.insert(Chunked.id());
allowed.insert(Constant.id());
allowed.insert(Masked.id());
allowed.insert(Dict.id());

// Compressed encodings from encoding crates
session.register(ALP);
session.register(ALPRD);
session.register(BitPacked);
session.register(ByteBool);
session.register(DateTimeParts);
session.register(DecimalByteParts);
session.register(Delta);
session.register(FoR);
session.register(FSST);
session.register(Pco);
session.register(RLE);
session.register(RunEnd);
session.register(Sequence);
session.register(Sparse);
session.register(ZigZag);
allowed.insert(ALP.id());
allowed.insert(ALPRD.id());
allowed.insert(BitPacked.id());
allowed.insert(ByteBool.id());
allowed.insert(DateTimeParts.id());
allowed.insert(DecimalByteParts.id());
allowed.insert(Delta.id());
allowed.insert(FoR.id());
allowed.insert(FSST.id());
allowed.insert(Pco.id());
allowed.insert(RLE.id());
allowed.insert(RunEnd.id());
allowed.insert(Sequence.id());
allowed.insert(Sparse.id());
allowed.insert(ZigZag.id());

#[cfg(feature = "zstd")]
session.register(Zstd);
allowed.insert(Zstd.id());
#[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
session.register(ZstdBuffers);
allowed.insert(ZstdBuffers.id());

session.registry().clone()
allowed
});

/// Build a new [writer strategy][LayoutStrategy] to compress and reorganize chunks of a Vortex file.
Expand All @@ -132,7 +133,7 @@ pub struct WriteStrategyBuilder {
compressor_override: Option<Arc<dyn CompressorPlugin>>,
row_block_size: usize,
field_writers: HashMap<FieldPath, Arc<dyn LayoutStrategy>>,
allow_encodings: Option<ArrayRegistry>,
allow_encodings: Option<HashSet<ArrayId>>,
flat_strategy: Option<Arc<dyn LayoutStrategy>>,
}

Expand Down Expand Up @@ -169,7 +170,7 @@ impl WriteStrategyBuilder {
}

/// Override the allowed array encodings for normalization.
pub fn with_allow_encodings(mut self, allow_encodings: ArrayRegistry) -> Self {
pub fn with_allow_encodings(mut self, allow_encodings: HashSet<ArrayId>) -> Self {
self.allow_encodings = Some(allow_encodings);
self
}
Expand Down
18 changes: 9 additions & 9 deletions vortex-layout/src/layouts/flat/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use async_trait::async_trait;
use futures::StreamExt;
use vortex_array::ArrayContext;
use vortex_array::ArrayId;
use vortex_array::dtype::DType;
use vortex_array::expr::stats::Precision;
use vortex_array::expr::stats::Stat;
Expand All @@ -15,7 +16,6 @@ use vortex_array::scalar::ScalarTruncation;
use vortex_array::scalar::lower_bound;
use vortex_array::scalar::upper_bound;
use vortex_array::serde::SerializeOptions;
use vortex_array::session::ArrayRegistry;
use vortex_array::stats::StatsSetRef;
use vortex_buffer::BufferString;
use vortex_buffer::ByteBuffer;
Expand All @@ -24,6 +24,7 @@ use vortex_error::VortexResult;
use vortex_error::vortex_bail;
use vortex_io::runtime::Handle;
use vortex_session::registry::ReadContext;
use vortex_utils::aliases::hash_set::HashSet;

use crate::IntoLayout;
use crate::LayoutRef;
Expand All @@ -42,7 +43,7 @@ pub struct FlatLayoutStrategy {
pub max_variable_length_statistics_size: usize,
/// Optional set of allowed array encodings for normalization.
/// If None, then all are allowed.
pub allowed_encodings: Option<ArrayRegistry>,
pub allowed_encodings: Option<HashSet<ArrayId>>,
}

impl Default for FlatLayoutStrategy {
Expand All @@ -69,7 +70,7 @@ impl FlatLayoutStrategy {
}

/// Set the allowed array encodings for normalization.
pub fn with_allow_encodings(mut self, allow_encodings: ArrayRegistry) -> Self {
pub fn with_allow_encodings(mut self, allow_encodings: HashSet<ArrayId>) -> Self {
self.allowed_encodings = Some(allow_encodings);
self
}
Expand Down Expand Up @@ -226,6 +227,7 @@ mod tests {
use vortex_io::runtime::single::block_on;
use vortex_mask::AllOr;
use vortex_mask::Mask;
use vortex_utils::aliases::hash_set::HashSet;

use crate::LayoutStrategy;
use crate::layouts::flat::writer::FlatLayoutStrategy;
Expand Down Expand Up @@ -424,9 +426,8 @@ mod tests {
let (layout, _segments) = {
let segments = Arc::new(TestSegments::default());
let (ptr, eof) = SequenceId::root().split();
// Only allow primitive encodings - filter arrays should fail.
let allowed = ArrayRegistry::default();
allowed.register(Primitive::ID, Arc::new(Primitive) as DynVTableRef);
// Only allow canonical encodings - filter arrays should fail.
let allowed = HashSet::default();
let layout = FlatLayoutStrategy::default()
.with_allow_encodings(allowed)
.write_stream(
Expand Down Expand Up @@ -466,9 +467,8 @@ mod tests {
let segments = Arc::new(TestSegments::default());
let (ptr, eof) = SequenceId::root().split();
// Only allow primitive encodings - filter arrays should fail.
let allowed = ArrayRegistry::default();
allowed.register(Primitive.id(), Arc::new(Primitive) as DynVTableRef);
allowed.register(Dict.id(), Arc::new(Dict) as DynVTableRef);
let mut allowed = HashSet::default();
allowed.insert(Dict.id());
let layout = FlatLayoutStrategy::default()
.with_allow_encodings(allowed)
.write_stream(
Expand Down
Loading