From c1c2c5cda3d7647b2fe8edd185dd26b0925a7e40 Mon Sep 17 00:00:00 2001 From: Mikhail Kot Date: Fri, 20 Mar 2026 17:40:25 +0000 Subject: [PATCH 1/2] C Scan API: initial --- proposed/0030-c-scan-api.md | 393 ++++++++++++++++++++++++++++++++++++ 1 file changed, 393 insertions(+) create mode 100644 proposed/0030-c-scan-api.md diff --git a/proposed/0030-c-scan-api.md b/proposed/0030-c-scan-api.md new file mode 100644 index 0000000..706deb5 --- /dev/null +++ b/proposed/0030-c-scan-api.md @@ -0,0 +1,393 @@ +- Start Date: 2026-03-13 +- Authors: Mikhail Kot + +# C Scan API + +There is a scan API for Rust-compatible code available at +https://github.com/vortex-data/vortex/tree/develop/vortex-scan. + +The goal of introducing C scan API is to make integration with non-Rust query +engines like Velox easier. In theory, such engines can use cxx.rs, but it +requires a lot of binding code and runtime bridging (see below). + +There exists a partial scan API for C exposed over files [1], but it's limited +to single-file URIs without globs, and it's also not thread-safe. Its main +flaws, however, are: + +- Inability to export to well-known format like ArrowArrayStream, +- Lack of introspection over produced `vx_array`s, and +- Inability to control scan on a level lower than just getting partitions and + `vx_array`s with filters and projections pre-configured. + + Why does Scan API need to expose `vx_array`s? What's the benefit of using + own format over ArrowArrayStream? + + The answer is "compression". Vortex DTypes don't exactly match with Arrow + physical encodings, so if you have i.e. a ConstantArray, you need to + decompress it into something Arrow-compatible. This was a major regression + in Duckdb integration. + +C++ API works it out by allowing to produce an ArrowArrayStream interface out of +ScanBuilder, but it uses Rust code directly via cxx.rs which we want to avoid +while adding C interfaces. C++ API future is outside of scope of this proposal +but it's expected to wrap C API directly over time, removing dependency on +cxx.rs for vortex-cxx. + +## Customization points + +Main goal of providing customization points is to do as little work as possible +in Vortex code and as much work as possible in the query engine. Some engines +may request control over scan execution like pruning. Engines like Duckdb have +own remote storage, caching, and globbing implementations. API still needs an +ability to fall back to own implementation. + +Still, Scan API is a relatively high-level concept, and if its level is not +suffifient, engines can resort to using a layout reader plan and executing it +directly. + +## Datasource + +A Datasource is a reference to multiple possibly remote files. When created, it +opens first file to determine the schema from DType, all other operations are +deferred till a scan is requested. You can request multiple file scans from a +Datasource. + +```c +// Opaque, generated by bindgen +typedef struct vx_data_source vx_data_source; +typedef struct vx_file_handle vx_file_handle; + +typedef void (*vx_list_callback)(void* userdata, const char* name, int is_dir); +typedef void (*vx_glob_callback)(void* userdata, const char* file); + +typedef struct vx_data_source_options { + // (1) Filesystem customization + + bool (*fs_use_vortex)(const char* schema, const char* path); + void (*fs_set_userdata)(void* userdata); + + // should be called after glob expansion, single-file mode + vx_file_handle (*fs_open)(void* userdata, const char* path, vx_error** err); + vx_file_handle (*fs_create)(void* userdata, const char* path, vx_error** err); + + // non-recursive, callback is invoked with each path + void fs_list(void* userdata, const char* path, vx_list_callback cb, vx_error *err); + + void fs_close(vx_file_handle handle); + uint64_t fs_size(vx_file_handle handle, vx_error *err); + + // positional read, doesn't change file open cursor + void fs_read(vx_file_handle handle, uint64_t offset, size_t len, uint8_t *buffer, + vx_error *err); + + // not needed for scanning but makes FS API complete + void fs_write(vx_file_handle handle, uint64_t offset, size_t len, uint8_t *buffer, + vx_error *err); + void fs_sync(vx_file_handle handle, vx_error *err); + + // (2) Globbing customization + + void (*glob)(const char* glob, vx_glob_callback cb, vx_error* err); + + /// (3) Cache customization + + void* (*cache_init)(vx_error* err); + void (*cache_free)(void* cache, vx_error* err); + void (*cache_get)(void* cache, const char* key, void** value, vx_error* err); + void (*cache_put)(void* cache, const char* key, void* value, vx_error* err); + void (*cache_delete)(void* cache, const char* key, vx_error* err); +} vx_data_source_options; + +// Addition to existing DType API, returns owned ArrowSchema which needs to +// be freed by the caller using release callback. If err is populated, out +// is not set. +void vx_dtype_to_arrow_schema(const vx_dtype* dtype, ArrowSchema* out, vx_error* err); + +/// Create a new owned datasource which must be freed by the caller. +const vx_data_source * +vx_data_source_new(const vx_session *session, const vx_data_source_options *opts, vx_error_out err); + +// datasource is Arc'd inside so a clone creates another reference rather +// than cloning the state fully. +const vx_data_source *vx_data_source_clone(const vx_data_source *ptr); + +// vx_dtype's lifetime is bound to datasource's lifetime, caller doesn't need +// to free it +const vx_dtype *vx_data_source_dtype(const vx_data_source *ds); + +typedef enum { + VX_CARD_UNKNOWN = 0, + VX_CARD_ESTIMATE = 1, + VX_CARD_MAXIMUM = 2, +} vx_cardinality; +typedef struct { + vx_cardinality cardinality; + uint64_t rows; +} vx_data_source_row_count; + +void vx_data_source_get_row_count(const vx_data_source *ds, vx_data_source_row_count *rc); +void vx_data_source_free(const vx_data_source *ptr); +``` + +1. Open local or remote file. Allow using vortex's filesystem or query engine's + filesystem e.g. duckdb fs. Allow partial customization e.g. duckdb fs for + local reads, but vortex fs for remote reads. Remote filesystem customization + point has the benefit of not duplicating credentials e.g. S3 access key + between query engine and vortex. Local implementation may be more performant. + Vortex rolled back full duckdb fs usage due to performance implications. +2. Open single file or multiple files. Query engines may have their own glob + expansion [4] which does HTTP IO. +3. Cache intermediate results. Main use case for Vortex is caching schema in + memory for footer cache and conversion results. Benefits are in no + requirement to open first file in a glob eagerly if there's a cache hit. + Vortex had had an integration with Duckdb object cache which was deleted in + favor of own implementation which led to a performance regression. + +When all three customization points are implemented, Vortex offloads all IO +to a query engine. + + Why not expose API to consume byte ranges or emit byte range requests for + the query engine to read and populate buffers? + + This approach is indeed easier than using a vtable with a specific + implementation, and requires slightly more scaffolding on the query engine + side, but it's significantly easier to implement on Vortex side and it is + coherent with current Rust implementation. + Similar implementation can be found in Arrow's RandomAccessFile or + parquet-rs's AsyncFileReader. + + However, as we're thinking of changing our Rust API, we can try to invest + time into this approach as well. + +Coupled with https://github.com/vortex-data/vortex/pull/7012 it also allows +Duckdb integration to abstract memory allocations to the database. + +## Runtime bridging + +In Rust API, a Datasource produces a stream of Partitions. A Partition produces +a stream of Arrays. API is required to be used in an async runtime, current +runtime for Vortex is tokio. + +Velox uses a non-coroutine but async runtime based on Folly executors. Engines +like CoroBase use a coroutine-based runtime. Duckdb and ClickHouse runtimes are sync +based on thread pools. Postgres runtime is sync based on processes. Some of +engines may use OS-specific IO like `io_uring`. + +All potential usages of our API may be grouped into 4 cases: + +- sync, single-thread runtime, trivial. (1) +- sync, multi-thread runtime. (2) +- async, multi-thread/coroutine. (3) +- async, single-thread. (4) + +Scan/Partition suggestions outlined below play well with (2) but not with (3) +and (4) because Vortex has its own runtime which will block on current thread +when i.e. getting an Array out of Partition. An async-friendly API basically +means exposing a coroutine/state machine which hands control over to the host on +IO. + +As Joe mentioned, we want to get away from the concept of partitions and emit +chunks of vx_array's directly from the scan. In this case, such state machine +may be expressed with roughly the following states: + +``` + Passed a file handle +START -> NEED_IO (offset, len) -> EXECUTE -> DONE + ^ When passed a file handle, instructs host to read following byte + range into buffer and return a handle to this buffer. + ^ Decompresses the buffer (executes the Array) + one step into other buffer + ^ + Array is executed/canonicalized to the form host can work with. + Host now transfers data from buffers to its own output format. +``` + +However, as the future of such approach is unclear, a async-unfriendly option is +described below + +## Scan + +Scan iterators: + +```c +``` + +Scan options: + +```c +typedef enum { + VX_S_INCLUDE_ALL = 0, + VX_S_INCLUDE_RANGE = 1, + VX_S_EXCLUDE_RANGE = 2, +} vx_scan_selection_include; + +typedef struct { + uint64_t *idx; + size_t idx_len; + // Roaring bitmaps won't be supported as for now + // If selection is VX_S_INCLUDE_ALL, these are not read. idx is copied by query + // engine on scan invocation and can be freed after a scan iterator is requested + vx_scan_selection_include include; +} vx_scan_selection; + +typedef struct vx_scan_selection { + const size_t* idx; + size_t idx_len; +} vx_scan_selection; + +typedef struct vx_scan_options { + // May be NULL which means "return all columns" + const vx_expression *projection; + + // May be NULL which means "no filter" + const vx_expression *filter; + + // Set both to 0 to indicate no range request + // Inclusive + uint64_t row_range_begin; + // Exclusive + uint64_t row_range_end; + + vx_scan_selection selection; + + // 0 for no limit + uint64_t limit; + int ordered; +} vx_scan_options; +``` + +Scan interface: + +```c +typedef struct vx_scan vx_scan; + +/** + * A partition is a contiguous chunk of memory from which you can interatively + * get vx_arrays. + */ +typedef struct vx_partition vx_partition; + +typedef enum { + VX_ESTIMATE_UNKNOWN = 0, + VX_ESTIMATE_EXACT = 1, + VX_ESTIMATE_INEXACT = 2, +} vx_estimate_boundary; + +typedef struct { + // If type is VX_P_ESTIMATE_UNKNOWN, estimate field is not populated + uint64_t estimate; + vx_estimate_boundary boundary; +} vx_estimate; + +// Users are encouraged to create worker threads depending on est->estimate to +// distribute work. +// opts and est may be nullptr. +// Requesting a scan doesn't do anything unless vx_partition_next is called. +vx_scan * +vx_data_source_scan(const vx_data_source *data_source, const vx_scan_options *options, vx_error_out err); + +/** + * Get next owned partition out of a scan request. + * Caller must free this partition using vx_partition_free. + * This method is thread-safe. + * If using in a sync multi-thread runtime, users are encouraged to create a + * worker thread per partition. + * Returns NULL and doesn't set err on exhaustion. + * Returns NULL and sets err on error. + */ +vx_partition *vx_scan_next(vx_scan *scan, vx_error_out err); + +// Request an array stream in Arrow format from a partition, consuming it +// fully. Not thread-safe, should be called once. +// stream is owned and must be freed using the release callback +void vx_partition_scan_arrow(const vx_partition *partition, FFI_ArrowArrayStream *stream, vx_error_out err); + +// Thread-unsafe. Get an owned vx_array of an iterator. +// Returns NULL if iterator is exhausted. Array is owned and must be +// freed by caller. +const vx_array *vx_partition_next(vx_partition *partition, vx_error_out err); +``` + +There are examples of APIs also exposing batch reads, but I doubt this is a good +option as for every ArrayRef the work that needs to be done to execute it may be +significant, and if you want to parallelize work, you can use this with +partitions, so each thread will be still busy with one ArrayRef at a time. +It can be introduced in the future. + +Scan functions are lazy as they operate on streams and it is +consumer's code responsibility to use parallelism at the desired degree. + +## What to do with `vx_array` + +The main question is how to transform outputs of iteration, vx_array, into +something query engines can operate with. You need to execute the array +iteratively till you recognize data and start exporting it. Duckdb integration +is mostly written in Rust with C++ code calling Rust's vtable functions. Rust +code does all data export. PoC implementation moves Duckdb to use C API but +leaves existing Rust code for exporting `vx_array` into DataChunk. + +However, the goal is not to interface with Rust code, so as a baseline the API +provides a way to scan partitions directly into ArrowArrayStream which should be +good enough for most consumers. + +## Cancellation + +There will be no option to cancel the scan as this isn't possibe on Rust API +either and this is a low priority task. + +## Testing + +C API doesn't have any testing. I suggest setting up a Catch3 testing target and +a CMake library for C API using FetchContent to download Catch. This way people +not working on Duckdb integration or FFI wouldn't need CMake and Catch. To +integrate C tests with `cargo test`, we can write a `build.rs` extension which +parses C test names and codegenerates rust tests targets calling to Catch. + +## Duckdb integration PoC + +``` +before: +Duckdb side Vortex side + +C++ C++ Rust +duckdb -> TableFunction vtable -> ffi wrapping -> vtable implementation + +after: + +C++ C++ C +duckdb -> TableFunction vtable -> ffi_wrapping -> vx_scan()* + +* - vx_array -> DataChunk reuses existing Rust code +``` + +https://github.com/vortex-data/vortex/tree/myrrc/scan-api-duckdb has an +implementation of using C Scan API for Duckdb scan integration. Duckdb has a +sync multi-threaded runtime, and table function is called from multiple threads +simultaneously. Users can save a per-thread state. + +The integration splits into following parts: +- DType -> LogicalType integration, done sans Temporal extension. +- Table function binding (creating a DataSource), done. +- Global state initialization (creating a Scan), done sans filter pushdown. +- Local state initialization (export batch id), done. +- Utility functions like cardinality estimates, done. +- vx_array -> DataChunk export, delegated to existing Rust code. + +On filter pushdown: projection pushdown requires exposing only `select()` +expression. On the other hand, filter pushdown requires `TableFilter -> Vortex +Expression` conversion which is significant porting so left out. + +On DataChunk export: it requires exposing features like array optimization, +validity masks, and other features, so left out. + +Table function uses Vortex _partition_ concepts as a work splitting term only, +i.e. one worker thread operating on one or multiple partitions. Each thread +pulls out partitions from `vx_scan_next` (thus it's thread-safe) and then +works on its own partition without synchronization. + +[1] `vx_file_scan` + +[2] Need to control pruning + https://spiraldb.slack.com/archives/C0AJS0HDS6R/p1773068549282999 + +[4] e.g. Duckdb MultiFileReader / MultiFileList From ae6ccfdd14d4e7b37cad1b7e7de85c17be775f62 Mon Sep 17 00:00:00 2001 From: Mikhail Kot Date: Tue, 31 Mar 2026 11:57:47 +0100 Subject: [PATCH 2/2] split API into 3 layers --- proposed/0030-c-scan-api.md | 483 +++++++++++++++++++----------------- 1 file changed, 251 insertions(+), 232 deletions(-) diff --git a/proposed/0030-c-scan-api.md b/proposed/0030-c-scan-api.md index 706deb5..0dcc0fc 100644 --- a/proposed/0030-c-scan-api.md +++ b/proposed/0030-c-scan-api.md @@ -2,110 +2,153 @@ - Authors: Mikhail Kot # C Scan API +## Summary + +Provide a layered C Scan API for non-Rust clients[^1], each layer exposing more +capabilities to the host: + +- High level layer is intended for sync multi-threaded or multi-process runtimes + like DuckDB, ClickHouse, or Postgres. It provides a `DataSource -> + Partition -> Array` chain with thread safety. It manages the event loop inside + Vortex but allows offloading IO and memory allocations to the host. This layer + roughly matches scan API + [exposed](https://github.com/vortex-data/vortex/tree/develop/vortex-scan) + in Rust. +- Middle level layer is intended for async runtimes like Velox which want to run + their own event loop. In addition to offloaded IO, it doesn't do any things + eagerly[^2] but returns control on IO or CPU work for host to schedule. + This layer exposes a coroutine-like state machine. +- Low level layer is intended for clients which need greater control over + scan scheduling. This layer roughly matches a + [LayoutReader](https://github.com/vortex-data/vortex/tree/develop/vortex-layout/src/reader.rs) + in Rust. + +| Level | Allocations | IO | Event loop | Scan planning | +| ----- | ----------- | -- | ---------- | ------------- | +| High | Host/Vortex | Host/Vortex | Vortex | Vortex | +| Middle | Host | Host | Host | Vortex | +| Low | Host | Host | Host | Host | + +## Motivation + +Vortex is a Rust project, and thus integration with non-Rust clients requires a +decent amount of scaffolding[^3] which lowers down the probability of such +integration. As reading files is most important feature for a file format, +exposing it to C-compatible uses lowers down the integration costs. + +Additional motivation is our DuckDB integration which requires C++-to-Rust +and Rust-to-C++ bridging. Replacing it with direct calls to C API would make +the project much smaller. + +[Current](https://github.com/vortex-data/vortex/tree/develop/vortex-ffi/src/file.rs) +C Scan API in the form of `vx_file_scan` is incomplete for the following +reasons: + +- Thread unsafety: host needs to serialize calls to `vx_array_iterator`. +- Lack of support for file globs. +- Lack of support for exporting to `ArrowSchema` and `ArrowArrayStream`. +- Lack of introspection over produced `vx_array` objects. + + Vortex also has a C++ API in vortex-cxx which wraps Rust code. Its future is + outside of scope of this proposal but it's expected to wrap C scan API, + removing dependency on cxx.rs for vortex-cxx. + +## Motivation for layered API + +Velox uses a non-coroutine but async runtime based on Folly executors. CoroBase +uses a coroutine-based runtime. DuckDB and ClickHouse runtimes are sync based on +thread pools. Postgres runtime is sync based on processes. Some of engines may +use OS-specific IO like `io_uring`. + +Scan API usage may be grouped into two distinct cases: + +- sync, single-thread/multi-thread/multi-process runtime, and +- async, multi-thread/coroutine runtime. + +Exposing just the high level would not work well with async hosts as there's +no way to suspend to host on IO operations. Exposing only the middle layer +requires sync hosts to manage their own event loop which is tedious. + +Another benefit of splitting API into layers is that it can be discussed and +implemented separately. + +## High level API -There is a scan API for Rust-compatible code available at -https://github.com/vortex-data/vortex/tree/develop/vortex-scan. - -The goal of introducing C scan API is to make integration with non-Rust query -engines like Velox easier. In theory, such engines can use cxx.rs, but it -requires a lot of binding code and runtime bridging (see below). - -There exists a partial scan API for C exposed over files [1], but it's limited -to single-file URIs without globs, and it's also not thread-safe. Its main -flaws, however, are: - -- Inability to export to well-known format like ArrowArrayStream, -- Lack of introspection over produced `vx_array`s, and -- Inability to control scan on a level lower than just getting partitions and - `vx_array`s with filters and projections pre-configured. - - Why does Scan API need to expose `vx_array`s? What's the benefit of using - own format over ArrowArrayStream? - - The answer is "compression". Vortex DTypes don't exactly match with Arrow - physical encodings, so if you have i.e. a ConstantArray, you need to - decompress it into something Arrow-compatible. This was a major regression - in Duckdb integration. - -C++ API works it out by allowing to produce an ArrowArrayStream interface out of -ScanBuilder, but it uses Rust code directly via cxx.rs which we want to avoid -while adding C interfaces. C++ API future is outside of scope of this proposal -but it's expected to wrap C API directly over time, removing dependency on -cxx.rs for vortex-cxx. - -## Customization points - -Main goal of providing customization points is to do as little work as possible -in Vortex code and as much work as possible in the query engine. Some engines -may request control over scan execution like pruning. Engines like Duckdb have -own remote storage, caching, and globbing implementations. API still needs an -ability to fall back to own implementation. - -Still, Scan API is a relatively high-level concept, and if its level is not -suffifient, engines can resort to using a layout reader plan and executing it -directly. +``` +┌──────────┐ +│DataSource│ +└─┬────────┘ + ▼ produces a +┌────┐ +│Scan│ +└┬┬┬─┘ + │││ worker threads pull a + ▼▼▼ +┌───────────┐ +│ Partition │ (thread safe) +└┬──────────┘ + │ which produces an + └─►Array (thread unsafe) +``` -## Datasource +### DataSource -A Datasource is a reference to multiple possibly remote files. When created, it +A DataSource is a reference to multiple possibly remote files. When created, it opens first file to determine the schema from DType, all other operations are -deferred till a scan is requested. You can request multiple file scans from a -Datasource. +deferred till a scan is requested and executed. You can request multiple file +scans from a DataSource. ```c -// Opaque, generated by bindgen +// bindgen typedef struct vx_data_source vx_data_source; typedef struct vx_file_handle vx_file_handle; +typedef struct vx_cache_handle *vx_cache_handle; typedef void (*vx_list_callback)(void* userdata, const char* name, int is_dir); typedef void (*vx_glob_callback)(void* userdata, const char* file); typedef struct vx_data_source_options { // (1) Filesystem customization - bool (*fs_use_vortex)(const char* schema, const char* path); void (*fs_set_userdata)(void* userdata); - // should be called after glob expansion, single-file mode - vx_file_handle (*fs_open)(void* userdata, const char* path, vx_error** err); - vx_file_handle (*fs_create)(void* userdata, const char* path, vx_error** err); + // Called after glob expansion, single-file mode + vx_file_handle (*fs_open)(void* userdata, const char* path, + vx_error** err); + vx_file_handle (*fs_create)(void* userdata, const char* path, + vx_error** err); // non-recursive, callback is invoked with each path - void fs_list(void* userdata, const char* path, vx_list_callback cb, vx_error *err); + void fs_list(void* userdata, const char* path, vx_list_callback cb, + vx_error** err); void fs_close(vx_file_handle handle); uint64_t fs_size(vx_file_handle handle, vx_error *err); // positional read, doesn't change file open cursor - void fs_read(vx_file_handle handle, uint64_t offset, size_t len, uint8_t *buffer, - vx_error *err); + void fs_read(vx_file_handle handle, uint64_t offset, size_t len, + uint8_t *buffer, vx_error** err); // not needed for scanning but makes FS API complete - void fs_write(vx_file_handle handle, uint64_t offset, size_t len, uint8_t *buffer, - vx_error *err); - void fs_sync(vx_file_handle handle, vx_error *err); - - // (2) Globbing customization - - void (*glob)(const char* glob, vx_glob_callback cb, vx_error* err); + void fs_write(vx_file_handle handle, uint64_t offset, size_t len, + uint8_t *buffer, vx_error** err); - /// (3) Cache customization + void fs_sync(vx_file_handle handle, vx_error** err); - void* (*cache_init)(vx_error* err); - void (*cache_free)(void* cache, vx_error* err); - void (*cache_get)(void* cache, const char* key, void** value, vx_error* err); - void (*cache_put)(void* cache, const char* key, void* value, vx_error* err); - void (*cache_delete)(void* cache, const char* key, vx_error* err); + // (2) Globbing customization + void (*glob)(const char* glob, vx_glob_callback cb, vx_error** err); } vx_data_source_options; // Addition to existing DType API, returns owned ArrowSchema which needs to // be freed by the caller using release callback. If err is populated, out // is not set. -void vx_dtype_to_arrow_schema(const vx_dtype* dtype, ArrowSchema* out, vx_error* err); +void vx_dtype_to_arrow_schema(const vx_dtype* dtype, ArrowSchema* out, + vx_error* err); /// Create a new owned datasource which must be freed by the caller. const vx_data_source * -vx_data_source_new(const vx_session *session, const vx_data_source_options *opts, vx_error_out err); +vx_data_source_new(const vx_session *session, + const vx_data_source_options *opts, vx_error_out err); // datasource is Arc'd inside so a clone creates another reference rather // than cloning the state fully. @@ -125,92 +168,38 @@ typedef struct { uint64_t rows; } vx_data_source_row_count; -void vx_data_source_get_row_count(const vx_data_source *ds, vx_data_source_row_count *rc); -void vx_data_source_free(const vx_data_source *ptr); +void vx_data_source_get_row_count(const vx_data_source *ds, + vx_data_source_row_count *rc); +void vx_data_source_free(const vx_data_source *ds); ``` -1. Open local or remote file. Allow using vortex's filesystem or query engine's - filesystem e.g. duckdb fs. Allow partial customization e.g. duckdb fs for - local reads, but vortex fs for remote reads. Remote filesystem customization +1. Open local or remote file. Allow using Vortex's filesystem or host's + filesystem e.g. DuckDB fs. Allow partial customization e.g. DuckDB fs for + local reads, but Vortex fs for remote reads. Host filesystem customization point has the benefit of not duplicating credentials e.g. S3 access key - between query engine and vortex. Local implementation may be more performant. - Vortex rolled back full duckdb fs usage due to performance implications. -2. Open single file or multiple files. Query engines may have their own glob - expansion [4] which does HTTP IO. -3. Cache intermediate results. Main use case for Vortex is caching schema in - memory for footer cache and conversion results. Benefits are in no - requirement to open first file in a glob eagerly if there's a cache hit. - Vortex had had an integration with Duckdb object cache which was deleted in - favor of own implementation which led to a performance regression. - -When all three customization points are implemented, Vortex offloads all IO -to a query engine. - - Why not expose API to consume byte ranges or emit byte range requests for - the query engine to read and populate buffers? - - This approach is indeed easier than using a vtable with a specific - implementation, and requires slightly more scaffolding on the query engine - side, but it's significantly easier to implement on Vortex side and it is - coherent with current Rust implementation. - Similar implementation can be found in Arrow's RandomAccessFile or - parquet-rs's AsyncFileReader. - - However, as we're thinking of changing our Rust API, we can try to invest - time into this approach as well. - -Coupled with https://github.com/vortex-data/vortex/pull/7012 it also allows -Duckdb integration to abstract memory allocations to the database. - -## Runtime bridging - -In Rust API, a Datasource produces a stream of Partitions. A Partition produces -a stream of Arrays. API is required to be used in an async runtime, current -runtime for Vortex is tokio. + between host and Vortex. Local implementation may be more performant. +2. Open single file or multiple files. Hosts may have their own glob expansion + [^4] which does HTTP IO. -Velox uses a non-coroutine but async runtime based on Folly executors. Engines -like CoroBase use a coroutine-based runtime. Duckdb and ClickHouse runtimes are sync -based on thread pools. Postgres runtime is sync based on processes. Some of -engines may use OS-specific IO like `io_uring`. - -All potential usages of our API may be grouped into 4 cases: - -- sync, single-thread runtime, trivial. (1) -- sync, multi-thread runtime. (2) -- async, multi-thread/coroutine. (3) -- async, single-thread. (4) - -Scan/Partition suggestions outlined below play well with (2) but not with (3) -and (4) because Vortex has its own runtime which will block on current thread -when i.e. getting an Array out of Partition. An async-friendly API basically -means exposing a coroutine/state machine which hands control over to the host on -IO. - -As Joe mentioned, we want to get away from the concept of partitions and emit -chunks of vx_array's directly from the scan. In this case, such state machine -may be expressed with roughly the following states: +When both customization points are implemented, Vortex offloads all IO +to a query engine. -``` - Passed a file handle -START -> NEED_IO (offset, len) -> EXECUTE -> DONE - ^ When passed a file handle, instructs host to read following byte - range into buffer and return a handle to this buffer. - ^ Decompresses the buffer (executes the Array) - one step into other buffer - ^ - Array is executed/canonicalized to the form host can work with. - Host now transfers data from buffers to its own output format. -``` + Memory allocation customization is out of scope of this proposal, but it's + possible for Vortex to expose bringing allocator from outside for buffers. -However, as the future of such approach is unclear, a async-unfriendly option is -described below +### Scan -## Scan +A Scan is a one-time traversal of files in a DataSource. A Scan can't be +restarted once requested. Hosts are encouraged to utilize multiple threads for +scanning. Scan thread-safely produces Partitions[^5] which are chunks of work +for a thread[^6]. -Scan iterators: +Host may instruct a Scan about its parallelism degree by passing `max_threads`. +Vortex may return less partitions than there are `max_threads` but won't return +more. -```c -``` +There will be no option to cancel a scan as this isn't possible on Rust side +either and this is a low priority task. Scan options: @@ -221,50 +210,34 @@ typedef enum { VX_S_EXCLUDE_RANGE = 2, } vx_scan_selection_include; +// Roaring bitmaps won't be supported. +// If selection is VX_S_INCLUDE_ALL, idx is not read. idx is copied +// by host on scan invocation and can be freed after first partition is +// requested typedef struct { uint64_t *idx; size_t idx_len; - // Roaring bitmaps won't be supported as for now - // If selection is VX_S_INCLUDE_ALL, these are not read. idx is copied by query - // engine on scan invocation and can be freed after a scan iterator is requested vx_scan_selection_include include; } vx_scan_selection; -typedef struct vx_scan_selection { - const size_t* idx; - size_t idx_len; -} vx_scan_selection; - typedef struct vx_scan_options { - // May be NULL which means "return all columns" - const vx_expression *projection; - - // May be NULL which means "no filter" - const vx_expression *filter; - - // Set both to 0 to indicate no range request - // Inclusive - uint64_t row_range_begin; - // Exclusive - uint64_t row_range_end; - + const vx_expression *projection; // NULL means "return all columns" + const vx_expression *filter; // NULL means "no filter" + uint64_t row_range_begin; // Inclusive, [0; 0) means "all rows" + uint64_t row_range_end; // Exclusive vx_scan_selection selection; - - // 0 for no limit - uint64_t limit; - int ordered; + uint64_t limit; // 0 means no limit + uint64_t max_threads; // 0 means no limit. + bool ordered; // 0 means unordered scan } vx_scan_options; ``` Scan interface: ```c +// bindgen typedef struct vx_scan vx_scan; - -/** - * A partition is a contiguous chunk of memory from which you can interatively - * get vx_arrays. - */ +// A Partition is a chunk of work for a thread. typedef struct vx_partition vx_partition; typedef enum { @@ -273,86 +246,121 @@ typedef enum { VX_ESTIMATE_INEXACT = 2, } vx_estimate_boundary; +// If type is VX_P_ESTIMATE_UNKNOWN, estimate is not populated. typedef struct { - // If type is VX_P_ESTIMATE_UNKNOWN, estimate field is not populated uint64_t estimate; - vx_estimate_boundary boundary; + vx_estimate_boundary type; } vx_estimate; -// Users are encouraged to create worker threads depending on est->estimate to -// distribute work. -// opts and est may be nullptr. -// Requesting a scan doesn't do anything unless vx_partition_next is called. +// Users are encouraged to create worker threads depending on +// estimate->estimate to distribute work. +// options and estimate fields may be NUL. +// Calling vx_data_source_scan doesn't do IO unless vx_scan_next is called. +// vx_scan can outlive vx_data_source. vx_scan * -vx_data_source_scan(const vx_data_source *data_source, const vx_scan_options *options, vx_error_out err); - -/** - * Get next owned partition out of a scan request. - * Caller must free this partition using vx_partition_free. - * This method is thread-safe. - * If using in a sync multi-thread runtime, users are encouraged to create a - * worker thread per partition. - * Returns NULL and doesn't set err on exhaustion. - * Returns NULL and sets err on error. - */ -vx_partition *vx_scan_next(vx_scan *scan, vx_error_out err); +vx_data_source_scan(const vx_data_source *ds, const vx_scan_options *options, + vx_estimate* estimate, vx_error **err); +``` + +### Partition + +A Partition allows a worker thread to produce Arrays thread-unsafely. +Partitions also allow exporting the data to ArrowArrayStream for hosts +that don't want to introspect Arrays. Hosts should call `vx_scan_next` +and `vx_partition_next` till they return NULL which signals there's no more +data. + +```c +// Get next owned partition out of a scan request. +// Caller must free this partition using vx_partition_free. +// This method is thread-safe. +// Hosts are encouraged to create a worker thread per partition. +// Returns NULL and doesn't set err on exhaustion. +// Returns NULL and sets err on error. +vx_partition *vx_scan_next(vx_scan *scan, vx_error **err); // Request an array stream in Arrow format from a partition, consuming it // fully. Not thread-safe, should be called once. // stream is owned and must be freed using the release callback -void vx_partition_scan_arrow(const vx_partition *partition, FFI_ArrowArrayStream *stream, vx_error_out err); +void vx_partition_scan_arrow(const vx_partition *partition, + ArrowArrayStream *stream, vx_error_out err); // Thread-unsafe. Get an owned vx_array of an iterator. -// Returns NULL if iterator is exhausted. Array is owned and must be -// freed by caller. -const vx_array *vx_partition_next(vx_partition *partition, vx_error_out err); +// Returns NULL and sets err to NULL if iterator is exhausted. +// Array must be freed by caller. +const vx_array *vx_partition_next(vx_partition *partition, vx_error **err); ``` -There are examples of APIs also exposing batch reads, but I doubt this is a good -option as for every ArrayRef the work that needs to be done to execute it may be -significant, and if you want to parallelize work, you can use this with -partitions, so each thread will be still busy with one ArrayRef at a time. -It can be introduced in the future. +### Array introspection -Scan functions are lazy as they operate on streams and it is -consumer's code responsibility to use parallelism at the desired degree. +The main question is how to transform outputs of iteration, `vx_array`, into +something query engines can operate with. You need to execute the array +iteratively till you recognize data and start exporting it. Thus API provides a +way to scan partitions directly into ArrowArrayStream which should be good +enough for most hosts. -## What to do with `vx_array` +## Middle level -The main question is how to transform outputs of iteration, vx_array, into -something query engines can operate with. You need to execute the array -iteratively till you recognize data and start exporting it. Duckdb integration -is mostly written in Rust with C++ code calling Rust's vtable functions. Rust -code does all data export. PoC implementation moves Duckdb to use C API but -leaves existing Rust code for exporting `vx_array` into DataChunk. +TODO -However, the goal is not to interface with Rust code, so as a baseline the API -provides a way to scan partitions directly into ArrowArrayStream which should be -good enough for most consumers. +## Low level -## Cancellation +TODO -There will be no option to cancel the scan as this isn't possibe on Rust API -either and this is a low priority task. +## Compatibility + +No impact. Existing C Scan API will exist for some time but will be removed +eventually. + +## Drawbacks + +TODO + +## Alternatives -## Testing +TODO -C API doesn't have any testing. I suggest setting up a Catch3 testing target and -a CMake library for C API using FetchContent to download Catch. This way people -not working on Duckdb integration or FFI wouldn't need CMake and Catch. To -integrate C tests with `cargo test`, we can write a `build.rs` extension which -parses C test names and codegenerates rust tests targets calling to Catch. +## Prior Art -## Duckdb integration PoC +- Dividing scan requests into Partitions for threads is taken from DataFusion's + [partitioned streams](https://docs.rs/datafusion/latest/datafusion/physical_plan/enum.Partitioning.html). +- Making Partitions produce Arrays without synchronization mimicks + [Arrow Stream](https://arrow.apache.org/docs/format/CStreamInterface.html) + interface. + +## Unresolved Questions + +- Should high level API have a way to use host's persistent caching options? + In-memory caching may be implemented using host allocator only but for + persistent caching we need additional filesystem customizations i.e. cache + location path. +- Should high level API expose batch reads from Partition? There are plans to + deprecate Partitions on Rust side. +- What introspecion should high level API have for hosts which aren't satisfied + with `Vortex -> ArrowArrayStream` conversion? Should there be iterative + execution API? +- How should API expose definitions of `ArrowSchema` and `ArrowArrayStream`? + Rust's implementation exposes `FFI_` structs and you need to typedef + them manually. Should API bundle `nanoarrow` or let hosts handle typedefs + themselves since the writer is ABI-compatible? Should we gate it behind a + macro? + + ```c + typedef FFI_ArrowSchema ArrowSchema; + #include "vortex.h" + ``` + +## High level API integration example: DuckDB ``` -before: -Duckdb side Vortex side +Before: + +DuckDB Vortex C++ C++ Rust duckdb -> TableFunction vtable -> ffi wrapping -> vtable implementation -after: +After: C++ C++ C duckdb -> TableFunction vtable -> ffi_wrapping -> vx_scan()* @@ -385,9 +393,20 @@ i.e. one worker thread operating on one or multiple partitions. Each thread pulls out partitions from `vx_scan_next` (thus it's thread-safe) and then works on its own partition without synchronization. -[1] `vx_file_scan` - -[2] Need to control pruning - https://spiraldb.slack.com/archives/C0AJS0HDS6R/p1773068549282999 - -[4] e.g. Duckdb MultiFileReader / MultiFileList +## Footnotes + +[^1]: Clients of this API would mostly be query engines like Velox or + ClickHouse, but may as well be our own integrations like vortex-duckdb. +[^2]: Like opening the first file in a glob expression to determine schema. +[^3]: Exposed Rust ABI is not stable, so clients can't use cbindgen. C++ clients + can use cxx.rs but this still requires writing manual bindings and runtime + bridging. +[^4]: DuckDB MultiFileReader and MultiFileList. +[^5]: The name may be misleading as it doesn't correspond to Rust side's + Partitions. +[^6]: DuckDB integration currently hides Partitions and Arrays behind a single + [thread-safe iterator](https://github.com/vortex-data/vortex/blob/e8cd130c8ccac45082a0b458b1f843c4313555bf/vortex-duckdb/src/datasource.rs#L151) + which implies unnecessary intra-thread synchronization on pulling data. + On the other hand, the producer, an async crossbeam queue, allows smoothing + out uneven workloads from the Vortex side, and if that's removed, Vortex's + partition scheduling must be precise.