Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 85 additions & 14 deletions vortex-cuda/src/arrow/canonical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,7 @@ fn gather_binary_values(
///
/// Returns `None` for the buffer when Arrow can omit validity because all rows are valid.
///
/// Returned buffers use zeroed 4-byte padding so cuDF's word-sized mask reads stay in bounds.
/// Returned buffers use zeroed cuDF-sized padding so mask reads stay in bounds.
/// Bits at positions `>= len + arrow_offset` within the final data byte are unspecified, as
/// Arrow permits.
pub(super) async fn export_arrow_validity_buffer(
Expand Down Expand Up @@ -798,21 +798,26 @@ pub(super) async fn export_arrow_validity_buffer(
let bitmap = ctx.ensure_on_device(bits).await?;
// ArrowDeviceArray uses ArrowArray layout with its buffers being device pointers.
//
// Validity is one bit per row, addressed via the Arrow array offset. Reuse the bitmap
// when Vortex's validity offset already matches Arrow's; otherwise repack on the GPU
// so row i is at Arrow bit `arrow_offset + i`.
let bitmap = if meta.offset() == arrow_offset {
bitmap
} else {
repack_arrow_validity_buffer(&bitmap, meta.offset(), len, arrow_offset, ctx)?
};
// Validity is one bit per row, addressed via the Arrow array offset. Repack on the GPU
// so row i is at Arrow bit `arrow_offset + i` and the backing allocation has the
// zeroed cuDF-sized padding expected by Arrow Device consumers.
let bitmap =
repack_arrow_validity_buffer(&bitmap, meta.offset(), len, arrow_offset, ctx)?;
// Keep nullable exports self-describing for consumers that require exact null counts.
let null_count = count_arrow_validity_nulls(&bitmap, len, arrow_offset, ctx)?;
Ok((Some(bitmap), null_count))
}
}
}

/// Minimum backing allocation quantum for Arrow validity buffers handed to cuDF.
///
/// Arrow exposes only the logical bitmap byte length, but cuDF imports null masks into 64-byte
/// padded buffers and its kernels may read through that padded extent. Vortex therefore keeps the
/// exported `BufferHandle` sliced to Arrow's logical length while zero-padding the underlying CUDA
/// allocation to this boundary.
const CUDF_VALIDITY_BUFFER_PADDING: usize = 64;

/// Return the byte length needed for `len` validity bits at the given bit offset.
fn validity_bitmap_byte_len(len: usize, arrow_offset: usize) -> VortexResult<usize> {
Ok(len
Expand All @@ -821,12 +826,25 @@ fn validity_bitmap_byte_len(len: usize, arrow_offset: usize) -> VortexResult<usi
.div_ceil(8))
}

/// Return the CUDA allocation size for a logical Arrow validity bitmap byte length.
///
/// The returned allocation length may be larger than `byte_len`; callers slice the exported
/// `BufferHandle` back to `byte_len` while retaining the padded backing allocation. A zero-length
/// bitmap still gets one byte so we never request a zero-sized CUDA allocation.
fn validity_bitmap_allocation_byte_len(byte_len: usize) -> usize {
if byte_len == 0 {
1
} else {
byte_len.next_multiple_of(CUDF_VALIDITY_BUFFER_PADDING)
}
}

