Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,4 @@ docs/static/viewer
docs/static/react
rust/perspective-server/build
target/
dist-gh-pages
41 changes: 41 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions rust/perspective-client/perspective.proto
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,11 @@ message MakeTableReq {
string make_index_table = 1;
uint32 make_limit_table = 2;
};

// Back this Table's canonical data with the on-disk storage backend
// (memory-mapped file on native; OPFS on WASM) instead of memory.
// Orthogonal to `make_table_type`, so it is a standalone field.
optional bool page_to_disk = 3;
}
}
message MakeTableResp {}
Expand Down
4 changes: 4 additions & 0 deletions rust/perspective-client/src/rust/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,7 @@ impl Client {
ClientResp::MakeJoinTableResp(_) => Ok(Table::new(entity_id, client, TableOptions {
index: Some(on.to_owned()),
limit: None,
page_to_disk: None,
})),
resp => Err(resp.into()),
}
Expand Down Expand Up @@ -662,6 +663,9 @@ impl Client {
let options = TableOptions {
index: info.index,
limit: info.limit,
// `page_to_disk` is a server-side property not surfaced in table
// info; it does not affect client-side behavior.
page_to_disk: None,
};

let client = self.clone();
Expand Down
12 changes: 12 additions & 0 deletions rust/perspective-client/src/rust/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@ pub struct TableInitOptions {
#[serde(default)]
#[ts(optional)]
pub limit: Option<u32>,

/// Back this [`Table`]'s canonical data with the on-disk storage backend
/// instead of memory. On native targets this is a memory-mapped file; on
/// WASM it is OPFS (Worker only). Defaults to in-memory.
#[serde(default)]
#[ts(optional)]
pub page_to_disk: Option<bool>,
}

impl TableInitOptions {
Expand All @@ -104,11 +111,14 @@ impl TryFrom<TableOptions> for MakeTableOptions {
type Error = ClientError;

fn try_from(value: TableOptions) -> Result<Self, Self::Error> {
let page_to_disk = value.page_to_disk;
Ok(MakeTableOptions {
page_to_disk,
make_table_type: match value {
TableOptions {
index: Some(_),
limit: Some(_),
..
} => Err(ClientError::BadTableOptions)?,
TableOptions {
index: Some(index), ..
Expand All @@ -126,13 +136,15 @@ impl TryFrom<TableOptions> for MakeTableOptions {
pub(crate) struct TableOptions {
pub index: Option<String>,
pub limit: Option<u32>,
pub page_to_disk: Option<bool>,
}

impl From<TableInitOptions> for TableOptions {
fn from(value: TableInitOptions) -> Self {
TableOptions {
index: value.index,
limit: value.limit,
page_to_disk: value.page_to_disk,
}
}
}
Expand Down
123 changes: 122 additions & 1 deletion rust/perspective-js/src/ts/perspective.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,16 @@ export type * from "./virtual_server.ts";
import WebSocket, { WebSocketServer as HttpWebSocketServer } from "ws";
import stoppable from "stoppable";
import { promises as fs } from "node:fs";
import {
openSync,
readSync,
writeSync,
closeSync,
mkdirSync,
unlinkSync,
rmSync,
} from "node:fs";
import os from "node:os";
import http from "node:http";
import path from "node:path";
import { webcrypto } from "node:crypto";
Expand Down Expand Up @@ -56,12 +66,123 @@ const uncompressed_client_wasm = await fs
.then((buffer) => load_wasm_stage_0(buffer.buffer as ArrayBuffer));

await perspective_client.default({ module_or_path: uncompressed_client_wasm });

function make_node_disk_bridge({
heap,
toAddr,
readCString,
}: {
heap: () => Uint8Array;
toAddr: (p: number | bigint) => number;
readCString: (p: number | bigint) => string;
}) {
const root = path.join(os.tmpdir(), `perspective-${process.pid}`);
const resolve_path = (name: string) => {
const p = path.join(root, name);
if (!p.startsWith(root)) {
throw new Error(`refusing disk path outside root: ${name}`);
}
return p;
};

let cleaned = false;
const cleanup = () => {
if (cleaned) return;
cleaned = true;
try {
rmSync(root, { recursive: true, force: true });
} catch (e) {
/* best effort */
}
};
process.on("exit", cleanup);
process.on("SIGINT", () => {
cleanup();
process.exit(130);
});

return {
// node:fs is synchronous — nothing to pre-open between safepoint phases.
async ensureOpen(_name: string) {},
store(
namePtr: number | bigint,
dataPtr: number | bigint,
len: number,
): number {
try {
const p = resolve_path(readCString(namePtr));
mkdirSync(path.dirname(p), { recursive: true });
const fd = openSync(p, "w");
try {
if (len > 0) {
const addr = toAddr(dataPtr);
const view = heap().subarray(addr, addr + len);
let off = 0;
while (off < len) {
off += writeSync(fd, view, off, len - off, off);
}
}
} finally {
closeSync(fd);
}
return len;
} catch (e) {
console.error("node disk store failed", e);
return -1;
}
},
load(
namePtr: number | bigint,
dataPtr: number | bigint,
len: number,
): number {
try {
if (len <= 0) return 0;
let fd: number;
try {
fd = openSync(resolve_path(readCString(namePtr)), "r");
} catch (e) {
return 0; // never-flushed file reads as zeros
}
try {
const addr = toAddr(dataPtr);
const view = heap().subarray(addr, addr + len);
let off = 0;
let n: number;
while (
off < len &&
(n = readSync(fd, view, off, len - off, off)) > 0
) {
off += n;
}
return off;
} finally {
closeSync(fd);
}
} catch (e) {
return 0;
}
},
remove(namePtr: number | bigint) {
try {
unlinkSync(resolve_path(readCString(namePtr)));
} catch (e) {
/* already gone */
}
},
};
}

const SYNC_MODULE = await fs
.readFile(
resolve("@perspective-dev/server/dist/wasm/perspective-server.wasm"),
)
.then((buffer) => load_wasm_stage_0(buffer.buffer as ArrayBuffer))
.then((buffer) => compile_perspective(buffer.buffer as ArrayBuffer));
.then((buffer) =>
compile_perspective(buffer.buffer as ArrayBuffer, {
make_disk_bridge: make_node_disk_bridge,
}),
);

let SYNC_CLIENT: perspective_client.Client;

Expand Down
25 changes: 25 additions & 0 deletions rust/perspective-js/src/ts/wasm/emscripten_api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,38 @@ import type * as perspective_server_t from "@perspective-dev/server/dist/wasm/pe
export type PspPtr = BigInt | number;
export type EmscriptenServer = bigint | number;

export interface DiskBridgeHelpers {
heap: () => Uint8Array;
toAddr: (p: number | bigint) => number;
readCString: (p: number | bigint) => string;
}

export interface CompileOptions {
make_disk_bridge?: (helpers: DiskBridgeHelpers) => {
store(
namePtr: number | bigint,
dataPtr: number | bigint,
len: number,
): number;
load(
namePtr: number | bigint,
dataPtr: number | bigint,
len: number,
): number;
remove(namePtr: number | bigint): void;
ensureOpen(name: string): Promise<void>;
};
}

export async function compile_perspective(
wasmBinary: ArrayBuffer,
opts?: CompileOptions,
): Promise<perspective_server_t.MainModule> {
const module = await perspective_server.default({
locateFile(x: any) {
return x;
},
make_disk_bridge: opts?.make_disk_bridge,
instantiateWasm: async (
imports: any,
receive: (_: WebAssembly.Instance) => void,
Expand Down
Loading
Loading