From 5f88e4a4dbb0956b5c62b3ffd5071d945d2baeb7 Mon Sep 17 00:00:00 2001 From: Nemo Yu Date: Mon, 29 Jun 2026 09:46:47 -0400 Subject: [PATCH 1/2] feat: wire vortex-native into duckdb-bench Signed-off-by: Nemo Yu --- Cargo.lock | 3 + benchmarks/datafusion-bench/src/lib.rs | 7 +- benchmarks/duckdb-bench/src/lib.rs | 5 +- benchmarks/duckdb-bench/src/main.rs | 4 +- vortex-bench/Cargo.toml | 3 + vortex-bench/src/benchmark.rs | 20 +++ vortex-bench/src/lib.rs | 22 ++- vortex-bench/src/spatialbench/benchmark.rs | 120 +++++++++++-- vortex-bench/src/spatialbench/datagen/mod.rs | 7 +- .../src/spatialbench/datagen/native.rs | 165 ++++++++++++++++++ .../src/spatialbench/datagen/table.rs | 55 +++++- vortex-duckdb/src/convert/dtype.rs | 14 +- vortex-duckdb/src/exporter/extension.rs | 18 ++ vortex-duckdb/src/exporter/geo.rs | 34 ++++ 14 files changed, 445 insertions(+), 32 deletions(-) create mode 100644 vortex-bench/src/spatialbench/datagen/native.rs diff --git a/Cargo.lock b/Cargo.lock index fe315a74b7c..490623bc6c4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9897,6 +9897,8 @@ dependencies = [ "bzip2", "clap", "futures", + "geoarrow", + "geoarrow-cast", "get_dir", "glob", "humansize", @@ -9930,6 +9932,7 @@ dependencies = [ "url", "uuid", "vortex", + "vortex-geo", "vortex-tensor", ] diff --git a/benchmarks/datafusion-bench/src/lib.rs b/benchmarks/datafusion-bench/src/lib.rs index 1100d38d24e..c45aa99be2c 100644 --- a/benchmarks/datafusion-bench/src/lib.rs +++ b/benchmarks/datafusion-bench/src/lib.rs @@ -111,10 +111,9 @@ pub fn format_to_df_format(format: Format) -> Arc { Format::Csv => Arc::new(CsvFormat::default()) as _, Format::Arrow => Arc::new(ArrowFormat), Format::Parquet => Arc::new(ParquetFormat::new()), - Format::OnDiskVortex | Format::VortexCompact => Arc::new(VortexFormat::new_with_options( - SESSION.clone(), - vortex_table_options(), - )), + Format::OnDiskVortex | Format::VortexCompact | Format::VortexNative => Arc::new( + VortexFormat::new_with_options(SESSION.clone(), vortex_table_options()), + ), Format::OnDiskDuckDB | Format::Lance => { unimplemented!("Format {format} cannot be turned into a DataFusion `FileFormat`") } diff --git a/benchmarks/duckdb-bench/src/lib.rs b/benchmarks/duckdb-bench/src/lib.rs index 4ec1efd0993..bf64f123956 100644 --- a/benchmarks/duckdb-bench/src/lib.rs +++ b/benchmarks/duckdb-bench/src/lib.rs @@ -169,7 +169,10 @@ impl DuckClient { file_format: Format, ) -> Result<()> { let object_type = match file_format { - Format::Parquet | Format::OnDiskVortex | Format::VortexCompact => "VIEW", + Format::Parquet + | Format::OnDiskVortex + | Format::VortexCompact + | Format::VortexNative => "VIEW", Format::OnDiskDuckDB => "TABLE", Format::Lance => { anyhow::bail!( diff --git a/benchmarks/duckdb-bench/src/main.rs b/benchmarks/duckdb-bench/src/main.rs index cf4fa071067..95d43cce954 100644 --- a/benchmarks/duckdb-bench/src/main.rs +++ b/benchmarks/duckdb-bench/src/main.rs @@ -142,6 +142,7 @@ fn main() -> anyhow::Result<()> { // OnDiskDuckDB tables are created during register_tables by loading from Parquet _ => {} } + benchmark.prepare_format(format, &base_path).await?; } anyhow::Ok(()) @@ -197,7 +198,8 @@ fn main() -> anyhow::Result<()> { if !args.reuse { ctx.reopen()?; } - ctx.execute_query_result(query) + let query = benchmark.query_for_format(query, format); + ctx.execute_query_result(&query) }, )?; diff --git a/vortex-bench/Cargo.toml b/vortex-bench/Cargo.toml index 96180f8bcff..55028d8dbd0 100644 --- a/vortex-bench/Cargo.toml +++ b/vortex-bench/Cargo.toml @@ -24,6 +24,7 @@ vortex = { workspace = true, features = [ "zstd", ] } vortex-tensor = { workspace = true } # TODO(connor): In the future, this might be inside vortex. +vortex-geo = { workspace = true } anyhow = { workspace = true } arrow-array = { workspace = true } @@ -33,6 +34,8 @@ async-trait = { workspace = true } bzip2 = { workspace = true } clap = { workspace = true, features = ["derive"] } futures = { workspace = true } +geoarrow = { workspace = true } +geoarrow-cast = { workspace = true } get_dir = { workspace = true } glob = { workspace = true } humansize = { workspace = true } diff --git a/vortex-bench/src/benchmark.rs b/vortex-bench/src/benchmark.rs index fe16df5cd3d..a2eaa157e78 100644 --- a/vortex-bench/src/benchmark.rs +++ b/vortex-bench/src/benchmark.rs @@ -3,6 +3,8 @@ //! Core benchmark trait and types. +use std::path::Path; + use arrow_schema::Schema; use glob::Pattern; use url::Url; @@ -33,6 +35,11 @@ pub trait Benchmark: Send + Sync { /// Get all available queries for this benchmark fn queries(&self) -> anyhow::Result>; + /// Adapt a query to a specific storage `format` before execution. Default: unchanged. + fn query_for_format(&self, query: &str, _format: Format) -> String { + query.to_string() + } + /// SQL an `engine` must run before this benchmark's queries (e.g. loading engine /// extensions). Runners replay these after every (re)open. Default: none. fn engine_init_sql(&self, _engine: Engine) -> Vec { @@ -47,6 +54,13 @@ pub trait Benchmark: Send + Sync { /// call this method to ensure base data exists, then perform their own format conversion. async fn generate_base_data(&self) -> anyhow::Result<()>; + /// Prepare benchmark- and format-specific data beyond the Parquet base that + /// [`Benchmark::generate_base_data`] produced. Called once per requested format, after the base + /// data exists. Default: nothing. + async fn prepare_format(&self, _format: Format, _base_path: &Path) -> anyhow::Result<()> { + Ok(()) + } + /// Get expected row counts for validation (optional) /// If None, no validation will be performed fn expected_row_counts(&self) -> Option> { @@ -80,4 +94,10 @@ pub trait Benchmark: Send + Sync { _ = format; None } + + /// SQL projection substituted into `SELECT {..} FROM read_(..)` when registering + /// `table_name` as a DuckDB view. Defaults to `*`. + fn view_projection(&self, _table_name: &str, _format: Format) -> String { + "*".to_string() + } } diff --git a/vortex-bench/src/lib.rs b/vortex-bench/src/lib.rs index daef12c7e78..9bb6576fd08 100644 --- a/vortex-bench/src/lib.rs +++ b/vortex-bench/src/lib.rs @@ -76,8 +76,11 @@ use vortex::session::VortexSession; #[global_allocator] static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; -pub static SESSION: LazyLock = - LazyLock::new(|| VortexSession::default().with_tokio()); +pub static SESSION: LazyLock = LazyLock::new(|| { + let session = VortexSession::default().with_tokio(); + vortex_geo::initialize(&session); + session +}); #[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, Serialize, Deserialize)] pub struct Target { @@ -146,6 +149,9 @@ pub enum Format { #[clap(name = "vortex-compact")] #[serde(rename = "vortex-compact")] VortexCompact, + #[clap(name = "vortex-native")] + #[serde(rename = "vortex-native")] + VortexNative, #[clap(name = "duckdb")] #[serde(rename = "duckdb")] OnDiskDuckDB, @@ -185,6 +191,7 @@ impl Format { Format::Parquet => "parquet", Format::OnDiskVortex => "vortex-file-compressed", Format::VortexCompact => "vortex-compact", + Format::VortexNative => "vortex-native", Format::OnDiskDuckDB => "duckdb", Format::Lance => "lance", } @@ -197,6 +204,7 @@ impl Format { Format::Parquet => "parquet", Format::OnDiskVortex => "vortex", Format::VortexCompact => "vortex", + Format::VortexNative => "vortex", Format::OnDiskDuckDB => "duckdb", Format::Lance => "lance", } @@ -451,8 +459,16 @@ where object_type.to_lowercase() ); + let projection = benchmark.view_projection(name, load_format); + // SpatialBench's native and WKB lanes both register `trip` from the same db path but with different casts — + // so always replace views (cheap, metadata-only). Tables hold materialized data: keep them. + let create = if object_type == "VIEW" { + format!("CREATE OR REPLACE VIEW {name}") + } else { + format!("CREATE {object_type} IF NOT EXISTS {name}") + }; sql_statements.push(format!( - "CREATE {object_type} IF NOT EXISTS {name} AS SELECT * FROM read_{extension}('{base_dir}/{pattern}');\n", + "{create} AS SELECT {projection} FROM read_{extension}('{base_dir}/{pattern}');\n", )); } diff --git a/vortex-bench/src/spatialbench/benchmark.rs b/vortex-bench/src/spatialbench/benchmark.rs index 6ce786ea205..092c256621c 100644 --- a/vortex-bench/src/spatialbench/benchmark.rs +++ b/vortex-bench/src/spatialbench/benchmark.rs @@ -4,6 +4,7 @@ //! SpatialBench benchmark implementation use std::fs; +use std::path::Path; use url::Url; @@ -13,9 +14,13 @@ use crate::Engine; use crate::Format; use crate::TableSpec; use crate::spatialbench::datagen; +use crate::spatialbench::datagen::Table; use crate::utils::file::resolve_data_url; use crate::workspace_root; +/// Data-dir subfolder for the native-geometry Vortex files (the `vortex-native` lane). +pub const NATIVE_DIR: &str = "vortex-native"; + /// SpatialBench geospatial benchmark (Apache Sedona): a `trip` point table, `building` polygons, and /// a `customer` attribute table, queried with spatial filters and joins. `zone` polygons are sourced /// externally and registered when present. See . @@ -34,6 +39,21 @@ impl SpatialBenchBenchmark { scale_factor, }) } + + /// Tables to materialize and register: the in-process base tables (`trip`, `building`, + /// `customer`) plus the externally-sourced `zone` when its parquet is present. Shared by native + /// data-gen and table registration so both lanes cover the same set. + fn base_tables(&self) -> Vec { + let mut tables = vec![Table::Trip, Table::Building, Table::Customer]; + let zone_present = match self.data_url.to_file_path() { + Ok(base) => zone_parquet_present(&base.join(Format::Parquet.name())), + Err(()) => true, + }; + if zone_present { + tables.push(Table::Zone); + } + tables + } } #[async_trait::async_trait] @@ -58,6 +78,16 @@ impl Benchmark for SpatialBenchBenchmark { .collect()) } + /// On the `vortex-native` lane, geometry columns surface as `GEOMETRY`, so drop the + /// `ST_GeomFromWKB(..)` wrappers and let DuckDB's `spatial` extension evaluate the `ST_*` + /// predicates directly on the native geometry. + fn query_for_format(&self, query: &str, format: Format) -> String { + match format { + Format::VortexNative => strip_wkb_wrappers(query), + _ => query.to_string(), + } + } + async fn generate_base_data(&self) -> anyhow::Result<()> { if self.data_url.scheme() != "file" { return Ok(()); @@ -66,7 +96,20 @@ impl Benchmark for SpatialBenchBenchmark { .data_url .to_file_path() .map_err(|_| anyhow::anyhow!("Invalid file URL: {}", self.data_url.as_str()))?; - datagen::generate_tables(&self.scale_factor, base_data_dir).await?; + datagen::generate_tables(&self.scale_factor, base_data_dir.clone()).await?; + Ok(()) + } + + /// The `vortex-native` lane decodes each table's WKB geometry to native GeoArrow once, into the + /// `vortex-native` dir, so its queries read DuckDB `GEOMETRY` directly. Idempotent. + async fn prepare_format(&self, format: Format, base_path: &Path) -> anyhow::Result<()> { + if format == Format::VortexNative { + let parquet_dir = base_path.join(Format::Parquet.name()); + let native_dir = base_path.join(NATIVE_DIR); + for table in self.base_tables() { + datagen::write_native_vortex(table, &parquet_dir, &native_dir).await?; + } + } Ok(()) } @@ -74,6 +117,16 @@ impl Benchmark for SpatialBenchBenchmark { &self.data_url } + /// The `vortex-native` lane reads the native-geometry Vortex dir; every other format reads its + /// own `{format}` subfolder. + fn format_path(&self, format: Format, base_url: &Url) -> anyhow::Result { + let dir = match format { + Format::VortexNative => NATIVE_DIR, + other => other.name(), + }; + Ok(base_url.join(&format!("{dir}/"))?) + } + fn expected_row_counts(&self) -> Option> { // Indexed by `query_idx` (1-based), so index 0 is a dummy and Q1's count is at index 1 (TPC-H // convention). Only SF1.0 and SF10.0 are validated (like TPC-H); other scale factors return @@ -101,22 +154,32 @@ impl Benchmark for SpatialBenchBenchmark { format!("spatialbench(sf={})", self.scale_factor) } + /// Both lanes register the same tables (WKB reads `parquet`/`vortex`, native reads + /// `vortex-native`); `zone` is externally sourced and optional, registered only when present. fn table_specs(&self) -> Vec { - let mut specs = vec![ - TableSpec::new("trip", None), - TableSpec::new("building", None), - TableSpec::new("customer", None), - ]; - // `zone` is externally sourced and optional; register it only when present so queries that - // don't need it don't fail on the missing glob. - let zone_present = match self.data_url.to_file_path() { - Ok(base) => zone_parquet_present(&base.join(Format::Parquet.name())), - Err(()) => true, - }; - if zone_present { - specs.push(TableSpec::new("zone", None)); + self.base_tables() + .iter() + .map(|table| TableSpec::new(table.name(), None)) + .collect() + } + + /// DuckDB's view star-expansion drops native `GEOMETRY` columns down to `BLOB`, so `ST_*` fail to + /// bind. Re-cast every geometry column back to `GEOMETRY` in the view's projection. + fn view_projection(&self, table_name: &str, format: Format) -> String { + if format == Format::VortexNative + && let Some(table) = Table::from_name(table_name) + { + let geometry_columns = table.geometry_columns(); + if !geometry_columns.is_empty() { + let casts = geometry_columns + .iter() + .map(|column| format!("{name}::GEOMETRY AS {name}", name = column.name)) + .collect::>() + .join(", "); + return format!("* REPLACE ({casts})"); + } } - specs + "*".to_string() } /// Scope each table to its own `{table}_*.{ext}` files; the default globs every file in the @@ -141,8 +204,33 @@ impl Benchmark for SpatialBenchBenchmark { /// Whether an externally-sourced `zone_*.parquet` exists under `parquet_dir` (generated by the /// upstream `spatialbench-cli`; see the module docs). -fn zone_parquet_present(parquet_dir: &std::path::Path) -> bool { +fn zone_parquet_present(parquet_dir: &Path) -> bool { glob::glob(&parquet_dir.join("zone_*.parquet").to_string_lossy()) .map(|mut paths| paths.next().is_some()) .unwrap_or(false) } + +/// Strip `ST_GeomFromWKB()` → `` so the native lane reads the already-`GEOMETRY` +/// column directly. Assumes the wrapped expression contains no inner `)` (true for our column refs). +fn strip_wkb_wrappers(sql: &str) -> String { + const OPEN: &str = "ST_GeomFromWKB("; + let mut out = String::with_capacity(sql.len()); + let mut rest = sql; + while let Some(pos) = rest.find(OPEN) { + out.push_str(&rest[..pos]); + let after = &rest[pos + OPEN.len()..]; + match after.find(')') { + Some(close) => { + out.push_str(&after[..close]); + rest = &after[close + 1..]; + } + // Unbalanced wrapper: emit it verbatim and stop rewriting. + None => { + out.push_str(OPEN); + rest = after; + } + } + } + out.push_str(rest); + out +} diff --git a/vortex-bench/src/spatialbench/datagen/mod.rs b/vortex-bench/src/spatialbench/datagen/mod.rs index 8ebd1b35d86..7808e06cbc3 100644 --- a/vortex-bench/src/spatialbench/datagen/mod.rs +++ b/vortex-bench/src/spatialbench/datagen/mod.rs @@ -1,11 +1,14 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -//! SpatialBench data preparation. [`wkb`] generates the canonical WKB base tables (Parquet + Vortex); -//! the [`table`] catalog is the single source of truth for the base tables. +//! SpatialBench data preparation. [`wkb`] generates the canonical WKB base tables; [`native`] derives +//! native-geometry Vortex files from them for `points=native`. The [`table`] catalog is the single +//! source of truth for the base tables both stages share. +pub mod native; pub mod table; pub mod wkb; +pub use native::write_native_vortex; pub use table::Table; pub use wkb::generate_tables; diff --git a/vortex-bench/src/spatialbench/datagen/native.rs b/vortex-bench/src/spatialbench/datagen/native.rs new file mode 100644 index 00000000000..1d600a8b8cf --- /dev/null +++ b/vortex-bench/src/spatialbench/datagen/native.rs @@ -0,0 +1,165 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Native-geometry prep for `points=native`: decode a table's WKB geometry to native +//! `vortex.geo.{point,polygon,multipolygon}` via `geoarrow_cast` (so Vortex never decodes WKB), then +//! write a Vortex file. A one-time cost; queries then see DuckDB `GEOMETRY` directly. + +use std::path::Path; +use std::path::PathBuf; +use std::sync::Arc; + +use anyhow::Context; +use arrow_array::RecordBatch; +use arrow_schema::DataType; +use arrow_schema::Schema; +use futures::TryStreamExt; +use geoarrow::array::GenericWkbArray; +use geoarrow::array::GeoArrowArray; +use geoarrow::array::WkbViewArray; +use geoarrow::datatypes::CoordType; +use geoarrow::datatypes::Crs; +use geoarrow::datatypes::Dimension; +use geoarrow::datatypes::GeoArrowType; +use geoarrow::datatypes::Metadata; +use geoarrow::datatypes::MultiPolygonType; +use geoarrow::datatypes::PointType; +use geoarrow::datatypes::PolygonType; +use geoarrow::datatypes::WkbType; +use geoarrow_cast::cast::cast; +use parquet::arrow::ParquetRecordBatchStreamBuilder; +use tokio::fs::File as TokioFile; +use vortex::array::ArrayRef; +use vortex::array::IntoArray; +use vortex::array::arrays::ChunkedArray; +use vortex::array::arrow::ArrowSessionExt; +use vortex::file::WriteOptionsSessionExt; + +use super::table::GeometryKind; +use super::table::Table; +use crate::SESSION; +use crate::utils::file::idempotent_async; + +fn geo_metadata() -> Arc { + Arc::new(Metadata::new(Crs::default(), None)) +} + +/// Write `{native_dir}/{table}_0.vortex` with native geometry columns from the WKB parquet. Idempotent. +pub async fn write_native_vortex( + table: Table, + parquet_dir: &Path, + native_dir: &Path, +) -> anyhow::Result { + idempotent_async( + native_dir.join(format!("{}_0.vortex", table.name())), + |path| async move { + let chunks = map_source_batches(parquet_dir, table, |b| native_chunk(b, table)).await?; + + let dtype = chunks[0].dtype().clone(); + let chunked = ChunkedArray::try_new(chunks, dtype)?.into_array(); + let mut file = TokioFile::create(&path).await?; + SESSION + .write_options() + .write(&mut file, chunked.to_array_stream()) + .await?; + tracing::info!(path = %path.display(), table = table.name(), "wrote native geometry table"); + Ok(()) + }, + ) + .await +} + +/// Apply `f` to every batch of `table`'s base WKB parquet parts. All columns are kept; only the +/// geometry columns are rewritten to native types. +async fn map_source_batches( + parquet_dir: &Path, + table: Table, + mut f: impl FnMut(RecordBatch) -> anyhow::Result, +) -> anyhow::Result> { + let pattern = parquet_dir.join(format!("{}_*.parquet", table.name())); + let mut files: Vec = + glob::glob(&pattern.to_string_lossy())?.collect::>()?; + files.sort(); + anyhow::ensure!(!files.is_empty(), "no parquet matching {pattern:?}"); + + let mut out = Vec::new(); + for file in files { + let builder = ParquetRecordBatchStreamBuilder::new(TokioFile::open(&file).await?).await?; + let mut stream = builder.build()?; + while let Some(batch) = stream.try_next().await? { + out.push(f(batch)?); + } + } + Ok(out) +} + +/// Rewrite each of `table`'s WKB geometry columns to its native-lane type, tagging the field with +/// the matching `geoarrow.*` extension. +fn native_record_batch(batch: RecordBatch, table: Table) -> anyhow::Result { + let schema = batch.schema(); + let mut fields = schema.fields().to_vec(); + let mut columns = batch.columns().to_vec(); + + for geom in table.geometry_columns() { + let idx = schema.index_of(geom.name)?; + let column = batch.column(idx).as_ref(); + let wkb_type = WkbType::new(geo_metadata()); + + // Wrap the source WKB. SpatialBench tables emit `Binary`; the external `zone` parquet uses + // `BinaryView`. + let wkb: Box = match column.data_type() { + DataType::Binary => Box::new(GenericWkbArray::::try_from((column, wkb_type))?), + DataType::LargeBinary => { + Box::new(GenericWkbArray::::try_from((column, wkb_type))?) + } + DataType::BinaryView => Box::new(WkbViewArray::try_from((column, wkb_type))?), + other => anyhow::bail!("{}: unsupported WKB column type {other}", geom.name), + }; + + // Decode to a native, separated-XY GeoArrow type. The columnar round-trip also normalizes + // WKB endianness (Overture ships big-endian; native types carry none). + let native: Arc = match geom.kind { + GeometryKind::Point => cast( + wkb.as_ref(), + &GeoArrowType::Point( + PointType::new(Dimension::XY, geo_metadata()) + .with_coord_type(CoordType::Separated), + ), + )?, + GeometryKind::Polygon => cast( + wkb.as_ref(), + &GeoArrowType::Polygon( + PolygonType::new(Dimension::XY, geo_metadata()) + .with_coord_type(CoordType::Separated), + ), + )?, + // Polygon promotes to a one-element multipolygon, so this also covers the mixed + // `Polygon`/`MultiPolygon` zone boundaries. + GeometryKind::MultiPolygon => cast( + wkb.as_ref(), + &GeoArrowType::MultiPolygon( + MultiPolygonType::new(Dimension::XY, geo_metadata()) + .with_coord_type(CoordType::Separated), + ), + )?, + }; + + columns[idx] = native.to_array_ref(); + fields[idx] = Arc::new(native.data_type().to_field(geom.name, false)); + } + + Ok(RecordBatch::try_new( + Arc::new(Schema::new(fields)), + columns, + )?) +} + +/// Convert a WKB batch to a Vortex struct chunk with `table`'s geometry columns as native types. +fn native_chunk(batch: RecordBatch, table: Table) -> anyhow::Result { + let native_batch = native_record_batch(batch, table)?; + let native_schema = native_batch.schema(); + SESSION + .arrow() + .from_arrow_record_batch(native_batch, &native_schema) + .context("importing native batch") +} diff --git a/vortex-bench/src/spatialbench/datagen/table.rs b/vortex-bench/src/spatialbench/datagen/table.rs index c924428cc0e..c72e0907f37 100644 --- a/vortex-bench/src/spatialbench/datagen/table.rs +++ b/vortex-bench/src/spatialbench/datagen/table.rs @@ -1,8 +1,8 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -//! The shared SpatialBench table catalog: one source of truth for the base tables generated by -//! [`super::wkb`]. +//! The shared SpatialBench table catalog: one source of truth for the base tables, used by both the +//! WKB generation ([`super::wkb`]) and the native geometry conversion ([`super::native`]). /// A SpatialBench base table. #[derive(Clone, Copy)] @@ -17,6 +17,20 @@ pub enum Table { /// externally (the `spatialbench` crate ships no zone generator). pub(crate) const TABLES: &[Table] = &[Table::Trip, Table::Building, Table::Customer]; +/// A geometry column and the geometry type its WKB bytes decode to. +pub(crate) struct GeometryColumn { + pub(crate) name: &'static str, + pub(crate) kind: GeometryKind, +} + +/// Geometry types a column can decode to on the native lane. +#[derive(Clone, Copy, Debug)] +pub(crate) enum GeometryKind { + Point, + Polygon, + MultiPolygon, +} + impl Table { /// File stem under a format directory, e.g. `Trip` → `trip_{part}.parquet`. pub(crate) fn name(self) -> &'static str { @@ -27,4 +41,41 @@ impl Table { Table::Zone => "zone", } } + + /// The [`Table`] for a registered table name, or `None` for an unknown name. + pub(crate) fn from_name(name: &str) -> Option
{ + match name { + "trip" => Some(Table::Trip), + "building" => Some(Table::Building), + "customer" => Some(Table::Customer), + "zone" => Some(Table::Zone), + _ => None, + } + } + + /// Geometry columns to decode from WKB to native, with their geometry type. Empty for tables with + /// no geometry (e.g. `customer`). + pub(crate) fn geometry_columns(self) -> &'static [GeometryColumn] { + match self { + Table::Trip => &[ + GeometryColumn { + name: "t_pickuploc", + kind: GeometryKind::Point, + }, + GeometryColumn { + name: "t_dropoffloc", + kind: GeometryKind::Point, + }, + ], + Table::Building => &[GeometryColumn { + name: "b_boundary", + kind: GeometryKind::Polygon, + }], + Table::Customer => &[], + Table::Zone => &[GeometryColumn { + name: "z_boundary", + kind: GeometryKind::MultiPolygon, + }], + } + } } diff --git a/vortex-duckdb/src/convert/dtype.rs b/vortex-duckdb/src/convert/dtype.rs index 4238b354182..624ced63f56 100644 --- a/vortex-duckdb/src/convert/dtype.rs +++ b/vortex-duckdb/src/convert/dtype.rs @@ -58,6 +58,9 @@ use vortex::extension::datetime::Time; use vortex::extension::datetime::TimeUnit; use vortex::extension::datetime::Timestamp; use vortex_geo::extension::GeoMetadata; +use vortex_geo::extension::MultiPolygon; +use vortex_geo::extension::Point; +use vortex_geo::extension::Polygon; use vortex_geo::extension::WellKnownBinary; use crate::cpp::DUCKDB_TYPE; @@ -245,9 +248,14 @@ impl TryFrom<&DType> for LogicalType { return temporal_to_duckdb(temporal); } - if let Some(wkb) = ext_dtype.metadata_opt::() { - let crs = wkb.crs.as_ref(); - return LogicalType::geometry_type(crs.map(|crs| crs.as_str())); + // Native Point/Polygon and WKB all surface to DuckDB as GEOMETRY so `ST_*` bind. + if let Some(geo) = ext_dtype + .metadata_opt::() + .or_else(|| ext_dtype.metadata_opt::()) + .or_else(|| ext_dtype.metadata_opt::()) + .or_else(|| ext_dtype.metadata_opt::()) + { + return LogicalType::geometry_type(geo.crs.as_deref()); } vortex_bail!("Unsupported extension type \"{}\"", ext_dtype.id()); diff --git a/vortex-duckdb/src/exporter/extension.rs b/vortex-duckdb/src/exporter/extension.rs index 221dc92a85f..30eb3715cc2 100644 --- a/vortex-duckdb/src/exporter/extension.rs +++ b/vortex-duckdb/src/exporter/extension.rs @@ -8,6 +8,12 @@ use vortex::array::arrays::extension::ExtensionArrayExt; use vortex::array::extension::datetime::AnyTemporal; use vortex::error::VortexResult; use vortex::error::vortex_bail; +use vortex_geo::extension::MultiPolygon; +use vortex_geo::extension::MultiPolygonData; +use vortex_geo::extension::Point; +use vortex_geo::extension::PointData; +use vortex_geo::extension::Polygon; +use vortex_geo::extension::PolygonData; use vortex_geo::extension::WellKnownBinary; use vortex_geo::extension::WellKnownBinaryData; @@ -27,5 +33,17 @@ pub(crate) fn new_exporter( return geo::new_wkb_exporter(WellKnownBinaryData::try_from(ext)?, ctx); } + if ext.ext_dtype().is::() { + return geo::new_point_exporter(PointData::try_from(ext)?, ctx); + } + + if ext.ext_dtype().is::() { + return geo::new_polygon_exporter(PolygonData::try_from(ext)?, ctx); + } + + if ext.ext_dtype().is::() { + return geo::new_multipolygon_exporter(MultiPolygonData::try_from(ext)?, ctx); + } + vortex_bail!("no non-temporal extension exporter") } diff --git a/vortex-duckdb/src/exporter/geo.rs b/vortex-duckdb/src/exporter/geo.rs index 1287ed019e2..fc6634c121c 100644 --- a/vortex-duckdb/src/exporter/geo.rs +++ b/vortex-duckdb/src/exporter/geo.rs @@ -4,6 +4,9 @@ use vortex::array::ExecutionCtx; use vortex::array::arrays::VarBinViewArray; use vortex::error::VortexResult; +use vortex_geo::extension::MultiPolygonData; +use vortex_geo::extension::PointData; +use vortex_geo::extension::PolygonData; use vortex_geo::extension::WellKnownBinaryData; use crate::exporter::ColumnExporter; @@ -17,3 +20,34 @@ pub(crate) fn new_wkb_exporter( let values = array.wkb_values().clone().execute::(ctx)?; new_exporter(values, ctx) } + +/// Create an exporter for a native `Point` column. DuckDB `GEOMETRY` vectors carry WKB, so the +/// points are serialized to WKB via [`PointData::to_wkb`] (only for rows DuckDB materializes — +/// with predicate pushdown that's just the survivors). +pub(crate) fn new_point_exporter( + point: PointData, + ctx: &mut ExecutionCtx, +) -> VortexResult> { + let values = point.to_wkb(ctx)?.execute::(ctx)?; + new_exporter(values, ctx) +} + +/// Create an exporter for a native `Polygon` column. Like [`new_point_exporter`], DuckDB `GEOMETRY` +/// vectors carry WKB, so the polygons are serialized to WKB via [`PolygonData::to_wkb`]. +pub(crate) fn new_polygon_exporter( + polygon: PolygonData, + ctx: &mut ExecutionCtx, +) -> VortexResult> { + let values = polygon.to_wkb(ctx)?.execute::(ctx)?; + new_exporter(values, ctx) +} + +/// Create an exporter for a native `MultiPolygon` column, serialized to WKB via +/// [`MultiPolygonData::to_wkb`] (see [`new_point_exporter`]). +pub(crate) fn new_multipolygon_exporter( + multipolygon: MultiPolygonData, + ctx: &mut ExecutionCtx, +) -> VortexResult> { + let values = multipolygon.to_wkb(ctx)?.execute::(ctx)?; + new_exporter(values, ctx) +} From 69a4f900e32d4c65e649e193bccf23aaf00965bd Mon Sep 17 00:00:00 2001 From: Nemo Yu Date: Mon, 29 Jun 2026 10:09:36 -0400 Subject: [PATCH 2/2] feat: wire geo native into vx-bench Signed-off-by: Nemo Yu --- .../bench_orchestrator/config.py | 2 ++ bench-orchestrator/tests/test_config.py | 34 +++++++++++++++++++ bench-orchestrator/tests/test_executor.py | 13 +++++++ 3 files changed, 49 insertions(+) diff --git a/bench-orchestrator/bench_orchestrator/config.py b/bench-orchestrator/bench_orchestrator/config.py index e358bf18f01..8c8275bc81e 100644 --- a/bench-orchestrator/bench_orchestrator/config.py +++ b/bench-orchestrator/bench_orchestrator/config.py @@ -35,6 +35,7 @@ class Format(Enum): PARQUET = "parquet" VORTEX = "vortex" VORTEX_COMPACT = "vortex-compact" + VORTEX_NATIVE = "vortex-native" DUCKDB = "duckdb" LANCE = "lance" @@ -68,6 +69,7 @@ class Benchmark(Enum): Format.PARQUET, Format.VORTEX, Format.VORTEX_COMPACT, + Format.VORTEX_NATIVE, Format.DUCKDB, ], Engine.LANCE: [Format.LANCE], diff --git a/bench-orchestrator/tests/test_config.py b/bench-orchestrator/tests/test_config.py index f900048f87b..a8f09c6b04b 100644 --- a/bench-orchestrator/tests/test_config.py +++ b/bench-orchestrator/tests/test_config.py @@ -26,6 +26,23 @@ def test_parse_formats_json_accepts_ci_format_arrays() -> None: assert formats == [Format.PARQUET, Format.VORTEX, Format.DUCKDB] +def test_parse_formats_json_accepts_vortex_native() -> None: + formats = parse_formats_json('["parquet","vortex","vortex-native"]') + + assert formats == [Format.PARQUET, Format.VORTEX, Format.VORTEX_NATIVE] + + +def test_resolve_axis_targets_offers_vortex_native_on_duckdb_only() -> None: + # vortex-native is a DuckDB-only lane; the DataFusion axis is dropped as unsupported. + targets, warnings = resolve_axis_targets( + [Engine.DATAFUSION, Engine.DUCKDB], + [Format.VORTEX_NATIVE], + ) + + assert targets == [BenchmarkTarget(engine=Engine.DUCKDB, format=Format.VORTEX_NATIVE)] + assert warnings == ["Format vortex-native is not supported by engine datafusion"] + + def test_resolve_axis_targets_filters_unsupported_combinations() -> None: targets, warnings = resolve_axis_targets( [Engine.DATAFUSION, Engine.DUCKDB], @@ -55,6 +72,23 @@ def test_resolve_axis_targets_skips_engines_a_benchmark_cannot_run() -> None: assert warnings == ["Benchmark spatialbench does not support engine datafusion"] +def test_resolve_axis_targets_expands_spatialbench_three_lanes() -> None: + # The single-command three-lane comparison: parquet, WKB vortex, and native-geometry vortex, all + # on DuckDB. + targets, warnings = resolve_axis_targets( + [Engine.DUCKDB], + [Format.PARQUET, Format.VORTEX, Format.VORTEX_NATIVE], + Benchmark.SPATIALBENCH, + ) + + assert targets == [ + BenchmarkTarget(engine=Engine.DUCKDB, format=Format.PARQUET), + BenchmarkTarget(engine=Engine.DUCKDB, format=Format.VORTEX), + BenchmarkTarget(engine=Engine.DUCKDB, format=Format.VORTEX_NATIVE), + ] + assert warnings == [] + + def test_validate_targets_rejects_engine_a_benchmark_cannot_run() -> None: errors = validate_targets( [BenchmarkTarget(engine=Engine.DATAFUSION, format=Format.PARQUET)], diff --git a/bench-orchestrator/tests/test_executor.py b/bench-orchestrator/tests/test_executor.py index dd3253a22ff..4a09d35d89e 100644 --- a/bench-orchestrator/tests/test_executor.py +++ b/bench-orchestrator/tests/test_executor.py @@ -33,6 +33,19 @@ def test_build_command_adds_duckdb_cleanup_flag() -> None: assert "scale-factor=1.0" in cmd +def test_build_command_serializes_vortex_native_format() -> None: + executor = BenchmarkExecutor(Path("/tmp/duckdb-bench"), Engine.DUCKDB) + + cmd = executor.build_command( + benchmark=Benchmark.SPATIALBENCH, + formats=[Format.PARQUET, Format.VORTEX, Format.VORTEX_NATIVE], + iterations=1, + options={"scale-factor": "1.0"}, + ) + + assert "parquet,vortex,vortex-native" in cmd + + def test_build_command_omits_formats_for_lance_backend() -> None: executor = BenchmarkExecutor(Path("/tmp/lance-bench"), Engine.LANCE)