Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions encodings/parquet-variant/src/kernel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ use vortex_mask::Mask;

use crate::ParquetVariant;
use crate::array::ParquetVariantArray;
use crate::variant_get::VariantGetExecuteParent;

pub(crate) static PARENT_KERNELS: ParentKernelSet<ParquetVariant> = ParentKernelSet::new(&[
ParentKernelSet::lift(&FilterExecuteAdaptor(ParquetVariant)),
ParentKernelSet::lift(&SliceExecuteAdaptor(ParquetVariant)),
ParentKernelSet::lift(&TakeExecuteAdaptor(ParquetVariant)),
ParentKernelSet::lift(&VariantGetExecuteParent),
]);

impl SliceKernel for ParquetVariant {
Expand Down
1 change: 1 addition & 0 deletions encodings/parquet-variant/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ mod array;
mod kernel;
mod operations;
mod validity;
mod variant_get;
mod vtable;

pub use array::ParquetVariantArray;
Expand Down
116 changes: 116 additions & 0 deletions encodings/parquet-variant/src/variant_get/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

//! Execute-parent kernel for `variant_get` on `ParquetVariantArray`.
//!
//! Delegates to `parquet_variant_compute::variant_get` after converting to Arrow.

use std::sync::Arc;

use parquet_variant::VariantPathElement;
use parquet_variant_compute::GetOptions;
use parquet_variant_compute::VariantArray as ArrowVariantArray;
use vortex_array::ArrayRef;
use vortex_array::ExecutionCtx;
use vortex_array::IntoArray;
use vortex_array::arrays::VariantArray;
use vortex_array::arrays::scalar_fn::ExactScalarFn;
use vortex_array::arrays::scalar_fn::ScalarFnArrayView;
use vortex_array::arrow::FromArrowArray;
use vortex_array::dtype::DType;
use vortex_array::dtype::FieldName;
use vortex_array::dtype::Nullability;
use vortex_array::kernel::ExecuteParentKernel;
use vortex_array::scalar_fn::fns::variant_get::VariantGet;
use vortex_array::validity::Validity;
use vortex_buffer::BitBuffer;
use vortex_error::VortexResult;
use vortex_error::vortex_err;

use crate::ParquetVariant;
use crate::array::ParquetVariantArray;

#[cfg(test)]
mod tests;

#[derive(Debug)]
pub(crate) struct VariantGetExecuteParent;

impl ExecuteParentKernel<ParquetVariant> for VariantGetExecuteParent {
type Parent = ExactScalarFn<VariantGet>;

fn execute_parent(
&self,
array: &ParquetVariantArray,
parent: ScalarFnArrayView<'_, VariantGet>,
_child_idx: usize,
ctx: &mut ExecutionCtx,
) -> VortexResult<Option<ArrayRef>> {
let field_name: &FieldName = parent.options;
variant_get_impl(array, field_name, ctx).map(Some)
}
}

fn variant_get_impl(
array: &ParquetVariantArray,
field_name: &FieldName,
ctx: &mut ExecutionCtx,
) -> VortexResult<ArrayRef> {
// Convert to Arrow VariantArray
let arrow_variant = array.to_arrow(ctx)?;

// Build path for a single field access
let path_element = VariantPathElement::Field {
name: field_name.as_ref().into(),
};
let options = GetOptions::new_with_path(vec![path_element].into());

// Delegate to the parquet-variant-compute kernel.
// With as_type = None, the result is itself a VariantArray.
let inner: Arc<dyn arrow_array::Array> = Arc::new(arrow_variant.into_inner());
let arrow_result = parquet_variant_compute::variant_get(&inner, options)
.map_err(|e| vortex_err!("variant_get failed: {e}"))?;

// Convert back to Vortex.
let result_variant = ArrowVariantArray::try_new(
arrow_result
.as_any()
.downcast_ref::<arrow_array::StructArray>()
.ok_or_else(|| vortex_err!("variant_get did not return a StructArray"))?,
)
.map_err(|e| vortex_err!("failed to create VariantArray from result: {e}"))?;

// Ensure the result is always nullable (matching variant_get's return_dtype).
// Arrow may return a non-nullable result when no nulls are present.
let validity = result_variant
.nulls()
.map(|nulls| {
if nulls.null_count() == nulls.len() {
Validity::AllInvalid
} else {
Validity::from(BitBuffer::from(nulls.inner().clone()))
}
})
.unwrap_or(Validity::AllValid);

let metadata = ArrayRef::from_arrow(
result_variant.metadata_field() as &dyn arrow_array::Array,
false,
)?;
let value = result_variant
.value_field()
.map(|v| ArrayRef::from_arrow(v as &dyn arrow_array::Array, true))
.transpose()?;
let typed_value = result_variant
.typed_value_field()
.map(|tv| ArrayRef::from_arrow(tv.as_ref(), true))
.transpose()?;

let pv = ParquetVariantArray::try_new(validity, metadata, value, typed_value)?;
debug_assert_eq!(
pv.dtype,
DType::Variant(Nullability::Nullable),
"variant_get result must be nullable"
);
Ok(VariantArray::new(pv.into_array()).into_array())
}
Loading
Loading