diff --git a/Cargo.lock b/Cargo.lock index ccc1ff46344..04684f767fd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10533,6 +10533,7 @@ dependencies = [ name = "vortex-ffi" version = "0.1.0" dependencies = [ + "arrow-array 58.0.0", "async-fs", "cbindgen", "futures", diff --git a/vortex-ffi/Cargo.toml b/vortex-ffi/Cargo.toml index 27f2c7b6140..b32915b6fa1 100644 --- a/vortex-ffi/Cargo.toml +++ b/vortex-ffi/Cargo.toml @@ -20,6 +20,7 @@ categories = { workspace = true } all-features = true [dependencies] +arrow-array = { workspace = true, features = ["ffi"] } async-fs = { workspace = true } futures = { workspace = true } itertools = { workspace = true } diff --git a/vortex-ffi/README.md b/vortex-ffi/README.md index 368aac7cb8c..5e2fb0c4b2d 100644 --- a/vortex-ffi/README.md +++ b/vortex-ffi/README.md @@ -110,6 +110,6 @@ cmake -Bbuild -DWITH_ASAN=1 -DTARGET_TRIPLE= 3. Run the tests (ctest doesn't output failures in detail): -``` +```sh ./build/test/vortex_ffi_test 2>& 1 | rustfilt -i- ``` diff --git a/vortex-ffi/cbindgen.toml b/vortex-ffi/cbindgen.toml index 12e484f093e..419188250df 100644 --- a/vortex-ffi/cbindgen.toml +++ b/vortex-ffi/cbindgen.toml @@ -8,10 +8,51 @@ style = "type" header = """ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +#include // // THIS FILE IS AUTO-GENERATED, DO NOT MAKE EDITS DIRECTLY // + +// https://arrow.apache.org/docs/format/CDataInterface.html#structure-definitions +// We don't want to bundle nanoarrow or similar just for these two definitions. +// If you use your own Arrow library, define this macro and +// typedef FFI_ArrowSchema ArrowSchema; +// typedef FFI_ArrowArrayStream ArrowArrayStream; +#ifndef USE_OWN_ARROW +struct ArrowSchema { + const char* format; + const char* name; + const char* metadata; + int64_t flags; + int64_t n_children; + struct ArrowSchema** children; + struct ArrowSchema* dictionary; + void (*release)(struct ArrowSchema*); + void* private_data; +}; +struct ArrowArray { + int64_t length; + int64_t null_count; + int64_t offset; + int64_t n_buffers; + int64_t n_children; + const void** buffers; + struct ArrowArray** children; + struct ArrowArray* dictionary; + void (*release)(struct ArrowArray*); + void* private_data; +}; +struct ArrowArrayStream { + int (*get_schema)(struct ArrowArrayStream*, struct ArrowSchema* out); + int (*get_next)(struct ArrowArrayStream*, struct ArrowArray* out); + const char* (*get_last_error)(struct ArrowArrayStream*); + void (*release)(struct ArrowArrayStream*); + void* private_data; +}; +typedef ArrowSchema FFI_ArrowSchema; +typedef ArrowArrayStream FFI_ArrowArrayStream; +#endif """ [export.rename] diff --git a/vortex-ffi/cinclude/vortex.h b/vortex-ffi/cinclude/vortex.h index f1f25491b98..46496bcc2f7 100644 --- a/vortex-ffi/cinclude/vortex.h +++ b/vortex-ffi/cinclude/vortex.h @@ -1,10 +1,51 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +#include // // THIS FILE IS AUTO-GENERATED, DO NOT MAKE EDITS DIRECTLY // +// https://arrow.apache.org/docs/format/CDataInterface.html#structure-definitions +// We don't want to bundle nanoarrow or similar just for these two definitions. +// If you use your own Arrow library, define this macro and +// typedef FFI_ArrowSchema ArrowSchema; +// typedef FFI_ArrowArrayStream ArrowArrayStream; +#ifndef USE_OWN_ARROW +struct ArrowSchema { + const char *format; + const char *name; + const char *metadata; + int64_t flags; + int64_t n_children; + struct ArrowSchema **children; + struct ArrowSchema *dictionary; + void (*release)(struct ArrowSchema *); + void *private_data; +}; +struct ArrowArray { + int64_t length; + int64_t null_count; + int64_t offset; + int64_t n_buffers; + int64_t n_children; + const void **buffers; + struct ArrowArray **children; + struct ArrowArray *dictionary; + void (*release)(struct ArrowArray *); + void *private_data; +}; +struct ArrowArrayStream { + int (*get_schema)(struct ArrowArrayStream *, struct ArrowSchema *out); + int (*get_next)(struct ArrowArrayStream *, struct ArrowArray *out); + const char *(*get_last_error)(struct ArrowArrayStream *); + void (*release)(struct ArrowArrayStream *); + void *private_data; +}; +typedef ArrowSchema FFI_ArrowSchema; +typedef ArrowArrayStream FFI_ArrowArrayStream; +#endif + #pragma once #include @@ -134,6 +175,12 @@ typedef enum { VX_VALIDITY_ARRAY = 3, } vx_validity_type; +typedef enum { + VX_CARD_UNKNOWN = 0, + VX_CARD_ESTIMATE = 1, + VX_CARD_MAXIMUM = 2, +} vx_cardinality; + /** * Equalities, inequalities, and boolean operations over possibly null values. * For most operations, if either side is null, the result is null. @@ -224,6 +271,18 @@ typedef enum { LOG_LEVEL_TRACE = 5, } vx_log_level; +typedef enum { + VX_S_INCLUDE_ALL = 0, + VX_S_INCLUDE_RANGE = 1, + VX_S_EXCLUDE_RANGE = 2, +} vx_scan_selection_include; + +typedef enum { + VX_ESTIMATE_UNKNOWN = 0, + VX_ESTIMATE_EXACT = 1, + VX_ESTIMATE_INEXACT = 2, +} vx_estimate_boundary; + /** * Physical type enum, represents the in-memory physical layout but might represent a different logical type. */ @@ -315,6 +374,8 @@ typedef struct Nullability Nullability; typedef struct Primitive Primitive; +typedef struct VxFileHandle VxFileHandle; + /** * Arrays are reference-counted handles to owned memory buffers that hold * scalars. These buffers can be held in a number of physical encodings to @@ -369,6 +430,14 @@ typedef struct vx_array_sink vx_array_sink; */ typedef struct vx_binary vx_binary; +/** + * A data source is a reference to multiple possibly remote files. When + * created, it opens first file to determine the schema from DType, all + * other operations are deferred till a scan is requested. You can request + * multiple file scans from a data source + */ +typedef struct vx_data_source vx_data_source; + /** * A Vortex data type. * @@ -403,6 +472,14 @@ typedef struct vx_expression vx_expression; */ typedef struct vx_file vx_file; +/** + * A Partition is a unit of work for a worker thread from which you can + * get vx_arrays. + */ +typedef struct vx_partition vx_partition; + +typedef struct vx_scan vx_scan; + /** * A handle to a Vortex session. */ @@ -435,6 +512,67 @@ typedef struct { const vx_array *array; } vx_validity; +typedef int (*vx_fs_use_vortex)(const char *schema, const char *path); + +typedef void (*vx_fs_set_userdata)(void *userdata); + +typedef void (*vx_fs_open)(void *userdata, const char *path, vx_error **err); + +typedef void (*vx_fs_create)(void *userdata, const char *path, vx_error **err); + +typedef void (*vx_list_callback)(void *userdata, const char *name, int is_dir); + +typedef void (*vx_fs_list)(const void *userdata, + const char *path, + vx_list_callback callback, + vx_error **error); + +typedef const VxFileHandle *vx_file_handle; + +typedef void (*vx_fs_close)(vx_file_handle handle); + +typedef uint64_t (*vx_fs_size)(vx_file_handle handle, vx_error **err); + +typedef uint64_t ( + *vx_fs_read)(vx_file_handle handle, uint64_t offset, size_t len, uint8_t *buffer, vx_error **err); + +typedef uint64_t ( + *vx_fs_write)(vx_file_handle handle, uint64_t offset, size_t len, uint8_t *buffer, vx_error **err); + +typedef void (*vx_fs_sync)(vx_file_handle handle, vx_error **err); + +typedef void (*vx_glob_callback)(void *userdata, const char *file); + +typedef void (*vx_glob)(const char *glob, vx_glob_callback callback, vx_error **err); + +/** + * Host must either implement all or none of fs_* callbacks. + */ +typedef struct { + const char *files; + /** + * Whether to use Vortex filesystem or host's filesystem. + * Return 1 to use Vortex for a given schema ("file", "s3") and path. + * Return 0 to use host's filesystem. + */ + vx_fs_use_vortex fs_use_vortex; + vx_fs_set_userdata fs_set_userdata; + vx_fs_open fs_open; + vx_fs_create fs_create; + vx_fs_list fs_list; + vx_fs_close fs_close; + vx_fs_size fs_size; + vx_fs_read fs_read; + vx_fs_write fs_write; + vx_fs_sync fs_sync; + vx_glob glob; +} vx_data_source_options; + +typedef struct { + vx_cardinality cardinality; + uint64_t rows; +} vx_data_source_row_count; + /** * Options supplied for opening a file. */ @@ -497,6 +635,28 @@ typedef struct { unsigned long row_offset; } vx_file_scan_options; +typedef struct { + uint64_t *idx; + size_t idx_len; + vx_scan_selection_include include; +} vx_scan_selection; + +typedef struct { + const vx_expression *projection; + const vx_expression *filter; + uint64_t row_range_begin; + uint64_t row_range_end; + vx_scan_selection selection; + uint64_t limit; + uint64_t max_threads; + int ordered; +} vx_scan_options; + +typedef struct { + uint64_t estimate; + vx_estimate_boundary type; +} vx_estimate; + #ifdef __cplusplus extern "C" { #endif // __cplusplus @@ -711,6 +871,34 @@ size_t vx_binary_len(const vx_binary *ptr); */ const char *vx_binary_ptr(const vx_binary *ptr); +/** + * Clone a borrowed [`vx_data_source`], returning an owned [`vx_data_source`]. + * + * + * Must be released with [`vx_data_source_free`]. + */ +const vx_data_source *vx_data_source_clone(const vx_data_source *ptr); + +/** + * Free an owned [`vx_data_source`] object. + */ +void vx_data_source_free(const vx_data_source *ptr); + +/** + * Create a new owned datasource which must be freed by the caller + */ +const vx_data_source * +vx_data_source_new(const vx_session *session, const vx_data_source_options *opts, vx_error **err); + +/** + * Create a non-owned dtype referencing dataframe. + * This dtype's lifetime is bound to underlying data source. + * Caller should not free this dtype. + */ +const vx_dtype *vx_data_source_dtype(const vx_data_source *ds); + +void vx_data_source_get_row_count(const vx_data_source *ds, vx_data_source_row_count *rc); + /** * Clone a borrowed [`vx_dtype`], returning an owned [`vx_dtype`]. * @@ -854,6 +1042,8 @@ uint8_t vx_dtype_time_unit(const DType *dtype); */ const vx_string *vx_dtype_time_zone(const DType *dtype); +void vx_type_to_arrow_schema(const vx_dtype *_dtype, FFI_ArrowSchema *_schema, vx_error **_err); + /** * Free an owned [`vx_error`] object. */ @@ -1034,6 +1224,47 @@ vx_array_iterator *vx_file_scan(const vx_session *session, */ void vx_set_log_level(vx_log_level level); +/** + * Free an owned [`vx_scan`] object. + */ +void vx_scan_free(vx_scan *ptr); + +/** + * Free an owned [`vx_partition`] object. + */ +void vx_partition_free(vx_partition *ptr); + +vx_scan *vx_data_source_scan(const vx_data_source *data_source, + const vx_scan_options *options, + vx_estimate *estimate, + vx_error **err); + +/** + * Get next owned partition out of a scan request. + * Caller must free this partition using vx_partition_free. + * This method is thread-safe. + * If using in a sync multi-thread runtime, users are encouraged to create a + * worker thread per partition. + * Returns NULL and doesn't set err on exhaustion. + * Returns NULL and sets err on error. + */ +vx_partition *vx_scan_next(vx_scan *scan, vx_error **err); + +void vx_partition_row_count(const vx_partition *partition, vx_estimate *count, vx_error **err); + +void vx_partition_scan_arrow(const vx_partition *_partition, FFI_ArrowArrayStream *_stream, vx_error **err); + +/** + * Get next vx_array out of this partition. + * Thread-unsafe. + */ +const vx_array *vx_partition_next(vx_partition *partition, vx_error **err); + +/** + * Scan progress between 0.0 and 1.0 + */ +double vx_scan_progress(const vx_scan *_scan); + /** * Free an owned [`vx_session`] object. */ @@ -1115,6 +1346,7 @@ void vx_struct_column_builder_free(vx_struct_column_builder *ptr); /** * Create a new column-wise struct array builder with given validity and a * capacity hint. validity can't be NULL. + * Capacity hint is for the number of columns. * If you don't know capacity, pass 0. * if validity is NULL, returns NULL. */ diff --git a/vortex-ffi/src/array.rs b/vortex-ffi/src/array.rs index b692c3d58c3..b9b57a75f51 100644 --- a/vortex-ffi/src/array.rs +++ b/vortex-ffi/src/array.rs @@ -192,6 +192,9 @@ pub unsafe extern "C-unwind" fn vx_array_dtype(array: *const vx_array) -> *const vx_dtype::new_ref(vx_array::as_ref(array).dtype()) } +// Return an owned field for array at index. +// Returns NULL and sets error_out if index is out of bounds or array doesn't +// have dtype DTYPE_STRUCT. #[unsafe(no_mangle)] pub unsafe extern "C-unwind" fn vx_array_get_field( array: *const vx_array, diff --git a/vortex-ffi/src/data_source.rs b/vortex-ffi/src/data_source.rs new file mode 100644 index 00000000000..2b55267e5e5 --- /dev/null +++ b/vortex-ffi/src/data_source.rs @@ -0,0 +1,187 @@ +#![allow(non_camel_case_types)] + +use std::ffi::c_char; +use std::ffi::c_int; +use std::ffi::c_void; +use std::sync::Arc; + +use vortex::error::VortexResult; +use vortex::error::vortex_ensure; +use vortex::expr::stats::Precision::Exact; +use vortex::expr::stats::Precision::Inexact; +use vortex::file::multi::MultiFileDataSource; +use vortex::io::runtime::BlockingRuntime; +use vortex::scan::DataSource; +use vortex::scan::DataSourceRef; + +use crate::RUNTIME; +use crate::dtype::vx_dtype; +use crate::error::try_or_default; +use crate::error::vx_error; +use crate::session::vx_session; +use crate::to_string; + +crate::arc_dyn_wrapper!( + /// A data source is a reference to multiple possibly remote files. When + /// created, it opens first file to determine the schema from DType, all + /// other operations are deferred till a scan is requested. You can request + /// multiple file scans from a data source + dyn DataSource, + vx_data_source); + +pub struct VxFileHandle; +pub type vx_file_handle = *const VxFileHandle; + +pub type vx_list_callback = + Option; +pub type vx_glob_callback = + Option; + +pub type vx_fs_use_vortex = + Option c_int>; +pub type vx_fs_set_userdata = Option; + +pub type vx_fs_open = Option< + unsafe extern "C" fn(userdata: *mut c_void, path: *const c_char, err: *mut *mut vx_error), +>; +pub type vx_fs_create = Option< + unsafe extern "C" fn(userdata: *mut c_void, path: *const c_char, err: *mut *mut vx_error), +>; + +pub type vx_fs_list = Option< + unsafe extern "C" fn( + userdata: *const c_void, + path: *const c_char, + callback: vx_list_callback, + error: *mut *mut vx_error, + ), +>; + +pub type vx_fs_close = Option; +pub type vx_fs_size = + Option u64>; + +pub type vx_fs_read = Option< + unsafe extern "C" fn( + handle: vx_file_handle, + offset: u64, + len: usize, + buffer: *mut u8, + err: *mut *mut vx_error, + ) -> u64, +>; + +pub type vx_fs_write = Option< + unsafe extern "C" fn( + handle: vx_file_handle, + offset: u64, + len: usize, + buffer: *mut u8, + err: *mut *mut vx_error, + ) -> u64, +>; + +pub type vx_fs_sync = Option; + +pub type vx_glob = Option< + unsafe extern "C" fn(glob: *const c_char, callback: vx_glob_callback, err: *mut *mut vx_error), +>; + +#[repr(C)] +/// Host must either implement all or none of fs_* callbacks. +pub struct vx_data_source_options { + // TODO what if the program wants to read a Vortex file from an existing buffer? + files: *const c_char, + + /// Whether to use Vortex filesystem or host's filesystem. + /// Return 1 to use Vortex for a given schema ("file", "s3") and path. + /// Return 0 to use host's filesystem. + fs_use_vortex: vx_fs_use_vortex, + fs_set_userdata: vx_fs_set_userdata, + fs_open: vx_fs_open, + fs_create: vx_fs_create, + fs_list: vx_fs_list, + fs_close: vx_fs_close, + fs_size: vx_fs_size, + fs_read: vx_fs_read, + fs_write: vx_fs_write, + fs_sync: vx_fs_sync, + + glob: vx_glob, +} + +unsafe fn data_source_new( + session: *const vx_session, + opts: *const vx_data_source_options, +) -> VortexResult<*const vx_data_source> { + vortex_ensure!(!session.is_null()); + vortex_ensure!(!opts.is_null()); + + let session = vx_session::as_ref(session).clone(); + + let opts = unsafe { &*opts }; + vortex_ensure!(!opts.files.is_null()); + + let glob = unsafe { to_string(opts.files) }; + + RUNTIME.block_on(async { + let data_source = MultiFileDataSource::new(session) + //.with_filesystem(fs) + .with_glob(glob) + .build() + .await?; + Ok(vx_data_source::new(Arc::new(data_source) as DataSourceRef)) + }) +} + +/// Create a new owned datasource which must be freed by the caller +#[unsafe(no_mangle)] +pub unsafe extern "C-unwind" fn vx_data_source_new( + session: *const vx_session, + opts: *const vx_data_source_options, + err: *mut *mut vx_error, +) -> *const vx_data_source { + try_or_default(err, || unsafe { data_source_new(session, opts) }) +} + +#[unsafe(no_mangle)] +/// Create a non-owned dtype referencing dataframe. +/// This dtype's lifetime is bound to underlying data source. +/// Caller should not free this dtype. +pub unsafe extern "C-unwind" fn vx_data_source_dtype(ds: *const vx_data_source) -> *const vx_dtype { + vx_dtype::new_ref(vx_data_source::as_ref(ds).dtype()) +} + +#[repr(C)] +enum vx_cardinality { + VX_CARD_UNKNOWN = 0, + VX_CARD_ESTIMATE = 1, + VX_CARD_MAXIMUM = 2, +} + +#[repr(C)] +pub struct vx_data_source_row_count { + cardinality: vx_cardinality, + rows: u64, +} + +#[unsafe(no_mangle)] +pub unsafe extern "C-unwind" fn vx_data_source_get_row_count( + ds: *const vx_data_source, + rc: *mut vx_data_source_row_count, +) { + let rc = unsafe { &mut *rc }; + match vx_data_source::as_ref(ds).row_count() { + Some(Exact(rows)) => { + rc.cardinality = vx_cardinality::VX_CARD_MAXIMUM; + rc.rows = rows; + } + Some(Inexact(rows)) => { + rc.cardinality = vx_cardinality::VX_CARD_ESTIMATE; + rc.rows = rows; + } + None => { + rc.cardinality = vx_cardinality::VX_CARD_UNKNOWN; + } + } +} diff --git a/vortex-ffi/src/dtype.rs b/vortex-ffi/src/dtype.rs index 42291d47743..3c608fa7264 100644 --- a/vortex-ffi/src/dtype.rs +++ b/vortex-ffi/src/dtype.rs @@ -4,6 +4,7 @@ use std::ptr; use std::sync::Arc; +use arrow_array::ffi::FFI_ArrowSchema; use vortex::dtype::DType; use vortex::dtype::DecimalDType; use vortex::error::VortexExpect; @@ -14,6 +15,7 @@ use vortex::extension::datetime::Time; use vortex::extension::datetime::Timestamp; use crate::arc_wrapper; +use crate::error::vx_error; use crate::ptype::vx_ptype; use crate::string::vx_string; use crate::struct_fields::vx_struct_fields; @@ -323,6 +325,15 @@ pub unsafe extern "C-unwind" fn vx_dtype_time_zone(dtype: *const DType) -> *cons } } +#[unsafe(no_mangle)] +pub unsafe extern "C-unwind" fn vx_type_to_arrow_schema( + _dtype: *const vx_dtype, + _schema: *mut FFI_ArrowSchema, + _err: *mut *mut vx_error, +) { + todo!(); +} + #[cfg(test)] #[allow(clippy::cast_possible_truncation)] mod tests { diff --git a/vortex-ffi/src/error.rs b/vortex-ffi/src/error.rs index b36ba480b66..4c7712595a0 100644 --- a/vortex-ffi/src/error.rs +++ b/vortex-ffi/src/error.rs @@ -28,6 +28,7 @@ pub(crate) fn write_error(error: *mut *mut vx_error, message: &str) { unsafe { error.write(err) }; } +#[inline] pub fn try_or_default( error_out: *mut *mut vx_error, function: impl FnOnce() -> VortexResult, @@ -38,10 +39,7 @@ pub fn try_or_default( value } Err(err) => { - let err = vx_error::new(Box::new(VortexError { - message: err.to_string().into(), - })); - unsafe { error_out.write(err) }; + write_error(error_out, &err.to_string()); T::default() } } diff --git a/vortex-ffi/src/lib.rs b/vortex-ffi/src/lib.rs index db387172150..808c42323e4 100644 --- a/vortex-ffi/src/lib.rs +++ b/vortex-ffi/src/lib.rs @@ -9,6 +9,7 @@ mod array; mod array_iterator; mod binary; +mod data_source; mod dtype; mod error; mod expression; @@ -16,6 +17,7 @@ mod file; mod log; mod macros; mod ptype; +mod scan; mod session; mod sink; mod string; diff --git a/vortex-ffi/src/scan.rs b/vortex-ffi/src/scan.rs new file mode 100644 index 00000000000..3aa503b0974 --- /dev/null +++ b/vortex-ffi/src/scan.rs @@ -0,0 +1,304 @@ +#![allow(non_camel_case_types)] + +use core::slice; +use std::ffi::c_int; +use std::ops::Range; +use std::ptr; +use std::sync::Arc; +use std::sync::Mutex; + +use arrow_array::ffi_stream::FFI_ArrowArrayStream; +use futures::StreamExt; +use vortex::array::expr::stats::Precision; +use vortex::array::stream::SendableArrayStream; +use vortex::buffer::Buffer; +use vortex::error::VortexResult; +use vortex::expr::root; +use vortex::io::runtime::BlockingRuntime; +use vortex::scan::DataSourceScan; +use vortex::scan::Partition; +use vortex::scan::PartitionStream; +use vortex::scan::ScanRequest; +use vortex::scan::selection::Selection; + +use crate::RUNTIME; +use crate::array::vx_array; +use crate::data_source::vx_data_source; +use crate::error::try_or_default; +use crate::error::vx_error; +use crate::error::write_error; +use crate::expression::vx_expression; + +pub enum VxScanState { + Pending(Box), + Started(PartitionStream), + Finished, +} +pub type VxScan = Mutex; +crate::box_wrapper!(VxScan, vx_scan); + +pub enum VxPartitionScan { + Pending(Box), + Started(SendableArrayStream), + Finished, +} +crate::box_wrapper!( + /// A Partition is a unit of work for a worker thread from which you can + /// get vx_arrays. + VxPartitionScan, + vx_partition); + +#[repr(C)] +// We parse Selection from vx_scan_selection[_include], so we don't need +// to instantiate VX_S_* items directly. +#[allow(dead_code)] +pub enum vx_scan_selection_include { + VX_S_INCLUDE_ALL = 0, + VX_S_INCLUDE_RANGE = 1, + VX_S_EXCLUDE_RANGE = 2, +} + +#[repr(C)] +pub struct vx_scan_selection { + pub idx: *mut u64, + pub idx_len: usize, + pub include: vx_scan_selection_include, +} + +// Distinct from ScanRequest for easier option handling from C +#[repr(C)] +pub struct vx_scan_options { + pub projection: *const vx_expression, + pub filter: *const vx_expression, + pub row_range_begin: u64, + pub row_range_end: u64, + pub selection: vx_scan_selection, + pub limit: u64, + pub max_threads: u64, + pub ordered: c_int, +} + +#[repr(C)] +pub enum vx_estimate_boundary { + VX_ESTIMATE_UNKNOWN = 0, + VX_ESTIMATE_EXACT = 1, + VX_ESTIMATE_INEXACT = 2, +} + +#[repr(C)] +pub struct vx_estimate { + estimate: u64, + r#type: vx_estimate_boundary, +} + +fn scan_request(opts: *const vx_scan_options) -> VortexResult { + if opts.is_null() { + return Ok(ScanRequest::default()); + } + let opts = unsafe { &*opts }; + + let projection = if opts.projection.is_null() { + root() + } else { + vx_expression::as_ref(opts.projection).clone() + }; + + let filter = if opts.filter.is_null() { + None + } else { + Some(vx_expression::as_ref(opts.filter).clone()) + }; + + let selection = &opts.selection; + let selection = match selection.include { + vx_scan_selection_include::VX_S_INCLUDE_ALL => Selection::All, + vx_scan_selection_include::VX_S_INCLUDE_RANGE => { + let buf = unsafe { slice::from_raw_parts(selection.idx, selection.idx_len) }; + let buf = Buffer::copy_from(buf); + Selection::IncludeByIndex(buf) + } + vx_scan_selection_include::VX_S_EXCLUDE_RANGE => { + let buf = unsafe { slice::from_raw_parts(selection.idx, selection.idx_len) }; + let buf = Buffer::copy_from(buf); + Selection::ExcludeByIndex(buf) + } + }; + + let ordered = opts.ordered == 1; + + let start = opts.row_range_begin; + let end = opts.row_range_end; + let row_range = (start > 0 || end > 0).then_some(Range { start, end }); + + let limit = (opts.limit != 0).then_some(opts.limit); + + Ok(ScanRequest { + projection, + filter, + row_range, + selection, + ordered, + limit, + }) +} + +fn write_estimate>(estimate: Option>, out: &mut vx_estimate) { + match estimate { + Some(Precision::Exact(value)) => { + out.r#type = vx_estimate_boundary::VX_ESTIMATE_EXACT; + out.estimate = value.into(); + } + Some(Precision::Inexact(value)) => { + out.r#type = vx_estimate_boundary::VX_ESTIMATE_INEXACT; + out.estimate = value.into(); + } + None => { + out.r#type = vx_estimate_boundary::VX_ESTIMATE_UNKNOWN; + } + } +} + +#[unsafe(no_mangle)] +// Create a new owned data source scan which must be freed by the caller. +// Scan can be consumed only once. +// Returns NULL and sets err on error. +// options may not be NULL. +pub unsafe extern "C-unwind" fn vx_data_source_scan( + data_source: *const vx_data_source, + options: *const vx_scan_options, + estimate: *mut vx_estimate, + err: *mut *mut vx_error, +) -> *mut vx_scan { + try_or_default(err, || { + let request = scan_request(options)?; + RUNTIME.block_on(async { + let scan = vx_data_source::as_ref(data_source).scan(request).await?; + if !estimate.is_null() { + write_estimate( + scan.partition_count().map(|x| match x { + Precision::Exact(v) => Precision::Exact(v as u64), + Precision::Inexact(v) => Precision::Inexact(v as u64), + }), + unsafe { &mut *estimate }, + ); + } + Ok(vx_scan::new(Box::new(Mutex::new(VxScanState::Pending( + scan, + ))))) + }) + }) +} + +#[unsafe(no_mangle)] +/// Get next owned partition out of a scan request. +/// Caller must free this partition using vx_partition_free. +/// This method is thread-safe. +/// If using in a sync multi-thread runtime, users are encouraged to create a +/// worker thread per partition. +/// Returns NULL and doesn't set err on exhaustion. +/// Returns NULL and sets err on error. +pub unsafe extern "C-unwind" fn vx_scan_next( + scan: *mut vx_scan, + err: *mut *mut vx_error, +) -> *mut vx_partition { + let scan = vx_scan::as_mut(scan); + let mut scan = scan.lock().expect("failed to lock mutex"); + let scan = &mut *scan; + unsafe { + let ptr = scan as *mut VxScanState; + + let on_finish = || -> VortexResult<*mut vx_partition> { + ptr::write(ptr, VxScanState::Finished); + Ok(ptr::null_mut()) + }; + + let on_stream = |mut stream: PartitionStream| -> VortexResult<*mut vx_partition> { + match RUNTIME.block_on(stream.next()) { + Some(partition) => { + let partition = VxPartitionScan::Pending(partition?); + let partition = vx_partition::new(Box::new(partition)); + ptr::write(ptr, VxScanState::Started(stream)); + Ok(partition) + } + None => on_finish(), + } + }; + + let owned = ptr::read(ptr); + try_or_default(err, || match owned { + VxScanState::Pending(scan) => on_stream(scan.partitions()), + VxScanState::Started(stream) => on_stream(stream), + VxScanState::Finished => on_finish(), + }) + } +} + +#[unsafe(no_mangle)] +pub unsafe extern "C-unwind" fn vx_partition_row_count( + partition: *const vx_partition, + count: *mut vx_estimate, + err: *mut *mut vx_error, +) { + let partition = vx_partition::as_ref(partition); + let VxPartitionScan::Pending(partition) = partition else { + write_error( + err, + "can't get row count of a partition that's already started", + ); + return; + }; + write_estimate(partition.row_count(), unsafe { &mut *count }) +} + +// TODO export nanoarrow headers? + +#[unsafe(no_mangle)] +pub unsafe extern "C-unwind" fn vx_partition_scan_arrow( + _partition: *const vx_partition, + _stream: *mut FFI_ArrowArrayStream, + err: *mut *mut vx_error, +) { + write_error(err, "failed to scan partition to Arrow"); +} + +#[unsafe(no_mangle)] +/// Get next vx_array out of this partition. +/// Thread-unsafe. +pub unsafe extern "C-unwind" fn vx_partition_next( + partition: *mut vx_partition, + err: *mut *mut vx_error, +) -> *const vx_array { + let partition = vx_partition::as_mut(partition); + unsafe { + let ptr = partition as *mut VxPartitionScan; + + let on_finish = || -> VortexResult<*const vx_array> { + ptr::write(ptr, VxPartitionScan::Finished); + Ok(ptr::null_mut()) + }; + + let on_stream = |mut stream: SendableArrayStream| -> VortexResult<*const vx_array> { + match RUNTIME.block_on(stream.next()) { + Some(array) => { + let array = vx_array::new(Arc::new(array?)); + ptr::write(ptr, VxPartitionScan::Started(stream)); + Ok(array) + } + None => on_finish(), + } + }; + + let owned = ptr::read(ptr); + try_or_default(err, || match owned { + VxPartitionScan::Pending(partition) => on_stream(partition.execute()?), + VxPartitionScan::Started(stream) => on_stream(stream), + VxPartitionScan::Finished => on_finish(), + }) + } +} + +#[unsafe(no_mangle)] +/// Scan progress between 0.0 and 1.0 +pub unsafe extern "C-unwind" fn vx_scan_progress(_scan: *const vx_scan) -> f64 { + 0.0 +} diff --git a/vortex-ffi/src/struct_array.rs b/vortex-ffi/src/struct_array.rs index d7c426a94ff..9a13c778601 100644 --- a/vortex-ffi/src/struct_array.rs +++ b/vortex-ffi/src/struct_array.rs @@ -28,6 +28,7 @@ crate::box_wrapper!(StructBuilder, vx_struct_column_builder); /// Create a new column-wise struct array builder with given validity and a /// capacity hint. validity can't be NULL. +/// Capacity hint is for the number of columns. /// If you don't know capacity, pass 0. /// if validity is NULL, returns NULL. #[unsafe(no_mangle)] diff --git a/vortex-ffi/test/common.h b/vortex-ffi/test/common.h index dbbcf56d318..3418cef0f87 100644 --- a/vortex-ffi/test/common.h +++ b/vortex-ffi/test/common.h @@ -1,7 +1,6 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors #pragma once - #include #include #include "vortex.h" diff --git a/vortex-ffi/test/scan.cpp b/vortex-ffi/test/scan.cpp new file mode 100644 index 00000000000..d9981ef768e --- /dev/null +++ b/vortex-ffi/test/scan.cpp @@ -0,0 +1,329 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors +#include +#include +#include +#include +#include +#include "common.h" + +namespace fs = std::filesystem; +using namespace std::string_literals; +using namespace std::string_view_literals; +using Catch::Matchers::ContainsSubstring; + +struct TempPath : fs::path { + ~TempPath() { + fs::remove(*this); + } +}; + +// StructArray { age: u8, height: u16? } +[[nodiscard]] const vx_dtype *sample_dtype() { + vx_struct_fields_builder *builder = vx_struct_fields_builder_new(); + + constexpr auto age = "age"sv; + const vx_string *age_name = vx_string_new(age.data(), age.size()); + const vx_dtype *age_type = vx_dtype_new_primitive(PTYPE_U8, false); + vx_struct_fields_builder_add_field(builder, age_name, age_type); + + constexpr auto height = "height"sv; + const vx_string *height_name = vx_string_new(height.data(), height.size()); + const vx_dtype *height_type = vx_dtype_new_primitive(PTYPE_U16, true); + vx_struct_fields_builder_add_field(builder, height_name, height_type); + + vx_struct_fields *fields = vx_struct_fields_builder_finalize(builder); + return vx_dtype_new_struct(fields, false); +} + +constexpr size_t SAMPLE_ROWS = 100; +[[nodiscard]] const vx_array *sample_array() { + vx_validity validity = {}; + validity.type = VX_VALIDITY_NON_NULLABLE; + + vx_struct_column_builder *builder = vx_struct_column_builder_new(&validity, SAMPLE_ROWS); + + vx_error *error = nullptr; + + std::vector age_buffer; + for (uint8_t age = 0; age < SAMPLE_ROWS; ++age) { + age_buffer.push_back(age); + } + const vx_array *age_array = + vx_array_new_primitive(PTYPE_U8, age_buffer.data(), age_buffer.size(), &validity, &error); + require_no_error(error); + + vx_struct_column_builder_add_field(builder, "age", age_array, &error); + require_no_error(error); + vx_array_free(age_array); + + std::vector height_buffer; + for (uint16_t height = 0; height < SAMPLE_ROWS; ++height) { + height_buffer.push_back(1 + rand() % (height + 1)); + } + validity.type = VX_VALIDITY_ALL_VALID; + const vx_array *height_array = + vx_array_new_primitive(PTYPE_U16, height_buffer.data(), height_buffer.size(), &validity, &error); + require_no_error(error); + + vx_struct_column_builder_add_field(builder, "height", height_array, &error); + require_no_error(error); + vx_array_free(height_array); + + const vx_array *array = vx_struct_column_builder_finalize(builder, &error); + require_no_error(error); + return array; +} + +[[nodiscard]] TempPath write_sample(vx_session *session, fs::path &&path) { + REQUIRE(path.is_absolute()); + + const vx_dtype *dtype = sample_dtype(); + + vx_error *error = nullptr; + vx_array_sink *sink = vx_array_sink_open_file(session, path.c_str(), dtype, &error); + REQUIRE(sink != nullptr); + require_no_error(error); + vx_dtype_free(dtype); + + const vx_array *array = sample_array(); + vx_array_sink_push(sink, array, &error); + require_no_error(error); + vx_array_free(array); + + vx_array_sink_close(sink, &error); + require_no_error(error); + + INFO("Written vortex file "s + path.generic_string()); + return {path}; +} + +TEST_CASE("Creating datasources", "[datasource]") { + vx_session *session = vx_session_new(); + vx_error *error = nullptr; + + const vx_data_source *ds = vx_data_source_new(session, nullptr, &error); + REQUIRE(ds == nullptr); + REQUIRE(error != nullptr); + vx_error_free(error); + + vx_data_source_options opts = {}; + ds = vx_data_source_new(session, &opts, &error); + REQUIRE(ds == nullptr); + REQUIRE(error != nullptr); + REQUIRE_THAT(to_string(error), ContainsSubstring("opts.files")); + vx_error_free(error); + + // First file is opened eagerly + opts.files = "nonexistent"; + ds = vx_data_source_new(session, &opts, &error); + REQUIRE(ds == nullptr); + REQUIRE(error != nullptr); + REQUIRE_THAT(to_string(error), ContainsSubstring("No such file or directory")); + vx_error_free(error); + + opts.files = "/tmp/*.vortex"; + ds = vx_data_source_new(session, &opts, &error); + REQUIRE(ds == nullptr); + REQUIRE(error != nullptr); + // TODO Object store error: Generic LocalFileSystem error: Unable to walk dir: File + // system loop found: /dev/fd/6 points to an ancestor / + // REQUIRE_THAT(to_string(error), ContainsSubstring("No such file or directory")); + vx_error_free(error); + + TempPath file = write_sample(session, fs::current_path() / "empty.vortex"); + + for (const char *files : + // TODO Object store error: Generic LocalFileSystem error: Unable to walk dir: File + // system loop found: /dev/fd/6 points to an ancestor / + //{ file.c_str(), "*.vortex"} + {file.c_str()}) { + INFO("reading "s + files); + opts.files = files; + ds = vx_data_source_new(session, &opts, &error); + require_no_error(error); + REQUIRE(ds != nullptr); + vx_data_source_free(ds); + } + + vx_session_free(session); +} + +TEST_CASE("Write file", "[datasource]") { + vx_session *session = vx_session_new(); + TempPath path = write_sample(session, fs::current_path() / "write-file.vortex"); + vx_session_free(session); +} + +TEST_CASE("Write file and read back types", "[datasource]") { + vx_session *session = vx_session_new(); + TempPath path = write_sample(session, fs::current_path() / "write-read-types.vortex"); + vx_error *error = nullptr; + + vx_data_source_options opts = {}; + opts.files = path.c_str(); + + const vx_data_source *ds = vx_data_source_new(session, &opts, &error); + require_no_error(error); + REQUIRE(ds != nullptr); + vx_session_free(session); + + vx_data_source_row_count row_count = {}; + vx_data_source_get_row_count(ds, &row_count); + + CHECK(row_count.cardinality == VX_CARD_MAXIMUM); + CHECK(row_count.rows == SAMPLE_ROWS); + + const vx_dtype *data_source_dtype = vx_data_source_dtype(ds); + REQUIRE(vx_dtype_get_variant(data_source_dtype) == DTYPE_STRUCT); + + const vx_struct_fields *fields = vx_dtype_struct_dtype(data_source_dtype); + const size_t len = vx_struct_fields_nfields(fields); + REQUIRE(len == 2); + + const vx_dtype *col1_dtype = vx_struct_fields_field_dtype(fields, 0); + const vx_string *col1_name = vx_struct_fields_field_name(fields, 0); + + REQUIRE(vx_dtype_get_variant(col1_dtype) == DTYPE_PRIMITIVE); + REQUIRE(vx_dtype_primitive_ptype(col1_dtype) == PTYPE_U8); + REQUIRE_FALSE(vx_dtype_is_nullable(col1_dtype)); + REQUIRE(to_string_view(col1_name) == "age"); + vx_dtype_free(col1_dtype); + + const vx_dtype *col2_dtype = vx_struct_fields_field_dtype(fields, 1); + const vx_string *col2_name = vx_struct_fields_field_name(fields, 1); + + REQUIRE(vx_dtype_get_variant(col2_dtype) == DTYPE_PRIMITIVE); + REQUIRE(vx_dtype_primitive_ptype(col2_dtype) == PTYPE_U16); + REQUIRE(vx_dtype_is_nullable(col2_dtype)); + REQUIRE(to_string_view(col2_name) == "height"); + vx_dtype_free(col2_dtype); + + vx_data_source_free(ds); +} + +void verify_sample_array(const vx_array *array) { + REQUIRE(vx_array_len(array) == SAMPLE_ROWS); + REQUIRE(vx_array_has_dtype(array, DTYPE_STRUCT)); + + const vx_struct_fields *fields = vx_dtype_struct_dtype(vx_array_dtype(array)); + size_t len = vx_struct_fields_nfields(fields); + REQUIRE(len == 2); + + const vx_dtype *age_dtype = vx_struct_fields_field_dtype(fields, 0); + REQUIRE(vx_dtype_get_variant(age_dtype) == DTYPE_PRIMITIVE); + REQUIRE(vx_dtype_primitive_ptype(age_dtype) == PTYPE_U8); + vx_dtype_free(age_dtype); + const vx_string *age_name = vx_struct_fields_field_name(fields, 0); + REQUIRE(to_string_view(age_name) == "age"); + + const vx_dtype *height_dtype = vx_struct_fields_field_dtype(fields, 1); + REQUIRE(vx_dtype_get_variant(height_dtype) == DTYPE_PRIMITIVE); + REQUIRE(vx_dtype_primitive_ptype(height_dtype) == PTYPE_U16); + vx_dtype_free(height_dtype); + const vx_string *height_name = vx_struct_fields_field_name(fields, 1); + REQUIRE(to_string_view(height_name) == "height"); + + vx_error *error = nullptr; + vx_validity validity = {}; + vx_array_get_validity(array, &validity, &error); + require_no_error(error); + REQUIRE(validity.type == VX_VALIDITY_NON_NULLABLE); + + const vx_array *age_field = vx_array_get_field(array, 0, &error); + require_no_error(error); + REQUIRE(vx_array_has_dtype(age_field, DTYPE_PRIMITIVE)); + REQUIRE(vx_dtype_primitive_ptype(vx_array_dtype(age_field)) == PTYPE_U8); + REQUIRE(vx_array_len(age_field) == SAMPLE_ROWS); + for (size_t i = 0; i < SAMPLE_ROWS; ++i) { + REQUIRE(vx_array_get_u8(age_field, i) == i); + } + vx_array_free(age_field); + + const vx_array *height_field = vx_array_get_field(array, 1, &error); + require_no_error(error); + REQUIRE(vx_array_has_dtype(height_field, DTYPE_PRIMITIVE)); + REQUIRE(vx_dtype_primitive_ptype(vx_array_dtype(height_field)) == PTYPE_U16); + REQUIRE(vx_array_len(height_field) == SAMPLE_ROWS); + for (size_t i = 0; i < SAMPLE_ROWS; ++i) { + REQUIRE(vx_array_get_u16(height_field, i) > 0); + } + vx_array_free(height_field); + + REQUIRE(vx_array_get_field(array, 2, &error) == nullptr); + REQUIRE(error != nullptr); + vx_error_free(error); +} + +TEST_CASE("Requesting scans", "[datasource]") { + vx_session *session = vx_session_new(); + TempPath path = write_sample(session, fs::current_path() / "write-file2.vortex"); + + vx_data_source_options ds_options = {}; + ds_options.files = path.c_str(); + + vx_error *error = nullptr; + const vx_data_source *ds = vx_data_source_new(session, &ds_options, &error); + require_no_error(error); + REQUIRE(ds != nullptr); + + vx_scan *scan = vx_data_source_scan(ds, nullptr, nullptr, &error); + require_no_error(error); + REQUIRE(scan != nullptr); + vx_scan_free(scan); + + vx_scan_options options = {}; + options.max_threads = 1; + scan = vx_data_source_scan(ds, &options, nullptr, &error); + require_no_error(error); + REQUIRE(scan != nullptr); + vx_scan_free(scan); + + vx_data_source_free(ds); + vx_session_free(session); +} + +TEST_CASE("Basic scan", "[datasource]") { + vx_session *session = vx_session_new(); + TempPath path = write_sample(session, fs::current_path() / "basic-scan.vortex"); + vx_error *error = nullptr; + + vx_data_source_options ds_options = {}; + ds_options.files = path.c_str(); + + const vx_data_source *ds = vx_data_source_new(session, &ds_options, &error); + require_no_error(error); + REQUIRE(ds != nullptr); + + vx_scan *scan = vx_data_source_scan(ds, nullptr, nullptr, &error); + require_no_error(error); + REQUIRE(scan != nullptr); + + vx_partition *partition = vx_scan_next(scan, &error); + require_no_error(error); + + vx_estimate estimate = {}; + vx_partition_row_count(partition, &estimate, &error); + require_no_error(error); + REQUIRE(estimate.type == VX_ESTIMATE_EXACT); + REQUIRE(estimate.estimate == SAMPLE_ROWS); + + REQUIRE(vx_scan_next(scan, &error) == nullptr); + require_no_error(error); + + const vx_array *array = vx_partition_next(partition, &error); + require_no_error(error); + REQUIRE(array != nullptr); + + REQUIRE(vx_partition_next(partition, &error) == nullptr); + require_no_error(error); + + verify_sample_array(array); + + vx_array_free(array); + vx_partition_free(partition); + vx_scan_free(scan); + + vx_data_source_free(ds); + vx_session_free(session); +}