diff --git a/.gitignore b/.gitignore index 30e1695322..b889829b72 100644 --- a/.gitignore +++ b/.gitignore @@ -68,3 +68,4 @@ docs/static/viewer docs/static/react rust/perspective-server/build target/ +dist-gh-pages diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 312e7d9f13..d9f82b7d17 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1129,6 +1129,12 @@ importers: perspective-4-4-0: specifier: npm:@perspective-dev/client@4.4.0 version: '@perspective-dev/client@4.4.0' + perspective-4-5-0: + specifier: npm:@perspective-dev/client@4.5.0 + version: '@perspective-dev/client@4.5.0' + perspective-4-5-1: + specifier: npm:@perspective-dev/client@4.5.1 + version: '@perspective-dev/client@4.5.1' devDependencies: '@perspective-dev/client': specifier: 'workspace:' @@ -2054,12 +2060,21 @@ packages: '@perspective-dev/client@4.4.0': resolution: {integrity: sha512-AvfWskJJbfOZl6k2Gfntw/c2a+KDnCy5GfDvEQohdXUNMg+0uec//QuL0ZaqclQA/WfRvm4k+YHIUiBhT8EYew==} + '@perspective-dev/client@4.5.0': + resolution: {integrity: sha512-eTJqHvmZOFvr+QDOZlmg6vmYuL7CtDAawFSGWhzuYw50PdC0osdKIvQmcoTY5eUcekNRCrOwGaGlp+KihpCAkA==} + + '@perspective-dev/client@4.5.1': + resolution: {integrity: sha512-Yb/huLDHEQ9FFeTloipHKM2oVtgTkHLIEqh971eznOc0sNQGg9Sd7flju8xlKe8KMGe3VCo7/wlvxr4GqBi1Bw==} + '@perspective-dev/server@4.3.0': resolution: {integrity: sha512-UH3ADscynozVx42RF07DTBmPE/0PUwH+SS0cgvMLuLfBjtvnPm6msfOU0tHlgFnNZOHJO0q6RZ9WI5fD6wF07A==} '@perspective-dev/server@4.4.1': resolution: {integrity: sha512-eOWr0qy2T2KwClbYgRjfDrcnkR023gHn/7J/tQEPa6rEYTfqXwicWsObFU4mZHi3rTySxYf6qsY6O+SJIM+BzA==} + '@perspective-dev/server@4.5.1': + resolution: {integrity: sha512-VzR1Z+SdUlB4K1hTHFxVE7PLxFXtTb008fBGMg/uo3Y3qDbJh/9IcZjOwvpI86k6QUZ49VR2w3BDGdh7YzF/8A==} + '@playwright/experimental-ct-core@1.58.0': resolution: {integrity: sha512-YZsjApZmRE78Kp2E6OtAvFFVheUyZDfrlZMf+lfnSshmYHrrJUy3bhdCe7EPCWsE12XfCVVAv6G0btiyAx8d0w==} engines: {node: '>=18'} @@ -6684,10 +6699,36 @@ snapshots: - supports-color - utf-8-validate + '@perspective-dev/client@4.5.0': + dependencies: + '@perspective-dev/server': 4.5.1 + pro_self_extracting_wasm: 0.0.9 + stoppable: 1.1.0 + ws: 8.18.3 + transitivePeerDependencies: + - bufferutil + - debug + - supports-color + - utf-8-validate + + '@perspective-dev/client@4.5.1': + dependencies: + '@perspective-dev/server': 4.5.1 + pro_self_extracting_wasm: 0.0.9 + stoppable: 1.1.0 + ws: 8.18.3 + transitivePeerDependencies: + - bufferutil + - debug + - supports-color + - utf-8-validate + '@perspective-dev/server@4.3.0': {} '@perspective-dev/server@4.4.1': {} + '@perspective-dev/server@4.5.1': {} + '@playwright/experimental-ct-core@1.58.0(@types/node@24.9.1)(jiti@1.21.7)(less@4.4.2)(lightningcss@1.32.0)(terser@5.44.0)(tsx@4.20.6)(yaml@2.8.1)': dependencies: playwright: 1.58.0 diff --git a/rust/perspective-client/perspective.proto b/rust/perspective-client/perspective.proto index 853ef78e48..515bc090d1 100644 --- a/rust/perspective-client/perspective.proto +++ b/rust/perspective-client/perspective.proto @@ -334,6 +334,11 @@ message MakeTableReq { string make_index_table = 1; uint32 make_limit_table = 2; }; + + // Back this Table's canonical data with the on-disk storage backend + // (memory-mapped file on native; OPFS on WASM) instead of memory. + // Orthogonal to `make_table_type`, so it is a standalone field. + optional bool page_to_disk = 3; } } message MakeTableResp {} diff --git a/rust/perspective-client/src/rust/client.rs b/rust/perspective-client/src/rust/client.rs index e845704b6d..b100160268 100644 --- a/rust/perspective-client/src/rust/client.rs +++ b/rust/perspective-client/src/rust/client.rs @@ -618,6 +618,7 @@ impl Client { ClientResp::MakeJoinTableResp(_) => Ok(Table::new(entity_id, client, TableOptions { index: Some(on.to_owned()), limit: None, + page_to_disk: None, })), resp => Err(resp.into()), } @@ -662,6 +663,9 @@ impl Client { let options = TableOptions { index: info.index, limit: info.limit, + // `page_to_disk` is a server-side property not surfaced in table + // info; it does not affect client-side behavior. + page_to_disk: None, }; let client = self.clone(); diff --git a/rust/perspective-client/src/rust/table.rs b/rust/perspective-client/src/rust/table.rs index 4ecf4ae49a..6ecc5117f6 100644 --- a/rust/perspective-client/src/rust/table.rs +++ b/rust/perspective-client/src/rust/table.rs @@ -92,6 +92,13 @@ pub struct TableInitOptions { #[serde(default)] #[ts(optional)] pub limit: Option, + + /// Back this [`Table`]'s canonical data with the on-disk storage backend + /// instead of memory. On native targets this is a memory-mapped file; on + /// WASM it is OPFS (Worker only). Defaults to in-memory. + #[serde(default)] + #[ts(optional)] + pub page_to_disk: Option, } impl TableInitOptions { @@ -104,11 +111,14 @@ impl TryFrom for MakeTableOptions { type Error = ClientError; fn try_from(value: TableOptions) -> Result { + let page_to_disk = value.page_to_disk; Ok(MakeTableOptions { + page_to_disk, make_table_type: match value { TableOptions { index: Some(_), limit: Some(_), + .. } => Err(ClientError::BadTableOptions)?, TableOptions { index: Some(index), .. @@ -126,6 +136,7 @@ impl TryFrom for MakeTableOptions { pub(crate) struct TableOptions { pub index: Option, pub limit: Option, + pub page_to_disk: Option, } impl From for TableOptions { @@ -133,6 +144,7 @@ impl From for TableOptions { TableOptions { index: value.index, limit: value.limit, + page_to_disk: value.page_to_disk, } } } diff --git a/rust/perspective-js/src/ts/perspective.node.ts b/rust/perspective-js/src/ts/perspective.node.ts index 1f64fd5523..b1a36e517f 100644 --- a/rust/perspective-js/src/ts/perspective.node.ts +++ b/rust/perspective-js/src/ts/perspective.node.ts @@ -16,6 +16,16 @@ export type * from "./virtual_server.ts"; import WebSocket, { WebSocketServer as HttpWebSocketServer } from "ws"; import stoppable from "stoppable"; import { promises as fs } from "node:fs"; +import { + openSync, + readSync, + writeSync, + closeSync, + mkdirSync, + unlinkSync, + rmSync, +} from "node:fs"; +import os from "node:os"; import http from "node:http"; import path from "node:path"; import { webcrypto } from "node:crypto"; @@ -56,12 +66,123 @@ const uncompressed_client_wasm = await fs .then((buffer) => load_wasm_stage_0(buffer.buffer as ArrayBuffer)); await perspective_client.default({ module_or_path: uncompressed_client_wasm }); + +function make_node_disk_bridge({ + heap, + toAddr, + readCString, +}: { + heap: () => Uint8Array; + toAddr: (p: number | bigint) => number; + readCString: (p: number | bigint) => string; +}) { + const root = path.join(os.tmpdir(), `perspective-${process.pid}`); + const resolve_path = (name: string) => { + const p = path.join(root, name); + if (!p.startsWith(root)) { + throw new Error(`refusing disk path outside root: ${name}`); + } + return p; + }; + + let cleaned = false; + const cleanup = () => { + if (cleaned) return; + cleaned = true; + try { + rmSync(root, { recursive: true, force: true }); + } catch (e) { + /* best effort */ + } + }; + process.on("exit", cleanup); + process.on("SIGINT", () => { + cleanup(); + process.exit(130); + }); + + return { + // node:fs is synchronous — nothing to pre-open between safepoint phases. + async ensureOpen(_name: string) {}, + store( + namePtr: number | bigint, + dataPtr: number | bigint, + len: number, + ): number { + try { + const p = resolve_path(readCString(namePtr)); + mkdirSync(path.dirname(p), { recursive: true }); + const fd = openSync(p, "w"); + try { + if (len > 0) { + const addr = toAddr(dataPtr); + const view = heap().subarray(addr, addr + len); + let off = 0; + while (off < len) { + off += writeSync(fd, view, off, len - off, off); + } + } + } finally { + closeSync(fd); + } + return len; + } catch (e) { + console.error("node disk store failed", e); + return -1; + } + }, + load( + namePtr: number | bigint, + dataPtr: number | bigint, + len: number, + ): number { + try { + if (len <= 0) return 0; + let fd: number; + try { + fd = openSync(resolve_path(readCString(namePtr)), "r"); + } catch (e) { + return 0; // never-flushed file reads as zeros + } + try { + const addr = toAddr(dataPtr); + const view = heap().subarray(addr, addr + len); + let off = 0; + let n: number; + while ( + off < len && + (n = readSync(fd, view, off, len - off, off)) > 0 + ) { + off += n; + } + return off; + } finally { + closeSync(fd); + } + } catch (e) { + return 0; + } + }, + remove(namePtr: number | bigint) { + try { + unlinkSync(resolve_path(readCString(namePtr))); + } catch (e) { + /* already gone */ + } + }, + }; +} + const SYNC_MODULE = await fs .readFile( resolve("@perspective-dev/server/dist/wasm/perspective-server.wasm"), ) .then((buffer) => load_wasm_stage_0(buffer.buffer as ArrayBuffer)) - .then((buffer) => compile_perspective(buffer.buffer as ArrayBuffer)); + .then((buffer) => + compile_perspective(buffer.buffer as ArrayBuffer, { + make_disk_bridge: make_node_disk_bridge, + }), + ); let SYNC_CLIENT: perspective_client.Client; diff --git a/rust/perspective-js/src/ts/wasm/emscripten_api.ts b/rust/perspective-js/src/ts/wasm/emscripten_api.ts index 9b9bce8c66..d9e43fb436 100644 --- a/rust/perspective-js/src/ts/wasm/emscripten_api.ts +++ b/rust/perspective-js/src/ts/wasm/emscripten_api.ts @@ -17,13 +17,38 @@ import type * as perspective_server_t from "@perspective-dev/server/dist/wasm/pe export type PspPtr = BigInt | number; export type EmscriptenServer = bigint | number; +export interface DiskBridgeHelpers { + heap: () => Uint8Array; + toAddr: (p: number | bigint) => number; + readCString: (p: number | bigint) => string; +} + +export interface CompileOptions { + make_disk_bridge?: (helpers: DiskBridgeHelpers) => { + store( + namePtr: number | bigint, + dataPtr: number | bigint, + len: number, + ): number; + load( + namePtr: number | bigint, + dataPtr: number | bigint, + len: number, + ): number; + remove(namePtr: number | bigint): void; + ensureOpen(name: string): Promise; + }; +} + export async function compile_perspective( wasmBinary: ArrayBuffer, + opts?: CompileOptions, ): Promise { const module = await perspective_server.default({ locateFile(x: any) { return x; }, + make_disk_bridge: opts?.make_disk_bridge, instantiateWasm: async ( imports: any, receive: (_: WebAssembly.Instance) => void, diff --git a/rust/perspective-js/src/ts/wasm/engine.ts b/rust/perspective-js/src/ts/wasm/engine.ts index 871208efef..a1764342b1 100644 --- a/rust/perspective-js/src/ts/wasm/engine.ts +++ b/rust/perspective-js/src/ts/wasm/engine.ts @@ -22,6 +22,33 @@ export interface PerspectiveServerOptions { on_poll_request?: (x: PerspectiveServer) => Promise; } +/** + * A server-wide FIFO async mutex. + */ +class AsyncLock { + private tail: Promise = Promise.resolve(); + run(fn: () => Promise | T): Promise { + const result = this.tail.then(fn, fn); + this.tail = result.then( + () => undefined, + () => undefined, + ); + + return result; + } +} + +async function residency_safepoint( + mod: MainModule, + lock: AsyncLock, + server: EmscriptenServer, +) { + const safepoint = (mod as any).psp_residency_safepoint; + if (safepoint) { + await lock.run(() => safepoint(server)); + } +} + export class PerspectivePollThread { private poll_handle?: Promise; private server: PerspectiveServer; @@ -63,10 +90,13 @@ export class PerspectiveServer { server: EmscriptenServer; module: MainModule; on_poll_request?: (x: PerspectiveServer) => Promise; + lock: AsyncLock; + constructor(module: MainModule, options?: PerspectiveServerOptions) { this.clients = new Map(); this.module = module; this.on_poll_request = options?.on_poll_request; + this.lock = new AsyncLock(); this.server = module._psp_new_server( !!options?.on_poll_request ? 1 : 0, ) as EmscriptenServer; @@ -85,12 +115,16 @@ export class PerspectiveServer { this.server, client_id, this.clients, + this.lock, this.on_poll_request && (() => this.on_poll_request!(this)), ); } async poll() { - const polled = this.module._psp_poll(this.server as any); + const polled = await this.lock.run(() => + this.module._psp_poll(this.server as any), + ); + await decode_api_responses( this.module, polled, @@ -98,6 +132,8 @@ export class PerspectiveServer { await this.clients.get(msg.client_id)!(msg.data); }, ); + + await residency_safepoint(this.module, this.lock, this.server); } delete() { @@ -111,6 +147,7 @@ export class PerspectiveSession { private server: EmscriptenServer, private client_id: number, private client_map: Map Promise>, + private lock: AsyncLock, private on_poll_request?: () => Promise, ) {} @@ -119,13 +156,17 @@ export class PerspectiveSession { this.mod, view, async (viewPtr) => { - return this.mod._psp_handle_request( - this.server as any, - this.client_id, - viewPtr as any, - this.mod._psp_is_memory64() - ? (BigInt(view.byteLength) as any as number) - : (view.byteLength as any), + const len = this.mod._psp_is_memory64() + ? (BigInt(view.byteLength) as any as number) + : (view.byteLength as any); + + return this.lock.run(() => + this.mod._psp_handle_request( + this.server as any, + this.client_id, + viewPtr as any, + len, + ), ); }, ); @@ -139,10 +180,14 @@ export class PerspectiveSession { } else { await this.poll(); } + + await residency_safepoint(this.mod, this.lock, this.server); } private async poll() { - const polled = this.mod._psp_poll(this.server as any); + const polled = await this.lock.run(() => + this.mod._psp_poll(this.server as any), + ); await decode_api_responses( this.mod, polled, @@ -172,9 +217,9 @@ async function convert_typed_array_to_pointer( : (array.byteLength as any), ); - core.HEAPU8.set(array, Number(ptr)); - const msg = await callback(ptr); - core._psp_free(ptr); + core.HEAPU8.set(array, Number(ptr) >>> 0); + const msg = await callback(Number(ptr) >>> 0); + core._psp_free(Number(ptr) >>> 0); return msg; } @@ -210,7 +255,7 @@ async function decode_api_responses( const is_64 = core._psp_is_memory64(); const response = new DataView( core.HEAPU8.buffer, - Number(ptr), + Number(ptr) >>> 0, is_64 ? 12 : 8, ); diff --git a/rust/perspective-js/src/ts/wasm/perspective-server.poly.ts b/rust/perspective-js/src/ts/wasm/perspective-server.poly.ts index 587f98eb23..b3787a0057 100644 --- a/rust/perspective-js/src/ts/wasm/perspective-server.poly.ts +++ b/rust/perspective-js/src/ts/wasm/perspective-server.poly.ts @@ -12,9 +12,6 @@ import type * as perspective_server from "@perspective-dev/server/dist/wasm/perspective-server.js"; -type PspPtr = bigint | number; -type EmscriptenServer = number; - var out = console.log.bind(console); var err = console.error.bind(console); @@ -50,6 +47,151 @@ export default async function (obj: any) { let psp_module: perspective_server.MainModule; let is_memory64 = false; let wasm_memory: WebAssembly.Memory; + + // A `BACKING_STORE_DISK` column's bytes live in wasm linear memory; the + // residency manager flushes/loads them to/from OPFS through these imports. + const heap = () => new Uint8Array(wasm_memory.buffer); + const toAddr = (p: number | bigint) => + is_memory64 ? Number(p as bigint) : (p as number) >>> 0; + + function readCString(p: number | bigint) { + const addr = toAddr(p); + const h = heap(); + let end = addr; + while (h[end]) ++end; + return UTF8Decoder.decode(h.subarray(addr, end)); + } + + // A `DiskBridge` backs `BACKING_STORE_DISK` columns: `store`/`load` flush and + // re-read a column buffer to/from disk synchronously (the only async step, + // `ensureOpen`, is hoisted into the JS safepoint between engine calls). The + // browser default is OPFS; a host can inject an alternative (e.g. `node:fs` + // in `perspective.node.ts`) via `obj.make_disk_bridge`. `store`/`load`/ + // `remove` receive raw wasm pointers and use the heap helpers; `ensureOpen` + // receives a resolved file name string (from `victim_fname`). + const disk_helpers = { heap, toAddr, readCString }; + + function makeOpfsBridge() { + async function opfsOpenFile(name: string, create: boolean) { + const parts = name.split("/").filter((s) => s.length > 0); + // @ts-ignore — navigator.storage.getDirectory is OPFS, Worker-only. + let dir = await navigator.storage.getDirectory(); + for (let i = 0; i < parts.length - 1; i++) { + dir = await dir.getDirectoryHandle(parts[i], { create }); + } + return await dir.getFileHandle(parts[parts.length - 1], { create }); + } + + // One OPFS `FileSystemSyncAccessHandle` is kept open per disk-backed file + // for its lifetime. + const handles = new Map(); + + return { + async ensureOpen(name: string) { + if (handles.has(name)) { + return; + } + const fh = await opfsOpenFile(name, true); + // @ts-ignore — createSyncAccessHandle is Worker-only OPFS. + const ah = await fh.createSyncAccessHandle(); + handles.set(name, ah); + }, + store( + namePtr: number | bigint, + dataPtr: number | bigint, + len: number, + ): number { + try { + const ah = handles.get(readCString(namePtr)); + if (!ah) { + // `commit` only evicts victims whose handle the safepoint + // just opened, so this should be unreachable. + return -1; + } + ah.truncate(len); + if (len > 0) { + const addr = toAddr(dataPtr); + ah.write(heap().subarray(addr, addr + len), { at: 0 }); + } + ah.flush(); + return len; + } catch (e) { + err("psp_opfs_store failed", e); + return -1; + } + }, + load( + namePtr: number | bigint, + dataPtr: number | bigint, + len: number, + ): number { + try { + if (len <= 0) { + return 0; + } + const ah = handles.get(readCString(namePtr)); + // A restore always follows an evict that left the handle open; + // a miss (or never-flushed file) reads as zeros. + if (!ah) { + return 0; + } + const addr = toAddr(dataPtr); + return ah.read(heap().subarray(addr, addr + len), { + at: 0, + }); + } catch (e) { + return 0; + } + }, + remove(namePtr: number | bigint) { + try { + const name = readCString(namePtr); + const ah = handles.get(name); + if (ah) { + try { + ah.close(); + } catch (e) { + /* already closed */ + } + handles.delete(name); + } + // Fire-and-forget the (async) file deletion; the handle is + // closed, so the bytes are no longer referenced. + void (async () => { + try { + const parts = name + .split("/") + .filter((s) => s.length > 0); + // @ts-ignore + let dir = await navigator.storage.getDirectory(); + for (let i = 0; i < parts.length - 1; i++) { + dir = await dir.getDirectoryHandle(parts[i], { + create: false, + }); + } + await dir.removeEntry(parts[parts.length - 1]); + } catch (e) { + /* already gone */ + } + })(); + } catch (e) { + /* already gone */ + } + }, + }; + } + + // The host may inject a disk bridge (e.g. `node:fs`); otherwise default to + // OPFS. Synchronous `store`/`load`/`remove` env imports — no JSPI. + const disk = obj.make_disk_bridge + ? obj.make_disk_bridge(disk_helpers) + : makeOpfsBridge(); + + const opfs_imports: Record = { + psp_opfs_store: disk.store, + psp_opfs_load: disk.load, + psp_opfs_remove: disk.remove, + }; const module = { HaveOffsetConverter() { console.error("HaveOffsetConverter"); @@ -211,6 +353,7 @@ export default async function (obj: any) { console.error("proc_exit", e); return 0; }, + ...opfs_imports, }; const x = await obj.instantiateWasm( @@ -228,14 +371,41 @@ export default async function (obj: any) { }, ); + // No JSPI: every export is exposed verbatim (emscripten `_`-prefixed alias). + // Restore is a synchronous import, so `psp_handle_request`/`psp_poll` stay + // synchronous WASM calls. const extensions: Record = {}; for (const [name, func] of Object.entries(x)) { extensions[`_${name}`] = func; } + // JS-driven residency safepoint. The only async OPFS step — opening each + // victim's sync access handle — happens HERE, between the two synchronous C++ + // phases: `prepare` picks the cold victims (no freeing), we open their + // handles, then `commit` flushes + frees them (now a synchronous write). The + // `engine.ts` server-wide lock serializes this against engine calls. A no-op + // when nothing is over budget. + const psp_residency_safepoint = async (server: number | bigint) => { + const mod = psp_module as any; + if (!mod.psp_residency_prepare) { + return; + } + const n = Number(mod.psp_residency_prepare(server)); + for (let i = 0; i < n; i++) { + const fname = readCString( + mod.psp_residency_victim_fname(server, i), + ); + if (fname) { + await disk.ensureOpen(fname); + } + } + mod.psp_residency_commit(server); + }; + return { ...x, ...extensions, + psp_residency_safepoint, get HEAPU8() { // @ts-ignore return new Uint8Array(wasm_memory.buffer); diff --git a/rust/perspective-js/test/js/page_to_disk.spec.js b/rust/perspective-js/test/js/page_to_disk.spec.js new file mode 100644 index 0000000000..8bb4e32f50 --- /dev/null +++ b/rust/perspective-js/test/js/page_to_disk.spec.js @@ -0,0 +1,136 @@ +// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ +// ┃ ██████ ██████ ██████ █ █ █ █ █ █▄ ▀███ █ ┃ +// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█ ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄ ▀█ █ ▀▀▀▀▀ ┃ +// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄ █ ▄▄▄▄▄ ┃ +// ┃ █ ██████ █ ▀█▄ █ ██████ █ ███▌▐███ ███████▄ █ ┃ +// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫ +// ┃ Copyright (c) 2017, the Perspective Authors. ┃ +// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃ +// ┃ This file is part of the Perspective library, distributed under the terms ┃ +// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃ +// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ + +import { test, expect } from "@perspective-dev/test"; +import perspective from "./perspective_client"; + +import * as fs from "node:fs"; +import * as os from "node:os"; +import * as path from "node:path"; + +const DISK_ROOT = path.join(os.tmpdir(), `perspective-${process.pid}`); + +function disk_file_count() { + const out = []; + const walk = (dir) => { + for (const ent of fs.readdirSync(dir, { withFileTypes: true })) { + const p = path.join(dir, ent.name); + if (ent.isDirectory()) walk(p); + else out.push(p); + } + }; + try { + walk(DISK_ROOT); + } catch (e) { + /* root not created yet */ + } + return out.length; +} + +const data = { + x: [1, 2, 3, 4], + y: ["a", "b", "c", "d"], + z: [1.5, 2.5, 3.5, 4.5], +}; + +test.describe("page_to_disk", function () { + test("produces results identical to an in-memory table", async function () { + const mem = await perspective.table(data); + const disk = await perspective.table(data, { page_to_disk: true }); + const vm = await mem.view(); + const vd = await disk.view(); + expect(await vd.to_columns()).toEqual(await vm.to_columns()); + expect(await disk.schema()).toEqual(await mem.schema()); + expect(await disk.size()).toEqual(await mem.size()); + await vm.delete(); + await vd.delete(); + await mem.delete(); + await disk.delete(); + }); + + test("a small page_to_disk table stays in-heap (no files spilled)", async function () { + const before = disk_file_count(); + const disk = await perspective.table(data, { page_to_disk: true }); + const view = await disk.view(); + await view.to_columns(); + expect(disk_file_count()).toEqual(before); + await view.delete(); + await disk.delete(); + }); + + test("supports update + aggregates", async function () { + const disk = await perspective.table(data, { page_to_disk: true }); + await disk.update({ x: [5, 6], y: ["e", "f"], z: [5.5, 6.5] }); + const view = await disk.view({ + group_by: ["y"], + aggregates: { z: "sum" }, + columns: ["z"], + }); + const cols = await view.to_columns(); + expect(cols.z[0]).toEqual(1.5 + 2.5 + 3.5 + 4.5 + 5.5 + 6.5); + await view.delete(); + await disk.delete(); + }); + + test("supports expression columns", async function () { + const disk = await perspective.table(data, { page_to_disk: true }); + const view = await disk.view({ + columns: ["w"], + expressions: { w: `"x" + "z"` }, + }); + const cols = await view.to_columns(); + expect(cols.w).toEqual([2.5, 4.5, 6.5, 8.5]); + await view.delete(); + await disk.delete(); + }); + + // Forces actual eviction (table > 1gb resident) so the `node:fs` bridge + // round-trips: columns are flushed to disk on eviction and re-read on access. + test("evicts to disk over budget and restores correctly", async function () { + const ROWS = 100_000; + const COLS = 256; + const UPDATES = 5; // 25M rows total + const chunk = {}; + for (let c = 0; c < COLS; c++) { + const col = new Array(ROWS); + for (let i = 0; i < ROWS; i++) col[i] = c * 1000 + (i % 1000) + 0.5; + chunk["c" + c] = col; + } + + const before = disk_file_count(); + const table = await perspective.table(chunk, { page_to_disk: true }); + for (let u = 1; u < UPDATES; u++) await table.update(chunk); + + expect(await table.size()).toEqual(ROWS * UPDATES); + + // Eviction must have spilled column buffers to disk. + expect(disk_file_count()).toBeGreaterThan(before); + + // Reading after eviction restores evicted columns from disk. Verify the + // head of every column matches the source (a broken restore would read + // zeros). `c[c][i] === c*1000 + i + 0.5` for i < 1000. + const view = await table.view(); + const head = await view.to_columns({ start_row: 0, end_row: 4 }); + for (let c = 0; c < COLS; c++) { + expect(head["c" + c]).toEqual([ + c * 1000 + 0.5, + c * 1000 + 1.5, + c * 1000 + 2.5, + c * 1000 + 3.5, + ]); + } + + await view.delete(); + await table.delete(); + expect(disk_file_count()).toEqual(before); + }); +}); diff --git a/rust/perspective-python/perspective/tests/table/test_table_page_to_disk.py b/rust/perspective-python/perspective/tests/table/test_table_page_to_disk.py new file mode 100644 index 0000000000..6a261f3a24 --- /dev/null +++ b/rust/perspective-python/perspective/tests/table/test_table_page_to_disk.py @@ -0,0 +1,304 @@ +# ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ +# ┃ ██████ ██████ ██████ █ █ █ █ █ █▄ ▀███ █ ┃ +# ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█ ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄ ▀█ █ ▀▀▀▀▀ ┃ +# ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄ █ ▄▄▄▄▄ ┃ +# ┃ █ ██████ █ ▀█▄ █ ██████ █ ███▌▐███ ███████▄ █ ┃ +# ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫ +# ┃ Copyright (c) 2017, the Perspective Authors. ┃ +# ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃ +# ┃ This file is part of the Perspective library, distributed under the terms ┃ +# ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃ +# ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ + +import os +import tempfile + +import perspective as psp +from pytest import mark + +client = psp.Server().new_local_client() +Table = client.table + + +def _perspective_temp_dirs(): + """All `perspective_*` directories currently in the OS temp directory. + + `BACKING_STORE_DISK` columns are written to a unique + `/perspective_/` directory. + """ + tmp = tempfile.gettempdir() + try: + entries = os.listdir(tmp) + except OSError: + return set() + + print(tmp) + return { + os.path.join(tmp, e) + for e in entries + if e.startswith("perspective_") and os.path.isdir(os.path.join(tmp, e)) + } + + +class TestTableOnDisk: + def test_page_to_disk_schema_and_view(self): + data = { + "x": [1, 2, 3, 4], + "y": ["a", "b", "c", "d"], + "z": [True, False, True, False], + } + + mem = Table(data) + disk = Table(data, page_to_disk=True) + assert disk.schema() == mem.schema() + assert disk.view().to_columns() == mem.view().to_columns() + + def test_page_to_disk_csv(self): + data = "x,y,z\n1,a,true\n2,b,false\n3,c,true\n4,d,false" + tbl = Table(data, page_to_disk=True) + assert tbl.schema() == {"x": "integer", "y": "string", "z": "boolean"} + assert tbl.view().to_columns() == { + "x": [1, 2, 3, 4], + "y": ["a", "b", "c", "d"], + "z": [True, False, True, False], + } + + def test_page_to_disk_update_indexed(self): + mem = Table({"x": [1, 2, 3], "y": [1.0, 2.0, 3.0]}, index="x") + disk = Table( + {"x": [1, 2, 3], "y": [1.0, 2.0, 3.0]}, index="x", page_to_disk=True + ) + update = {"x": [2, 4], "y": [20.0, 40.0]} + mem.update(update) + disk.update(update) + assert disk.view().to_columns() == mem.view().to_columns() + assert disk.size() == mem.size() + + def test_page_to_disk_group_by_aggregation(self): + data = {"g": ["a", "b", "a", "b", "a"], "v": [1, 2, 3, 4, 5]} + mem = Table(data) + disk = Table(data, page_to_disk=True) + config = {"group_by": ["g"], "columns": ["v"], "aggregates": {"v": "sum"}} + assert disk.view(**config).to_columns() == mem.view(**config).to_columns() + + def test_page_to_disk_arrow_roundtrip(self): + data = {"x": list(range(100)), "y": [float(i) / 2 for i in range(100)]} + mem = Table(data) + disk = Table(data, page_to_disk=True) + assert disk.view().to_arrow() == mem.view().to_arrow() + + def test_page_to_disk_creates_backing_files(self): + before = _perspective_temp_dirs() + tbl = Table({"x": [1, 2, 3], "y": ["a", "b", "c"]}, page_to_disk=True) + + # Touch the table so it is not optimized away before we inspect the FS. + assert tbl.size() == 3 + after = _perspective_temp_dirs() + new_dirs = after - before + assert new_dirs, "expected a new perspective_ directory on disk" + assert any(os.listdir(d) for d in new_dirs), "expected column files on disk" + + def test_memory_table_creates_no_backing_files(self): + before = _perspective_temp_dirs() + tbl = Table({"x": [1, 2, 3]}) + assert tbl.size() == 3 + after = _perspective_temp_dirs() + assert after == before, "in-memory table must not write to disk" + + def test_page_to_disk_growth_forces_resize(self): + tbl = Table({"x": [0], "y": [0.0]}, index="x", page_to_disk=True) + n = 50000 + tbl.update({"x": list(range(n)), "y": [float(i) for i in range(n)]}) + assert tbl.size() == n + cols = tbl.view().to_columns() + assert cols["x"][0] == 0 + assert cols["x"][-1] == n - 1 + assert cols["y"][-1] == float(n - 1) + + def test_page_to_disk_clone_keeps_master_files(self): + # Cloning a disk-backed column (e.g. the masked clone that serializing a + # table with removed rows performs) must give the clone its OWN backing + # file. Otherwise the clone aliases — and on teardown `rmfile`s — the + # master's file, silently unlinking the master's named backing store. + before = _perspective_temp_dirs() + tbl = Table( + {"x": [1, 2, 3, 4], "y": [10.0, 20.0, 30.0, 40.0]}, + index="x", + page_to_disk=True, + ) + view = tbl.view() + new_dirs = _perspective_temp_dirs() - before + master_dirs = [ + d + for d in new_dirs + if not os.path.basename(d).startswith("perspective_expr_") + ] + assert len(master_dirs) == 1 + master_dir = master_dirs[0] + master_files = set(os.listdir(master_dir)) + assert master_files, "expected master backing files on disk" + + # Removing rows triggers a masked clone of the disk master columns. + tbl.remove([2, 3]) + assert view.to_columns()["x"] == [1, 4] + # The master's own backing files must survive the clone's teardown. + survived = set(os.listdir(master_dir)) + assert master_files <= survived, ( + "clone teardown unlinked master backing files: {}".format( + master_files - survived + ) + ) + + # And the master must remain usable for subsequent updates. + tbl.update({"x": [5, 6], "y": [50.0, 60.0]}) + assert sorted(view.to_columns()["x"]) == [1, 4, 5, 6] + + def test_page_to_disk_larger_dataset_matches_memory(self): + n = 20000 + data = { + "i": list(range(n)), + "f": [float(i) * 1.5 for i in range(n)], + "s": ["row_{}".format(i % 97) for i in range(n)], + } + + mem = Table(data) + disk = Table(data, page_to_disk=True) + assert disk.view().to_arrow() == mem.view().to_arrow() + + +def _perspective_expr_dirs(): + """`perspective_expr_*` directories — the on-disk expression `m_master`.""" + tmp = tempfile.gettempdir() + try: + entries = os.listdir(tmp) + except OSError: + return set() + + return { + os.path.join(tmp, e) + for e in entries + if e.startswith("perspective_expr_") and os.path.isdir(os.path.join(tmp, e)) + } + + +class TestTableOnDiskExpressions: + def test_expression_numeric_equivalence(self): + data = {"x": [1, 2, 3, 4], "y": [10.0, 20.0, 30.0, 40.0]} + mem = Table(data) + disk = Table(data, page_to_disk=True) + exprs = {"sum": '"x" + "y"', "prod": '"x" * "y"'} + assert ( + disk.view(expressions=exprs).to_columns() + == mem.view(expressions=exprs).to_columns() + ) + + def test_expression_string_equivalence(self): + data = {"a": ["foo", "bar", "baz"], "b": ["AA", "BB", "CC"]} + mem = Table(data) + disk = Table(data, page_to_disk=True) + exprs = {"up": 'upper("a")', "lo": 'lower("b")'} + assert ( + disk.view(expressions=exprs).to_columns() + == mem.view(expressions=exprs).to_columns() + ) + + def test_expression_with_group_by_equivalence(self): + data = {"g": ["a", "b", "a", "b"], "v": [1, 2, 3, 4]} + mem = Table(data) + disk = Table(data, page_to_disk=True) + config = { + "expressions": {"v2": '"v" * 2'}, + "group_by": ["g"], + "columns": ["v2"], + "aggregates": {"v2": "sum"}, + } + + assert disk.view(**config).to_columns() == mem.view(**config).to_columns() + + def test_expression_master_is_page_to_disk(self): + before = _perspective_expr_dirs() + tbl = Table({"x": [1, 2, 3], "y": [10.0, 20.0, 30.0]}, page_to_disk=True) + view = tbl.view(expressions={"e": '"x" + "y"'}) + assert view.to_columns()["e"] == [11.0, 22.0, 33.0] + new_dirs = _perspective_expr_dirs() - before + assert new_dirs, ( + "expected a perspective_expr_ dir for the expression master" + ) + + assert any(os.listdir(d) for d in new_dirs), ( + "expected expression backing files on disk" + ) + + def test_memory_expression_creates_no_expr_dir(self): + before = _perspective_expr_dirs() + tbl = Table({"x": [1, 2, 3], "y": [10.0, 20.0, 30.0]}) + view = tbl.view(expressions={"e": '"x" + "y"'}) + assert view.to_columns()["e"] == [11.0, 22.0, 33.0] + assert _perspective_expr_dirs() == before, ( + "in-memory table must not write expression data to disk" + ) + + def test_expression_page_to_disk_update_and_larger(self): + n = 10000 + tbl = Table({"x": [0], "y": [0.0]}, index="x", page_to_disk=True) + tbl.update({"x": list(range(n)), "y": [float(i) for i in range(n)]}) + cols = tbl.view(expressions={"e": '"x" + "y"'}).to_columns() + assert cols["e"][0] == 0.0 + assert cols["e"][-1] == float((n - 1) + (n - 1)) + + +class TestResidency: + # The residency manager evicts disk-backed column buffers to their files + # when over the `PSP_MEMORY_BUDGET` and restores them transparently on + # access. Under a tiny budget, eviction fires aggressively and data must + # still round-trip identically to an in-memory table. + + @mark.skip(reason="No secret hooks in the engine") + def test_residency_evicts_and_data_is_correct(self): + stats_fd, stats_path = tempfile.mkstemp(prefix="psp_residency_") + os.close(stats_fd) + os.environ["PSP_MEMORY_BUDGET"] = "1024" + os.environ["PSP_RESIDENCY_STATS_FILE"] = stats_path + try: + n = 5000 + data = { + "x": list(range(n)), + "y": [float(i) * 1.5 for i in range(n)], + "s": ["row_{}".format(i % 50) for i in range(n)], + } + mem = Table(data) + disk = Table(data, page_to_disk=True) + + # Each request is a safepoint that trims to budget; data read back + # must be correct (round-tripped through evict -> restore). + assert disk.view().to_columns() == mem.view().to_columns() + assert disk.view().to_arrow() == mem.view().to_arrow() + + upd = { + "x": list(range(n, n + 200)), + "y": [float(i) for i in range(n, n + 200)], + "s": ["upd_{}".format(i) for i in range(200)], + } + mem.update(upd) + disk.update(upd) + assert disk.view().to_columns() == mem.view().to_columns() + + # An expression view on an evicted disk table must still compute. + exprs = {"e": '"x" + "y"'} + assert ( + disk.view(expressions=exprs).to_columns() + == mem.view(expressions=exprs).to_columns() + ) + + # Confirm eviction actually occurred (otherwise the test is vacuous). + with open(stats_path) as f: + stats = f.read() + evictions = int(stats.split("evictions=")[1].split()[0]) + assert evictions > 0, "expected evictions under a tiny budget: " + stats + finally: + os.environ.pop("PSP_MEMORY_BUDGET", None) + os.environ.pop("PSP_RESIDENCY_STATS_FILE", None) + os.remove(stats_path) + # Drain a safepoint with residency disabled so any evicted stores + # from other live tables are restored before subsequent tests. + Table({"_": [0]}).view().to_columns() diff --git a/rust/perspective-python/src/client/client_async.rs b/rust/perspective-python/src/client/client_async.rs index d53be5b0c1..d6f5eba281 100644 --- a/rust/perspective-python/src/client/client_async.rs +++ b/rust/perspective-python/src/client/client_async.rs @@ -167,6 +167,8 @@ impl AsyncClient { /// `"json"`, `"columns"`, `"csv"` or `"arrow"`. This overrides /// language-specific type dispatch behavior, which allows stringified /// and byte array alternative inputs. + /// - `page_to_disk` - Back this [`Table`]'s canonical data with the + /// on-disk (memory-mapped) storage backend instead of memory. /// /// # Python Examples /// @@ -175,7 +177,7 @@ impl AsyncClient { /// ```python /// table = await client.table("x,y\n1,2\n3,4") /// ``` - #[pyo3(signature=(input, limit=None, index=None, name=None, format=None))] + #[pyo3(signature=(input, limit=None, index=None, name=None, format=None, page_to_disk=None))] pub async fn table( &self, input: Py, @@ -183,12 +185,14 @@ impl AsyncClient { index: Option>, name: Option>, format: Option>, + page_to_disk: Option, ) -> PyResult { let client = self.client.clone(); let py_client = Python::with_gil(|_| self.clone()); let table = Python::with_gil(|py| { let mut options = TableInitOptions { name: name.map(|x| x.extract::(py)).transpose()?, + page_to_disk, ..TableInitOptions::default() }; diff --git a/rust/perspective-python/src/client/client_sync.rs b/rust/perspective-python/src/client/client_sync.rs index f0d448bc5b..46b9cac9a1 100644 --- a/rust/perspective-python/src/client/client_sync.rs +++ b/rust/perspective-python/src/client/client_sync.rs @@ -165,7 +165,7 @@ impl Client { /// ```python /// table = client.table("x,y\n1,2\n3,4") /// ``` - #[pyo3(signature = (input, limit=None, index=None, name=None, format=None))] + #[pyo3(signature = (input, limit=None, index=None, name=None, format=None, page_to_disk=None))] pub fn table( &self, py: Python<'_>, @@ -174,10 +174,11 @@ impl Client { index: Option>, name: Option>, format: Option>, + page_to_disk: Option, ) -> PyResult { Ok(Table( self.0 - .table(input, limit, index, name, format) + .table(input, limit, index, name, format, page_to_disk) .py_block_on(py)?, )) } diff --git a/rust/perspective-server/cpp/perspective/CMakeLists.txt b/rust/perspective-server/cpp/perspective/CMakeLists.txt index c7c74376bc..e2dc3f3b16 100644 --- a/rust/perspective-server/cpp/perspective/CMakeLists.txt +++ b/rust/perspective-server/cpp/perspective/CMakeLists.txt @@ -32,6 +32,7 @@ set(CMAKE_MODULE_PATH "${PSP_CMAKE_MODULE_PATH}/modules" ${CMAKE_MODULE_PATH}) set(MSVC_RUNTIME_LIBRARY MultiThreaded) option(PSP_WASM64 "Enable WASM Memory64 support" OFF) +option(PSP_WASM_OPFS "Enable on-disk (OPFS) tables in the browser via the custom `psp_opfs_*` bridge in `perspective-server.poly.ts` (NO_FILESYSTEM + JSPI; no WasmFS, no SharedArrayBuffer). Backwards compatible: the module loads and runs in-memory tables on non-JSPI browsers; on-disk activates only in a JSPI-capable Worker." ON) if(${CMAKE_SYSTEM_NAME} MATCHES "Windows") set(WIN32 ON) @@ -494,10 +495,12 @@ set(SOURCE_FILES ${PSP_CPP_SRC}/src/cpp/sort_specification.cpp ${PSP_CPP_SRC}/src/cpp/sparse_tree.cpp ${PSP_CPP_SRC}/src/cpp/sparse_tree_node.cpp + ${PSP_CPP_SRC}/src/cpp/residency.cpp ${PSP_CPP_SRC}/src/cpp/step_delta.cpp ${PSP_CPP_SRC}/src/cpp/storage.cpp ${PSP_CPP_SRC}/src/cpp/storage_impl_linux.cpp ${PSP_CPP_SRC}/src/cpp/storage_impl_osx.cpp + ${PSP_CPP_SRC}/src/cpp/storage_impl_wasm.cpp ${PSP_CPP_SRC}/src/cpp/storage_impl_win.cpp ${PSP_CPP_SRC}/src/cpp/sym_table.cpp ${PSP_CPP_SRC}/src/cpp/table.cpp @@ -544,6 +547,9 @@ set(PSP_EXPORTED_FUNCTIONS _psp_is_memory64 _psp_num_cpus _psp_set_num_cpus + _psp_residency_prepare + _psp_residency_victim_fname + _psp_residency_commit ) if(PSP_HEAP_INSTRUMENTS) @@ -566,10 +572,30 @@ if(PSP_PYODIDE) else() # -s MEMORY_GROWTH_GEOMETRIC_STEP=1.0 \ # -s MEMORY_GROWTH_GEOMETRIC_CAP=536870912 \ + + # On-disk tables in the browser are backed by OPFS via emscripten's WasmFS. + # `PSP_WASM_OPFS` swaps the default `NO_FILESYSTEM` (memory-only) build for + # `WASMFS` + `FORCE_FILESYSTEM`, and enables the OPFS backend. `JSPI` provides + # the synchronous-looking OPFS access the WasmFS OPFS backend needs without + # SharedArrayBuffer/cross-origin-isolation (`-pthread`). `JSPI` is backwards + # compatible: the module loads and runs in-memory tables fine on non-JSPI + # browsers (it has no `main`/async imports wrapped at instantiation); the JSPI + # API is only reached when an actual OPFS op runs, i.e. when `page_to_disk` is used + # (the JS client gates that on JSPI being present). Closure is incompatible + # with the WasmFS JS glue, so it is disabled in that case. + if(PSP_WASM_OPFS) + set(PSP_FILESYSTEM_FLAGS "-s NO_FILESYSTEM=1 ") + set(PSP_CLOSURE_FLAGS "--closure=1 ") + add_compile_definitions(PSP_WASM_OPFS_PERSIST=1) + else() + set(PSP_FILESYSTEM_FLAGS "-s NO_FILESYSTEM=1 ") + set(PSP_CLOSURE_FLAGS "--closure=1 ") + endif() + set(PSP_WASM_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} \ --no-entry \ - --closure=1 \ - -s NO_FILESYSTEM=1 \ + ${PSP_CLOSURE_FLAGS} \ + ${PSP_FILESYSTEM_FLAGS} \ -s ALLOW_MEMORY_GROWTH=1 \ -s MODULARIZE=1 \ -s WASM_BIGINT=1 \ diff --git a/rust/perspective-server/cpp/perspective/src/cpp/arrow_loader.cpp b/rust/perspective-server/cpp/perspective/src/cpp/arrow_loader.cpp index 083bba1704..c8201f873c 100644 --- a/rust/perspective-server/cpp/perspective/src/cpp/arrow_loader.cpp +++ b/rust/perspective-server/cpp/perspective/src/cpp/arrow_loader.cpp @@ -62,9 +62,9 @@ load_stream( const uint32_t length, std::shared_ptr& table ) { - arrow::io::BufferReader buffer_reader( + arrow::io::BufferReader buffer_reader(std::make_shared( reinterpret_cast(ptr), length - ); + )); auto status = arrow::ipc::RecordBatchStreamReader::Open(&buffer_reader); if (!status.ok()) { @@ -92,9 +92,9 @@ load_file( const uint32_t length, std::shared_ptr& table ) { - arrow::io::BufferReader buffer_reader( + arrow::io::BufferReader buffer_reader(std::make_shared( reinterpret_cast(ptr), length - ); + )); auto status = arrow::ipc::RecordBatchFileReader::Open(&buffer_reader); if (!status.ok()) { diff --git a/rust/perspective-server/cpp/perspective/src/cpp/binding_api.cpp b/rust/perspective-server/cpp/perspective/src/cpp/binding_api.cpp index f13cbccf55..4d5ae6baba 100644 --- a/rust/perspective-server/cpp/perspective/src/cpp/binding_api.cpp +++ b/rust/perspective-server/cpp/perspective/src/cpp/binding_api.cpp @@ -102,6 +102,24 @@ psp_poll(ProtoServer* server) { return encode_api_responses(responses); } +PERSPECTIVE_EXPORT +std::size_t +psp_residency_prepare(ProtoServer* server) { + return server->residency_prepare(); +} + +PERSPECTIVE_EXPORT +const char* +psp_residency_victim_fname(ProtoServer* server, std::size_t i) { + return server->residency_victim_fname(i); +} + +PERSPECTIVE_EXPORT +void +psp_residency_commit(ProtoServer* server) { + server->residency_commit(); +} + PERSPECTIVE_EXPORT std::uint32_t psp_new_session(ProtoServer* server) { diff --git a/rust/perspective-server/cpp/perspective/src/cpp/column.cpp b/rust/perspective-server/cpp/perspective/src/cpp/column.cpp index cc42e63d91..c9983ce577 100644 --- a/rust/perspective-server/cpp/perspective/src/cpp/column.cpp +++ b/rust/perspective-server/cpp/perspective/src/cpp/column.cpp @@ -73,12 +73,14 @@ t_column::column_copy_helper(const t_column& other) { m_dtype = other.m_dtype; m_init = false; m_isvlen = other.m_isvlen; - m_data = std::make_shared(other.m_data->get_recipe()); + // Use clone recipes so that copying a disk-backed column produces a column + // with its own independent backing files rather than aliasing `other`'s. + m_data = std::make_shared(other.m_data->get_clone_recipe()); m_vocab = std::make_shared( - other.m_vocab->get_vlendata()->get_recipe(), - other.m_vocab->get_extents()->get_recipe() + other.m_vocab->get_vlendata()->get_clone_recipe(), + other.m_vocab->get_extents()->get_clone_recipe() ); - m_status = std::make_shared(other.m_status->get_recipe()); + m_status = std::make_shared(other.m_status->get_clone_recipe()); m_size = other.m_size; m_status_enabled = other.m_status_enabled; @@ -861,6 +863,38 @@ t_column::clone(const t_mask& mask) const { return rval; } +void +t_column::ensure_resident() { + if (m_data) { + m_data->ensure_resident(); + } + if (m_status) { + m_status->ensure_resident(); + } + if (m_isvlen && m_vocab) { + m_vocab->get_vlendata()->ensure_resident(); + m_vocab->get_extents()->ensure_resident(); + } +} + +void +t_column::copy_from(const t_column& other) { + set_size(other.size()); + m_data->fill(*other.m_data); + + if (is_status_enabled() && other.is_status_enabled()) { + m_status->fill(*other.m_status); + } + + if (is_vlen_dtype(get_dtype())) { + m_vocab->clone(*other.m_vocab); + } + +#ifdef PSP_COLUMN_VERIFY + verify(); +#endif +} + void t_column::valid_raw_fill() { m_status->raw_fill(STATUS_VALID); diff --git a/rust/perspective-server/cpp/perspective/src/cpp/compat_impl_wasm.cpp b/rust/perspective-server/cpp/perspective/src/cpp/compat_impl_wasm.cpp index fc750f18ad..174f3f87bf 100644 --- a/rust/perspective-server/cpp/perspective/src/cpp/compat_impl_wasm.cpp +++ b/rust/perspective-server/cpp/perspective/src/cpp/compat_impl_wasm.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -43,12 +44,22 @@ static void map_file_internal_( t_uindex file_size(t_handle h) { - PSP_COMPLAIN_AND_ABORT("Not implemented"); + struct stat st {}; + if (fstat(h, &st) != 0) { + return 0; + } + return static_cast(st.st_size); } void close_file(t_handle h) { - PSP_COMPLAIN_AND_ABORT("Not implemented"); + // On WASM (NO_FILESYSTEM) disk stores are backed by the OPFS/node-fs bridge + // keyed by filename, not real fds — `create_file()` returns a sentinel (1). + // Closing it issued `fd_close(1)` (stdout) on every disk-column free; never + // touch the std streams (0/1/2). Real fds (none under NO_FILESYSTEM) close. + if (h > 2) { + close(h); + } } void @@ -120,7 +131,14 @@ set_thread_name(const std::string& name) { void rmfile(const std::string& fname) { +#ifdef PSP_HAS_OPFS_BRIDGE + // Close the kept-open OPFS sync access handle and delete the backing file. + // (`unlink` is a no-op under `NO_FILESYSTEM`, and would leak the open handle + // the residency bridge holds for this file.) + psp_opfs_remove(fname.c_str()); +#else unlink(fname.c_str()); +#endif } void diff --git a/rust/perspective-server/cpp/perspective/src/cpp/context_grouped_pkey.cpp b/rust/perspective-server/cpp/perspective/src/cpp/context_grouped_pkey.cpp index 58a655c698..aaf62e3751 100644 --- a/rust/perspective-server/cpp/perspective/src/cpp/context_grouped_pkey.cpp +++ b/rust/perspective-server/cpp/perspective/src/cpp/context_grouped_pkey.cpp @@ -54,7 +54,9 @@ t_ctx_grouped_pkey::init() { // `t_data_table`s so that each context's expressions are isolated // and do not affect other contexts when they are calculated. const auto& expressions = m_config.get_expressions(); - m_expression_tables = std::make_shared(expressions); + m_expression_tables = std::make_shared( + expressions, m_config.get_backing_store() + ); m_init = true; } diff --git a/rust/perspective-server/cpp/perspective/src/cpp/context_one.cpp b/rust/perspective-server/cpp/perspective/src/cpp/context_one.cpp index 1da048e35e..951e7431b5 100644 --- a/rust/perspective-server/cpp/perspective/src/cpp/context_one.cpp +++ b/rust/perspective-server/cpp/perspective/src/cpp/context_one.cpp @@ -48,7 +48,9 @@ t_ctx1::init() { // `t_data_table`s so that each context's expressions are isolated // and do not affect other contexts when they are calculated. const auto& expressions = m_config.get_expressions(); - m_expression_tables = std::make_shared(expressions); + m_expression_tables = std::make_shared( + expressions, m_config.get_backing_store() + ); m_init = true; } diff --git a/rust/perspective-server/cpp/perspective/src/cpp/context_two.cpp b/rust/perspective-server/cpp/perspective/src/cpp/context_two.cpp index 3606e1e0bc..8f0d6d595e 100644 --- a/rust/perspective-server/cpp/perspective/src/cpp/context_two.cpp +++ b/rust/perspective-server/cpp/perspective/src/cpp/context_two.cpp @@ -100,7 +100,9 @@ t_ctx2::init() { // `t_data_table`s so that each context's expressions are isolated // and do not affect other contexts when they are calculated. const auto& expressions = m_config.get_expressions(); - m_expression_tables = std::make_shared(expressions); + m_expression_tables = std::make_shared( + expressions, m_config.get_backing_store() + ); m_init = true; } diff --git a/rust/perspective-server/cpp/perspective/src/cpp/context_zero.cpp b/rust/perspective-server/cpp/perspective/src/cpp/context_zero.cpp index e2aea1e14d..2dde9d2f09 100644 --- a/rust/perspective-server/cpp/perspective/src/cpp/context_zero.cpp +++ b/rust/perspective-server/cpp/perspective/src/cpp/context_zero.cpp @@ -40,7 +40,9 @@ t_ctx0::init() { // `t_data_table`s so that each context's expressions are isolated // and do not affect other contexts when they are calculated. const auto& expressions = m_config.get_expressions(); - m_expression_tables = std::make_shared(expressions); + m_expression_tables = std::make_shared( + expressions, m_config.get_backing_store() + ); m_init = true; } diff --git a/rust/perspective-server/cpp/perspective/src/cpp/data_table.cpp b/rust/perspective-server/cpp/perspective/src/cpp/data_table.cpp index a50c129fa5..213ac72884 100644 --- a/rust/perspective-server/cpp/perspective/src/cpp/data_table.cpp +++ b/rust/perspective-server/cpp/perspective/src/cpp/data_table.cpp @@ -179,6 +179,7 @@ t_data_table::get_column_safe(std::string_view colname) { if (idx == -1) { return nullptr; } + ensure_col_resident(idx); return m_columns[idx]; } @@ -187,6 +188,7 @@ t_data_table::get_column(std::string_view colname) { PSP_TRACE_SENTINEL(); PSP_VERBOSE_ASSERT(m_init, "touching uninited object"); t_uindex idx = m_schema.get_colidx(colname.data()); + ensure_col_resident(idx); return m_columns[idx]; } @@ -195,6 +197,7 @@ t_data_table::get_column(std::string_view colname) const { PSP_TRACE_SENTINEL(); PSP_VERBOSE_ASSERT(m_init, "touching uninited object"); t_uindex idx = m_schema.get_colidx(colname.data()); + ensure_col_resident(idx); return m_columns[idx]; } @@ -206,6 +209,7 @@ t_data_table::get_column_safe(std::string_view colname) const { if (idx == -1) { return nullptr; } + ensure_col_resident(idx); return m_columns[idx]; } @@ -214,6 +218,7 @@ t_data_table::get_const_column(std::string_view colname) const { PSP_TRACE_SENTINEL(); PSP_VERBOSE_ASSERT(m_init, "touching uninited object"); t_uindex idx = m_schema.get_colidx(colname.data()); + ensure_col_resident(idx); return m_columns[idx]; } @@ -225,6 +230,7 @@ t_data_table::get_const_column_safe(std::string_view colname) const { if (idx == -1) { return nullptr; } + ensure_col_resident(idx); return m_columns[idx]; } @@ -232,6 +238,7 @@ std::shared_ptr t_data_table::get_const_column(t_uindex idx) const { PSP_TRACE_SENTINEL(); PSP_VERBOSE_ASSERT(m_init, "touching uninited object"); + ensure_col_resident(idx); return m_columns[idx]; } @@ -242,6 +249,7 @@ t_data_table::get_const_column_safe(t_uindex idx) const { if (idx == -1) { return nullptr; } + ensure_col_resident(idx); return m_columns[idx]; } @@ -250,6 +258,7 @@ t_data_table::get_columns() { std::vector rval(m_columns.size()); t_uindex idx = 0; for (const auto& c : m_columns) { + ensure_col_resident(idx); rval[idx] = c.get(); ++idx; } @@ -261,6 +270,7 @@ t_data_table::get_const_columns() const { std::vector rval(m_columns.size()); t_uindex idx = 0; for (const auto& c : m_columns) { + ensure_col_resident(idx); rval[idx] = c.get(); ++idx; } @@ -273,6 +283,9 @@ t_data_table::extend(t_uindex nelems) { PSP_VERBOSE_ASSERT(m_init, "touching uninited object"); PSP_VERBOSE_ASSERT(m_init, "Table not inited"); for (t_uindex idx = 0, loop_end = m_schema.size(); idx < loop_end; ++idx) { + // Restore before reallocating: extend/reserve realloc `m_base`, which on + // an evicted store would discard its disk-resident data. + ensure_col_resident(idx); m_columns[idx]->extend_dtype(nelems); } m_size = std::max(nelems, m_size); @@ -283,6 +296,7 @@ void t_data_table::set_size(t_uindex size) { PSP_TRACE_SENTINEL(); for (t_uindex idx = 0, loop_end = m_schema.size(); idx < loop_end; ++idx) { + ensure_col_resident(idx); m_columns[idx]->set_size(size); } m_size = size; @@ -298,6 +312,7 @@ t_data_table::reserve(t_uindex capacity) { PSP_TRACE_SENTINEL(); PSP_VERBOSE_ASSERT(m_init, "touching uninited object"); for (t_uindex idx = 0, loop_end = m_schema.size(); idx < loop_end; ++idx) { + ensure_col_resident(idx); m_columns[idx]->reserve(capacity); } set_capacity(std::max(capacity, m_capacity)); @@ -308,6 +323,7 @@ t_data_table::_get_column(std::string_view colname) { PSP_TRACE_SENTINEL(); PSP_VERBOSE_ASSERT(m_init, "touching uninited object"); t_uindex idx = m_schema.get_colidx(colname.data()); + ensure_col_resident(idx); return m_columns[idx].get(); } @@ -316,6 +332,7 @@ t_data_table::_get_const_column(std::string_view colname) const { PSP_TRACE_SENTINEL(); PSP_VERBOSE_ASSERT(m_init, "touching uninited object"); t_uindex idx = m_schema.get_colidx(colname.data()); + ensure_col_resident(idx); return m_columns[idx].get(); } @@ -323,6 +340,7 @@ const t_column* t_data_table::_get_const_column(t_uindex idx) const { PSP_TRACE_SENTINEL(); PSP_VERBOSE_ASSERT(m_init, "touching uninited object"); + ensure_col_resident(idx); return m_columns[idx].get(); } @@ -334,6 +352,7 @@ t_data_table::_get_const_column_safe(std::string_view colname) const { if (idx == t_uindex(-1)) { return nullptr; } + ensure_col_resident(idx); return m_columns[idx].get(); } diff --git a/rust/perspective-server/cpp/perspective/src/cpp/expression_tables.cpp b/rust/perspective-server/cpp/perspective/src/cpp/expression_tables.cpp index 9a42056cd0..988337a36e 100644 --- a/rust/perspective-server/cpp/perspective/src/cpp/expression_tables.cpp +++ b/rust/perspective-server/cpp/perspective/src/cpp/expression_tables.cpp @@ -11,11 +11,15 @@ // ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ #include +#include + +#include namespace perspective { t_expression_tables::t_expression_tables( - const std::vector>& expressions + const std::vector>& expressions, + t_backing_store backing_store ) { t_schema schema; t_schema transitions_schema; @@ -26,8 +30,17 @@ t_expression_tables::t_expression_tables( transitions_schema.add_column(alias, DTYPE_UINT8); } + // Only the persistent `m_master` table honors on-disk backing; the + // transitional tables are per-update scratch (cleared every update, sized + // to the update batch) so disk-backing them is pure I/O churn with no + // memory-relief benefit, and they stay in memory. + std::string master_dirname; + if (backing_store == BACKING_STORE_DISK && !expressions.empty()) { + master_dirname = create_backing_store_dir("perspective_expr_"); + } + m_master = std::make_shared( - "", "", schema, DEFAULT_EMPTY_CAPACITY, BACKING_STORE_MEMORY + "", master_dirname, schema, DEFAULT_EMPTY_CAPACITY, backing_store ); m_flattened = std::make_shared( "", "", schema, DEFAULT_EMPTY_CAPACITY, BACKING_STORE_MEMORY @@ -68,8 +81,14 @@ t_expression_tables::set_flattened( const t_schema& schema = m_flattened->get_schema(); const std::vector& column_names = schema.m_columns; for (const auto& colname : column_names) { - m_flattened->set_column( - colname, flattened->get_column(colname)->clone() + // Deep-copy into the (in-memory) transitional column in place rather + // than `clone()`-ing the source. `clone()` inherits the source's + // backing store via its recipe, and for an on-disk expression + // `m_master` that produces a broken disk clone which aliases the + // master's backing file. `copy_from` preserves `m_flattened`'s own + // (memory) backing store. + m_flattened->get_column(colname)->copy_from( + *flattened->get_column(colname) ); } } diff --git a/rust/perspective-server/cpp/perspective/src/cpp/gnode.cpp b/rust/perspective-server/cpp/perspective/src/cpp/gnode.cpp index 358d21a445..eb7e55151f 100644 --- a/rust/perspective-server/cpp/perspective/src/cpp/gnode.cpp +++ b/rust/perspective-server/cpp/perspective/src/cpp/gnode.cpp @@ -49,7 +49,11 @@ calc_negate(t_tscalar val) { return val.negate(); } -t_gnode::t_gnode(t_schema input_schema, t_schema output_schema) : +t_gnode::t_gnode( + t_schema input_schema, + t_schema output_schema, + t_backing_store backing_store +) : m_mode(NODE_PROCESSING_SIMPLE_DATAFLOW) #ifdef PSP_PARALLEL_FOR , @@ -62,6 +66,7 @@ t_gnode::t_gnode(t_schema input_schema, t_schema output_schema) : m_init(false), m_id(0), m_last_input_port_id(0), + m_backing_store(backing_store), m_pool_cleanup([]() {}) { PSP_TRACE_SENTINEL(); LOG_CONSTRUCTOR("t_gnode"); @@ -98,7 +103,9 @@ void t_gnode::init() { PSP_TRACE_SENTINEL(); - m_gstate = std::make_shared(m_input_schema, m_output_schema); + m_gstate = std::make_shared( + m_input_schema, m_output_schema, m_backing_store + ); m_gstate->init(); // Create and store the main input port, which is always port 0. The next diff --git a/rust/perspective-server/cpp/perspective/src/cpp/gnode_state.cpp b/rust/perspective-server/cpp/perspective/src/cpp/gnode_state.cpp index 097c66009f..e2e8ad0022 100644 --- a/rust/perspective-server/cpp/perspective/src/cpp/gnode_state.cpp +++ b/rust/perspective-server/cpp/perspective/src/cpp/gnode_state.cpp @@ -19,14 +19,20 @@ #include #include #include +#include #include namespace perspective { -t_gstate::t_gstate(t_schema input_schema, t_schema output_schema) : +t_gstate::t_gstate( + t_schema input_schema, + t_schema output_schema, + t_backing_store backing_store +) : m_input_schema(std::move(input_schema)), m_output_schema(std::move(output_schema)), + m_backing_store(backing_store), m_init(false) { LOG_CONSTRUCTOR("t_gstate"); } @@ -35,8 +41,16 @@ t_gstate::~t_gstate() { LOG_DESTRUCTOR("t_gstate"); } void t_gstate::init() { + // The master `t_data_table` is the canonical copy of the dataset and is the + // only table eligible for on-disk backing. When `BACKING_STORE_DISK` is + // requested, the column files live in a unique per-table directory under + // the OS temp directory (native only; WASM/OPFS is handled separately). + std::string dirname; + if (m_backing_store == BACKING_STORE_DISK) { + dirname = create_backing_store_dir("perspective_"); + } m_table = std::make_shared( - "", "", m_input_schema, DEFAULT_EMPTY_CAPACITY, BACKING_STORE_MEMORY + "", dirname, m_input_schema, DEFAULT_EMPTY_CAPACITY, m_backing_store ); m_table->init(); m_pkcol = m_table->get_column("psp_pkey"); @@ -133,18 +147,27 @@ t_gstate::fill_master_table(const std::shared_ptr& flattened) { t_uindex ncols = m_table->num_columns(); auto* master_table = m_table.get(); + const bool page_to_disk = m_backing_store == BACKING_STORE_DISK; parallel_for( int(ncols), - [&master_table, &master_table_schema, &flattened](int idx) { - // Alias each column `shared_ptr` from flattened into `m_table` - // rather than deep-cloning. + [&master_table, &master_table_schema, &flattened, page_to_disk](int idx) { const std::string& column_name = master_table_schema.m_columns[idx]; auto flattened_column = flattened->get_column_safe(column_name); if (!flattened_column) { return; } - master_table->set_column(idx, std::move(flattened_column)); + if (page_to_disk) { + // The master table is on-disk; preserve its disk-backed column + // and deep-copy the flattened (in-memory) column's data into + // it rather than aliasing the in-memory `shared_ptr`. + master_table->_get_column(column_name) + ->copy_from(*flattened_column); + } else { + // Alias each column `shared_ptr` from flattened into `m_table` + // rather than deep-cloning. + master_table->set_column(idx, std::move(flattened_column)); + } } ); @@ -198,16 +221,23 @@ t_gstate::init_from_table(const std::shared_ptr& source) { const t_schema& master_table_schema = m_table->get_schema(); t_uindex ncols = m_table->num_columns(); auto* master_table = m_table.get(); + const bool page_to_disk = m_backing_store == BACKING_STORE_DISK; parallel_for( int(ncols), - [&master_table, &master_table_schema, &source](int idx) { + [&master_table, &master_table_schema, &source, page_to_disk](int idx) { const std::string& column_name = master_table_schema.m_columns[idx]; auto src_col = source->get_column_safe(column_name); if (!src_col) { return; } - master_table->set_column(idx, std::move(src_col)); + if (page_to_disk) { + // Preserve the disk-backed master column; deep-copy rather than + // aliasing the in-memory source column. + master_table->_get_column(column_name)->copy_from(*src_col); + } else { + master_table->set_column(idx, std::move(src_col)); + } } ); diff --git a/rust/perspective-server/cpp/perspective/src/cpp/residency.cpp b/rust/perspective-server/cpp/perspective/src/cpp/residency.cpp new file mode 100644 index 0000000000..bc1123591b --- /dev/null +++ b/rust/perspective-server/cpp/perspective/src/cpp/residency.cpp @@ -0,0 +1,194 @@ +// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ +// ┃ ██████ ██████ ██████ █ █ █ █ █ █▄ ▀███ █ ┃ +// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█ ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄ ▀█ █ ▀▀▀▀▀ ┃ +// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄ █ ▄▄▄▄▄ ┃ +// ┃ █ ██████ █ ▀█▄ █ ██████ █ ███▌▐███ ███████▄ █ ┃ +// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫ +// ┃ Copyright (c) 2017, the Perspective Authors. ┃ +// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃ +// ┃ This file is part of the Perspective library, distributed under the terms ┃ +// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃ +// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ + +#include +#include +#include +#include +#include +#include +#include + +namespace perspective { + +bool g_residency_active = false; +std::uint64_t g_residency_tick = 0; + +t_residency_manager& +t_residency_manager::inst() { + static t_residency_manager s_inst; + return s_inst; +} + +void +t_residency_manager::register_store(t_lstore* store) { + std::lock_guard lk(m_mutex); + m_stores.insert(store); +} + +void +t_residency_manager::unregister_store(t_lstore* store) { + std::lock_guard lk(m_mutex); + m_stores.erase(store); +} + +// Hard-coded residency budget (bytes) for WASM. A browser has no environment, +// so `PSP_MEMORY_BUDGET` is unreachable there — without a budget, residency is +// inert, on-disk columns never evict to OPFS, and the heap can grow past the +// 2GB signed-pointer ceiling. This caps resident disk-backed column buffers so +// the cold set is flushed to OPFS. Tunable via `-DPSP_WASM_MEMORY_BUDGET=...`. +#ifndef PSP_WASM_MEMORY_BUDGET +#define PSP_WASM_MEMORY_BUDGET (1024ull * 1024ull * 1024ull) // 1 GiB +#endif + +void +t_residency_manager::refresh_config() { + std::size_t budget = 0; +#ifdef PSP_ENABLE_WASM + budget = static_cast(PSP_WASM_MEMORY_BUDGET); +#else + const char* budget_env = std::getenv("PSP_MEMORY_BUDGET"); + if (budget_env != nullptr) { + budget = static_cast(std::strtoull(budget_env, nullptr, 10)); + } +#endif + + bool was_active = g_residency_active; + m_budget = budget; + g_residency_active = budget > 0; + + // If residency was just disabled, restore every evicted store so the lazy + // `ensure_resident()` hook ( + // now a no-op) never sees a null `m_base`. + if (was_active && !g_residency_active) { + for (auto* store : m_stores) { + store->restore(); + } + } +} + +std::size_t +t_residency_manager::resident_bytes() { + std::lock_guard lk(m_mutex); + std::size_t total = 0; + for (auto* store : m_stores) { + if (store->is_resident()) { + total += store->capacity(); + } + } + + return total; +} + +std::size_t +t_residency_manager::prepare() { + m_pending.clear(); + m_pending_fnames.clear(); + + refresh_config(); + if (!g_residency_active) { + return 0; + } + + std::lock_guard lk(m_mutex); + g_residency_tick = ++m_tick; + std::size_t resident = 0; + std::vector candidates; + candidates.reserve(m_stores.size()); + for (auto* store : m_stores) { + if (store->is_resident()) { + resident += store->capacity(); + candidates.push_back(store); + } + } + + if (resident <= m_budget) { + return 0; + } + + std::sort( + candidates.begin(), + candidates.end(), + [](const t_lstore* a, const t_lstore* b) { + return a->residency_tick() < b->residency_tick(); + } + ); + + // Select victims to bring under budget, but do NOT evict yet — the JS driver + // must open each victim's OPFS handle before `commit()` can flush it. + for (auto* store : candidates) { + if (resident <= m_budget) { + break; + } + + resident -= store->capacity(); + m_pending.push_back(store); + m_pending_fnames.push_back(store->get_fname()); + } + + return m_pending.size(); +} + +const char* +t_residency_manager::victim_fname(std::size_t i) const { + if (i >= m_pending_fnames.size()) { + return ""; + } + return m_pending_fnames[i].c_str(); +} + +void +t_residency_manager::commit() { + std::size_t n = 0; + { + std::lock_guard lk(m_mutex); + for (auto* store : m_pending) { + store->evict(); + ++m_evictions; + } + + n = m_pending.size(); + } + + m_pending.clear(); + m_pending_fnames.clear(); + if (n == 0) { + return; + } + + // // TODO: No diagnostics hooks + // // Test/diagnostic hook: dump cumulative stats so a harness can confirm + // // eviction is actually occurring. + // const char* stats_file = std::getenv("PSP_RESIDENCY_STATS_FILE"); + // if (stats_file != nullptr) { + // FILE* f = std::fopen(stats_file, "w"); + // if (f != nullptr) { + // std::fprintf( + // f, + // "evictions=%llu restores=%llu budget=%zu\n", + // static_cast(m_evictions), + // static_cast(m_restores), + // m_budget + // ); + // std::fclose(f); + // } + // } +} + +void +t_residency_manager::safepoint() { + // Native (mmap) path: no async handle setup, so both phases run inline. + prepare(); + commit(); +} + +} // namespace perspective diff --git a/rust/perspective-server/cpp/perspective/src/cpp/server.cpp b/rust/perspective-server/cpp/perspective/src/cpp/server.cpp index 0ea0410110..4955833b8b 100644 --- a/rust/perspective-server/cpp/perspective/src/cpp/server.cpp +++ b/rust/perspective-server/cpp/perspective/src/cpp/server.cpp @@ -31,6 +31,8 @@ #include #include #include +#include +#include #include #include #include @@ -105,6 +107,7 @@ make_context( auto expressions = view_config->get_used_expressions(); auto cfg = t_config(columns, fterm, filter_op, expressions); + cfg.set_backing_store(table->get_backing_store()); auto ctx0 = std::make_shared(*schema, cfg); ctx0->init(); ctx0->sort_by(sortspec); @@ -137,6 +140,7 @@ make_context( auto expressions = view_config->get_used_expressions(); auto cfg = t_config(row_pivots, aggspecs, fterm, filter_op, expressions); + cfg.set_backing_store(table->get_backing_store()); auto ctx1 = std::make_shared(*schema, cfg); ctx1->init(); @@ -196,6 +200,7 @@ make_context( expressions, column_only ); + cfg.set_backing_store(table->get_backing_store()); auto ctx2 = std::make_shared(*schema, cfg); ctx2->init(); @@ -876,6 +881,15 @@ ProtoServer::handle_request( std::chrono::duration_cast(end - start) .count(); +#ifndef PSP_ENABLE_WASM + // Request safepoint: the response is fully serialized, so no raw column + // pointer is live. Trim resident disk-backed buffers to the memory budget. + // Native (mmap) evicts inline here. On WASM/OPFS eviction needs an async + // handle-open step, so the JS engine drives it (`prepare`/open/`commit`) + // after this synchronous call returns — see `engine.ts`/the poly. + t_residency_manager::inst().safepoint(); +#endif + return serialized_responses; } @@ -934,9 +948,32 @@ ProtoServer::poll() { std::chrono::duration_cast(end - start) .count(); + // Request safepoint (see `handle_request`). +#ifndef PSP_ENABLE_WASM + t_residency_manager::inst().safepoint(); +#endif + return out; } +// WASM/OPFS safepoint, driven from JS so the async OPFS handle-open step can run +// between the two synchronous phases. `prepare` selects victims, the JS driver +// opens each `residency_victim_fname(i)` handle, then `commit` flushes + frees. +std::size_t +ProtoServer::residency_prepare() { + return t_residency_manager::inst().prepare(); +} + +const char* +ProtoServer::residency_victim_fname(std::size_t i) { + return t_residency_manager::inst().victim_fname(i); +} + +void +ProtoServer::residency_commit() { + t_residency_manager::inst().commit(); +} + proto::ColumnType dtype_to_column_type(const t_dtype& t) { switch (t) { @@ -1575,6 +1612,13 @@ ProtoServer::_handle_request(std::uint32_t client_id, Request&& req) { break; } + // On-disk backing: native uses a memory-mapped file; WASM uses the + // WasmFS/OPFS backend (`storage_impl_wasm.cpp`). The browser JS + // bootstrap mounts OPFS at `/perspective` before any table is built. + t_backing_store backing_store = r.options().page_to_disk() + ? BACKING_STORE_DISK + : BACKING_STORE_MEMORY; + switch (r.data().data_case()) { case proto::MakeTableData::kFromView: { auto view = m_resources.get_view(r.data().from_view()); @@ -1594,42 +1638,54 @@ ProtoServer::_handle_request(std::uint32_t client_id, Request&& req) { dims.end_col ); - table = Table::from_arrow(index, std::move(*arrow), limit); + table = Table::from_arrow( + index, std::move(*arrow), limit, backing_store + ); break; } case proto::MakeTableData::kFromArrow: { std::string data = r.data().from_arrow(); { auto _ = std::move(req); } - table = Table::from_arrow(index, std::move(data), limit); + table = Table::from_arrow( + index, std::move(data), limit, backing_store + ); break; } case proto::MakeTableData::kFromCsv: { std::string data = r.data().from_csv(); { auto _ = std::move(req); } - table = Table::from_csv(index, std::move(data), limit); + table = Table::from_csv( + index, std::move(data), limit, backing_store + ); break; } case proto::MakeTableData::kFromCols: { std::string data = r.data().from_cols(); { auto _ = std::move(req); } - table = Table::from_cols(index, std::move(data), limit); + table = Table::from_cols( + index, std::move(data), limit, backing_store + ); break; } case proto::MakeTableData::kFromRows: { std::string data = r.data().from_rows(); { auto _ = std::move(req); } - table = Table::from_rows(index, std::move(data), limit); + table = Table::from_rows( + index, std::move(data), limit, backing_store + ); break; } case proto::MakeTableData::kFromNdjson: { std::string data = r.data().from_ndjson(); { auto _ = std::move(req); } - table = Table::from_ndjson(index, std::move(data), limit); + table = Table::from_ndjson( + index, std::move(data), limit, backing_store + ); break; } case proto::MakeTableData::kFromSchema: { @@ -1642,7 +1698,9 @@ ProtoServer::_handle_request(std::uint32_t client_id, Request&& req) { } t_schema table_schema(columns, types); - table = Table::from_schema(index, table_schema, limit); + table = Table::from_schema( + index, table_schema, limit, backing_store + ); break; } case proto::MakeTableData::DATA_NOT_SET: { diff --git a/rust/perspective-server/cpp/perspective/src/cpp/storage.cpp b/rust/perspective-server/cpp/perspective/src/cpp/storage.cpp index 7581ec8a49..9ae9b513e5 100644 --- a/rust/perspective-server/cpp/perspective/src/cpp/storage.cpp +++ b/rust/perspective-server/cpp/perspective/src/cpp/storage.cpp @@ -24,6 +24,7 @@ SUPPRESS_WARNINGS_VC(4505) #include #include #include +#include #include #include #include @@ -243,6 +244,7 @@ t_lstore::~t_lstore() { switch (m_backing_store) { case BACKING_STORE_DISK: { + t_residency_manager::inst().unregister_store(this); destroy_mapping(); close_file(m_fd); @@ -344,6 +346,11 @@ t_lstore::init() { } m_init = true; + + if (m_backing_store == BACKING_STORE_DISK) { + m_residency_tick = g_residency_tick; + t_residency_manager::inst().register_store(this); + } } void @@ -360,6 +367,7 @@ void t_lstore::reserve_impl(t_uindex capacity, bool allow_shrink) { PSP_TRACE_SENTINEL(); PSP_VERBOSE_ASSERT(m_init, "touching uninited object"); + ensure_resident(); if ((capacity < m_capacity) && !allow_shrink) { return; } @@ -517,6 +525,7 @@ t_lstore::get_fname() const { void t_lstore::push_back(const void* ptr, t_uindex len) { PSP_TRACE_SENTINEL(); + ensure_resident(); if (m_size + len >= m_capacity) { reserve(static_cast(m_size + len) ); // reserve() will multiply by m_resize_factor internally @@ -535,11 +544,13 @@ t_lstore::push_back(const void* ptr, t_uindex len) { void* t_lstore::get_ptr(t_uindex offset) { + ensure_resident(); return static_cast(static_cast(m_base) + offset); } const void* t_lstore::get_ptr(t_uindex offset) const { + const_cast(this)->ensure_resident(); return static_cast(static_cast(m_base) + offset); } @@ -561,6 +572,7 @@ void t_lstore::append(const t_lstore& other) { PSP_TRACE_SENTINEL(); PSP_VERBOSE_ASSERT(m_init, "touching uninited object"); + const_cast(other).ensure_resident(); push_back(other.m_base, other.size()); } @@ -569,6 +581,7 @@ t_lstore::clear() { PSP_TRACE_SENTINEL(); PSP_VERBOSE_ASSERT(m_init, "touching uninited object"); #ifndef PSP_ENABLE_WASM + ensure_resident(); memset(m_base, 0, size_t(capacity())); #endif { @@ -597,10 +610,26 @@ t_lstore::get_recipe() const { return rval; } +t_lstore_recipe +t_lstore::get_clone_recipe() const { + t_lstore_recipe rval = get_recipe(); + // `get_recipe()` produces a `from_recipe` recipe that re-maps *this* store's + // backing file — correct for serialization/reconstruction, but wrong for + // cloning a `BACKING_STORE_DISK` store: the copy would map (and, on + // destruction, `rmfile`) the source's file. Clearing `m_from_recipe` makes + // the `t_lstore` constructor mint a fresh, independent backing file for the + // clone. (Memory stores ignore the file entirely, so this is a no-op there.) + if (m_backing_store == BACKING_STORE_DISK) { + rval.m_from_recipe = false; + } + return rval; +} + void t_lstore::fill(const t_lstore& other) { PSP_TRACE_SENTINEL(); PSP_VERBOSE_ASSERT(m_init, "touching uninited object"); + const_cast(other).ensure_resident(); reserve(other.size()); memcpy(m_base, const_cast(other.m_base), size_t(other.size())); set_size(other.size()); @@ -647,7 +676,7 @@ t_lstore::pprint() const { std::shared_ptr t_lstore::clone() const { - auto recipe = get_recipe(); + auto recipe = get_clone_recipe(); std::shared_ptr rval(new t_lstore(recipe)); rval->init(); rval->set_size(m_size); @@ -655,4 +684,51 @@ t_lstore::clone() const { return rval; } +void +t_lstore::evict() { + if (m_backing_store != BACKING_STORE_DISK || !m_init || m_base == nullptr) { + return; + } +#ifdef PSP_HAS_OPFS_BRIDGE + // Flush the resident buffer to OPFS (suspending), then free it. Only valid + // from a `promising` safepoint — never mid-request. + psp_opfs_store( + m_fname.c_str(), m_base, static_cast(capacity()) + ); + free(m_base); + m_base = nullptr; +#else + // Native: `destroy_mapping()` (`munmap`) writes back the `MAP_SHARED` pages; + // the file `m_fd` stays open for a later `restore()`. + destroy_mapping(); + m_base = nullptr; +#endif + ++m_version; +} + +void +t_lstore::restore() { + if (m_backing_store != BACKING_STORE_DISK || !m_init || m_base != nullptr) { + return; + } + // May be called from parallel-for workers within a request (native); the + // OPFS path is only entered from the single-threaded promising safepoint. + std::lock_guard lk(t_residency_manager::inst().mutex()); + if (m_base != nullptr) { + return; + } +#ifdef PSP_HAS_OPFS_BRIDGE + auto cap = static_cast(capacity()); + m_base = calloc(std::max(cap, static_cast(1)), 1); + psp_opfs_load(m_fname.c_str(), m_base, static_cast(capacity())); +#else + m_base = create_mapping(); +#endif + // Stamp LRU recency at bring-in time. Moved here from `ensure_resident()` so + // the per-access hot path reads no residency global (see `storage.h`). + m_residency_tick = g_residency_tick; + t_residency_manager::inst().note_restore(); + ++m_version; +} + } // end namespace perspective diff --git a/rust/perspective-server/cpp/perspective/src/cpp/storage_impl_linux.cpp b/rust/perspective-server/cpp/perspective/src/cpp/storage_impl_linux.cpp index 7c0f6ebd8c..ce1477373f 100644 --- a/rust/perspective-server/cpp/perspective/src/cpp/storage_impl_linux.cpp +++ b/rust/perspective-server/cpp/perspective/src/cpp/storage_impl_linux.cpp @@ -12,7 +12,10 @@ #include -#ifdef __linux__ +// NB: `first.h` force-defines `__linux__` on any non-mac/non-win platform, +// *including emscripten*. Exclude WASM here so `storage_impl_wasm.cpp` (and not +// this `open`/`mmap` implementation) provides the storage backend on WASM. +#if defined(__linux__) && !defined(PSP_ENABLE_WASM) #include #include #include @@ -107,6 +110,9 @@ t_lstore::resize_mapping(t_uindex cap_new) { void t_lstore::destroy_mapping() { + if (m_base == nullptr) { + return; // already evicted + } t_index rc = munmap(m_base, capacity()); PSP_VERBOSE_ASSERT(!rc, "Failed to destroy mapping"); } diff --git a/rust/perspective-server/cpp/perspective/src/cpp/storage_impl_osx.cpp b/rust/perspective-server/cpp/perspective/src/cpp/storage_impl_osx.cpp index 473adf4264..082af9a586 100644 --- a/rust/perspective-server/cpp/perspective/src/cpp/storage_impl_osx.cpp +++ b/rust/perspective-server/cpp/perspective/src/cpp/storage_impl_osx.cpp @@ -113,6 +113,9 @@ t_lstore::resize_mapping(t_uindex cap_new) { void t_lstore::destroy_mapping() { + if (m_base == nullptr) { + return; // already evicted + } t_index rc = munmap(m_base, capacity()); PSP_VERBOSE_ASSERT(rc, == 0, "Failed to destroy mapping"); } diff --git a/rust/perspective-server/cpp/perspective/src/cpp/storage_impl_wasm.cpp b/rust/perspective-server/cpp/perspective/src/cpp/storage_impl_wasm.cpp new file mode 100644 index 0000000000..8c81b36762 --- /dev/null +++ b/rust/perspective-server/cpp/perspective/src/cpp/storage_impl_wasm.cpp @@ -0,0 +1,127 @@ +// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ +// ┃ ██████ ██████ ██████ █ █ █ █ █ █▄ ▀███ █ ┃ +// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█ ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄ ▀█ █ ▀▀▀▀▀ ┃ +// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄ █ ▄▄▄▄▄ ┃ +// ┃ █ ██████ █ ▀█▄ █ ██████ █ ███▌▐███ ███████▄ █ ┃ +// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫ +// ┃ Copyright (c) 2017, the Perspective Authors. ┃ +// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃ +// ┃ This file is part of the Perspective library, distributed under the terms ┃ +// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃ +// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ + +#include + +#ifdef PSP_ENABLE_WASM + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace perspective { + +// ┌─────────────────────────────────────────────────────────────────────────┐ +// │ Emulated mmap over WasmFS/OPFS │ +// │ │ +// │ WasmFS has no usable file-backed `mmap` (and even where one exists it │ +// │ copies the whole file into linear memory), so `BACKING_STORE_DISK` on │ +// │ WASM keeps a resident `malloc` buffer (`m_base`) that mirrors the backing │ +// │ file. Reads/writes go through `m_base`; the file is the durable copy that │ +// │ is (re)read via `pread` on mapping and written via `pwrite` on flush. │ +// │ │ +// │ Within a session `m_base` is the source of truth — the file is sized to │ +// │ match (so a later read/restore is correct) but is only written when the │ +// │ buffer is flushed. This is the substrate the residency manager (Phase 4) │ +// │ evicts: `pwrite` the buffer, `free` it, keep the file; restore by │ +// │ re-`pread`ing into a fresh buffer. │ +// └─────────────────────────────────────────────────────────────────────────┘ + +t_lstore::t_lstore(const t_lstore_recipe& a) : + m_base(nullptr), + m_dirname(a.m_dirname), + m_colname(a.m_colname), + m_fd(-1), + m_capacity(a.m_capacity), + m_size(0), + m_alignment(a.m_alignment), + m_fflags(a.m_fflags), + m_fmode(a.m_fmode), + m_creation_disposition(a.m_creation_disposition), + m_mprot(a.m_mprot), + m_mflags(a.m_mflags), + m_backing_store(a.m_backing_store), + m_init(false), + m_resize_factor(1.3), + m_version(0), + m_from_recipe(a.m_from_recipe) { + if (m_from_recipe) { + m_fname = a.m_fname; + return; + } + + if (m_backing_store == BACKING_STORE_DISK) { + std::stringstream ss; + ss << a.m_dirname << "/" + << "_col_" << a.m_colname << "_" << this; + m_fname = unique_path(ss.str()); + } +} + +// A disk-backed column on WASM is just a resident `malloc` buffer; there is no +// OS file (the build is `NO_FILESYSTEM`). Persistence to OPFS is performed by +// the residency manager (`t_lstore::evict`/`restore` in storage.cpp) via the +// `psp_opfs_*` bridge, at promising safepoints. `m_fname` is the OPFS key. + +t_handle +t_lstore::create_file() { + // No OS file; return a sentinel handle. + return 1; +} + +void* +t_lstore::create_mapping() { + auto cap = static_cast(capacity()); + void* base = calloc(std::max(cap, static_cast(1)), 1); + PSP_VERBOSE_ASSERT(base != nullptr, "calloc failed"); + return base; +} + +void +t_lstore::resize_mapping(t_uindex cap_new) { + void* base = realloc(m_base, static_cast(cap_new)); + PSP_VERBOSE_ASSERT(base != nullptr, "realloc failed"); + m_base = base; + m_capacity = cap_new; +} + +void +t_lstore::destroy_mapping() { + if (m_base != nullptr) { + free(m_base); + m_base = nullptr; + } +} + +void +t_lstore::freeze_impl() { + PSP_COMPLAIN_AND_ABORT("Not implemented"); +} + +void +t_lstore::unfreeze_impl() { + PSP_COMPLAIN_AND_ABORT("Not implemented"); +} + +} // end namespace perspective + +#endif diff --git a/rust/perspective-server/cpp/perspective/src/cpp/table.cpp b/rust/perspective-server/cpp/perspective/src/cpp/table.cpp index 9c26f479b1..86b9ddcae4 100644 --- a/rust/perspective-server/cpp/perspective/src/cpp/table.cpp +++ b/rust/perspective-server/cpp/perspective/src/cpp/table.cpp @@ -39,7 +39,8 @@ Table::Table( std::vector column_names, std::vector data_types, std::uint32_t limit, - std::string index + std::string index, + t_backing_store backing_store ) : m_init(false), m_id(GLOBAL_TABLE_ID++), @@ -49,7 +50,8 @@ Table::Table( m_offset(0), m_limit(limit), m_index(std::move(index)), - m_gnode_set(false) { + m_gnode_set(false), + m_backing_store(backing_store) { validate_columns(m_column_names); } @@ -206,7 +208,8 @@ Table::validate_expressions( std::shared_ptr Table::make_gnode(const t_schema& in_schema) { t_schema out_schema = in_schema.drop({"psp_pkey", "psp_op"}); - auto gnode = std::make_shared(in_schema, out_schema); + auto gnode = + std::make_shared(in_schema, out_schema, m_backing_store); gnode->init(); return gnode; } @@ -300,6 +303,11 @@ Table::get_limit() const { return m_limit; } +t_backing_store +Table::get_backing_store() const { + return m_backing_store; +} + void Table::set_column_names(const std::vector& column_names) { validate_columns(column_names); @@ -382,7 +390,10 @@ Table::update_csv(const std::string_view& data, std::uint32_t port_id) { std::shared_ptr
Table::from_csv( - const std::string& index, std::string&& data, std::uint32_t limit + const std::string& index, + std::string&& data, + std::uint32_t limit, + t_backing_store backing_store ) { auto map = std::unordered_map>(); @@ -423,8 +434,9 @@ Table::from_csv( auto pool = std::make_shared(); pool->init(); - auto tbl = - std::make_shared
(pool, column_names, data_types, limit, index); + auto tbl = std::make_shared
( + pool, column_names, data_types, limit, index, backing_store + ); // `psp_pkey` is guaranteed unique only when the index is implicit (a // generated row-number). Explicit indexes or `__INDEX__` columns can @@ -1029,7 +1041,10 @@ Table::update_cols(const std::string_view& data, std::uint32_t port_id) { std::shared_ptr
Table::from_cols( - const std::string& index, std::string&& data, std::uint32_t limit + const std::string& index, + std::string&& data, + std::uint32_t limit, + t_backing_store backing_store ) { // 1.) Infer schema rapidjson::Document document; @@ -1136,7 +1151,7 @@ Table::from_cols( auto pool = std::make_shared(); pool->init(); auto tbl = std::make_shared
( - pool, schema.columns(), schema.types(), limit, index + pool, schema.columns(), schema.types(), limit, index, backing_store ); tbl->init(*data_table, nrows, t_op::OP_INSERT, 0); @@ -1249,7 +1264,10 @@ Table::update_rows(const std::string_view& data, std::uint32_t port_id) { std::shared_ptr
Table::from_rows( - const std::string& index, std::string&& data, std::uint32_t limit + const std::string& index, + std::string&& data, + std::uint32_t limit, + t_backing_store backing_store ) { // 1.) Infer schema rapidjson::Document document; @@ -1371,7 +1389,7 @@ Table::from_rows( auto pool = std::make_shared(); pool->init(); auto tbl = std::make_shared
( - pool, schema.columns(), schema.types(), limit, index + pool, schema.columns(), schema.types(), limit, index, backing_store ); tbl->init(*data_table, document.Size(), t_op::OP_INSERT, 0); @@ -1490,7 +1508,10 @@ Table::update_ndjson(const std::string_view& data, std::uint32_t port_id) { std::shared_ptr
Table::from_ndjson( - const std::string& index, std::string&& data, std::uint32_t limit + const std::string& index, + std::string&& data, + std::uint32_t limit, + t_backing_store backing_store ) { // 1.) Infer schema rapidjson::Document document; @@ -1625,7 +1646,7 @@ Table::from_ndjson( auto pool = std::make_shared(); pool->init(); auto tbl = std::make_shared
( - pool, schema.columns(), schema.types(), limit, index + pool, schema.columns(), schema.types(), limit, index, backing_store ); tbl->init(*data_table, ii, t_op::OP_INSERT, 0); @@ -1636,7 +1657,10 @@ Table::from_ndjson( std::shared_ptr
Table::from_schema( - const std::string& index, const t_schema& schema, std::uint32_t limit + const std::string& index, + const t_schema& schema, + std::uint32_t limit, + t_backing_store backing_store ) { auto pool = std::make_shared(); pool->init(); @@ -1661,7 +1685,7 @@ Table::from_schema( } auto tbl = std::make_shared
( - pool, schema.columns(), schema.types(), limit, index + pool, schema.columns(), schema.types(), limit, index, backing_store ); tbl->init(data_table, 0, t_op::OP_INSERT, 0); @@ -1703,7 +1727,10 @@ Table::update_arrow(const std::string_view& data, std::uint32_t port_id) { std::shared_ptr
Table::from_arrow( - const std::string& index, std::string&& data, std::uint32_t limit + const std::string& index, + std::string&& data, + std::uint32_t limit, + t_backing_store backing_store ) { apachearrow::ArrowLoader arrow_loader; @@ -1744,7 +1771,8 @@ Table::from_arrow( // Make Table auto pool = std::make_shared(); pool->init(); - auto table = std::make_shared
(pool, columns, types, limit, index); + auto table = + std::make_shared
(pool, columns, types, limit, index, backing_store); table->init(*data_table, data_table->num_rows(), t_op::OP_INSERT, 0); data_table.reset(); pool->_process(); @@ -1757,7 +1785,8 @@ Table::make_table( const std::vector& data_types, std::uint32_t limit, const std::string& index, - const std::string_view& data + const std::string_view& data, + t_backing_store backing_store ) { auto pool = std::make_shared(); pool->init(); @@ -1781,7 +1810,9 @@ Table::make_table( } auto columns = data_table.get_schema().columns(); auto dtypes = data_table.get_schema().types(); - auto table = std::make_shared
(pool, columns, dtypes, limit, index); + auto table = std::make_shared
( + pool, columns, dtypes, limit, index, backing_store + ); table->init(data_table, data_table.num_rows(), t_op::OP_INSERT, 0); pool->_process(); return table; diff --git a/rust/perspective-server/cpp/perspective/src/cpp/utils.cpp b/rust/perspective-server/cpp/perspective/src/cpp/utils.cpp index 8f6916897a..5ac5a72bd5 100644 --- a/rust/perspective-server/cpp/perspective/src/cpp/utils.cpp +++ b/rust/perspective-server/cpp/perspective/src/cpp/utils.cpp @@ -17,18 +17,61 @@ #include #include #include +#include #include #include #include +#ifdef PSP_ENABLE_WASM +#include +#include +#else +#include +#endif #include namespace perspective { std::string unique_path(const std::string& path_prefix) { +#ifdef PSP_ENABLE_WASM + // `boost::uuids::random_generator()` seeds itself by opening `/dev/urandom` + // directly, which aborts under emscripten's `NO_FILESYSTEM`. We don't need + // cryptographic randomness for an OPFS key — a process-unique counter, + // seeded from wall-clock time (via `clock_time_get`, no filesystem) so keys + // differ across reloads, is sufficient. + static std::atomic s_counter{ + static_cast( + std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch() + ) + .count() + ) + }; + std::stringstream ss; + ss << path_prefix << s_counter.fetch_add(1); + return ss.str(); +#else std::stringstream ss; ss << path_prefix << boost::uuids::random_generator()(); return ss.str(); +#endif +} + +std::string +create_backing_store_dir(const std::string& prefix) { +#ifdef PSP_ENABLE_WASM + // On WASM there is no OS filesystem (NO_FILESYSTEM): this is just a unique + // logical path used as the OPFS key prefix for a table's column files. The + // actual OPFS directory/file is created lazily by the JS bridge + // (`perspective-server.poly.ts` `opfsOpenFile`) when the residency manager + // first flushes a column. `/perspective` is the OPFS root namespace. + return unique_path(std::string("/perspective/") + prefix); +#else + namespace fs = std::filesystem; + fs::path dir = fs::temp_directory_path() / unique_path(prefix); + fs::create_directories(dir); + return dir.string(); +#endif } template diff --git a/rust/perspective-server/cpp/perspective/src/include/perspective/column.h b/rust/perspective-server/cpp/perspective/src/include/perspective/column.h index a3a593d4b3..a5f298bae5 100644 --- a/rust/perspective-server/cpp/perspective/src/include/perspective/column.h +++ b/rust/perspective-server/cpp/perspective/src/include/perspective/column.h @@ -198,6 +198,23 @@ class PERSPECTIVE_EXPORT t_column { std::shared_ptr clone(const t_mask& mask) const; + /** + * @brief Deep-copy the contents of `other` into this column, preserving + * *this* column's backing store (e.g. `BACKING_STORE_DISK`). Unlike + * `clone()`, which inherits the source column's backing store, this fills + * an already-initialized destination column in place. + * + * @param other + */ + void copy_from(const t_column& other); + + // Ensure this column's backing stores (data, status, string vocab) are + // resident, restoring any the residency manager evicted and stamping LRU + // recency. No-op for resident / non-disk columns. Called once when the column + // is fetched (`t_data_table::get_column`), so per-element accessors stay + // check-free; safe because eviction only happens at request safepoints. + void ensure_resident(); + void valid_raw_fill(); void invalid_raw_fill(); diff --git a/rust/perspective-server/cpp/perspective/src/include/perspective/config.h b/rust/perspective-server/cpp/perspective/src/include/perspective/config.h index 3128d786e5..9ec4f6dbfa 100644 --- a/rust/perspective-server/cpp/perspective/src/include/perspective/config.h +++ b/rust/perspective-server/cpp/perspective/src/include/perspective/config.h @@ -205,6 +205,19 @@ class PERSPECTIVE_EXPORT t_config { return m_grand_agg_str; } + // The backing store for any persistent storage created by the context built + // from this config (currently the expression `m_master` table). Inherited + // from the parent `Table`'s backing store. Defaults to memory. + inline void + set_backing_store(t_backing_store backing_store) { + m_backing_store = backing_store; + } + + inline t_backing_store + get_backing_store() const { + return m_backing_store; + } + protected: void populate_sortby(const std::vector& pivots); @@ -235,6 +248,7 @@ class PERSPECTIVE_EXPORT t_config { std::string m_grand_agg_str; t_fmode m_fmode; bool m_has_pkey_agg; + t_backing_store m_backing_store = BACKING_STORE_MEMORY; }; } // end namespace perspective diff --git a/rust/perspective-server/cpp/perspective/src/include/perspective/data_table.h b/rust/perspective-server/cpp/perspective/src/include/perspective/data_table.h index e6a0146d1e..f200767240 100644 --- a/rust/perspective-server/cpp/perspective/src/include/perspective/data_table.h +++ b/rust/perspective-server/cpp/perspective/src/include/perspective/data_table.h @@ -249,6 +249,19 @@ class PERSPECTIVE_EXPORT t_data_table { std::string repr() const; private: + // Residency chokepoint: ensure column `idx` is resident before it is handed + // out. Disk-backed tables only (one predicted-not-taken branch otherwise); + // every disk-column access fetches the column by name/index first and caches + // it (the schema hash-lookup forbids per-row re-fetch), so this runs once per + // column per operation — keeping per-element accessors check-free while + // preserving fine-grained, per-column lazy restore. + inline void + ensure_col_resident(t_uindex idx) const { + if (m_backing_store == BACKING_STORE_DISK) { + m_columns[idx]->ensure_resident(); + } + } + std::string m_name; std::string m_dirname; t_schema m_schema; diff --git a/rust/perspective-server/cpp/perspective/src/include/perspective/expression_tables.h b/rust/perspective-server/cpp/perspective/src/include/perspective/expression_tables.h index 5debcc7746..0ff88af888 100644 --- a/rust/perspective-server/cpp/perspective/src/include/perspective/expression_tables.h +++ b/rust/perspective-server/cpp/perspective/src/include/perspective/expression_tables.h @@ -30,7 +30,8 @@ struct t_expression_tables { PSP_NON_COPYABLE(t_expression_tables); t_expression_tables( - const std::vector>& expressions + const std::vector>& expressions, + t_backing_store backing_store = BACKING_STORE_MEMORY ); /** diff --git a/rust/perspective-server/cpp/perspective/src/include/perspective/gnode.h b/rust/perspective-server/cpp/perspective/src/include/perspective/gnode.h index c585a17a96..772b58ed2b 100644 --- a/rust/perspective-server/cpp/perspective/src/include/perspective/gnode.h +++ b/rust/perspective-server/cpp/perspective/src/include/perspective/gnode.h @@ -93,7 +93,11 @@ class PERSPECTIVE_EXPORT t_gnode { * @param input_schema * @param output_schema */ - t_gnode(t_schema input_schema, t_schema output_schema); + t_gnode( + t_schema input_schema, + t_schema output_schema, + t_backing_store backing_store = BACKING_STORE_MEMORY + ); ~t_gnode(); void init(); @@ -410,6 +414,7 @@ class PERSPECTIVE_EXPORT t_gnode { std::vector> m_oports; tsl::ordered_map m_contexts; std::shared_ptr m_gstate; + t_backing_store m_backing_store; std::chrono::high_resolution_clock::time_point m_epoch; std::function m_pool_cleanup; diff --git a/rust/perspective-server/cpp/perspective/src/include/perspective/gnode_state.h b/rust/perspective-server/cpp/perspective/src/include/perspective/gnode_state.h index 79e00a2d46..148ee9be05 100644 --- a/rust/perspective-server/cpp/perspective/src/include/perspective/gnode_state.h +++ b/rust/perspective-server/cpp/perspective/src/include/perspective/gnode_state.h @@ -47,7 +47,11 @@ class PERSPECTIVE_EXPORT t_gstate { * @param input_schema * @param output_schema */ - t_gstate(t_schema input_schema, t_schema output_schema); + t_gstate( + t_schema input_schema, + t_schema output_schema, + t_backing_store backing_store = BACKING_STORE_MEMORY + ); ~t_gstate(); @@ -308,6 +312,7 @@ class PERSPECTIVE_EXPORT t_gstate { t_schema m_input_schema; // pkeyed t_schema m_output_schema; // tblschema + t_backing_store m_backing_store; bool m_init; std::shared_ptr m_table; diff --git a/rust/perspective-server/cpp/perspective/src/include/perspective/opfs.h b/rust/perspective-server/cpp/perspective/src/include/perspective/opfs.h new file mode 100644 index 0000000000..282c58da38 --- /dev/null +++ b/rust/perspective-server/cpp/perspective/src/include/perspective/opfs.h @@ -0,0 +1,43 @@ +// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ +// ┃ ██████ ██████ ██████ █ █ █ █ █ █▄ ▀███ █ ┃ +// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█ ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄ ▀█ █ ▀▀▀▀▀ ┃ +// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄ █ ▄▄▄▄▄ ┃ +// ┃ █ ██████ █ ▀█▄ █ ██████ █ ███▌▐███ ███████▄ █ ┃ +// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫ +// ┃ Copyright (c) 2017, the Perspective Authors. ┃ +// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃ +// ┃ This file is part of the Perspective library, distributed under the terms ┃ +// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃ +// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ + +#pragma once + +// ┌─────────────────────────────────────────────────────────────────────────┐ +// │ Custom OPFS bridge (WASM, path 3) │ +// │ │ +// │ On WASM, a `BACKING_STORE_DISK` column's bytes live in a resident `malloc`│ +// │ buffer (`m_base`). The residency manager flushes that buffer to / reloads │ +// │ it from the browser's OPFS through these three imports, implemented in JS │ +// │ by `perspective-server.poly.ts` using `FileSystemSyncAccessHandle`. │ +// │ │ +// │ The OPFS *open* (`createSyncAccessHandle`) is async, so these imports are │ +// │ JSPI-**suspending**: they may ONLY be called from a `WebAssembly.promising`│ +// │ export — i.e. the residency safepoint / pre-request restore entry points, │ +// │ never from the synchronous `psp_handle_request`. This keeps the engine's │ +// │ request path synchronous while avoiding SharedArrayBuffer/COOP-COEP. │ +// │ │ +// │ `name` is the column's OPFS file key (`m_fname`); `data`/`len` is the │ +// │ resident buffer. Return value is bytes transferred, or negative on error. │ +// └─────────────────────────────────────────────────────────────────────────┘ + +#if defined(PSP_ENABLE_WASM) && defined(PSP_WASM_OPFS_PERSIST) + +extern "C" { +int psp_opfs_store(const char* name, const void* data, int len); +int psp_opfs_load(const char* name, void* data, int len); +void psp_opfs_remove(const char* name); +} + +#define PSP_HAS_OPFS_BRIDGE 1 + +#endif diff --git a/rust/perspective-server/cpp/perspective/src/include/perspective/residency.h b/rust/perspective-server/cpp/perspective/src/include/perspective/residency.h new file mode 100644 index 0000000000..4ae9616e15 --- /dev/null +++ b/rust/perspective-server/cpp/perspective/src/include/perspective/residency.h @@ -0,0 +1,93 @@ +// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ +// ┃ ██████ ██████ ██████ █ █ █ █ █ █▄ ▀███ █ ┃ +// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█ ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄ ▀█ █ ▀▀▀▀▀ ┃ +// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄ █ ▄▄▄▄▄ ┃ +// ┃ █ ██████ █ ▀█▄ █ ██████ █ ███▌▐███ ███████▄ █ ┃ +// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫ +// ┃ Copyright (c) 2017, the Perspective Authors. ┃ +// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃ +// ┃ This file is part of the Perspective library, distributed under the terms ┃ +// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃ +// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ + +#pragma once +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace perspective { + +class t_lstore; + +// Hot-path flags read by `t_lstore::ensure_resident()`. Plain globals so the +// common (residency-disabled) case is a single predicted-not-taken branch with +// no indirection. Owned/updated by `t_residency_manager`. +PERSPECTIVE_EXPORT extern bool g_residency_active; +PERSPECTIVE_EXPORT extern std::uint64_t g_residency_tick; + +/** + * @brief Bounds the resident (in linear-memory) footprint of `BACKING_STORE_DISK` + * column buffers, evicting the coldest ones to their backing files when over a + * configured byte budget and transparently restoring them on next access. + * + * This is what turns on-disk backing into actual *memory relief* on WASM, where + * there is no demand paging: an evicted store's `m_base` buffer is freed (its + * data flushed to the backing file) and re-read on demand. Eviction runs only at + * request *safepoints* (between requests, when no live raw column pointer + * exists — see the safepoint audit), and restoration is lazy via + * `ensure_resident()` on every `t_lstore` data accessor. + * + * Disabled by default (zero overhead). Enabled by setting the `PSP_MEMORY_BUDGET` + * environment variable (bytes), re-read at each safepoint. + */ +class PERSPECTIVE_EXPORT t_residency_manager { +public: + static t_residency_manager& inst(); + + // Disk-backed stores register on `init()` and unregister on destruction. + void register_store(t_lstore* store); + void unregister_store(t_lstore* store); + + std::size_t prepare(); + const char* victim_fname(std::size_t i) const; + void commit(); + void safepoint(); + + // Total bytes currently resident across registered disk-backed stores. + std::size_t resident_bytes(); + + bool active() const { return g_residency_active; } + std::size_t budget() const { return m_budget; } + std::uint64_t evictions() const { return m_evictions; } + std::uint64_t restores() const { return m_restores; } + + // Serializes lazy restores (which may occur from parallel-for workers within + // a request) against each other and the safepoint eviction pass. + std::mutex& mutex() { return m_mutex; } + void note_restore() { ++m_restores; } + +private: + t_residency_manager() = default; + void refresh_config(); + + std::size_t m_budget = 0; + std::uint64_t m_tick = 0; + std::uint64_t m_evictions = 0; + std::uint64_t m_restores = 0; + std::unordered_set m_stores; + // Victims selected by `prepare()`, evicted by `commit()`. Held between the + // two phases while the JS driver opens their OPFS handles. `m_pending_fnames` + // owns stable C-strings for `victim_fname()` (a `t_lstore::get_fname()` temp + // would dangle). + std::vector m_pending; + std::vector m_pending_fnames; + std::mutex m_mutex; +}; + +} // namespace perspective diff --git a/rust/perspective-server/cpp/perspective/src/include/perspective/server.h b/rust/perspective-server/cpp/perspective/src/include/perspective/server.h index 2ae17f4f90..aecdaa8588 100644 --- a/rust/perspective-server/cpp/perspective/src/include/perspective/server.h +++ b/rust/perspective-server/cpp/perspective/src/include/perspective/server.h @@ -656,6 +656,10 @@ namespace server { handle_request(std::uint32_t client_id, const std::string_view& data); std::vector> poll(); + std::size_t residency_prepare(); + const char* residency_victim_fname(std::size_t i); + void residency_commit(); + private: void handle_process_table( const Request& req, diff --git a/rust/perspective-server/cpp/perspective/src/include/perspective/storage.h b/rust/perspective-server/cpp/perspective/src/include/perspective/storage.h index 19631e85bd..a4f293d790 100644 --- a/rust/perspective-server/cpp/perspective/src/include/perspective/storage.h +++ b/rust/perspective-server/cpp/perspective/src/include/perspective/storage.h @@ -18,6 +18,7 @@ #include #include #include +#include #include /* @@ -204,6 +205,11 @@ class PERSPECTIVE_MPROTECT_EXPORT t_lstore : public t_debug_helper t_lstore_recipe get_recipe() const; + // Like `get_recipe()`, but produces a recipe suitable for an *independent* + // copy: a `BACKING_STORE_DISK` clone gets its own fresh backing file rather + // than aliasing this store's file. Used by the clone paths. + t_lstore_recipe get_clone_recipe() const; + void fill(const t_lstore& other); void fill(const t_lstore& other, const t_mask& mask, t_uindex elem_size); @@ -227,6 +233,39 @@ class PERSPECTIVE_MPROTECT_EXPORT t_lstore : public t_debug_helper return size() == 0; } + // ── Residency (memory relief for `BACKING_STORE_DISK`) ────────────────── + // Flush this store's resident buffer to its backing file and free it. + void evict(); + // Re-read this store's buffer from its backing file (if currently evicted). + void restore(); + + bool + is_resident() const { + return m_init && m_backing_store == BACKING_STORE_DISK + && m_base != nullptr; + } + + std::uint64_t + residency_tick() const { + return m_residency_tick; + } + + // Lazily restore an evicted store on access. Called at the top of every + // method that dereferences `m_base`. + // Coarse residency hook: called once per column-FETCH (column-outer — see + // `t_data_table::get_column`), NOT per element. Restores an evicted disk + // store and stamps LRU recency. The per-element accessors are check-free, so + // tight flatten/read loops vectorize as they did before on-disk existed. + inline void + ensure_resident() { + if (m_backing_store == BACKING_STORE_DISK) { + if (m_base == nullptr) { + restore(); + } + m_residency_tick = g_residency_tick; + } + } + #ifdef PSP_ENABLE_PYTHON /* Python bits */ // py::array _as_numpy(t_dtype dtype); @@ -264,6 +303,7 @@ class PERSPECTIVE_MPROTECT_EXPORT t_lstore : public t_debug_helper double m_resize_factor; t_uindex m_version; bool m_from_recipe; + std::uint64_t m_residency_tick = 0; #ifdef PSP_MPROTECT // size of padding + size of fields above diff --git a/rust/perspective-server/cpp/perspective/src/include/perspective/table.h b/rust/perspective-server/cpp/perspective/src/include/perspective/table.h index 02eae64c49..3fd2d3ff74 100644 --- a/rust/perspective-server/cpp/perspective/src/include/perspective/table.h +++ b/rust/perspective-server/cpp/perspective/src/include/perspective/table.h @@ -53,7 +53,8 @@ class PERSPECTIVE_EXPORT Table { std::vector column_names, std::vector data_types, std::uint32_t limit, - std::string index + std::string index, + t_backing_store backing_store = BACKING_STORE_MEMORY ); /** @@ -207,6 +208,7 @@ class PERSPECTIVE_EXPORT Table { std::uint32_t get_offset() const; std::uint32_t get_limit() const; const std::string& get_index() const; + t_backing_store get_backing_store() const; // Setters void set_column_names(const std::vector& column_names); @@ -225,37 +227,43 @@ class PERSPECTIVE_EXPORT Table { static std::shared_ptr
from_csv( const std::string& index, std::string&& data, - std::uint32_t limit = std::numeric_limits::max() + std::uint32_t limit = std::numeric_limits::max(), + t_backing_store backing_store = BACKING_STORE_MEMORY ); static std::shared_ptr
from_cols( const std::string& index, std::string&& data, - std::uint32_t limit = std::numeric_limits::max() + std::uint32_t limit = std::numeric_limits::max(), + t_backing_store backing_store = BACKING_STORE_MEMORY ); static std::shared_ptr
from_rows( const std::string& index, std::string&& data, - std::uint32_t limit = std::numeric_limits::max() + std::uint32_t limit = std::numeric_limits::max(), + t_backing_store backing_store = BACKING_STORE_MEMORY ); static std::shared_ptr
from_ndjson( const std::string& index, std::string&& data, - std::uint32_t limit = std::numeric_limits::max() + std::uint32_t limit = std::numeric_limits::max(), + t_backing_store backing_store = BACKING_STORE_MEMORY ); static std::shared_ptr
from_schema( const std::string& index, const t_schema& schema, - std::uint32_t limit = std::numeric_limits::max() + std::uint32_t limit = std::numeric_limits::max(), + t_backing_store backing_store = BACKING_STORE_MEMORY ); static std::shared_ptr
from_arrow( const std::string& index, std::string&& data, - std::uint32_t limit = std::numeric_limits::max() + std::uint32_t limit = std::numeric_limits::max(), + t_backing_store backing_store = BACKING_STORE_MEMORY ); static std::shared_ptr
make_table( @@ -263,7 +271,8 @@ class PERSPECTIVE_EXPORT Table { const std::vector& data_types, std::uint32_t limit, const std::string& index, - const std::string_view& data + const std::string_view& data, + t_backing_store backing_store = BACKING_STORE_MEMORY ); private: @@ -314,6 +323,7 @@ class PERSPECTIVE_EXPORT Table { */ const std::string m_index; bool m_gnode_set; + const t_backing_store m_backing_store; }; } // namespace perspective \ No newline at end of file diff --git a/rust/perspective-server/cpp/perspective/src/include/perspective/utils.h b/rust/perspective-server/cpp/perspective/src/include/perspective/utils.h index 82f756db9a..1dd0db0849 100644 --- a/rust/perspective-server/cpp/perspective/src/include/perspective/utils.h +++ b/rust/perspective-server/cpp/perspective/src/include/perspective/utils.h @@ -50,6 +50,15 @@ str_(const T& value) { std::string unique_path(const std::string& path_prefix); +/** + * @brief Create (and return the path of) a unique directory to hold the column + * backing files of a `BACKING_STORE_DISK` table. On native this lives under the + * OS temp directory; on WASM it lives under the WasmFS mount root + * (`/perspective`, where OPFS is mounted in the browser). `prefix` (e.g. + * "perspective_" or "perspective_expr_") names the directory. + */ +std::string create_backing_store_dir(const std::string& prefix); + template void set_to_vec(const std::set& s, std::vector& out_v) { diff --git a/tools/bench/package.json b/tools/bench/package.json index bfc225eb86..d8afd6f9a0 100644 --- a/tools/bench/package.json +++ b/tools/bench/package.json @@ -29,6 +29,8 @@ "zx": "catalog:" }, "dependencies": { + "perspective-4-5-1": "npm:@perspective-dev/client@4.5.1", + "perspective-4-5-0": "npm:@perspective-dev/client@4.5.0", "perspective-4-4-0": "npm:@perspective-dev/client@4.4.0", "perspective-4-3-0": "npm:@perspective-dev/client@4.3.0", "perspective-4-2-0": "npm:@perspective-dev/client@4.2.0",