From 4545e5ca32ee7117015c78cec7ba1a7ed3d5d031 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Fri, 3 Apr 2026 10:53:16 -0400 Subject: [PATCH 1/2] Normalize Execution Signed-off-by: Nicholas Gates --- vortex-array/src/array/erased.rs | 7 ++ vortex-array/src/normalize.rs | 140 ++++++++++++++++++++++++++++--- 2 files changed, 136 insertions(+), 11 deletions(-) diff --git a/vortex-array/src/array/erased.rs b/vortex-array/src/array/erased.rs index bdb67697c3a..9c3c9a16b62 100644 --- a/vortex-array/src/array/erased.rs +++ b/vortex-array/src/array/erased.rs @@ -383,6 +383,13 @@ impl ArrayRef { self.is::() } + /// Returns a new array with all slots replaced. + pub fn with_slots(self, slots: Vec>) -> VortexResult { + vortex_ensure!(self.slots().len() == slots.len(), "slot count mismatch"); + let vtable = self.vtable().clone_boxed(); + vtable.with_slots(self, slots) + } + /// Returns a new array with the slot at `slot_idx` replaced by `replacement`. /// /// Takes ownership to allow in-place mutation when the refcount is 1. diff --git a/vortex-array/src/normalize.rs b/vortex-array/src/normalize.rs index 9a796e5485c..6d81ec5ca7f 100644 --- a/vortex-array/src/normalize.rs +++ b/vortex-array/src/normalize.rs @@ -1,27 +1,27 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -use itertools::Itertools; use vortex_error::VortexResult; use vortex_error::vortex_bail; use vortex_session::registry::Id; +use vortex_utils::aliases::hash_set::HashSet; use crate::ArrayRef; -use crate::session::ArrayRegistry; +use crate::ExecutionCtx; /// Options for normalizing an array. pub struct NormalizeOptions<'a> { /// The set of allowed array encodings (in addition to the canonical ones) that are permitted /// in the normalized array. - pub allowed: &'a ArrayRegistry, + pub allowed: &'a HashSet, /// The operation to perform when a non-allowed encoding is encountered. - pub operation: Operation, + pub operation: Operation<'a>, } /// The operation to perform when a non-allowed encoding is encountered. -pub enum Operation { +pub enum Operation<'a> { Error, - // TODO(joe): add into canonical variant + Execute(&'a mut ExecutionCtx), } impl ArrayRef { @@ -30,13 +30,17 @@ impl ArrayRef { /// This operation performs a recursive traversal of the array. Any non-allowed encoding is /// normalized per the configured operation. pub fn normalize(self, options: &mut NormalizeOptions) -> VortexResult { - let array_ids = options.allowed.ids().collect_vec(); - self.normalize_with_error(&array_ids)?; - // Note this takes ownership so we can at a later date remove non-allowed encodings. - Ok(self) + match &mut options.operation { + Operation::Error => { + self.normalize_with_error(options.allowed)?; + // Note this takes ownership so we can at a later date remove non-allowed encodings. + Ok(self) + } + Operation::Execute(ctx) => self.normalize_with_execution(options.allowed, *ctx), + } } - fn normalize_with_error(&self, allowed: &[Id]) -> VortexResult<()> { + fn normalize_with_error(&self, allowed: &HashSet) -> VortexResult<()> { if !allowed.contains(&self.encoding_id()) { vortex_bail!(AssertionFailed: "normalize forbids encoding ({})", self.encoding_id()) } @@ -46,4 +50,118 @@ impl ArrayRef { } Ok(()) } + + fn normalize_with_execution( + self, + allowed: &HashSet, + ctx: &mut ExecutionCtx, + ) -> VortexResult { + let mut normalized = self; + + // Top-first execute the array tree while we hit non-allowed encodings. + while !allowed.contains(&normalized.encoding_id()) { + normalized = normalized.execute(ctx)?; + } + + // Now we've normalized the root, we need to ensure the children are normalized also. + let slots = normalized.slots(); + let mut normalized_slots = Vec::with_capacity(slots.len()); + let mut any_slot_changed = false; + + for slot in slots { + match slot { + Some(child) => { + let normalized_child = child.clone().normalize(&mut NormalizeOptions { + allowed, + operation: Operation::Execute(ctx), + })?; + any_slot_changed |= !ArrayRef::ptr_eq(&child, &normalized_child); + normalized_slots.push(Some(normalized_child)); + } + None => normalized_slots.push(None), + } + } + + if any_slot_changed { + normalized = normalized.with_slots(normalized_slots)?; + } + + Ok(normalized) + } +} + +#[cfg(test)] +mod tests { + use vortex_error::VortexResult; + use vortex_session::VortexSession; + + use super::NormalizeOptions; + use super::Operation; + use crate::ArrayRef; + use crate::ExecutionCtx; + use crate::IntoArray; + use crate::arrays::PrimitiveArray; + use crate::arrays::SliceArray; + use crate::arrays::StructArray; + use crate::assert_arrays_eq; + use crate::validity::Validity; + + #[test] + fn normalize_with_execution_keeps_parent_when_children_are_unchanged() -> VortexResult<()> { + let field = PrimitiveArray::from_iter(0i32..4).into_array(); + let array = StructArray::try_new( + ["field"].into(), + vec![field.clone()], + field.len(), + Validity::NonNullable, + )? + .into_array(); + let allowed = HashSet::from_iter([array.encoding_id(), field.encoding_id()]); + let mut ctx = ExecutionCtx::new(VortexSession::empty()); + + let normalized = array.clone().normalize(&mut NormalizeOptions { + allowed: &allowed, + operation: Operation::Execute(&mut ctx), + })?; + + assert!(ArrayRef::ptr_eq(&array, &normalized)); + Ok(()) + } + + #[test] + fn normalize_with_execution_rebuilds_parent_when_a_child_changes() -> VortexResult<()> { + let unchanged = PrimitiveArray::from_iter(0i32..4).into_array(); + let sliced = + SliceArray::new(PrimitiveArray::from_iter(10i32..20).into_array(), 2..6).into_array(); + let array = StructArray::try_new( + ["lhs", "rhs"].into(), + vec![unchanged.clone(), sliced.clone()], + unchanged.len(), + Validity::NonNullable, + )? + .into_array(); + let allowed = HashSet::from_iter([array.encoding_id(), unchanged.encoding_id()]); + let mut ctx = ExecutionCtx::new(VortexSession::empty()); + + let normalized = array.clone().normalize(&mut NormalizeOptions { + allowed: &allowed, + operation: Operation::Execute(&mut ctx), + })?; + + assert!(!ArrayRef::ptr_eq(&array, &normalized)); + + let original_children = array.children(); + let normalized_children = normalized.children(); + assert!(ArrayRef::ptr_eq( + &original_children[0], + &normalized_children[0] + )); + assert!(!ArrayRef::ptr_eq( + &original_children[1], + &normalized_children[1] + )); + assert_arrays_eq!(normalized_children[1], PrimitiveArray::from_iter(12i32..16)); + + Ok(()) + } } From 58fd4cde2e253cc2da978a2c242a908b49387183 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Fri, 3 Apr 2026 11:03:24 -0400 Subject: [PATCH 2/2] Normalize Execution Signed-off-by: Nicholas Gates --- vortex-file/src/strategy.rs | 79 ++++++++++++------------ vortex-layout/src/layouts/flat/writer.rs | 18 +++--- 2 files changed, 49 insertions(+), 48 deletions(-) diff --git a/vortex-file/src/strategy.rs b/vortex-file/src/strategy.rs index c855a825c17..2189dc07752 100644 --- a/vortex-file/src/strategy.rs +++ b/vortex-file/src/strategy.rs @@ -10,6 +10,8 @@ use vortex_alp::ALP; // Compressed encodings from encoding crates // Canonical array encodings from vortex-array use vortex_alp::ALPRD; +use vortex_array::ArrayId; +use vortex_array::VTable; use vortex_array::arrays::Bool; use vortex_array::arrays::Chunked; use vortex_array::arrays::Constant; @@ -26,8 +28,6 @@ use vortex_array::arrays::Struct; use vortex_array::arrays::VarBin; use vortex_array::arrays::VarBinView; use vortex_array::dtype::FieldPath; -use vortex_array::session::ArrayRegistry; -use vortex_array::session::ArraySession; use vortex_btrblocks::BtrBlocksCompressor; use vortex_btrblocks::BtrBlocksCompressorBuilder; use vortex_btrblocks::SchemeExt; @@ -67,6 +67,7 @@ use vortex_btrblocks::{ schemes::integer, schemes::string, }; +use vortex_utils::aliases::hash_set::HashSet; #[cfg(feature = "zstd")] use vortex_zstd::Zstd; #[cfg(all(feature = "zstd", feature = "unstable_encodings"))] @@ -78,49 +79,49 @@ const ONE_MEG: u64 = 1 << 20; /// /// This includes all canonical encodings from vortex-array plus all compressed /// encodings from the various encoding crates. -pub static ALLOWED_ENCODINGS: LazyLock = LazyLock::new(|| { - let session = ArraySession::empty(); +pub static ALLOWED_ENCODINGS: LazyLock> = LazyLock::new(|| { + let mut allowed = HashSet::new(); // Canonical encodings from vortex-array - session.register(Null); - session.register(Bool); - session.register(Primitive); - session.register(Decimal); - session.register(VarBin); - session.register(VarBinView); - session.register(List); - session.register(ListView); - session.register(FixedSizeList); - session.register(Struct); - session.register(Extension); - session.register(Chunked); - session.register(Constant); - session.register(Masked); - session.register(Dict); + allowed.insert(Null.id()); + allowed.insert(Bool.id()); + allowed.insert(Primitive.id()); + allowed.insert(Decimal.id()); + allowed.insert(VarBin.id()); + allowed.insert(VarBinView.id()); + allowed.insert(List.id()); + allowed.insert(ListView.id()); + allowed.insert(FixedSizeList.id()); + allowed.insert(Struct.id()); + allowed.insert(Extension.id()); + allowed.insert(Chunked.id()); + allowed.insert(Constant.id()); + allowed.insert(Masked.id()); + allowed.insert(Dict.id()); // Compressed encodings from encoding crates - session.register(ALP); - session.register(ALPRD); - session.register(BitPacked); - session.register(ByteBool); - session.register(DateTimeParts); - session.register(DecimalByteParts); - session.register(Delta); - session.register(FoR); - session.register(FSST); - session.register(Pco); - session.register(RLE); - session.register(RunEnd); - session.register(Sequence); - session.register(Sparse); - session.register(ZigZag); + allowed.insert(ALP.id()); + allowed.insert(ALPRD.id()); + allowed.insert(BitPacked.id()); + allowed.insert(ByteBool.id()); + allowed.insert(DateTimeParts.id()); + allowed.insert(DecimalByteParts.id()); + allowed.insert(Delta.id()); + allowed.insert(FoR.id()); + allowed.insert(FSST.id()); + allowed.insert(Pco.id()); + allowed.insert(RLE.id()); + allowed.insert(RunEnd.id()); + allowed.insert(Sequence.id()); + allowed.insert(Sparse.id()); + allowed.insert(ZigZag.id()); #[cfg(feature = "zstd")] - session.register(Zstd); + allowed.insert(Zstd.id()); #[cfg(all(feature = "zstd", feature = "unstable_encodings"))] - session.register(ZstdBuffers); + allowed.insert(ZstdBuffers.id()); - session.registry().clone() + allowed }); /// Build a new [writer strategy][LayoutStrategy] to compress and reorganize chunks of a Vortex file. @@ -132,7 +133,7 @@ pub struct WriteStrategyBuilder { compressor_override: Option>, row_block_size: usize, field_writers: HashMap>, - allow_encodings: Option, + allow_encodings: Option>, flat_strategy: Option>, } @@ -169,7 +170,7 @@ impl WriteStrategyBuilder { } /// Override the allowed array encodings for normalization. - pub fn with_allow_encodings(mut self, allow_encodings: ArrayRegistry) -> Self { + pub fn with_allow_encodings(mut self, allow_encodings: HashSet) -> Self { self.allow_encodings = Some(allow_encodings); self } diff --git a/vortex-layout/src/layouts/flat/writer.rs b/vortex-layout/src/layouts/flat/writer.rs index 733b575b684..6552a302042 100644 --- a/vortex-layout/src/layouts/flat/writer.rs +++ b/vortex-layout/src/layouts/flat/writer.rs @@ -4,6 +4,7 @@ use async_trait::async_trait; use futures::StreamExt; use vortex_array::ArrayContext; +use vortex_array::ArrayId; use vortex_array::dtype::DType; use vortex_array::expr::stats::Precision; use vortex_array::expr::stats::Stat; @@ -15,7 +16,6 @@ use vortex_array::scalar::ScalarTruncation; use vortex_array::scalar::lower_bound; use vortex_array::scalar::upper_bound; use vortex_array::serde::SerializeOptions; -use vortex_array::session::ArrayRegistry; use vortex_array::stats::StatsSetRef; use vortex_buffer::BufferString; use vortex_buffer::ByteBuffer; @@ -24,6 +24,7 @@ use vortex_error::VortexResult; use vortex_error::vortex_bail; use vortex_io::runtime::Handle; use vortex_session::registry::ReadContext; +use vortex_utils::aliases::hash_set::HashSet; use crate::IntoLayout; use crate::LayoutRef; @@ -42,7 +43,7 @@ pub struct FlatLayoutStrategy { pub max_variable_length_statistics_size: usize, /// Optional set of allowed array encodings for normalization. /// If None, then all are allowed. - pub allowed_encodings: Option, + pub allowed_encodings: Option>, } impl Default for FlatLayoutStrategy { @@ -69,7 +70,7 @@ impl FlatLayoutStrategy { } /// Set the allowed array encodings for normalization. - pub fn with_allow_encodings(mut self, allow_encodings: ArrayRegistry) -> Self { + pub fn with_allow_encodings(mut self, allow_encodings: HashSet) -> Self { self.allowed_encodings = Some(allow_encodings); self } @@ -226,6 +227,7 @@ mod tests { use vortex_io::runtime::single::block_on; use vortex_mask::AllOr; use vortex_mask::Mask; + use vortex_utils::aliases::hash_set::HashSet; use crate::LayoutStrategy; use crate::layouts::flat::writer::FlatLayoutStrategy; @@ -424,9 +426,8 @@ mod tests { let (layout, _segments) = { let segments = Arc::new(TestSegments::default()); let (ptr, eof) = SequenceId::root().split(); - // Only allow primitive encodings - filter arrays should fail. - let allowed = ArrayRegistry::default(); - allowed.register(Primitive::ID, Arc::new(Primitive) as DynVTableRef); + // Only allow canonical encodings - filter arrays should fail. + let allowed = HashSet::default(); let layout = FlatLayoutStrategy::default() .with_allow_encodings(allowed) .write_stream( @@ -466,9 +467,8 @@ mod tests { let segments = Arc::new(TestSegments::default()); let (ptr, eof) = SequenceId::root().split(); // Only allow primitive encodings - filter arrays should fail. - let allowed = ArrayRegistry::default(); - allowed.register(Primitive.id(), Arc::new(Primitive) as DynVTableRef); - allowed.register(Dict.id(), Arc::new(Dict) as DynVTableRef); + let mut allowed = HashSet::default(); + allowed.insert(Dict.id()); let layout = FlatLayoutStrategy::default() .with_allow_encodings(allowed) .write_stream(