/// Allocate a zeroed device buffer with cuDF-safe padding for Arrow validity masks.
fn device_zeroed_byte_buffer(
byte_len: usize,
ctx: &mut CudaExecutionCtx,
) -> VortexResult<BufferHandle> {
let allocation_len = byte_len.next_multiple_of(size_of::<u32>()).max(1);
let allocation_len = validity_bitmap_allocation_byte_len(byte_len);
let mut buffer = ctx.device_alloc::<u8>(allocation_len)?;
ctx.stream()
.memset_zeros(&mut buffer)
Expand Down Expand Up @@ -894,8 +912,8 @@ pub fn count_arrow_validity_nulls(
///
/// Vortex bitmaps may start at any bit offset. Arrow exposes only a byte-addressed validity buffer
/// plus an array offset, so sliced compact exports need a GPU rewrite when either side has a
/// bit-level offset. The kernel writes the output one 64-bit word at a time, funnel-shifting two
/// adjacent input words, so the allocation is padded to whole words (zeroed by the edge masks).
/// bit-level offset. The output handle keeps Arrow's logical byte length, while the backing
/// allocation is zero-padded to cuDF's mask allocation size for consumers that read full masks.
pub fn repack_arrow_validity_buffer(
input_buffer: &BufferHandle,
input_offset: usize,
Expand All @@ -904,7 +922,13 @@ pub fn repack_arrow_validity_buffer(
ctx: &mut CudaExecutionCtx,
) -> VortexResult<BufferHandle> {
let output_bytes = validity_bitmap_byte_len(len, arrow_offset)?;
// The CUDA kernel writes the bitmap as u64 words, so round the logical byte length up to the
// number of words that cover the exported Arrow bytes.
let output_words = output_bytes.div_ceil(size_of::<u64>());
// `device_alloc::<u64>` takes a word count, while the padding policy is expressed in bytes.
// Round up so the padded byte allocation is fully represented by whole u64 words.
let allocation_words =
validity_bitmap_allocation_byte_len(output_bytes).div_ceil(size_of::<u64>());

// The kernel loads the input bitmap as 64-bit words.
if !input_buffer
Expand All @@ -914,7 +938,12 @@ pub fn repack_arrow_validity_buffer(
vortex_bail!("Arrow validity repack requires an 8-byte aligned device buffer");
}

let output = ctx.device_alloc::<u64>(output_words.max(1))?;
let mut output = ctx.device_alloc::<u64>(allocation_words.max(1))?;
// The repack kernel writes only the logical bitmap words. Zero the whole backing allocation so
// cuDF's padded mask reads see invalid rows, not uninitialized CUDA memory.
ctx.stream()
.memset_zeros(&mut output)
.map_err(|err| vortex_err!("Failed to zero Arrow validity buffer padding: {err}"))?;
let output_device = CudaDeviceBuffer::new(output);

if output_words > 0 {
Expand Down Expand Up @@ -1337,6 +1366,7 @@ mod tests {
use crate::arrow::ArrowDeviceArray;
use crate::arrow::DeviceArrayExt;
use crate::arrow::PrivateData;
use crate::arrow::canonical::CUDF_VALIDITY_BUFFER_PADDING;
use crate::arrow::canonical::export_arrow_validity_buffer;
use crate::arrow::canonical::repack_arrow_validity_buffer;
use crate::device_buffer::cuda_backing_allocation;
Expand Down Expand Up @@ -2955,7 +2985,43 @@ mod tests {
let backing_bytes = backing.to_host_sync();
assert_eq!(
backing_bytes.len(),
output_bytes.next_multiple_of(size_of::<u64>())
output_bytes.next_multiple_of(CUDF_VALIDITY_BUFFER_PADDING)
);
assert!(backing_bytes[output_bytes..].iter().all(|byte| *byte == 0));

Ok(())
}

#[crate::test]
async fn test_export_validity_buffer_pads_matching_offset() -> VortexResult<()> {
let mut ctx = CudaSession::create_execution_ctx(&crate::cuda_session())
.vortex_expect("failed to create execution context");

let len = 3;
let arrow_offset = 0;
let (buffer, null_count) = export_arrow_validity_buffer(
Validity::from(BitBuffer::from_iter([true, false, true])),
len,
arrow_offset,
&mut ctx,
)
.await?;
ctx.synchronize_stream()?;

assert_eq!(null_count, 1);
let buffer = buffer.vortex_expect("nullable validity should export a null buffer");
let output_bytes = (len + arrow_offset).div_ceil(8);
assert_eq!(buffer.len(), output_bytes);
let actual = BitBuffer::new(buffer.to_host_sync(), len + arrow_offset)
.iter()
.collect::<Vec<_>>();
assert_eq!(actual, [true, false, true]);

let backing = cuda_backing_allocation(&buffer)?;
let backing_bytes = backing.to_host_sync();
assert_eq!(
backing_bytes.len(),
output_bytes.next_multiple_of(CUDF_VALIDITY_BUFFER_PADDING)
);
assert!(backing_bytes[output_bytes..].iter().all(|byte| *byte == 0));

Expand Down Expand Up @@ -2983,6 +3049,11 @@ mod tests {
let bytes = buffer.to_host_sync();
assert_eq!(bytes.len(), (len + arrow_offset).div_ceil(8));
assert!(bytes.iter().all(|byte| *byte == 0));
let backing = cuda_backing_allocation(&buffer)?;
assert_eq!(
backing.len(),
bytes.len().next_multiple_of(CUDF_VALIDITY_BUFFER_PADDING)
);

Ok(())
}
Expand Down
61 changes: 61 additions & 0 deletions vortex-python-cuda/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# vortex-data-cuda

CUDA extension for [Vortex](https://vortex.dev). Exports a `vortex.Array` to
[RAPIDS cuDF](https://docs.rapids.ai/api/cudf/stable/) or any
[Arrow C Device](https://arrow.apache.org/docs/format/CDeviceDataInterface.html) consumer, on the
GPU. Imported as `vortex_cuda`.

## Install

```bash
pip install vortex-data vortex-data-cuda # versions must match; CUDA device required
```

`to_cudf` also needs RAPIDS `cudf` and `pylibcudf` in the environment.

## Export to cuDF

`to_cudf` converts via the Arrow C Device interface: struct arrays become a `cudf.DataFrame`,
everything else a `cudf.Series`. Importing `vortex_cuda` installs it as `vortex.Array.to_cudf`.

```python
import vortex, vortex_cuda
import pyarrow as pa

s = vortex.array([1, None, 3]).to_cudf() # -> cudf.Series
df = vortex_cuda.to_cudf( # struct -> cudf.DataFrame
vortex.Array.from_arrow(pa.table({"x": [1, None, 3], "y": [4.0, 5.0, 6.0]}))
)
```

Buffers are imported zero-copy; host arrays are moved to the GPU as part of the export. cuDF keeps
shared ownership for the lifetime of the result and any view derived from it, so no extra
bookkeeping is needed.

Signature: `to_cudf(obj, *, fallback="error")`. Only `fallback="error"` is supported
(`NotImplementedError` otherwise); raises `TypeError` for a non-`vortex.Array`, `RuntimeError`
without a CUDA device, `ImportError` if cuDF/pylibcudf are missing.

## Export an Arrow C Device array

`vortex.Array` exposes the standard `__arrow_c_device_array__` protocol (installed when CUDA is
available), so any Arrow-C-Device consumer can ingest it zero-copy:

```python
import vortex, vortex_cuda, pylibcudf

array = vortex.array([1, None, 3])
column = pylibcudf.Column.from_arrow(array) # via the protocol

schema_capsule, device_array_capsule = vortex_cuda.export_device_array(array) # raw capsules
```

`export_device_array` returns `PyCapsule`s named `"arrow_schema"` and `"arrow_device_array"`. The
consumer owns the exported structs and runs the Arrow release callbacks when done (libcudf does
this automatically); Vortex's device buffers stay alive until then.

## Notes

- Integer, float, bool, and string arrays (incl. nullable) and structs are supported; nulls are
preserved.
- A CUDA device is required; there is no CPU fallback.
98 changes: 97 additions & 1 deletion vortex-python-cuda/python/vortex_cuda/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,107 @@
# SPDX-FileCopyrightText: Copyright the Vortex contributors
# pyright: reportMissingModuleSource=false, reportPrivateUsage=false

import importlib
from typing import Protocol, cast

from . import _lib

# Private debug hooks used by CUDA bridge tests.
_debug_array_metadata_dtype = _lib._debug_array_metadata_dtype
_debug_array_metadata_display_values = _lib._debug_array_metadata_display_values
_debug_arrow_device_array_capsule_summary = _lib._debug_arrow_device_array_capsule_summary
_debug_consume_arrow_device_array_capsules = _lib._debug_consume_arrow_device_array_capsules
cuda_available = _lib.cuda_available
export_device_array = _lib.export_device_array

__all__ = ["cuda_available", "export_device_array"]

class _FromArrow(Protocol):
def from_arrow(self, obj: object) -> object: ...


class _FromPylibcudf(Protocol):
def from_pylibcudf(self, obj: object) -> object: ...


class _DataFrameFromPylibcudf(Protocol):
def from_pylibcudf(self, obj: object, metadata: dict[str, object] | None = None) -> object: ...


class _PylibcudfModule(Protocol):
Column: _FromArrow
Table: _FromArrow


class _CudfModule(Protocol):
Series: _FromPylibcudf
DataFrame: _DataFrameFromPylibcudf


_SUPPORTED_FALLBACKS = frozenset({"error"})


def _Array_to_cudf(self: object, *, fallback: str = "error") -> object:
return to_cudf(self, fallback=fallback)


def _Array___arrow_c_device_array__(
self: object,
requested_schema: object | None = None,
**kwargs: object,
) -> tuple[object, object]:
return export_device_array(self, requested_schema, **kwargs)


def _install_vortex_array_methods() -> None:
import vortex

setattr(vortex.Array, "to_cudf", _Array_to_cudf)
if cuda_available():
setattr(vortex.Array, "__arrow_c_device_array__", _Array___arrow_c_device_array__)


def _import_cudf_modules() -> tuple[_CudfModule, _PylibcudfModule]:
try:
cudf = importlib.import_module("cudf")
pylibcudf = importlib.import_module("pylibcudf")
except ImportError as err:
raise ImportError("vortex_cuda.to_cudf requires RAPIDS cuDF and pylibcudf to be installed") from err
return cast(_CudfModule, cast(object, cudf)), cast(_PylibcudfModule, cast(object, pylibcudf))


def to_cudf(obj: object, *, fallback: str = "error") -> object:
"""Convert a Vortex array to a cuDF object through the Arrow Device interface.

pylibcudf imports the exported Arrow Device array zero-copy and keeps shared ownership of
Vortex's device buffers (via libcudf's ``arrow_column``) for the lifetime of the returned
cuDF object and any view derived from it, so no extra keepalive is required here.

``fallback`` is reserved for future policy choices. The initial implementation
supports only ``fallback="error"`` and never falls back to host Arrow conversion.
"""
if fallback not in _SUPPORTED_FALLBACKS:
raise NotImplementedError("vortex_cuda.to_cudf currently supports only fallback='error'")

import vortex

if not isinstance(obj, vortex.Array):
raise TypeError(f"vortex_cuda.to_cudf expected a vortex.Array, got {type(obj).__name__}")

if not cuda_available():
raise RuntimeError("CUDA is not available; vortex_cuda.to_cudf requires a CUDA device")

cudf, pylibcudf = _import_cudf_modules()

dtype = obj.dtype
if isinstance(dtype, vortex.StructDType):
table = pylibcudf.Table.from_arrow(obj)
return cudf.DataFrame.from_pylibcudf(table, metadata={"columns": dtype.names()})

column = pylibcudf.Column.from_arrow(obj)
return cudf.Series.from_pylibcudf(column)


_install_vortex_array_methods()


__all__ = ["cuda_available", "export_device_array", "to_cudf"]
4 changes: 4 additions & 0 deletions vortex-python-cuda/python/vortex_cuda/_lib.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@

def _debug_array_metadata_dtype(array: object) -> str: ...
def _debug_array_metadata_display_values(array: object) -> str: ...
def _debug_arrow_device_array_capsule_summary(schema: object, device_array: object) -> dict[str, object]: ...
def _debug_consume_arrow_device_array_capsules(
schema: object, device_array: object
) -> tuple[bool, bool, bool, bool, bool, bool]: ...
def cuda_available() -> bool: ...
def export_device_array(
array: object, requested_schema: object | None = None, **kwargs: object
Expand Down
Loading
Loading