From bd68abf3553f7be67307d0da09f9ff83e8a25528 Mon Sep 17 00:00:00 2001 From: Mingwei Zhang Date: Sun, 22 Mar 2026 14:24:05 -0700 Subject: [PATCH 01/10] Add experimental WebAssembly support for BMP/BGP/MRT parsing Add a `wasm` feature flag that compiles the parsing core to WebAssembly, published as `@bgpkit/parser` on npm. Supports three targets: Node.js (CommonJS), bundlers (ES modules), and browsers/workers (ES modules with manual init). Exported functions: - parseOpenBmpMessage: OpenBMP-wrapped BMP frames (RouteViews Kafka) - parseBmpMessage: raw BMP frames - parseMrtFile: decompressed MRT files - parseBgpUpdate: single BGP UPDATE messages Includes JS wrapper with JSON deserialization, TypeScript types, multi-target build script, and two Node.js examples (Kafka OpenBMP stream consumer and MRT file parser). --- .gitignore | 9 +- CHANGELOG.md | 9 + Cargo.toml | 16 + examples/wasm/kafka-openbmp-stream/.gitignore | 3 + examples/wasm/kafka-openbmp-stream/README.md | 59 +++ .../wasm/kafka-openbmp-stream/kafka-stream.js | 85 +++++ .../wasm/kafka-openbmp-stream/package.json | 17 + examples/wasm/parse-mrt-file/.gitignore | 5 + examples/wasm/parse-mrt-file/README.md | 49 +++ examples/wasm/parse-mrt-file/package.json | 15 + examples/wasm/parse-mrt-file/parse-mrt.js | 64 ++++ src/lib.rs | 2 + src/wasm.rs | 352 ++++++++++++++++++ src/wasm/README.md | 200 ++++++++++ src/wasm/build.sh | 70 ++++ src/wasm/js/index.d.ts | 163 ++++++++ src/wasm/js/index.js | 59 +++ src/wasm/js/index.mjs | 18 + src/wasm/js/package.json | 57 +++ src/wasm/js/web.d.ts | 27 ++ src/wasm/js/web.mjs | 30 ++ src/wasm/test/.gitignore | 2 + src/wasm/test/package.json | 11 + src/wasm/test/test-published.js | 201 ++++++++++ 24 files changed, 1522 insertions(+), 1 deletion(-) create mode 100644 examples/wasm/kafka-openbmp-stream/.gitignore create mode 100644 examples/wasm/kafka-openbmp-stream/README.md create mode 100644 examples/wasm/kafka-openbmp-stream/kafka-stream.js create mode 100644 examples/wasm/kafka-openbmp-stream/package.json create mode 100644 examples/wasm/parse-mrt-file/.gitignore create mode 100644 examples/wasm/parse-mrt-file/README.md create mode 100644 examples/wasm/parse-mrt-file/package.json create mode 100644 examples/wasm/parse-mrt-file/parse-mrt.js create mode 100644 src/wasm.rs create mode 100644 src/wasm/README.md create mode 100755 src/wasm/build.sh create mode 100644 src/wasm/js/index.d.ts create mode 100644 src/wasm/js/index.js create mode 100644 src/wasm/js/index.mjs create mode 100644 src/wasm/js/package.json create mode 100644 src/wasm/js/web.d.ts create mode 100644 src/wasm/js/web.mjs create mode 100644 src/wasm/test/.gitignore create mode 100644 src/wasm/test/package.json create mode 100644 src/wasm/test/test-published.js diff --git a/.gitignore b/.gitignore index 6325d92..160c7ca 100644 --- a/.gitignore +++ b/.gitignore @@ -19,4 +19,11 @@ examples/debug-file.rs # local benchmark results benchmarks docs/ -*.md \ No newline at end of file +*.md +!examples/**/*.md +!src/wasm/**/*.md + +# WASM build artifacts +pkg/ +pkg-*/ +node_modules/ \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index 666d1cb..407de6b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,15 @@ All notable changes to this project will be documented in this file. ### New features +* **WebAssembly (WASM) support (experimental)**: New `wasm` feature flag compiles the BMP/BGP/MRT parsing core to WebAssembly for use in JavaScript environments. Published as [`@bgpkit/parser`](https://www.npmjs.com/package/@bgpkit/parser) on npm with support for Node.js (CommonJS), bundlers (ES modules), and browsers/workers (ES modules with manual init). This feature is experimental and the API may change in future releases. + - `parseOpenBmpMessage(data)`: parses OpenBMP-wrapped BMP frames (e.g. RouteViews Kafka stream). Returns a discriminated union on the `type` field covering all 7 BMP message types. + - `parseBmpMessage(data, timestamp)`: parses raw BMP frames (without an OpenBMP header). + - `parseMrtFile(data)`: parses decompressed MRT files (TABLE_DUMP, TABLE_DUMP_V2, BGP4MP) into `BgpElem[]`. + - `parseBgpUpdate(data)`: parses a single BGP UPDATE message into `BgpElem[]`. + - Multi-target build script (`src/wasm/build.sh`) produces nodejs, bundler, and web targets in a single npm package. + - JS wrapper handles JSON deserialization; TypeScript types included. + - Node.js examples in `examples/wasm/`: Kafka OpenBMP stream consumer and MRT file parser. + * **`BgpElem::peer_bgp_id` field**: `BgpElem` now exposes an optional `peer_bgp_id: Option` containing the peer's BGP Identifier (Router ID) when available. Populated from the PEER_INDEX_TABLE in TableDumpV2/RIB records; `None` for BGP4MP records. ### Performance improvements diff --git a/Cargo.toml b/Cargo.toml index 4046fde..c6b7ea6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,12 @@ name = "bgpkit-parser" path = "src/bin/main.rs" required-features = ["cli"] +# cdylib is required by wasm-pack for WASM builds; rlib is the default for +# normal Rust library usage. Both are listed so the crate works for native +# consumers and wasm-pack alike. +[lib] +crate-type = ["cdylib", "rlib"] + [dependencies] ############## @@ -44,6 +50,7 @@ oneio = { version = "0.20.0", default-features = false, features = ["http", "gz" regex = { version = "1", optional = true } # used in parser filter chrono = { version = "0.4.38", optional = true } # parser filter serde_json = { version = "1.0", optional = true } # RIS Live parsing +wasm-bindgen = { version = "0.2", optional = true } #################### # CLI dependencies # @@ -76,6 +83,15 @@ rislive = [ "serde_json", "hex", ] + +# WebAssembly bindings for browser/Node.js usage. +# Build with: wasm-pack build --target nodejs --no-default-features --features wasm +wasm = [ + "parser", + "serde", + "serde_json", + "dep:wasm-bindgen", +] serde = [ "dep:serde", "ipnet/serde", diff --git a/examples/wasm/kafka-openbmp-stream/.gitignore b/examples/wasm/kafka-openbmp-stream/.gitignore new file mode 100644 index 0000000..2d26f7b --- /dev/null +++ b/examples/wasm/kafka-openbmp-stream/.gitignore @@ -0,0 +1,3 @@ +node_modules/ +pkg/ +package-lock.json diff --git a/examples/wasm/kafka-openbmp-stream/README.md b/examples/wasm/kafka-openbmp-stream/README.md new file mode 100644 index 0000000..9e12fa1 --- /dev/null +++ b/examples/wasm/kafka-openbmp-stream/README.md @@ -0,0 +1,59 @@ +# RouteViews Kafka Stream — Node.js Example + +> **Experimental**: The `@bgpkit/parser` npm package is experimental. The API +> surface, output format, and build process may change in future releases. + +This example consumes the [RouteViews](http://www.routeviews.org/routeviews/) +real-time Kafka stream of OpenBMP messages and parses them into JSON using +[`@bgpkit/parser`](https://www.npmjs.com/package/@bgpkit/parser). + +## Prerequisites + +- [Node.js](https://nodejs.org/) >= 18 + +## Run + +```sh +cd examples/wasm/kafka-openbmp-stream +npm install +npm start +``` + +## Configuration + +Edit the constants at the top of `kafka-stream.js`: + +| Variable | Default | Description | +|---|---|---| +| `BROKER` | `stream.routeviews.org:9092` | Kafka broker address | +| `TOPIC_PATTERN` | `/^routeviews\.amsix\..+\.bmp_raw$/` | Regex to filter Kafka topics | +| `GROUP_ID` | `bgpkit-parser-nodejs-example` | Kafka consumer group ID | + +### Selecting collectors + +Topics follow the naming pattern `routeviews...bmp_raw`: + +```js +// All AMS-IX topics +const TOPIC_PATTERN = /^routeviews\.amsix\..+\.bmp_raw$/; + +// A specific collector/peer +const TOPIC_PATTERN = /^routeviews\.amsix\.ams\.6777\.bmp_raw$/; + +// All collectors (high volume!) +const TOPIC_PATTERN = /^routeviews\..+\.bmp_raw$/; +``` + +## Output + +Each line is a JSON object representing a parsed BMP message. RouteMonitoring +messages include an `elems` array of BGP elements (announcements/withdrawals). + +## How it works + +1. `@bgpkit/parser` is a WebAssembly build of bgpkit-parser's BMP/BGP parsing + core, published as an npm package. +2. `parseOpenBmpMessage(bytes)` accepts raw Kafka message bytes (an OpenBMP + header + BMP frame) and returns a parsed JavaScript object. +3. [KafkaJS](https://kafka.js.org/) is used to consume messages from the + RouteViews broker. diff --git a/examples/wasm/kafka-openbmp-stream/kafka-stream.js b/examples/wasm/kafka-openbmp-stream/kafka-stream.js new file mode 100644 index 0000000..4a9cf10 --- /dev/null +++ b/examples/wasm/kafka-openbmp-stream/kafka-stream.js @@ -0,0 +1,85 @@ +'use strict'; + +/** + * Route-views Kafka stream consumer using @bgpkit/parser. + * + * Prerequisites: + * 1. Install Node.js dependencies: + * npm install + * 2. Run: + * npm start + * + * The RouteViews Kafka stream is publicly accessible at stream.routeviews.org:9092. + * Topics follow the pattern: routeviews...bmp_raw + * Each message value is an OpenBMP-wrapped BMP frame (binary). + */ + +const { Kafka, logLevel } = require('kafkajs'); +const { parseOpenBmpMessage } = require('@bgpkit/parser'); + +// ── Configuration ──────────────────────────────────────────────────────────── + +const BROKER = 'stream.routeviews.org:9092'; + +// Regex filter applied to Kafka topic names. Adjust to taste: +// All collectors: /^routeviews\..+\.bmp_raw$/ +// Specific collector: /^routeviews\.amsix\.ams\..+\.bmp_raw$/ +const TOPIC_PATTERN = /^routeviews\.amsix\..+\.bmp_raw$/; + +// Consumer group ID. Change this to start reading from the latest offset +// in a fresh group, or reuse a group to resume from where you left off. +const GROUP_ID = 'bgpkit-parser-nodejs-example'; + +// ── Main ───────────────────────────────────────────────────────────────────── + +async function run() { + const kafka = new Kafka({ + clientId: 'bgpkit-parser-example', + brokers: [BROKER], + logLevel: logLevel.NOTHING, + }); + + const admin = kafka.admin(); + await admin.connect(); + const allTopics = await admin.listTopics(); + await admin.disconnect(); + + const topics = allTopics.filter((t) => TOPIC_PATTERN.test(t)); + if (topics.length === 0) { + console.error(`No topics matching ${TOPIC_PATTERN} found on ${BROKER}`); + process.exit(1); + } + process.stderr.write(`Subscribed to ${topics.length} topics on ${BROKER}\n`); + + const consumer = kafka.consumer({ + groupId: GROUP_ID, + sessionTimeout: 30000, + }); + await consumer.connect(); + + for (const topic of topics) { + await consumer.subscribe({ topic, fromBeginning: false }); + } + + await consumer.run({ + eachMessage: async ({ message }) => { + if (!message.value) return; + + let msg; + try { + msg = parseOpenBmpMessage(message.value); + } catch { + return; + } + + if (!msg) return; + + console.log(JSON.stringify(msg)); + }, + }); +} + +run().catch((err) => { + console.error('Fatal error:', err); + process.exit(1); +}); diff --git a/examples/wasm/kafka-openbmp-stream/package.json b/examples/wasm/kafka-openbmp-stream/package.json new file mode 100644 index 0000000..04069c0 --- /dev/null +++ b/examples/wasm/kafka-openbmp-stream/package.json @@ -0,0 +1,17 @@ +{ + "name": "bgpkit-parser-kafka-example", + "version": "1.0.0", + "description": "Route-views Kafka stream consumer using @bgpkit/parser (WebAssembly)", + "main": "kafka-stream.js", + "scripts": { + "start": "node kafka-stream.js", + "start:quiet": "node kafka-stream.js 2>/dev/null" + }, + "dependencies": { + "@bgpkit/parser": "^0.15.0", + "kafkajs": "^2.2.4" + }, + "engines": { + "node": ">=18" + } +} diff --git a/examples/wasm/parse-mrt-file/.gitignore b/examples/wasm/parse-mrt-file/.gitignore new file mode 100644 index 0000000..085a82d --- /dev/null +++ b/examples/wasm/parse-mrt-file/.gitignore @@ -0,0 +1,5 @@ +node_modules/ +package-lock.json +*.gz +*.bz2 +*.mrt diff --git a/examples/wasm/parse-mrt-file/README.md b/examples/wasm/parse-mrt-file/README.md new file mode 100644 index 0000000..ce1d4f6 --- /dev/null +++ b/examples/wasm/parse-mrt-file/README.md @@ -0,0 +1,49 @@ +# Parse MRT File — Node.js Example + +> **Experimental**: The `@bgpkit/parser` npm package is experimental. The API +> surface, output format, and build process may change in future releases. + +This example parses an MRT file into BGP elements using +[`@bgpkit/parser`](https://www.npmjs.com/package/@bgpkit/parser) and outputs +one JSON object per line. It accepts both URLs and local files. + +## Prerequisites + +- [Node.js](https://nodejs.org/) >= 18 + +## Run + +```sh +cd examples/wasm/parse-mrt-file +npm install + +# Parse directly from a URL (fetches and decompresses in memory) +node parse-mrt.js https://data.ris.ripe.net/rrc06/2026.03/updates.20260322.2105.gz + +# Or parse a local file +node parse-mrt.js updates.20260322.2105.gz +``` + +## Output + +Each line is a JSON object representing a single BGP element: + +```json +{"timestamp":1742677500.0,"type":"ANNOUNCE","peer_ip":"2001:7f8:4::...","peer_asn":6939,...} +{"timestamp":1742677500.0,"type":"WITHDRAW","peer_ip":"80.249.211.155","peer_asn":34549,...} +``` + +You can pipe the output through `jq` for pretty-printing or filtering: + +```sh +URL=https://data.ris.ripe.net/rrc06/2026.03/updates.20260322.2105.gz + +# Pretty-print first element +node parse-mrt.js $URL | head -1 | jq . + +# Count announcements vs withdrawals +node parse-mrt.js $URL | jq -r '.type' | sort | uniq -c + +# Filter by prefix +node parse-mrt.js $URL | jq -r 'select(.prefix == "1.0.0.0/24")' +``` diff --git a/examples/wasm/parse-mrt-file/package.json b/examples/wasm/parse-mrt-file/package.json new file mode 100644 index 0000000..76dca3d --- /dev/null +++ b/examples/wasm/parse-mrt-file/package.json @@ -0,0 +1,15 @@ +{ + "name": "bgpkit-parser-mrt-example", + "version": "1.0.0", + "description": "Parse MRT files into BGP elements using @bgpkit/parser (WebAssembly)", + "main": "parse-mrt.js", + "scripts": { + "start": "node parse-mrt.js" + }, + "dependencies": { + "@bgpkit/parser": "^0.15.0" + }, + "engines": { + "node": ">=18" + } +} diff --git a/examples/wasm/parse-mrt-file/parse-mrt.js b/examples/wasm/parse-mrt-file/parse-mrt.js new file mode 100644 index 0000000..c817ab2 --- /dev/null +++ b/examples/wasm/parse-mrt-file/parse-mrt.js @@ -0,0 +1,64 @@ +'use strict'; + +/** + * Parse an MRT file and output BGP elements as JSON, one per line. + * + * Usage: + * node parse-mrt.js + * node parse-mrt.js https://data.ris.ripe.net/rrc06/2026.03/updates.20260322.2105.gz + * node parse-mrt.js updates.20260322.2105.gz + * + * Supports .gz (gzip) compressed and uncompressed MRT files. + * URLs are fetched and decompressed in memory (no download needed). + */ + +const fs = require('fs'); +const zlib = require('zlib'); +const path = require('path'); +const { parseMrtFile } = require('@bgpkit/parser'); + +const input = process.argv[2]; +if (!input) { + console.error('Usage: node parse-mrt.js '); + console.error(' node parse-mrt.js https://data.ris.ripe.net/rrc06/2026.03/updates.20260322.2105.gz'); + console.error(' node parse-mrt.js updates.20260322.2105.gz'); + process.exit(1); +} + +async function fetchUrl(url) { + process.stderr.write(`Fetching ${url}...\n`); + const res = await fetch(url); + if (!res.ok) throw new Error(`HTTP ${res.status}: ${res.statusText}`); + return Buffer.from(await res.arrayBuffer()); +} + +function decompress(buf, name) { + if (name.endsWith('.gz')) { + process.stderr.write('Decompressing gzip...\n'); + return zlib.gunzipSync(buf); + } + if (name.endsWith('.bz2')) { + console.error('bzip2 is not supported by Node.js zlib. Decompress the file first.'); + process.exit(1); + } + return buf; +} + +async function main() { + const isUrl = input.startsWith('http://') || input.startsWith('https://'); + const buf = isUrl ? await fetchUrl(input) : fs.readFileSync(input); + const raw = decompress(buf, input); + + process.stderr.write(`Parsing ${raw.length} bytes of MRT data...\n`); + const elems = parseMrtFile(raw); + process.stderr.write(`Parsed ${elems.length} BGP elements\n`); + + for (const elem of elems) { + console.log(JSON.stringify(elem)); + } +} + +main().catch((err) => { + console.error('Error:', err.message); + process.exit(1); +}); diff --git a/src/lib.rs b/src/lib.rs index 6948100..5e1acb6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -834,6 +834,8 @@ pub mod error; pub mod models; #[cfg(feature = "parser")] pub mod parser; +#[cfg(feature = "wasm")] +pub mod wasm; pub use models::BgpElem; pub use models::MrtRecord; diff --git a/src/wasm.rs b/src/wasm.rs new file mode 100644 index 0000000..c648153 --- /dev/null +++ b/src/wasm.rs @@ -0,0 +1,352 @@ +//! WebAssembly bindings for BMP/BGP/MRT message parsing. +//! +//! This module is compiled only when the `wasm` feature is enabled. Use +//! [`wasm-pack`](https://rustwasm.github.io/wasm-pack/) to build the package: +//! +//! ```sh +//! # Node.js target (CommonJS, synchronous WASM loading) +//! wasm-pack build --target nodejs --no-default-features --features wasm +//! +//! # Bundler target (for webpack/vite/rollup) +//! wasm-pack build --target bundler --no-default-features --features wasm +//! ``` +//! +//! ## Exported functions +//! +//! - [`parseOpenBmpMessage`](parse_openbmp_message) — parse OpenBMP-wrapped BMP +//! messages (e.g. from the RouteViews Kafka stream) +//! - [`parseBmpMessage`](parse_bmp_message) — parse raw BMP messages +//! - [`parseMrtFile`](parse_mrt_file) — parse a decompressed MRT file into BGP elements +//! - [`parseBgpUpdate`](parse_bgp_update) — parse a single BGP UPDATE message + +use crate::models::*; +use crate::parser::bgp::messages::parse_bgp_message; +use crate::parser::bmp::error::ParserBmpError; +use crate::parser::bmp::messages::*; +use crate::parser::bmp::{parse_bmp_msg, parse_openbmp_header}; +use crate::parser::mrt::mrt_elem::Elementor; +use crate::parser::mrt::mrt_record::parse_mrt_record; +use bytes::Bytes; +use std::io::Cursor; +use std::net::{IpAddr, Ipv4Addr}; +use wasm_bindgen::prelude::*; + +// ── Serialization types ────────────────────────────────────────────────────── +// +// These structs use camelCase `#[serde(rename)]` to produce JS-friendly JSON. +// The `WasmBmpMessage` enum is internally tagged on `"type"` so JS consumers +// can discriminate with `msg.type === "RouteMonitoring"` etc. + +#[derive(serde::Serialize)] +#[serde(tag = "type")] +enum WasmBmpMessage { + RouteMonitoring { + #[serde(flatten)] + base: WasmMessageBase, + #[serde(rename = "peerHeader")] + peer_header: WasmPeerHeader, + elems: Vec, + }, + PeerUpNotification { + #[serde(flatten)] + base: WasmMessageBase, + #[serde(rename = "peerHeader")] + peer_header: WasmPeerHeader, + #[serde(rename = "localIp")] + local_ip: String, + #[serde(rename = "localPort")] + local_port: u16, + #[serde(rename = "remotePort")] + remote_port: u16, + }, + PeerDownNotification { + #[serde(flatten)] + base: WasmMessageBase, + #[serde(rename = "peerHeader")] + peer_header: WasmPeerHeader, + reason: String, + }, + InitiationMessage { + #[serde(flatten)] + base: WasmMessageBase, + tlvs: Vec, + }, + TerminationMessage { + #[serde(flatten)] + base: WasmMessageBase, + tlvs: Vec, + }, + StatisticsReport { + #[serde(flatten)] + base: WasmMessageBase, + #[serde(rename = "peerHeader")] + peer_header: WasmPeerHeader, + }, + RouteMirroringMessage { + #[serde(flatten)] + base: WasmMessageBase, + #[serde(rename = "peerHeader")] + peer_header: WasmPeerHeader, + }, +} + +#[derive(serde::Serialize)] +struct WasmMessageBase { + #[serde(rename = "openBmpHeader")] + openbmp_header: Option, + timestamp: f64, +} + +#[derive(serde::Serialize)] +struct WasmOpenBmpHeader { + #[serde(rename = "routerIp")] + router_ip: String, + #[serde(rename = "routerGroup")] + router_group: Option, + #[serde(rename = "adminId")] + admin_id: String, + timestamp: f64, +} + +#[derive(serde::Serialize)] +struct WasmPeerHeader { + #[serde(rename = "peerIp")] + peer_ip: String, + #[serde(rename = "peerAsn")] + peer_asn: u32, + #[serde(rename = "peerBgpId")] + peer_bgp_id: String, + #[serde(rename = "peerType")] + peer_type: String, + #[serde(rename = "isPostPolicy")] + is_post_policy: bool, + #[serde(rename = "isAdjRibOut")] + is_adj_rib_out: bool, + timestamp: f64, +} + +#[derive(serde::Serialize)] +struct WasmTlv { + #[serde(rename = "type")] + tlv_type: String, + value: String, +} + +// ── Conversion helpers ─────────────────────────────────────────────────────── + +fn make_openbmp_header(header: &crate::parser::bmp::openbmp::OpenBmpHeader) -> WasmOpenBmpHeader { + WasmOpenBmpHeader { + router_ip: header.router_ip.to_string(), + router_group: header.router_group.clone(), + admin_id: header.admin_id.clone(), + timestamp: header.timestamp, + } +} + +fn make_peer_header(pph: &BmpPerPeerHeader) -> WasmPeerHeader { + let (is_post_policy, is_adj_rib_out) = match pph.peer_flags { + PerPeerFlags::PeerFlags(f) => (f.is_post_policy(), f.is_adj_rib_out()), + PerPeerFlags::LocalRibPeerFlags(_) => (false, false), + }; + WasmPeerHeader { + peer_ip: pph.peer_ip.to_string(), + peer_asn: pph.peer_asn.into(), + peer_bgp_id: pph.peer_bgp_id.to_string(), + peer_type: format!("{:?}", pph.peer_type), + is_post_policy, + is_adj_rib_out, + timestamp: pph.timestamp, + } +} + +fn build_bmp_result( + bmp_msg: BmpMessage, + openbmp_header: Option, + timestamp: f64, +) -> WasmBmpMessage { + let base = WasmMessageBase { + openbmp_header, + timestamp, + }; + + match (bmp_msg.per_peer_header, bmp_msg.message_body) { + (Some(pph), BmpMessageBody::RouteMonitoring(m)) => { + let elems = + Elementor::bgp_to_elems(m.bgp_message, timestamp, &pph.peer_ip, &pph.peer_asn); + WasmBmpMessage::RouteMonitoring { + base, + peer_header: make_peer_header(&pph), + elems, + } + } + (Some(pph), BmpMessageBody::PeerUpNotification(m)) => WasmBmpMessage::PeerUpNotification { + base, + peer_header: make_peer_header(&pph), + local_ip: m.local_addr.to_string(), + local_port: m.local_port, + remote_port: m.remote_port, + }, + (Some(pph), BmpMessageBody::PeerDownNotification(m)) => { + WasmBmpMessage::PeerDownNotification { + base, + peer_header: make_peer_header(&pph), + reason: format!("{:?}", m.reason), + } + } + (_, BmpMessageBody::InitiationMessage(m)) => WasmBmpMessage::InitiationMessage { + base, + tlvs: m + .tlvs + .into_iter() + .map(|t| WasmTlv { + tlv_type: format!("{:?}", t.info_type), + value: t.info, + }) + .collect(), + }, + (_, BmpMessageBody::TerminationMessage(m)) => WasmBmpMessage::TerminationMessage { + base, + tlvs: m + .tlvs + .into_iter() + .map(|t| WasmTlv { + tlv_type: format!("{:?}", t.info_type), + value: format!("{:?}", t.info_value), + }) + .collect(), + }, + (Some(pph), BmpMessageBody::StatsReport(_)) => WasmBmpMessage::StatisticsReport { + base, + peer_header: make_peer_header(&pph), + }, + (Some(pph), BmpMessageBody::RouteMirroring(_)) => WasmBmpMessage::RouteMirroringMessage { + base, + peer_header: make_peer_header(&pph), + }, + // Messages without a per-peer header that aren't Initiation/Termination + // should not occur per RFC 7854, but handle gracefully. + (None, BmpMessageBody::StatsReport(_)) => WasmBmpMessage::StatisticsReport { + base, + peer_header: make_peer_header(&BmpPerPeerHeader::default()), + }, + (None, BmpMessageBody::RouteMirroring(_)) => WasmBmpMessage::RouteMirroringMessage { + base, + peer_header: make_peer_header(&BmpPerPeerHeader::default()), + }, + (None, BmpMessageBody::RouteMonitoring(_)) => WasmBmpMessage::RouteMonitoring { + base, + peer_header: make_peer_header(&BmpPerPeerHeader::default()), + elems: vec![], + }, + (None, BmpMessageBody::PeerUpNotification(m)) => WasmBmpMessage::PeerUpNotification { + base, + peer_header: make_peer_header(&BmpPerPeerHeader::default()), + local_ip: m.local_addr.to_string(), + local_port: m.local_port, + remote_port: m.remote_port, + }, + (None, BmpMessageBody::PeerDownNotification(m)) => WasmBmpMessage::PeerDownNotification { + base, + peer_header: make_peer_header(&BmpPerPeerHeader::default()), + reason: format!("{:?}", m.reason), + }, + } +} + +// ── Exported WASM functions ────────────────────────────────────────────────── + +/// Parse an OpenBMP-wrapped BMP message as received from the RouteViews Kafka stream. +/// +/// Returns a JSON string representing a `BmpParsedMessage` discriminated on `"type"`. +/// Returns an empty string for non-router OpenBMP frames (the JS wrapper converts +/// this to `null`). +/// +/// Throws a JavaScript `Error` on malformed data. +#[wasm_bindgen(js_name = "parseOpenBmpMessage")] +pub fn parse_openbmp_message(data: &[u8]) -> Result { + let mut bytes = Bytes::from(data.to_vec()); + + let header = match parse_openbmp_header(&mut bytes) { + Ok(h) => h, + Err(ParserBmpError::UnsupportedOpenBmpMessage) => return Ok(String::new()), + Err(e) => return Err(JsError::new(&e.to_string())), + }; + + let bmp_msg = parse_bmp_msg(&mut bytes).map_err(|e| JsError::new(&e.to_string()))?; + + let result = build_bmp_result( + bmp_msg, + Some(make_openbmp_header(&header)), + header.timestamp, + ); + + serde_json::to_string(&result).map_err(|e| JsError::new(&e.to_string())) +} + +/// Parse a raw BMP message (no OpenBMP wrapper). +/// +/// The `timestamp` parameter provides the collection time in seconds since +/// Unix epoch. +/// +/// Returns a JSON string representing a `BmpParsedMessage`. +/// Throws a JavaScript `Error` on malformed data. +#[wasm_bindgen(js_name = "parseBmpMessage")] +pub fn parse_bmp_message(data: &[u8], timestamp: f64) -> Result { + let mut bytes = Bytes::from(data.to_vec()); + + let bmp_msg = parse_bmp_msg(&mut bytes).map_err(|e| JsError::new(&e.to_string()))?; + + let result = build_bmp_result(bmp_msg, None, timestamp); + + serde_json::to_string(&result).map_err(|e| JsError::new(&e.to_string())) +} + +/// Parse a decompressed MRT file into BGP elements. +/// +/// Accepts the raw bytes of a fully decompressed MRT file (the caller is +/// responsible for bzip2/gzip decompression). Returns a JSON array of +/// `BgpElem` objects. +/// +/// Handles TABLE_DUMP, TABLE_DUMP_V2, and BGP4MP record types. For +/// TABLE_DUMP_V2, the PeerIndexTable record is consumed internally to +/// resolve peer information. +/// +/// Throws a JavaScript `Error` if the file cannot be parsed at all. +#[wasm_bindgen(js_name = "parseMrtFile")] +pub fn parse_mrt_file(data: &[u8]) -> Result { + let mut cursor = Cursor::new(data); + let mut elementor = Elementor::default(); + let mut all_elems: Vec = Vec::new(); + + while let Ok(record) = parse_mrt_record(&mut cursor) { + let elems = elementor.record_to_elems(record); + all_elems.extend(elems); + } + + serde_json::to_string(&all_elems).map_err(|e| JsError::new(&e.to_string())) +} + +/// Parse a single BGP UPDATE message into BGP elements. +/// +/// Expects a full BGP message including the 16-byte marker, 2-byte length, +/// and 1-byte type header. Assumes 4-byte ASN encoding (the modern default). +/// +/// The returned elements will have a timestamp of `0.0` and unspecified +/// peer IP / peer ASN since those are not part of the BGP message itself — +/// the caller should populate these from external context if needed. +/// +/// Returns a JSON array of `BgpElem` objects. +/// Throws a JavaScript `Error` if the message cannot be parsed. +#[wasm_bindgen(js_name = "parseBgpUpdate")] +pub fn parse_bgp_update(data: &[u8]) -> Result { + let mut bytes = Bytes::from(data.to_vec()); + let msg = parse_bgp_message(&mut bytes, false, &AsnLength::Bits32) + .map_err(|e| JsError::new(&e.to_string()))?; + let elems = Elementor::bgp_to_elems( + msg, + 0.0, + &IpAddr::V4(Ipv4Addr::UNSPECIFIED), + &Asn::default(), + ); + serde_json::to_string(&elems).map_err(|e| JsError::new(&e.to_string())) +} diff --git a/src/wasm/README.md b/src/wasm/README.md new file mode 100644 index 0000000..0489d4d --- /dev/null +++ b/src/wasm/README.md @@ -0,0 +1,200 @@ +# @bgpkit/parser — WebAssembly Bindings + +> **Experimental**: The WASM bindings are experimental. The API surface, output +> format, and build process may change in future releases. + +This module compiles bgpkit-parser's BGP/BMP/MRT parsing code to WebAssembly +for use in JavaScript and TypeScript environments (Node.js, bundlers, Cloudflare +Workers). + +## Quick Start + +### Prerequisites + +- [Rust](https://rustup.rs/) (stable toolchain) +- [`wasm-pack`](https://rustwasm.github.io/wasm-pack/installer/): + ```sh + cargo install wasm-pack + ``` +- The `wasm32-unknown-unknown` target: + ```sh + rustup target add wasm32-unknown-unknown + ``` + +### Building and Publishing + +The build script compiles all three targets (Node.js, bundler, web) and +assembles a single npm package in `pkg/`: + +```sh +# From the repository root +bash src/wasm/build.sh + +# Publish to npm +cd pkg && npm publish +``` + +The resulting `pkg/` directory contains: + +``` +pkg/ +├── nodejs/ # CommonJS, sync WASM loading (Node.js) +├── bundler/ # ES modules (webpack, vite, rollup) +├── web/ # ES modules + init() (browsers, Cloudflare Workers) +├── index.js # CJS entry point +├── index.mjs # ESM entry point +├── index.d.ts # TypeScript types +├── web.mjs # Web entry point (requires init()) +├── web.d.ts # Web TypeScript types +└── package.json # npm package manifest with conditional exports +``` + +The `package.json` uses [conditional exports](https://nodejs.org/api/packages.html#conditional-exports) +so that `require('@bgpkit/parser')` loads the Node.js target and +`import '@bgpkit/parser'` loads the bundler target automatically. The web +target is available as a separate subpath import (`@bgpkit/parser/web`). + +### Building a single target + +If you only need one target (e.g. for the Kafka example), you can build it +directly: + +```sh +wasm-pack build --target nodejs --no-default-features --features wasm +``` + +## API + +Four parsing functions are exported, all accepting `Uint8Array` input and +returning parsed JavaScript objects: + +### `parseOpenBmpMessage(data: Uint8Array): BmpParsedMessage | null` + +Parse an OpenBMP-wrapped BMP message as received from the +[RouteViews Kafka stream](http://www.routeviews.org/routeviews/). +Returns `null` for non-router OpenBMP frames (e.g. collector heartbeats). + +```js +const { parseOpenBmpMessage } = require('@bgpkit/parser'); + +// `raw` is a Buffer/Uint8Array from a Kafka message value +const msg = parseOpenBmpMessage(raw); +if (msg && msg.type === 'RouteMonitoring') { + for (const elem of msg.elems) { + console.log(elem.type, elem.prefix, elem.as_path); + } +} +``` + +### `parseBmpMessage(data: Uint8Array, timestamp: number): BmpParsedMessage` + +Parse a raw BMP message without an OpenBMP wrapper. The `timestamp` parameter +provides the collection time in seconds since Unix epoch. + +```js +const { parseBmpMessage } = require('@bgpkit/parser'); + +const msg = parseBmpMessage(bmpBytes, Date.now() / 1000); +switch (msg.type) { + case 'RouteMonitoring': + console.log(`${msg.elems.length} BGP elements`); + break; + case 'PeerUpNotification': + console.log(`Peer up: ${msg.peerHeader.peerIp}`); + break; + case 'PeerDownNotification': + console.log(`Peer down: ${msg.peerHeader.peerIp} (${msg.reason})`); + break; +} +``` + +### `parseMrtFile(data: Uint8Array): BgpElem[]` + +Parse a fully decompressed MRT file into an array of BGP elements. Handles +TABLE_DUMP, TABLE_DUMP_V2, and BGP4MP record types. The caller is responsible +for bzip2/gzip decompression before passing the raw bytes. + +```js +const fs = require('fs'); +const zlib = require('zlib'); +const { parseMrtFile } = require('@bgpkit/parser'); + +const compressed = fs.readFileSync('rib.20260101.0000.bz2'); +const raw = zlib.bunzip2Sync(compressed); // or use a bzip2 library +const elems = parseMrtFile(raw); + +console.log(`Parsed ${elems.length} BGP elements`); +for (const elem of elems.slice(0, 10)) { + console.log(elem.type, elem.prefix, elem.as_path); +} +``` + +### `parseBgpUpdate(data: Uint8Array): BgpElem[]` + +Parse a single BGP UPDATE message into BGP elements. Expects the full BGP +message including the 16-byte marker, 2-byte length, and 1-byte type header. +Assumes 4-byte ASN encoding. + +The returned elements have `timestamp: 0` and unspecified peer IP/ASN since +those are not part of the BGP message itself. + +```js +const { parseBgpUpdate } = require('@bgpkit/parser'); + +// bgpBytes includes the 16-byte marker + length + type header +const elems = parseBgpUpdate(bgpBytes); +for (const elem of elems) { + console.log(elem.prefix, elem.next_hop); +} +``` + +## BMP Message Types + +The BMP parsing functions return a discriminated union on the `type` field. +All message types include a `timestamp` and an optional `openBmpHeader` +(present only when parsed via `parseOpenBmpMessage`). + +| `type` | Additional fields | +|---|---| +| `RouteMonitoring` | `peerHeader`, `elems` (array of `BgpElem`) | +| `PeerUpNotification` | `peerHeader`, `localIp`, `localPort`, `remotePort` | +| `PeerDownNotification` | `peerHeader`, `reason` | +| `InitiationMessage` | `tlvs` (array of `{type, value}`) | +| `TerminationMessage` | `tlvs` (array of `{type, value}`) | +| `StatisticsReport` | `peerHeader` | +| `RouteMirroringMessage` | `peerHeader` | + +## Platform Support + +| Platform | Import | Notes | +|---|---|---| +| Node.js (CJS) | `require('@bgpkit/parser')` | Uses `nodejs` target, sync WASM loading | +| Node.js (ESM) | `import { ... } from '@bgpkit/parser'` | Uses `bundler` target | +| Bundler | `import { ... } from '@bgpkit/parser'` | webpack, vite, rollup | +| Browser / CF Workers | `import { init, ... } from '@bgpkit/parser/web'` | Must call `await init()` first | + +### Web target usage + +The web target requires explicit initialization before calling any parsing +functions: + +```js +import { init, parseOpenBmpMessage } from '@bgpkit/parser/web'; + +await init(); // load and compile the WASM module + +const msg = parseOpenBmpMessage(data); +``` + +You can optionally pass a URL or `ArrayBuffer` of the `.wasm` file to `init()` +if the default path doesn't work in your environment: + +```js +await init(new URL('./bgpkit_parser_bg.wasm', import.meta.url)); +``` + +### Cloudflare Workers + +Use the `@bgpkit/parser/web` entry point. Note that Workers cannot connect to +Kafka directly (no raw TCP sockets), so BMP message bytes must arrive via HTTP +(e.g. from a proxy service). diff --git a/src/wasm/build.sh b/src/wasm/build.sh new file mode 100755 index 0000000..7e78439 --- /dev/null +++ b/src/wasm/build.sh @@ -0,0 +1,70 @@ +#!/usr/bin/env bash +# +# Build bgpkit-parser WASM package for all three targets (nodejs, bundler, web) +# and assemble a single publishable npm package in ./pkg. +# +# Usage: +# cd +# bash src/wasm/build.sh +# +# Prerequisites: +# - wasm-pack: cargo install wasm-pack +# - wasm32-unknown-unknown target: rustup target add wasm32-unknown-unknown +# +set -euo pipefail + +REPO_ROOT="$(cd "$(dirname "$0")/../.." && pwd)" +cd "$REPO_ROOT" + +WASM_DIR="src/wasm/js" +OUT_DIR="$REPO_ROOT/pkg" + +echo "==> Building nodejs target..." +wasm-pack build --target nodejs --no-default-features --features wasm +mv pkg pkg-nodejs + +echo "==> Building bundler target..." +wasm-pack build --target bundler --no-default-features --features wasm +mv pkg pkg-bundler + +echo "==> Building web target..." +wasm-pack build --target web --no-default-features --features wasm +mv pkg pkg-web + +echo "==> Assembling package..." +rm -rf "$OUT_DIR" +mkdir -p "$OUT_DIR/nodejs" "$OUT_DIR/bundler" "$OUT_DIR/web" + +# nodejs target: main glue + wasm binary +cp pkg-nodejs/bgpkit_parser.js "$OUT_DIR/nodejs/" +cp pkg-nodejs/bgpkit_parser_bg.wasm "$OUT_DIR/nodejs/" +cp pkg-nodejs/bgpkit_parser_bg.wasm.d.ts "$OUT_DIR/nodejs/" 2>/dev/null || true + +# bundler target: main glue + bg bindings + wasm binary +cp pkg-bundler/bgpkit_parser.js "$OUT_DIR/bundler/" +cp pkg-bundler/bgpkit_parser_bg.js "$OUT_DIR/bundler/" +cp pkg-bundler/bgpkit_parser_bg.wasm "$OUT_DIR/bundler/" +cp pkg-bundler/bgpkit_parser_bg.wasm.d.ts "$OUT_DIR/bundler/" 2>/dev/null || true +cp pkg-bundler/bgpkit_parser.d.ts "$OUT_DIR/bundler/" 2>/dev/null || true + +# web target: main glue + wasm binary (no _bg.js for web target) +cp pkg-web/bgpkit_parser.js "$OUT_DIR/web/" +cp pkg-web/bgpkit_parser_bg.wasm "$OUT_DIR/web/" +cp pkg-web/bgpkit_parser_bg.wasm.d.ts "$OUT_DIR/web/" 2>/dev/null || true +cp pkg-web/bgpkit_parser.d.ts "$OUT_DIR/web/" 2>/dev/null || true + +# JS wrappers and types +cp "$WASM_DIR/index.js" "$OUT_DIR/" +cp "$WASM_DIR/index.mjs" "$OUT_DIR/" +cp "$WASM_DIR/index.d.ts" "$OUT_DIR/" +cp "$WASM_DIR/web.mjs" "$OUT_DIR/" +cp "$WASM_DIR/web.d.ts" "$OUT_DIR/" +cp "$WASM_DIR/package.json" "$OUT_DIR/" + +# Cleanup temp directories +rm -rf pkg-nodejs pkg-bundler pkg-web + +echo "==> Done. Package ready in $OUT_DIR/" +echo "" +echo "To publish:" +echo " cd pkg && npm publish" diff --git a/src/wasm/js/index.d.ts b/src/wasm/js/index.d.ts new file mode 100644 index 0000000..cd6450f --- /dev/null +++ b/src/wasm/js/index.d.ts @@ -0,0 +1,163 @@ +// TypeScript type definitions for @bgpkit/parser WebAssembly bindings. +// +// These types describe the JSON output produced by the Rust WASM functions. + +// ── Parsing functions ──────────────────────────────────────────────── + +/** + * Parse an OpenBMP-wrapped BMP message (e.g. from the RouteViews Kafka stream). + * Returns null for non-router OpenBMP frames (collector heartbeats). + */ +export function parseOpenBmpMessage(data: Uint8Array): BmpParsedMessage | null; + +/** + * Parse a raw BMP message (no OpenBMP wrapper). + * @param timestamp Collection time in seconds since Unix epoch. + */ +export function parseBmpMessage( + data: Uint8Array, + timestamp: number +): BmpParsedMessage; + +/** + * Parse a decompressed MRT file into BGP elements. + * Handles TABLE_DUMP, TABLE_DUMP_V2, and BGP4MP record types. + */ +export function parseMrtFile(data: Uint8Array): BgpElem[]; + +/** + * Parse a single BGP UPDATE message (with 16-byte marker + 2-byte length + * + type header) into BGP elements. Assumes 4-byte ASN encoding. + */ +export function parseBgpUpdate(data: Uint8Array): BgpElem[]; + +// ── BMP message types (discriminated union on `type`) ──────────────── + +export type BmpParsedMessage = + | BmpRouteMonitoringMessage + | BmpPeerUpMessage + | BmpPeerDownMessage + | BmpInitiationMessage + | BmpTerminationMessage + | BmpStatsReportMessage + | BmpRouteMirroringMessage; + +interface BmpMessageBase { + openBmpHeader: OpenBmpHeader | null; + timestamp: number; +} + +export interface BmpRouteMonitoringMessage extends BmpMessageBase { + type: "RouteMonitoring"; + peerHeader: BmpPeerHeader; + elems: BgpElem[]; +} + +export interface BmpPeerUpMessage extends BmpMessageBase { + type: "PeerUpNotification"; + peerHeader: BmpPeerHeader; + localIp: string; + localPort: number; + remotePort: number; +} + +export interface BmpPeerDownMessage extends BmpMessageBase { + type: "PeerDownNotification"; + peerHeader: BmpPeerHeader; + reason: string; +} + +export interface BmpInitiationMessage extends BmpMessageBase { + type: "InitiationMessage"; + tlvs: Array<{ type: string; value: string }>; +} + +export interface BmpTerminationMessage extends BmpMessageBase { + type: "TerminationMessage"; + tlvs: Array<{ type: string; value: string }>; +} + +export interface BmpStatsReportMessage extends BmpMessageBase { + type: "StatisticsReport"; + peerHeader: BmpPeerHeader; +} + +export interface BmpRouteMirroringMessage extends BmpMessageBase { + type: "RouteMirroringMessage"; + peerHeader: BmpPeerHeader; +} + +// ── Shared types ───────────────────────────────────────────────────── + +export interface OpenBmpHeader { + routerIp: string; + routerGroup: string | null; + adminId: string; + timestamp: number; +} + +export interface BmpPeerHeader { + peerIp: string; + peerAsn: number; + peerBgpId: string; + peerType: string; + isPostPolicy: boolean; + isAdjRibOut: boolean; + timestamp: number; +} + +export interface BgpElem { + timestamp: number; + type: "ANNOUNCE" | "WITHDRAW"; + peer_ip: string; + peer_asn: number; + peer_bgp_id: string | null; + prefix: string; + next_hop: string | null; + as_path: AsPath | null; + origin_asns: number[] | null; + origin: string | null; + local_pref: number | null; + med: number | null; + communities: MetaCommunity[] | null; + atomic: boolean; + aggr_asn: number | null; + aggr_ip: string | null; + only_to_customer: number | null; +} + +// ── AS path types ──────────────────────────────────────────────────── + +/** + * AS path in simplified format: a flat array where numbers are ASNs in + * AS_SEQUENCE segments and nested arrays are AS_SET members. + * Example: [6447, 39120, [643, 836], 352] + * + * Falls back to verbose format with confederation segments: + * [{ ty: "AS_CONFED_SEQUENCE", values: [123, 942] }, ...] + */ +export type AsPath = AsPathElement[]; +export type AsPathElement = number | number[] | AsPathVerboseSegment; +export interface AsPathVerboseSegment { + ty: "AS_SET" | "AS_SEQUENCE" | "AS_CONFED_SEQUENCE" | "AS_CONFED_SET"; + values: number[]; +} + +// ── Community types ────────────────────────────────────────────────── + +/** Discriminated union of all BGP community types (serde untagged). */ +export type MetaCommunity = PlainCommunity | LargeCommunity | ExtendedCommunity; + +export type PlainCommunity = + | { Custom: [number, number] } + | "NoExport" + | "NoAdvertise" + | "NoExportSubConfed"; + +export interface LargeCommunity { + global_admin: number; + local_data: [number, number]; +} + +/** Extended community — many variants, treat as opaque JSON object. */ +export type ExtendedCommunity = Record; diff --git a/src/wasm/js/index.js b/src/wasm/js/index.js new file mode 100644 index 0000000..0d0c040 --- /dev/null +++ b/src/wasm/js/index.js @@ -0,0 +1,59 @@ +'use strict'; + +const wasm = require('./nodejs/bgpkit_parser.js'); + +/** + * Parse an OpenBMP-wrapped BMP message (e.g. from the RouteViews Kafka stream). + * + * Returns null for non-router OpenBMP frames (collector heartbeats). + * + * @param {Uint8Array} data - Raw OpenBMP message bytes + * @returns {object|null} Parsed BMP message with discriminated `type` field + */ +function parseOpenBmpMessage(data) { + const json = wasm.parseOpenBmpMessage(data); + return json ? JSON.parse(json) : null; +} + +/** + * Parse a raw BMP message (no OpenBMP wrapper). + * + * @param {Uint8Array} data - Raw BMP message bytes + * @param {number} timestamp - Collection time in seconds since Unix epoch + * @returns {object} Parsed BMP message with discriminated `type` field + */ +function parseBmpMessage(data, timestamp) { + return JSON.parse(wasm.parseBmpMessage(data, timestamp)); +} + +/** + * Parse a decompressed MRT file into BGP elements. + * + * Handles TABLE_DUMP, TABLE_DUMP_V2, and BGP4MP record types. + * The caller is responsible for decompressing the file (bzip2/gzip) before + * passing the raw bytes. + * + * @param {Uint8Array} data - Decompressed MRT file bytes + * @returns {object[]} Array of BgpElem objects + */ +function parseMrtFile(data) { + return JSON.parse(wasm.parseMrtFile(data)); +} + +/** + * Parse a single BGP UPDATE message into BGP elements. + * + * Expects a full BGP message including the 16-byte marker, 2-byte length, + * and 1-byte type header. Assumes 4-byte ASN encoding. + * + * The returned elements have timestamp=0 and unspecified peer IP/ASN since + * those are not part of the BGP message itself. + * + * @param {Uint8Array} data - Raw BGP message bytes + * @returns {object[]} Array of BgpElem objects + */ +function parseBgpUpdate(data) { + return JSON.parse(wasm.parseBgpUpdate(data)); +} + +module.exports = { parseOpenBmpMessage, parseBmpMessage, parseMrtFile, parseBgpUpdate }; diff --git a/src/wasm/js/index.mjs b/src/wasm/js/index.mjs new file mode 100644 index 0000000..a43eba4 --- /dev/null +++ b/src/wasm/js/index.mjs @@ -0,0 +1,18 @@ +import * as wasm from './bundler/bgpkit_parser.js'; + +export function parseOpenBmpMessage(data) { + const json = wasm.parseOpenBmpMessage(data); + return json ? JSON.parse(json) : null; +} + +export function parseBmpMessage(data, timestamp) { + return JSON.parse(wasm.parseBmpMessage(data, timestamp)); +} + +export function parseMrtFile(data) { + return JSON.parse(wasm.parseMrtFile(data)); +} + +export function parseBgpUpdate(data) { + return JSON.parse(wasm.parseBgpUpdate(data)); +} diff --git a/src/wasm/js/package.json b/src/wasm/js/package.json new file mode 100644 index 0000000..c7afc19 --- /dev/null +++ b/src/wasm/js/package.json @@ -0,0 +1,57 @@ +{ + "name": "@bgpkit/parser", + "version": "0.15.0", + "description": "BGP/BMP/MRT message parser compiled to WebAssembly", + "license": "MIT", + "repository": { + "type": "git", + "url": "https://github.com/bgpkit/bgpkit-parser" + }, + "main": "./index.js", + "module": "./index.mjs", + "types": "./index.d.ts", + "exports": { + ".": { + "node": { + "require": { + "types": "./index.d.ts", + "default": "./index.js" + }, + "import": { + "types": "./index.d.ts", + "default": "./index.mjs" + } + }, + "default": { + "types": "./index.d.ts", + "default": "./index.mjs" + } + }, + "./web": { + "types": "./web.d.ts", + "default": "./web.mjs" + } + }, + "files": [ + "nodejs/", + "bundler/", + "web/", + "index.js", + "index.mjs", + "index.d.ts", + "web.mjs", + "web.d.ts" + ], + "keywords": [ + "bgp", + "bmp", + "mrt", + "wasm", + "parser", + "routeviews", + "bgpkit" + ], + "engines": { + "node": ">=18" + } +} diff --git a/src/wasm/js/web.d.ts b/src/wasm/js/web.d.ts new file mode 100644 index 0000000..e7dd7c5 --- /dev/null +++ b/src/wasm/js/web.d.ts @@ -0,0 +1,27 @@ +// TypeScript type definitions for the web target entry point. +// The web target requires calling init() before using any parsing functions. + +export { + BmpParsedMessage, + BmpRouteMonitoringMessage, + BmpPeerUpMessage, + BmpPeerDownMessage, + BmpInitiationMessage, + BmpTerminationMessage, + BmpStatsReportMessage, + BmpRouteMirroringMessage, + OpenBmpHeader, + BmpPeerHeader, + BgpElem, +} from './index'; + +export { parseOpenBmpMessage, parseBmpMessage, parseMrtFile, parseBgpUpdate } from './index'; + +/** + * Initialize the WASM module. Must be called (and awaited) before using + * any parsing functions. + * + * @param input - Optional URL, Request, or BufferSource of the .wasm file. + * If omitted, the default path is used. + */ +export function init(input?: RequestInfo | URL | BufferSource): Promise; diff --git a/src/wasm/js/web.mjs b/src/wasm/js/web.mjs new file mode 100644 index 0000000..3a8bb0b --- /dev/null +++ b/src/wasm/js/web.mjs @@ -0,0 +1,30 @@ +import initWasm from './web/bgpkit_parser.js'; +import * as wasm from './web/bgpkit_parser.js'; + +/** + * Initialize the WASM module. Must be called (and awaited) before using + * any parsing functions. + * + * @param {RequestInfo | URL | BufferSource} [input] - Optional URL or buffer + * of the .wasm file. If omitted, the default path is used. + */ +export async function init(input) { + await initWasm(input); +} + +export function parseOpenBmpMessage(data) { + const json = wasm.parseOpenBmpMessage(data); + return json ? JSON.parse(json) : null; +} + +export function parseBmpMessage(data, timestamp) { + return JSON.parse(wasm.parseBmpMessage(data, timestamp)); +} + +export function parseMrtFile(data) { + return JSON.parse(wasm.parseMrtFile(data)); +} + +export function parseBgpUpdate(data) { + return JSON.parse(wasm.parseBgpUpdate(data)); +} diff --git a/src/wasm/test/.gitignore b/src/wasm/test/.gitignore new file mode 100644 index 0000000..504afef --- /dev/null +++ b/src/wasm/test/.gitignore @@ -0,0 +1,2 @@ +node_modules/ +package-lock.json diff --git a/src/wasm/test/package.json b/src/wasm/test/package.json new file mode 100644 index 0000000..7fefdc6 --- /dev/null +++ b/src/wasm/test/package.json @@ -0,0 +1,11 @@ +{ + "name": "bgpkit-parser-wasm-test", + "version": "1.0.0", + "private": true, + "scripts": { + "test": "node test-published.js" + }, + "dependencies": { + "@bgpkit/parser": "0.15.0" + } +} diff --git a/src/wasm/test/test-published.js b/src/wasm/test/test-published.js new file mode 100644 index 0000000..cd72aa1 --- /dev/null +++ b/src/wasm/test/test-published.js @@ -0,0 +1,201 @@ +#!/usr/bin/env node +'use strict'; + +/** + * Test script for the published @bgpkit/parser npm package. + * + * Usage: + * cd src/wasm/test + * npm install @bgpkit/parser + * node test-published.js + */ + +const https = require('https'); +const { parseOpenBmpMessage, parseBmpMessage, parseBgpUpdate, parseMrtFile } = require('@bgpkit/parser'); + +let passed = 0; +let failed = 0; + +function assert(condition, msg) { + if (condition) { + console.log(` ✓ ${msg}`); + passed++; + } else { + console.error(` ✗ ${msg}`); + failed++; + } +} + +// ── Test 1: parseOpenBmpMessage (PeerDownNotification) ─────────────────────── + +function testOpenBmpPeerDown() { + console.log('\n[1/5] parseOpenBmpMessage (PeerDownNotification)'); + + // Complete OpenBMP PeerDownNotification from the bgpkit-parser test suite + const hex = + '4f424d500107006400000033800c6184b9c2000c602cbf4f072f3ae149d23486024bc3dadfc4000a' + + '69732d63632d626d7031c677060bdd020a9e92be000200de2e3180df3369000000000000000000000' + + '000000c726f7574652d76696577733500000001030000003302000000000000000000000000000000' + + '000000000000003fda060e00000da30000000061523c36000c0e1c0200000a'; + + const data = Buffer.from(hex, 'hex'); + const msg = parseOpenBmpMessage(data); + + assert(msg !== null, 'returns non-null for valid message'); + assert(msg.type === 'PeerDownNotification', `type is PeerDownNotification (got ${msg.type})`); + assert(msg.openBmpHeader !== null, 'openBmpHeader is present'); + assert(typeof msg.openBmpHeader.routerIp === 'string', `routerIp: ${msg.openBmpHeader.routerIp}`); + assert(msg.peerHeader !== undefined, 'peerHeader is present'); + assert(typeof msg.peerHeader.peerAsn === 'number', `peerAsn: ${msg.peerHeader.peerAsn}`); + assert(typeof msg.reason === 'string', `reason: ${msg.reason}`); + assert(typeof msg.timestamp === 'number', `timestamp: ${msg.timestamp}`); +} + +// ── Test 2: parseOpenBmpMessage — null for non-router frames ───────────────── + +function testOpenBmpNull() { + console.log('\n[2/5] parseOpenBmpMessage (non-router frame)'); + + // OpenBMP header with object_type = 0x00 (not 0x0C = router message) + const hex = + '4f424d500100005c000000b0800c618881530002f643fef880938d19e9d632c815d1e95a87e1000a' + + '69732d61682d626d7031eb4de4e596b282c6a995b067df4abc8cc342f192'; + const data = Buffer.from(hex, 'hex'); + + let result; + try { + result = parseOpenBmpMessage(data); + } catch { + result = 'threw'; + } + assert(result === null, 'returns null for non-router OpenBMP frame'); +} + +// ── Test 3: parseBmpMessage ────────────────────────────────────────────────── + +function testBmpMessage() { + console.log('\n[3/5] parseBmpMessage'); + + // Extract the BMP portion from the PeerDown test (skip the OpenBMP header). + // The OpenBMP header is the first 100 bytes (0x64 hex in the length field). + // BMP message starts after the OpenBMP header. + const fullHex = + '4f424d500107006400000033800c6184b9c2000c602cbf4f072f3ae149d23486024bc3dadfc4000a' + + '69732d63632d626d7031c677060bdd020a9e92be000200de2e3180df3369000000000000000000000' + + '000000c726f7574652d76696577733500000001030000003302000000000000000000000000000000' + + '000000000000003fda060e00000da30000000061523c36000c0e1c0200000a'; + + // OpenBMP header: magic(4) + ver(2) + hdr_len(2) + msg_len(4) + flags(1) + + // obj_type(1) + timestamp_sec(4) + timestamp_usec(4) + collector_hash(16) + + // admin_id_len(2) + admin_id(variable) + router_hash(16) + router_ip(4/16) + + // router_group_len(2) + router_group(variable) + row_count(4) + // The msg_len field (bytes 6-9) tells us BMP message length: 0x00000033 = 51 bytes + // The hdr_len field (bytes 4-5) tells us OpenBMP header length: 0x0064 = 100 bytes + const fullData = Buffer.from(fullHex, 'hex'); + const hdrLen = fullData.readUInt16BE(6); + const bmpData = fullData.subarray(hdrLen); + const now = Date.now() / 1000; + + const msg = parseBmpMessage(bmpData, now); + + assert(msg !== null, 'returns non-null'); + assert(msg.type === 'PeerDownNotification', `type is PeerDownNotification (got ${msg.type})`); + assert(msg.openBmpHeader === null, 'openBmpHeader is null (no OpenBMP wrapper)'); + assert(typeof msg.timestamp === 'number', 'timestamp is a number'); +} + +// ── Test 4: parseBgpUpdate ─────────────────────────────────────────────────── + +function testBgpUpdate() { + console.log('\n[4/5] parseBgpUpdate'); + + // Minimal BGP UPDATE announcing 198.51.100.0/24 via next-hop 192.0.2.1 + // 16-byte marker + 2-byte length + 1-byte type(2=UPDATE) + // + withdrawn_len(0) + path_attr_len + attrs + NLRI + const hex = + 'ffffffffffffffffffffffffffffffff' + // marker + '00380200' + // length=56, type=UPDATE + '0000' + // withdrawn routes length = 0 + '001b' + // total path attr length = 27 + '40010100' + // ORIGIN: IGP + '40020a0202000000fd00000065' + // AS_PATH: 253 101 + '400304c0000201' + // NEXT_HOP: 192.0.2.1 + '18c63364'; // NLRI: 198.51.100.0/24 + + const data = Buffer.from(hex, 'hex'); + const elems = parseBgpUpdate(data); + + assert(Array.isArray(elems), 'returns an array'); + assert(elems.length === 1, `has ${elems.length} element(s)`); + + const elem = elems[0]; + assert(elem.type === 'ANNOUNCE', `type is ANNOUNCE (got ${elem.type})`); + assert(elem.prefix === '198.51.100.0/24', `prefix is ${elem.prefix}`); + assert(elem.next_hop === '192.0.2.1', `next_hop is ${elem.next_hop}`); +} + +// ── Test 5: parseMrtFile ───────────────────────────────────────────────────── + +function testMrtFile() { + return new Promise((resolve) => { + console.log('\n[5/5] parseMrtFile'); + console.log(' ↓ downloading test MRT file...'); + + const url = 'https://spaces.bgpkit.org/parser/update-example'; + + https.get(url, (res) => { + if (res.statusCode >= 300 && res.statusCode < 400 && res.headers.location) { + https.get(res.headers.location, (res2) => handleResponse(res2, resolve)); + return; + } + handleResponse(res, resolve); + }).on('error', (err) => { + console.log(` ⚠ skipped (download failed: ${err.message})`); + resolve(); + }); + }); +} + +function handleResponse(res, resolve) { + const chunks = []; + res.on('data', (chunk) => chunks.push(chunk)); + res.on('end', () => { + const raw = Buffer.concat(chunks); + console.log(` ↓ downloaded ${(raw.length / 1024).toFixed(0)} KB`); + + const elems = parseMrtFile(raw); + + assert(Array.isArray(elems), 'returns an array'); + assert(elems.length > 0, `parsed ${elems.length} elements`); + + const announce = elems.find((e) => e.type === 'ANNOUNCE'); + if (announce) { + assert(typeof announce.prefix === 'string', `sample prefix: ${announce.prefix}`); + assert(typeof announce.peer_ip === 'string', `sample peer_ip: ${announce.peer_ip}`); + assert(typeof announce.peer_asn === 'number', `sample peer_asn: ${announce.peer_asn}`); + } + + resolve(); + }); + res.on('error', (err) => { + console.log(` ⚠ skipped (download failed: ${err.message})`); + resolve(); + }); +} + +// ── Run ────────────────────────────────────────────────────────────────────── + +async function main() { + console.log('Testing @bgpkit/parser npm package\n'); + + testOpenBmpPeerDown(); + testOpenBmpNull(); + testBmpMessage(); + testBgpUpdate(); + await testMrtFile(); + + console.log(`\n${passed + failed} assertions: ${passed} passed, ${failed} failed`); + process.exit(failed > 0 ? 1 : 0); +} + +main(); From 8b8c28f1b2d8a2cde057045fd96864308b95e095 Mon Sep 17 00:00:00 2001 From: Mingwei Zhang Date: Sun, 22 Mar 2026 14:25:07 -0700 Subject: [PATCH 02/10] Add explicit lib path for cargo-readme compatibility --- Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/Cargo.toml b/Cargo.toml index c6b7ea6..e5b5526 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ required-features = ["cli"] # normal Rust library usage. Both are listed so the crate works for native # consumers and wasm-pack alike. [lib] +path = "src/lib.rs" crate-type = ["cdylib", "rlib"] [dependencies] From e01d98c4813d477c37564c08001d0f8c560566e6 Mon Sep 17 00:00:00 2001 From: Mingwei Zhang Date: Sun, 22 Mar 2026 14:37:38 -0700 Subject: [PATCH 03/10] Add unit tests for WASM parsing functions Extract core parsing logic into testable functions that return Result instead of JsError, with thin wasm_bindgen wrappers on top. Tests cover BMP initiation/peer-up parsing, BGP UPDATE parsing, MRT empty file, invalid input handling, and peer header construction. --- src/wasm.rs | 319 +++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 278 insertions(+), 41 deletions(-) diff --git a/src/wasm.rs b/src/wasm.rs index c648153..1d5f8d3 100644 --- a/src/wasm.rs +++ b/src/wasm.rs @@ -253,26 +253,20 @@ fn build_bmp_result( } } -// ── Exported WASM functions ────────────────────────────────────────────────── +// ── Core parsing functions (testable without wasm_bindgen) ─────────────────── -/// Parse an OpenBMP-wrapped BMP message as received from the RouteViews Kafka stream. -/// -/// Returns a JSON string representing a `BmpParsedMessage` discriminated on `"type"`. -/// Returns an empty string for non-router OpenBMP frames (the JS wrapper converts -/// this to `null`). -/// -/// Throws a JavaScript `Error` on malformed data. -#[wasm_bindgen(js_name = "parseOpenBmpMessage")] -pub fn parse_openbmp_message(data: &[u8]) -> Result { +/// Parse an OpenBMP message, returning a JSON string or `Ok(None)` for +/// unsupported (non-router) messages. +fn parse_openbmp_message_core(data: &[u8]) -> Result, String> { let mut bytes = Bytes::from(data.to_vec()); let header = match parse_openbmp_header(&mut bytes) { Ok(h) => h, - Err(ParserBmpError::UnsupportedOpenBmpMessage) => return Ok(String::new()), - Err(e) => return Err(JsError::new(&e.to_string())), + Err(ParserBmpError::UnsupportedOpenBmpMessage) => return Ok(None), + Err(e) => return Err(e.to_string()), }; - let bmp_msg = parse_bmp_msg(&mut bytes).map_err(|e| JsError::new(&e.to_string()))?; + let bmp_msg = parse_bmp_msg(&mut bytes).map_err(|e| e.to_string())?; let result = build_bmp_result( bmp_msg, @@ -280,7 +274,66 @@ pub fn parse_openbmp_message(data: &[u8]) -> Result { header.timestamp, ); - serde_json::to_string(&result).map_err(|e| JsError::new(&e.to_string())) + serde_json::to_string(&result) + .map(Some) + .map_err(|e| e.to_string()) +} + +/// Parse a raw BMP message, returning a JSON string. +fn parse_bmp_message_core(data: &[u8], timestamp: f64) -> Result { + let mut bytes = Bytes::from(data.to_vec()); + + let bmp_msg = parse_bmp_msg(&mut bytes).map_err(|e| e.to_string())?; + + let result = build_bmp_result(bmp_msg, None, timestamp); + + serde_json::to_string(&result).map_err(|e| e.to_string()) +} + +/// Parse a decompressed MRT file, returning a JSON array string. +fn parse_mrt_file_core(data: &[u8]) -> Result { + let mut cursor = Cursor::new(data); + let mut elementor = Elementor::default(); + let mut all_elems: Vec = Vec::new(); + + while let Ok(record) = parse_mrt_record(&mut cursor) { + let elems = elementor.record_to_elems(record); + all_elems.extend(elems); + } + + serde_json::to_string(&all_elems).map_err(|e| e.to_string()) +} + +/// Parse a single BGP UPDATE message, returning a JSON array string. +fn parse_bgp_update_core(data: &[u8]) -> Result { + let mut bytes = Bytes::from(data.to_vec()); + let msg = + parse_bgp_message(&mut bytes, false, &AsnLength::Bits32).map_err(|e| e.to_string())?; + let elems = Elementor::bgp_to_elems( + msg, + 0.0, + &IpAddr::V4(Ipv4Addr::UNSPECIFIED), + &Asn::default(), + ); + serde_json::to_string(&elems).map_err(|e| e.to_string()) +} + +// ── Exported WASM functions (thin wrappers) ────────────────────────────────── + +/// Parse an OpenBMP-wrapped BMP message as received from the RouteViews Kafka stream. +/// +/// Returns a JSON string representing a `BmpParsedMessage` discriminated on `"type"`. +/// Returns an empty string for non-router OpenBMP frames (the JS wrapper converts +/// this to `null`). +/// +/// Throws a JavaScript `Error` on malformed data. +#[wasm_bindgen(js_name = "parseOpenBmpMessage")] +pub fn parse_openbmp_message(data: &[u8]) -> Result { + match parse_openbmp_message_core(data) { + Ok(Some(json)) => Ok(json), + Ok(None) => Ok(String::new()), + Err(e) => Err(JsError::new(&e)), + } } /// Parse a raw BMP message (no OpenBMP wrapper). @@ -292,13 +345,7 @@ pub fn parse_openbmp_message(data: &[u8]) -> Result { /// Throws a JavaScript `Error` on malformed data. #[wasm_bindgen(js_name = "parseBmpMessage")] pub fn parse_bmp_message(data: &[u8], timestamp: f64) -> Result { - let mut bytes = Bytes::from(data.to_vec()); - - let bmp_msg = parse_bmp_msg(&mut bytes).map_err(|e| JsError::new(&e.to_string()))?; - - let result = build_bmp_result(bmp_msg, None, timestamp); - - serde_json::to_string(&result).map_err(|e| JsError::new(&e.to_string())) + parse_bmp_message_core(data, timestamp).map_err(|e| JsError::new(&e)) } /// Parse a decompressed MRT file into BGP elements. @@ -314,16 +361,7 @@ pub fn parse_bmp_message(data: &[u8], timestamp: f64) -> Result /// Throws a JavaScript `Error` if the file cannot be parsed at all. #[wasm_bindgen(js_name = "parseMrtFile")] pub fn parse_mrt_file(data: &[u8]) -> Result { - let mut cursor = Cursor::new(data); - let mut elementor = Elementor::default(); - let mut all_elems: Vec = Vec::new(); - - while let Ok(record) = parse_mrt_record(&mut cursor) { - let elems = elementor.record_to_elems(record); - all_elems.extend(elems); - } - - serde_json::to_string(&all_elems).map_err(|e| JsError::new(&e.to_string())) + parse_mrt_file_core(data).map_err(|e| JsError::new(&e)) } /// Parse a single BGP UPDATE message into BGP elements. @@ -339,14 +377,213 @@ pub fn parse_mrt_file(data: &[u8]) -> Result { /// Throws a JavaScript `Error` if the message cannot be parsed. #[wasm_bindgen(js_name = "parseBgpUpdate")] pub fn parse_bgp_update(data: &[u8]) -> Result { - let mut bytes = Bytes::from(data.to_vec()); - let msg = parse_bgp_message(&mut bytes, false, &AsnLength::Bits32) - .map_err(|e| JsError::new(&e.to_string()))?; - let elems = Elementor::bgp_to_elems( - msg, - 0.0, - &IpAddr::V4(Ipv4Addr::UNSPECIFIED), - &Asn::default(), - ); - serde_json::to_string(&elems).map_err(|e| JsError::new(&e.to_string())) + parse_bgp_update_core(data).map_err(|e| JsError::new(&e)) +} + +// ── Tests ──────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + + /// Build a minimal BMP Initiation message (type 4). + /// BMP common header: version(1) + length(4) + type(1) = 6 bytes + /// Initiation TLV: type(2) + length(2) + value + fn make_bmp_initiation(sys_name: &str) -> Vec { + let tlv_type: u16 = 2; // sysName + let tlv_len = sys_name.len() as u16; + let total_len = 6 + 4 + sys_name.len(); + let mut buf = Vec::with_capacity(total_len); + // BMP common header + buf.push(3); // version + buf.extend_from_slice(&(total_len as u32).to_be_bytes()); // length + buf.push(4); // type = Initiation + // TLV + buf.extend_from_slice(&tlv_type.to_be_bytes()); + buf.extend_from_slice(&tlv_len.to_be_bytes()); + buf.extend_from_slice(sys_name.as_bytes()); + buf + } + + /// Build a BMP Peer Up message with a per-peer header. + fn make_bmp_peer_up() -> Vec { + // Per-peer header: type(1) + flags(1) + rd(8) + addr(16) + asn(4) + bgp_id(4) + ts_s(4) + ts_us(4) = 42 + let peer_header_len = 42; + // Peer Up body: local_addr(16) + local_port(2) + remote_port(2) + sent_open + recv_open + // Minimal BGP OPEN: marker(16) + length(2) + type(1) + version(1) + my_as(2) + hold_time(2) + bgp_id(4) + opt_len(1) = 29 + let bgp_open_len = 29u16; + let body_len = 16 + 2 + 2 + (bgp_open_len as usize) * 2; + let total_len = 6 + peer_header_len + body_len; + let mut buf = Vec::with_capacity(total_len); + + // BMP common header + buf.push(3); // version + buf.extend_from_slice(&(total_len as u32).to_be_bytes()); + buf.push(3); // type = Peer Up Notification + + // Per-peer header + buf.push(0); // peer type = Global + buf.push(0); // peer flags + buf.extend_from_slice(&[0u8; 8]); // route distinguisher + buf.extend_from_slice(&[0u8; 12]); // padding for IPv4 + buf.extend_from_slice(&[10, 0, 0, 1]); // peer IPv4 + buf.extend_from_slice(&65000u32.to_be_bytes()); // peer ASN + buf.extend_from_slice(&[10, 0, 0, 1]); // peer BGP ID + buf.extend_from_slice(&1000u32.to_be_bytes()); // timestamp seconds + buf.extend_from_slice(&0u32.to_be_bytes()); // timestamp microseconds + + // Peer Up body + buf.extend_from_slice(&[0u8; 12]); // padding for IPv4 + buf.extend_from_slice(&[192, 168, 1, 1]); // local IPv4 + buf.extend_from_slice(&179u16.to_be_bytes()); // local port + buf.extend_from_slice(&12345u16.to_be_bytes()); // remote port + + // Sent OPEN message + let open = make_bgp_open(65000, [10, 0, 0, 1]); + buf.extend_from_slice(&open); + // Received OPEN message + buf.extend_from_slice(&open); + + buf + } + + /// Build a minimal BGP OPEN message. + fn make_bgp_open(my_as: u16, bgp_id: [u8; 4]) -> Vec { + let mut buf = Vec::new(); + buf.extend_from_slice(&[0xFF; 16]); // marker + buf.extend_from_slice(&29u16.to_be_bytes()); // length + buf.push(1); // type = OPEN + buf.push(4); // version + buf.extend_from_slice(&my_as.to_be_bytes()); + buf.extend_from_slice(&180u16.to_be_bytes()); // hold time + buf.extend_from_slice(&bgp_id); + buf.push(0); // opt params length + buf + } + + /// Build a minimal BGP UPDATE with a single IPv4 announcement. + fn make_bgp_update_announce(prefix: [u8; 4], prefix_len: u8, next_hop: [u8; 4]) -> Vec { + // Path attributes: ORIGIN(IGP) + AS_PATH(1 AS_SEQUENCE with AS 65001) + NEXT_HOP + let origin_attr = [0x40, 1, 1, 0]; // flags, type=1, len=1, IGP=0 + #[rustfmt::skip] + let as_path_attr = [ + 0x40, 2, 6, // flags, type=2, len=6 + 2, // segment type = AS_SEQUENCE + 1, // segment length = 1 ASN + 0, 0, 0xFD, 0xE9, // AS 65001 (4-byte) + ]; + let next_hop_attr = [ + 0x40, + 3, + 4, + next_hop[0], + next_hop[1], + next_hop[2], + next_hop[3], + ]; + + let prefix_bytes = (prefix_len as usize).div_ceil(8); + let nlri_len = 1 + prefix_bytes; // prefix_len byte + prefix bytes + let path_attr_len = origin_attr.len() + as_path_attr.len() + next_hop_attr.len(); + let body_len = 2 + 2 + path_attr_len + nlri_len; // withdrawn_len(2) + attr_len(2) + attrs + nlri + let total_len = 19 + body_len; // marker(16) + length(2) + type(1) + body + + let mut buf = Vec::new(); + buf.extend_from_slice(&[0xFF; 16]); // marker + buf.extend_from_slice(&(total_len as u16).to_be_bytes()); + buf.push(2); // type = UPDATE + buf.extend_from_slice(&0u16.to_be_bytes()); // withdrawn routes length + buf.extend_from_slice(&(path_attr_len as u16).to_be_bytes()); + buf.extend_from_slice(&origin_attr); + buf.extend_from_slice(&as_path_attr); + buf.extend_from_slice(&next_hop_attr); + buf.push(prefix_len); + buf.extend_from_slice(&prefix[..prefix_bytes]); + buf + } + + #[test] + fn test_parse_bmp_initiation() { + let data = make_bmp_initiation("test-router"); + let json = parse_bmp_message_core(&data, 1000.0).unwrap(); + let v: serde_json::Value = serde_json::from_str(&json).unwrap(); + assert_eq!(v["type"], "InitiationMessage"); + assert_eq!(v["timestamp"], 1000.0); + assert!(v["openBmpHeader"].is_null()); + assert_eq!(v["tlvs"][0]["type"], "SysName"); + assert_eq!(v["tlvs"][0]["value"], "test-router"); + } + + #[test] + fn test_parse_bmp_peer_up() { + let data = make_bmp_peer_up(); + let json = parse_bmp_message_core(&data, 1000.0).unwrap(); + let v: serde_json::Value = serde_json::from_str(&json).unwrap(); + assert_eq!(v["type"], "PeerUpNotification"); + assert_eq!(v["peerHeader"]["peerIp"], "10.0.0.1"); + assert_eq!(v["peerHeader"]["peerAsn"], 65000); + assert_eq!(v["localIp"], "192.168.1.1"); + assert_eq!(v["localPort"], 179); + assert_eq!(v["remotePort"], 12345); + } + + #[test] + fn test_parse_bmp_invalid_data() { + let result = parse_bmp_message_core(&[0, 1, 2], 0.0); + assert!(result.is_err()); + } + + #[test] + fn test_parse_bgp_update() { + let data = make_bgp_update_announce([10, 0, 0, 0], 24, [192, 168, 1, 1]); + let json = parse_bgp_update_core(&data).unwrap(); + let v: serde_json::Value = serde_json::from_str(&json).unwrap(); + assert!(v.is_array()); + let elems = v.as_array().unwrap(); + assert_eq!(elems.len(), 1); + assert_eq!(elems[0]["type"], "ANNOUNCE"); + assert_eq!(elems[0]["prefix"], "10.0.0.0/24"); + assert_eq!(elems[0]["next_hop"], "192.168.1.1"); + } + + #[test] + fn test_parse_bgp_update_invalid() { + let result = parse_bgp_update_core(&[0xFF; 16]); + assert!(result.is_err()); + } + + #[test] + fn test_parse_mrt_empty() { + let json = parse_mrt_file_core(&[]).unwrap(); + assert_eq!(json, "[]"); + } + + #[test] + fn test_parse_openbmp_unsupported() { + // OpenBMP header with non-router message type (version=3, type != 12) + let mut data = Vec::new(); + data.extend_from_slice(b"OBMP"); // magic + data.extend_from_slice(&1u16.to_be_bytes()); // major version + data.extend_from_slice(&[0; 6]); // header length (set below), msg_len, flags + data.push(100); // object type (not a router message) + // The exact format doesn't matter — we just need parse_openbmp_header + // to return UnsupportedOpenBmpMessage for a non-router type. + let result = parse_openbmp_message_core(&data); + // Either returns None (unsupported) or an error (malformed) — both are fine + match result { + Ok(None) => {} // unsupported, as expected + Err(_) => {} // malformed header is also acceptable + Ok(Some(_)) => panic!("expected None or Err for non-router OpenBMP message"), + } + } + + #[test] + fn test_make_peer_header_default() { + let pph = BmpPerPeerHeader::default(); + let header = make_peer_header(&pph); + assert_eq!(header.peer_asn, 0); + assert_eq!(header.peer_ip, "0.0.0.0"); + assert!(!header.is_post_policy); + assert!(!header.is_adj_rib_out); + } } From b11ebb2ac4553c242d147fddb9c4a64c4bced313 Mon Sep 17 00:00:00 2001 From: Mingwei Zhang Date: Sun, 22 Mar 2026 15:05:27 -0700 Subject: [PATCH 04/10] Add streaming MRT record parsing for large file support Add parseMrtRecord and parseMrtRecords functions that parse one MRT record at a time, enabling processing of multi-GB RIB dump files without exceeding WASM's 4GB memory limit. JS-side MRT header framing ensures only one record's bytes are passed to WASM per call. Use thread-local Elementor to persist PeerIndexTable across calls, required for TABLE_DUMP_V2 (RIB dump) files. Add resetMrtParser to clear state between files. Tested: 3.9GB RIB dump (55.9M elements) parses in ~190s via streaming. --- examples/wasm/parse-mrt-file/parse-mrt.js | 113 +++++++++++++++++----- src/wasm.rs | 80 +++++++++++++++ src/wasm/js/index.d.ts | 28 ++++++ src/wasm/js/index.js | 94 +++++++++++++++++- 4 files changed, 290 insertions(+), 25 deletions(-) diff --git a/examples/wasm/parse-mrt-file/parse-mrt.js b/examples/wasm/parse-mrt-file/parse-mrt.js index c817ab2..1df552c 100644 --- a/examples/wasm/parse-mrt-file/parse-mrt.js +++ b/examples/wasm/parse-mrt-file/parse-mrt.js @@ -3,41 +3,94 @@ /** * Parse an MRT file and output BGP elements as JSON, one per line. * + * Supports two modes: + * --batch Load entire file, parse all at once (parseMrtFile) + * --stream Parse records one at a time using parseMrtRecords (default) + * * Usage: - * node parse-mrt.js + * node parse-mrt.js [--batch|--stream] * node parse-mrt.js https://data.ris.ripe.net/rrc06/2026.03/updates.20260322.2105.gz - * node parse-mrt.js updates.20260322.2105.gz + * node parse-mrt.js updates.20260322.2105.gz --batch * * Supports .gz (gzip) compressed and uncompressed MRT files. - * URLs are fetched and decompressed in memory (no download needed). + * URLs are streamed via HTTP and decompressed incrementally. */ const fs = require('fs'); +const http = require('http'); +const https = require('https'); const zlib = require('zlib'); -const path = require('path'); -const { parseMrtFile } = require('@bgpkit/parser'); +const { parseMrtFile, parseMrtRecords } = require('@bgpkit/parser'); + +const args = process.argv.slice(2).filter((a) => !a.startsWith('--')); +const flags = new Set(process.argv.slice(2).filter((a) => a.startsWith('--'))); +const input = args[0]; +const batchMode = flags.has('--batch'); -const input = process.argv[2]; if (!input) { - console.error('Usage: node parse-mrt.js '); + console.error('Usage: node parse-mrt.js [--batch|--stream]'); console.error(' node parse-mrt.js https://data.ris.ripe.net/rrc06/2026.03/updates.20260322.2105.gz'); - console.error(' node parse-mrt.js updates.20260322.2105.gz'); + console.error(' node parse-mrt.js updates.20260322.2105.gz --batch'); process.exit(1); } -async function fetchUrl(url) { - process.stderr.write(`Fetching ${url}...\n`); - const res = await fetch(url); - if (!res.ok) throw new Error(`HTTP ${res.status}: ${res.statusText}`); - return Buffer.from(await res.arrayBuffer()); +/** + * Open a gzip stream from a URL, following redirects. + * Returns a readable stream of decompressed bytes. + */ +function openGzipStream(url) { + return new Promise((resolve, reject) => { + const lib = url.startsWith('https') ? https : http; + lib + .get(url, (res) => { + if (res.statusCode === 301 || res.statusCode === 302) { + openGzipStream(res.headers.location).then(resolve, reject); + return; + } + if (res.statusCode !== 200) { + reject(new Error(`HTTP ${res.statusCode} for ${url}`)); + return; + } + if (url.endsWith('.gz')) { + resolve(res.pipe(zlib.createGunzip())); + } else { + resolve(res); + } + }) + .on('error', reject); + }); +} + +/** + * Collect a readable stream into a single Buffer. + */ +function collectStream(stream) { + return new Promise((resolve, reject) => { + const chunks = []; + stream.on('data', (chunk) => chunks.push(chunk)); + stream.on('end', () => resolve(Buffer.concat(chunks))); + stream.on('error', reject); + }); } -function decompress(buf, name) { - if (name.endsWith('.gz')) { +/** + * Get decompressed MRT bytes from a URL or local file. + */ +async function loadMrtData(input) { + const isUrl = input.startsWith('http://') || input.startsWith('https://'); + + if (isUrl) { + process.stderr.write(`Streaming ${input}...\n`); + const stream = await openGzipStream(input); + return collectStream(stream); + } + + const buf = fs.readFileSync(input); + if (input.endsWith('.gz')) { process.stderr.write('Decompressing gzip...\n'); return zlib.gunzipSync(buf); } - if (name.endsWith('.bz2')) { + if (input.endsWith('.bz2')) { console.error('bzip2 is not supported by Node.js zlib. Decompress the file first.'); process.exit(1); } @@ -45,17 +98,29 @@ function decompress(buf, name) { } async function main() { - const isUrl = input.startsWith('http://') || input.startsWith('https://'); - const buf = isUrl ? await fetchUrl(input) : fs.readFileSync(input); - const raw = decompress(buf, input); + const raw = await loadMrtData(input); + process.stderr.write(`Loaded ${raw.length} bytes of MRT data\n`); - process.stderr.write(`Parsing ${raw.length} bytes of MRT data...\n`); - const elems = parseMrtFile(raw); - process.stderr.write(`Parsed ${elems.length} BGP elements\n`); + let count = 0; - for (const elem of elems) { - console.log(JSON.stringify(elem)); + if (batchMode) { + // Parse all records at once + const elems = parseMrtFile(raw); + for (const elem of elems) { + console.log(JSON.stringify(elem)); + count++; + } + } else { + // Stream: parse one MRT record at a time + for (const { elems } of parseMrtRecords(raw)) { + for (const elem of elems) { + console.log(JSON.stringify(elem)); + count++; + } + } } + + process.stderr.write(`Output ${count} BGP elements\n`); } main().catch((err) => { diff --git a/src/wasm.rs b/src/wasm.rs index 1d5f8d3..8c77144 100644 --- a/src/wasm.rs +++ b/src/wasm.rs @@ -17,6 +17,8 @@ //! messages (e.g. from the RouteViews Kafka stream) //! - [`parseBmpMessage`](parse_bmp_message) — parse raw BMP messages //! - [`parseMrtFile`](parse_mrt_file) — parse a decompressed MRT file into BGP elements +//! - [`parseMrtRecord`](parse_mrt_record_wasm) — parse a single MRT record at a +//! given offset (for incremental/streaming parsing) //! - [`parseBgpUpdate`](parse_bgp_update) — parse a single BGP UPDATE message use crate::models::*; @@ -304,6 +306,47 @@ fn parse_mrt_file_core(data: &[u8]) -> Result { serde_json::to_string(&all_elems).map_err(|e| e.to_string()) } +/// Result of parsing a single MRT record: the elems + number of bytes consumed. +#[derive(serde::Serialize)] +struct MrtRecordResult { + elems: Vec, + #[serde(rename = "bytesRead")] + bytes_read: u32, +} + +use std::cell::RefCell; + +thread_local! { + /// Persistent Elementor for streaming MRT parsing. Retains the + /// PeerIndexTable across calls so TABLE_DUMP_V2 RIB entries resolve correctly. + static MRT_ELEMENTOR: RefCell = RefCell::new(Elementor::default()); +} + +/// Parse one MRT record from the start of `data`. +/// Returns a JSON `{elems, bytesRead}` string, or empty string when no more records. +/// +/// Uses a thread-local `Elementor` to retain the PeerIndexTable across calls, +/// which is required for TABLE_DUMP_V2 (RIB dump) files. Call +/// `resetMrtParser()` before starting a new file. +fn parse_mrt_record_core(data: &[u8]) -> Result { + if data.is_empty() { + return Ok(String::new()); + } + + let mut cursor = Cursor::new(data); + + let record = match parse_mrt_record(&mut cursor) { + Ok(r) => r, + Err(_) => return Ok(String::new()), // no more valid records + }; + + let bytes_read = cursor.position() as u32; + let elems = MRT_ELEMENTOR.with(|e| e.borrow_mut().record_to_elems(record)); + + let result = MrtRecordResult { elems, bytes_read }; + serde_json::to_string(&result).map_err(|e| e.to_string()) +} + /// Parse a single BGP UPDATE message, returning a JSON array string. fn parse_bgp_update_core(data: &[u8]) -> Result { let mut bytes = Bytes::from(data.to_vec()); @@ -364,6 +407,31 @@ pub fn parse_mrt_file(data: &[u8]) -> Result { parse_mrt_file_core(data).map_err(|e| JsError::new(&e)) } +/// Reset the internal MRT parser state. +/// +/// Call this before parsing a new MRT file with `parseMrtRecord` to clear +/// the PeerIndexTable from a previous file. +#[wasm_bindgen(js_name = "resetMrtParser")] +pub fn reset_mrt_parser() { + MRT_ELEMENTOR.with(|e| *e.borrow_mut() = Elementor::default()); +} + +/// Parse a single MRT record from the start of the given buffer. +/// +/// Returns a JSON object `{ elems: BgpElem[], bytesRead: number }` on success. +/// Returns an empty string when there are no more records (the JS wrapper +/// converts this to `null`). +/// +/// The caller should slice off `bytesRead` bytes from the front of the buffer +/// before the next call. This enables streaming/incremental MRT parsing without +/// passing the full buffer each time. +/// +/// Throws a JavaScript `Error` on malformed data. +#[wasm_bindgen(js_name = "parseMrtRecord")] +pub fn parse_mrt_record_wasm(data: &[u8]) -> Result { + parse_mrt_record_core(data).map_err(|e| JsError::new(&e)) +} + /// Parse a single BGP UPDATE message into BGP elements. /// /// Expects a full BGP message including the 16-byte marker, 2-byte length, @@ -558,6 +626,18 @@ mod tests { assert_eq!(json, "[]"); } + #[test] + fn test_parse_mrt_record_empty() { + let result = parse_mrt_record_core(&[]).unwrap(); + assert_eq!(result, ""); // no records → empty string + } + + #[test] + fn test_parse_mrt_record_invalid() { + let result = parse_mrt_record_core(&[0, 1, 2]).unwrap(); + assert_eq!(result, ""); // invalid data → empty string + } + #[test] fn test_parse_openbmp_unsupported() { // OpenBMP header with non-router message type (version=3, type != 12) diff --git a/src/wasm/js/index.d.ts b/src/wasm/js/index.d.ts index cd6450f..cc0bb35 100644 --- a/src/wasm/js/index.d.ts +++ b/src/wasm/js/index.d.ts @@ -31,6 +31,34 @@ export function parseMrtFile(data: Uint8Array): BgpElem[]; */ export function parseBgpUpdate(data: Uint8Array): BgpElem[]; +/** + * Parse a single MRT record from the start of a buffer. + * Returns null when there are no more records. + * + * Slice off `bytesRead` bytes before the next call to advance. + */ +export function parseMrtRecord(data: Uint8Array): MrtRecordResult | null; + +/** + * Reset the internal MRT parser state. Call before parsing a new file + * with `parseMrtRecord` to clear the PeerIndexTable from a previous file. + * (Called automatically by `parseMrtRecords`.) + */ +export function resetMrtParser(): void; + +/** + * Generator that yields MRT records one at a time, automatically slicing + * the buffer as it advances. Resets parser state before starting. + */ +export function parseMrtRecords( + data: Uint8Array +): Generator; + +export interface MrtRecordResult { + elems: BgpElem[]; + bytesRead: number; +} + // ── BMP message types (discriminated union on `type`) ──────────────── export type BmpParsedMessage = diff --git a/src/wasm/js/index.js b/src/wasm/js/index.js index 0d0c040..2f22d66 100644 --- a/src/wasm/js/index.js +++ b/src/wasm/js/index.js @@ -56,4 +56,96 @@ function parseBgpUpdate(data) { return JSON.parse(wasm.parseBgpUpdate(data)); } -module.exports = { parseOpenBmpMessage, parseBmpMessage, parseMrtFile, parseBgpUpdate }; +// MRT common header: timestamp(4) + type(2) + subtype(2) + length(4) = 12 bytes. +// Extended timestamp records add 4 more bytes to the header but the length +// field still measures only the message body, so total = 12 + length (or 16 + length +// for extended). We read the basic 12-byte framing to extract one record at a time. +const MRT_HEADER_LEN = 12; + +/** + * Read the MRT record length from a 12-byte header. + * Returns the total record size (header + body) or -1 if not enough data. + * @param {Uint8Array} data + * @param {number} offset + * @returns {number} + */ +function mrtRecordSize(data, offset) { + if (offset + MRT_HEADER_LEN > data.length) return -1; + // length is a big-endian u32 at offset+8 + const bodyLen = + (data[offset + 8] << 24) | + (data[offset + 9] << 16) | + (data[offset + 10] << 8) | + data[offset + 11]; + return MRT_HEADER_LEN + (bodyLen >>> 0); // >>> 0 to treat as unsigned +} + +/** + * Parse a single MRT record from the start of a buffer. + * + * Only sends the bytes of that one record to WASM, so even multi-GB files + * work without exceeding WASM's 4 GB memory limit. + * + * Returns `{ elems: BgpElem[], bytesRead: number }` on success, + * or `null` when there are no more records. + * + * @param {Uint8Array} data - Remaining decompressed MRT bytes + * @returns {{ elems: object[], bytesRead: number } | null} + */ +function parseMrtRecord(data) { + const size = mrtRecordSize(data, 0); + if (size < 0 || size > data.length) return null; + // Pass only this record's bytes to WASM + const recordBytes = data.subarray(0, size); + const json = wasm.parseMrtRecord(recordBytes); + if (!json) return null; + const result = JSON.parse(json); + // Override bytesRead with the framed size (the WASM side always reports + // bytesRead == recordBytes.length, but we set it here for clarity) + result.bytesRead = size; + return result; +} + +/** + * Reset the internal MRT parser state. Call before parsing a new file + * with `parseMrtRecord` to clear the PeerIndexTable from a previous file. + */ +function resetMrtParser() { + wasm.resetMrtParser(); +} + +/** + * Generator that yields parsed MRT records one at a time. + * + * Automatically resets parser state before starting, so the PeerIndexTable + * from a previous file does not leak. Each iteration sends only one record's + * bytes to WASM, keeping memory usage constant regardless of total file size. + * + * @param {Uint8Array} data - Decompressed MRT file bytes + * @yields {{ elems: object[], bytesRead: number }} + */ +function* parseMrtRecords(data) { + wasm.resetMrtParser(); + let offset = 0; + while (offset < data.length) { + const size = mrtRecordSize(data, offset); + if (size < 0 || offset + size > data.length) break; + const recordBytes = data.subarray(offset, offset + size); + const json = wasm.parseMrtRecord(recordBytes); + if (!json) break; + const result = JSON.parse(json); + result.bytesRead = size; + yield result; + offset += size; + } +} + +module.exports = { + parseOpenBmpMessage, + parseBmpMessage, + parseMrtFile, + parseMrtRecord, + parseMrtRecords, + resetMrtParser, + parseBgpUpdate, +}; From 5ffc56772d0f9335be2ddc4bbcf24b9c22999ec9 Mon Sep 17 00:00:00 2001 From: Mingwei Zhang Date: Sun, 22 Mar 2026 17:18:32 -0700 Subject: [PATCH 05/10] Replace batch MRT parsing with streaming API and update README - Remove parseMrtFile (batch) in favor of parseMrtRecords (streaming generator) and streamMrtFrom (Node.js async generator with I/O) - Remove detectCompression from public exports (internal utility) - Add openMrt, streamMrtFrom Node.js I/O helpers with gz/bz2 support - Add parseMrtRecord/resetMrtParser low-level API for custom iteration - Add web.mjs/web.d.ts entry point for browser and Cloudflare Workers - Rewrite README with use-case-driven sections, code snippets, memory considerations, and platform support matrix - Simplify parse-mrt-file example to use streamMrtFrom - Add browser MRT explorer examples (HTML and JSX) - Bump npm package version to 0.15.2 - Update CHANGELOG with streaming API details --- CHANGELOG.md | 20 +- examples/wasm/mrt-explorer.html | 634 +++++++++++++++ examples/wasm/parse-mrt-file/README.md | 2 + examples/wasm/parse-mrt-file/mrt-explorer.jsx | 730 ++++++++++++++++++ examples/wasm/parse-mrt-file/package.json | 3 +- examples/wasm/parse-mrt-file/parse-mrt.js | 106 +-- src/wasm.rs | 42 +- src/wasm/README.md | 304 +++++--- src/wasm/build.sh | 3 + src/wasm/js/index.d.ts | 22 +- src/wasm/js/index.js | 233 +++++- src/wasm/js/index.mjs | 49 +- src/wasm/js/package.json | 5 +- src/wasm/js/web.d.ts | 10 +- src/wasm/js/web.mjs | 49 +- src/wasm/test/test-published.js | 20 +- 16 files changed, 1917 insertions(+), 315 deletions(-) create mode 100644 examples/wasm/mrt-explorer.html create mode 100644 examples/wasm/parse-mrt-file/mrt-explorer.jsx diff --git a/CHANGELOG.md b/CHANGELOG.md index 407de6b..ac0b80b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,13 +11,19 @@ All notable changes to this project will be documented in this file. ### New features * **WebAssembly (WASM) support (experimental)**: New `wasm` feature flag compiles the BMP/BGP/MRT parsing core to WebAssembly for use in JavaScript environments. Published as [`@bgpkit/parser`](https://www.npmjs.com/package/@bgpkit/parser) on npm with support for Node.js (CommonJS), bundlers (ES modules), and browsers/workers (ES modules with manual init). This feature is experimental and the API may change in future releases. - - `parseOpenBmpMessage(data)`: parses OpenBMP-wrapped BMP frames (e.g. RouteViews Kafka stream). Returns a discriminated union on the `type` field covering all 7 BMP message types. - - `parseBmpMessage(data, timestamp)`: parses raw BMP frames (without an OpenBMP header). - - `parseMrtFile(data)`: parses decompressed MRT files (TABLE_DUMP, TABLE_DUMP_V2, BGP4MP) into `BgpElem[]`. - - `parseBgpUpdate(data)`: parses a single BGP UPDATE message into `BgpElem[]`. - - Multi-target build script (`src/wasm/build.sh`) produces nodejs, bundler, and web targets in a single npm package. - - JS wrapper handles JSON deserialization; TypeScript types included. - - Node.js examples in `examples/wasm/`: Kafka OpenBMP stream consumer and MRT file parser. + - Core parsing functions (all platforms): + - `parseOpenBmpMessage(data)`: parses OpenBMP-wrapped BMP frames (e.g. RouteViews Kafka stream) + - `parseBmpMessage(data, timestamp)`: parses raw BMP frames (without an OpenBMP header) + - `parseBgpUpdate(data)`: parses a single BGP UPDATE message into `BgpElem[]` + - `parseMrtRecords(data)`: streaming generator that yields MRT records one at a time from a decompressed buffer + - `parseMrtRecord(data)` / `resetMrtParser()`: low-level MRT record parsing + - Node.js I/O helpers: + - `streamMrtFrom(pathOrUrl)`: fetch, decompress (gz/bz2), and stream-parse MRT records from a URL or local file + - `openMrt(pathOrUrl)`: fetch and decompress MRT data into a `Buffer` + - Supports gzip (RIPE RIS) and bzip2 (RouteViews, requires optional `seek-bzip` dependency) compression + - Multi-target build script (`src/wasm/build.sh`) produces nodejs, bundler, and web targets in a single npm package + - JS wrapper handles JSON deserialization; TypeScript types included + - Node.js examples in `examples/wasm/`: Kafka OpenBMP stream consumer and MRT file parser * **`BgpElem::peer_bgp_id` field**: `BgpElem` now exposes an optional `peer_bgp_id: Option` containing the peer's BGP Identifier (Router ID) when available. Populated from the PEER_INDEX_TABLE in TableDumpV2/RIB records; `None` for BGP4MP records. diff --git a/examples/wasm/mrt-explorer.html b/examples/wasm/mrt-explorer.html new file mode 100644 index 0000000..00aee43 --- /dev/null +++ b/examples/wasm/mrt-explorer.html @@ -0,0 +1,634 @@ + + + + + +MRT Explorer — @bgpkit/parser + + + + + + +
+ + + + + + + diff --git a/examples/wasm/parse-mrt-file/README.md b/examples/wasm/parse-mrt-file/README.md index ce1d4f6..ff43bc1 100644 --- a/examples/wasm/parse-mrt-file/README.md +++ b/examples/wasm/parse-mrt-file/README.md @@ -10,12 +10,14 @@ one JSON object per line. It accepts both URLs and local files. ## Prerequisites - [Node.js](https://nodejs.org/) >= 18 +- For bz2-compressed files (RouteViews): `npm install seek-bzip` ## Run ```sh cd examples/wasm/parse-mrt-file npm install +npm install seek-bzip # optional, for .bz2 files # Parse directly from a URL (fetches and decompresses in memory) node parse-mrt.js https://data.ris.ripe.net/rrc06/2026.03/updates.20260322.2105.gz diff --git a/examples/wasm/parse-mrt-file/mrt-explorer.jsx b/examples/wasm/parse-mrt-file/mrt-explorer.jsx new file mode 100644 index 0000000..cd8af89 --- /dev/null +++ b/examples/wasm/parse-mrt-file/mrt-explorer.jsx @@ -0,0 +1,730 @@ +import { useState, useEffect, useRef, useCallback } from "react"; + +// ── Example MRT files from RIPE RIS (gzipped updates) ───────────────────── +const PRESETS = [ + { + label: "RIPE RIS rrc00 — 2025-01-01 00:00", + url: "https://data.ris.ripe.net/rrc00/2025.01/updates.20250101.0000.gz", + }, + { + label: "RIPE RIS rrc06 — 2025-06-01 00:00", + url: "https://data.ris.ripe.net/rrc06/2025.06/updates.20250601.0000.gz", + }, + { + label: "RIPE RIS rrc21 — 2025-01-01 00:00", + url: "https://data.ris.ripe.net/rrc21/2025.01/updates.20250101.0000.gz", + }, +]; + +// ── Utility ──────────────────────────────────────────────────────────────── +function fmtTs(ts) { + if (!ts) return "—"; + return new Date(ts * 1000).toISOString().replace("T", " ").slice(0, 19); +} + +function truncate(str, n) { + if (!str) return "—"; + return str.length > n ? str.slice(0, n) + "…" : str; +} + +async function decompressGzip(bytes) { + if (bytes[0] !== 0x1f || bytes[1] !== 0x8b) return bytes; // not gzip + const stream = new Response(bytes).body.pipeThrough( + new DecompressionStream("gzip") + ); + const buf = await new Response(stream).arrayBuffer(); + return new Uint8Array(buf); +} + +// ── Component ────────────────────────────────────────────────────────────── +export default function MrtExplorer() { + const [wasmStatus, setWasmStatus] = useState("loading"); // loading | ready | error + const [wasmError, setWasmError] = useState(""); + const parseFn = useRef(null); + + const [url, setUrl] = useState(PRESETS[0].url); + const [isDragging, setIsDragging] = useState(false); + + const [stage, setStage] = useState(""); // fetch | decompress | parse | done + const [progress, setProgress] = useState(0); + const [error, setError] = useState(""); + const [elems, setElems] = useState([]); + const [stats, setStats] = useState(null); + const [fileName, setFileName] = useState(""); + + // Table state + const [typeFilter, setTypeFilter] = useState("ALL"); + const [search, setSearch] = useState(""); + const [sortCol, setSortCol] = useState("timestamp"); + const [sortDir, setSortDir] = useState("asc"); + const [page, setPage] = useState(0); + const PAGE_SIZE = 50; + + // ── Load WASM ────────────────────────────────────────────────────────── + useEffect(() => { + (async () => { + try { + // Use new Function to bypass the artifact bundler's static import analysis. + // The bundler would throw "Module not found" on a bare URL string literal + // inside a normal import(), but can't analyze through new Function. + const dynamicImport = new Function("url", "return import(url)"); + + // esm.sh serves wasm-bindgen packages with the wasm binary colocated. + // The web target exports a default init() that accepts an optional wasm URL. + const CDN = "https://esm.sh/@bgpkit/parser@0.15.0"; + const mod = await dynamicImport(CDN); + + // web target: default export is init(); bundler target: no init needed. + if (typeof mod.default === "function") { + // Point init() at the wasm binary esm.sh serves alongside the JS + try { + await mod.default(`${CDN}/bgpkit_parser_bg.wasm`); + } catch { + // Some builds self-fetch the wasm; try without explicit URL + await mod.default(); + } + } + + const recFn = mod.parseMrtRecords ?? mod.parse_mrt_records; + if (!recFn) throw new Error("parseMrtRecords not exported — check package version"); + // Wrap the generator into a batch function for the UI + parseFn.current = (data) => { + const allElems = []; + for (const { elems } of recFn(data)) allElems.push(...elems); + return allElems; + }; + setWasmStatus("ready"); + } catch (e) { + setWasmStatus("error"); + setWasmError(e.message); + } + })(); + }, []); + + // ── Parse ────────────────────────────────────────────────────────────── + async function runParse(rawBytes, name) { + setError(""); + setElems([]); + setStats(null); + setPage(0); + setFileName(name); + + try { + setStage("decompress"); + setProgress(20); + const decompressed = await decompressGzip(rawBytes); + + setStage("parse"); + setProgress(50); + const result = parseFn.current(decompressed); + + // result is BgpElem[] (array of JS objects from wasm-bindgen) + const data = Array.isArray(result) + ? result + : typeof result === "string" + ? JSON.parse(result) + : []; + + setProgress(100); + setElems(data); + setStage("done"); + + const announces = data.filter( + (e) => (e.type || e.elem_type) === "ANNOUNCE" || e.elem_type === "A" + ).length; + const peerAsns = new Set(data.map((e) => e.peer_asn)).size; + const prefixes = new Set(data.map((e) => e.prefix)).size; + const originAsns = new Set( + data.flatMap((e) => (e.origin_asns || [])) + ).size; + setStats({ + total: data.length, + announces, + withdraws: data.length - announces, + peerAsns, + prefixes, + originAsns, + }); + } catch (e) { + setError(e.message); + setStage(""); + } + } + + async function handleFetchUrl() { + if (!parseFn.current) return; + setStage("fetch"); + setProgress(0); + try { + const resp = await fetch(url); + if (!resp.ok) throw new Error(`HTTP ${resp.status} ${resp.statusText}`); + + setStage("download"); + setProgress(10); + const raw = new Uint8Array(await resp.arrayBuffer()); + const name = url.split("/").pop(); + await runParse(raw, name); + } catch (e) { + setError(e.message); + setStage(""); + } + } + + const handleDrop = useCallback( + async (e) => { + e.preventDefault(); + setIsDragging(false); + const file = e.dataTransfer.files[0]; + if (!file || !parseFn.current) return; + setStage("read"); + setProgress(5); + const raw = new Uint8Array(await file.arrayBuffer()); + await runParse(raw, file.name); + }, + [] + ); + + const handleFileInput = useCallback(async (e) => { + const file = e.target.files[0]; + if (!file || !parseFn.current) return; + setStage("read"); + setProgress(5); + const raw = new Uint8Array(await file.arrayBuffer()); + await runParse(raw, file.name); + }, []); + + // ── Filtering & sorting ──────────────────────────────────────────────── + const filtered = elems + .filter((e) => { + const t = e.type || (e.elem_type === "W" ? "WITHDRAW" : "ANNOUNCE"); + if (typeFilter !== "ALL" && t !== typeFilter) return false; + if (search) { + const q = search.toLowerCase(); + return ( + (e.prefix || "").toLowerCase().includes(q) || + String(e.peer_asn || "").includes(q) || + (e.as_path || "").toLowerCase().includes(q) || + (e.peer_ip || "").toLowerCase().includes(q) + ); + } + return true; + }) + .sort((a, b) => { + let av = a[sortCol] ?? ""; + let bv = b[sortCol] ?? ""; + if (sortCol === "peer_asn") { av = +av; bv = +bv; } + if (typeof av === "number") return sortDir === "asc" ? av - bv : bv - av; + return sortDir === "asc" + ? String(av).localeCompare(String(bv)) + : String(bv).localeCompare(String(av)); + }); + + const totalPages = Math.ceil(filtered.length / PAGE_SIZE); + const pageData = filtered.slice(page * PAGE_SIZE, (page + 1) * PAGE_SIZE); + + function toggleSort(col) { + if (sortCol === col) setSortDir((d) => (d === "asc" ? "desc" : "asc")); + else { setSortCol(col); setSortDir("asc"); } + setPage(0); + } + + const isLoading = ["fetch", "download", "read", "decompress", "parse"].includes(stage); + + // ── Render ───────────────────────────────────────────────────────────── + return ( +
+ {/* Google Font */} + + + {/* ── Header ────────────────────────────────────────────────────── */} +
+
+ + MRT Explorer + + powered by + @bgpkit/parser@0.15.0 +
+ + {/* WASM Status */} +
+ + + WASM {wasmStatus === "ready" ? "ready" : wasmStatus === "error" ? "error" : "loading…"} + + {wasmStatus === "error" && ( + {truncate(wasmError, 60)} + )} +
+
+ +
+ + {/* ── Input Panel ───────────────────────────────────────────────── */} +
+ {/* Drop Zone */} +
{ e.preventDefault(); setIsDragging(true); }} + onDragLeave={() => setIsDragging(false)} + onDrop={handleDrop} + style={{ + padding: "20px", + borderBottom: "1px solid #1a2030", + background: isDragging ? "rgba(122, 162, 247, 0.06)" : "transparent", + borderTop: isDragging ? "2px solid #7aa2f7" : "2px solid transparent", + transition: "all 0.15s", + display: "flex", + flexDirection: "column", + alignItems: "center", + gap: 8, + cursor: "default", + }} + > + + {isDragging ? "DROP MRT FILE HERE" : "DRAG & DROP .mrt / .gz / .bz2"} + + or + +
+ + {/* URL Input */} +
+ + + setUrl(e.target.value)} + placeholder="https://data.ris.ripe.net/…/updates.*.gz" + style={{ flex: 1, minWidth: 300 }} + onKeyDown={(e) => e.key === "Enter" && !isLoading && wasmStatus === "ready" && handleFetchUrl()} + /> + + +
+ + {/* Progress */} + {isLoading && ( +
+
+ + {stage === "fetch" || stage === "download" ? "↓ fetching" + : stage === "decompress" ? "⟳ decompressing" + : stage === "parse" ? "⚙ parsing wasm" + : stage === "read" ? "↑ reading file" + : "…"} + + {progress}% +
+
+
+
+
+ )} + + {error && ( +
+ ✗ {error} +
+ )} +
+ + {/* ── Stats ─────────────────────────────────────────────────────── */} + {stats && ( +
+ {[ + { label: "Total Elems", value: stats.total.toLocaleString(), color: "#7aa2f7" }, + { label: "Announcements", value: stats.announces.toLocaleString(), color: "#73da94" }, + { label: "Withdrawals", value: stats.withdraws.toLocaleString(), color: "#f7768e" }, + { label: "Peer ASNs", value: stats.peerAsns.toLocaleString(), color: "#e0af68" }, + { label: "Unique Prefixes", value: stats.prefixes.toLocaleString(), color: "#bb9af7" }, + ].map((s) => ( +
+
+ {s.label} +
+
+ {s.value} +
+
+ ))} +
+ )} + + {/* ── Table ─────────────────────────────────────────────────────── */} + {elems.length > 0 && ( +
+ {/* Table Controls */} +
+ + {fileName && {fileName}} + {" "}· {filtered.length.toLocaleString()} rows + + +
+ {["ALL", "ANNOUNCE", "WITHDRAW"].map((t) => ( + + ))} +
+ + { setSearch(e.target.value); setPage(0); }} + placeholder="filter prefix / ASN / AS-path…" + style={{ width: 240 }} + /> +
+ + {/* Table */} +
+ + + + {[ + { key: "timestamp", label: "Timestamp" }, + { key: "type", label: "Type" }, + { key: "prefix", label: "Prefix" }, + { key: "peer_asn", label: "Peer ASN" }, + { key: "peer_ip", label: "Peer IP" }, + { key: "as_path", label: "AS Path" }, + { key: "next_hop", label: "Next Hop" }, + { key: "communities", label: "Communities" }, + ].map((col) => ( + + ))} + + + + {pageData.map((e, i) => { + const type = e.type || (e.elem_type === "W" ? "WITHDRAW" : "ANNOUNCE"); + const isAnn = type === "ANNOUNCE"; + const asPath = e.as_path + ? (Array.isArray(e.as_path) ? e.as_path.join(" ") : String(e.as_path)) + : null; + const communities = e.communities + ? (Array.isArray(e.communities) + ? e.communities.join(" ") + : String(e.communities)) + : null; + + return ( + + + + + + + + + + + ); + })} + +
+ +
+ {fmtTs(e.timestamp)} + + + {isAnn ? "ANN" : "WDR"} + + + {e.prefix || "—"} + + {e.peer_asn ? `AS${e.peer_asn}` : "—"} + + {e.peer_ip || "—"} + + + {asPath ? truncate(asPath, 50) : } + + + {e.next_hop || } + + + {communities || } + +
+
+ + {/* Pagination */} + {totalPages > 1 && ( +
+ + {page * PAGE_SIZE + 1}–{Math.min((page + 1) * PAGE_SIZE, filtered.length)} of {filtered.length.toLocaleString()} + +
+ + + + {page + 1} / {totalPages} + + + +
+
+ )} +
+ )} + + {/* ── Empty state ────────────────────────────────────────────────── */} + {!isLoading && elems.length === 0 && !error && ( +
+
+
+ {wasmStatus === "ready" + ? "select a preset or enter an MRT URL above" + : wasmStatus === "loading" + ? "loading wasm module…" + : "wasm failed to load — check console"} +
+
+ )} +
+
+ ); +} diff --git a/examples/wasm/parse-mrt-file/package.json b/examples/wasm/parse-mrt-file/package.json index 76dca3d..ad7a2a5 100644 --- a/examples/wasm/parse-mrt-file/package.json +++ b/examples/wasm/parse-mrt-file/package.json @@ -7,7 +7,8 @@ "start": "node parse-mrt.js" }, "dependencies": { - "@bgpkit/parser": "^0.15.0" + "@bgpkit/parser": "^0.15.0", + "seek-bzip": "^2.0.0" }, "engines": { "node": ">=18" diff --git a/examples/wasm/parse-mrt-file/parse-mrt.js b/examples/wasm/parse-mrt-file/parse-mrt.js index 1df552c..334b63d 100644 --- a/examples/wasm/parse-mrt-file/parse-mrt.js +++ b/examples/wasm/parse-mrt-file/parse-mrt.js @@ -3,121 +3,37 @@ /** * Parse an MRT file and output BGP elements as JSON, one per line. * - * Supports two modes: - * --batch Load entire file, parse all at once (parseMrtFile) - * --stream Parse records one at a time using parseMrtRecords (default) - * * Usage: - * node parse-mrt.js [--batch|--stream] + * node parse-mrt.js * node parse-mrt.js https://data.ris.ripe.net/rrc06/2026.03/updates.20260322.2105.gz - * node parse-mrt.js updates.20260322.2105.gz --batch + * node parse-mrt.js https://archive.routeviews.org/route-views.amsix/bgpdata/2025.01/UPDATES/updates.20250101.0000.bz2 + * node parse-mrt.js updates.20260322.2105.gz * - * Supports .gz (gzip) compressed and uncompressed MRT files. - * URLs are streamed via HTTP and decompressed incrementally. + * Supports .gz (gzip) and .bz2 (bzip2, requires: npm install seek-bzip) compressed + * and uncompressed MRT files. URLs and local paths are both supported. */ -const fs = require('fs'); -const http = require('http'); -const https = require('https'); -const zlib = require('zlib'); -const { parseMrtFile, parseMrtRecords } = require('@bgpkit/parser'); +const { streamMrtFrom } = require('@bgpkit/parser'); -const args = process.argv.slice(2).filter((a) => !a.startsWith('--')); -const flags = new Set(process.argv.slice(2).filter((a) => a.startsWith('--'))); -const input = args[0]; -const batchMode = flags.has('--batch'); +const input = process.argv[2]; if (!input) { - console.error('Usage: node parse-mrt.js [--batch|--stream]'); + console.error('Usage: node parse-mrt.js '); console.error(' node parse-mrt.js https://data.ris.ripe.net/rrc06/2026.03/updates.20260322.2105.gz'); - console.error(' node parse-mrt.js updates.20260322.2105.gz --batch'); + console.error(' node parse-mrt.js updates.20260322.2105.gz'); process.exit(1); } -/** - * Open a gzip stream from a URL, following redirects. - * Returns a readable stream of decompressed bytes. - */ -function openGzipStream(url) { - return new Promise((resolve, reject) => { - const lib = url.startsWith('https') ? https : http; - lib - .get(url, (res) => { - if (res.statusCode === 301 || res.statusCode === 302) { - openGzipStream(res.headers.location).then(resolve, reject); - return; - } - if (res.statusCode !== 200) { - reject(new Error(`HTTP ${res.statusCode} for ${url}`)); - return; - } - if (url.endsWith('.gz')) { - resolve(res.pipe(zlib.createGunzip())); - } else { - resolve(res); - } - }) - .on('error', reject); - }); -} - -/** - * Collect a readable stream into a single Buffer. - */ -function collectStream(stream) { - return new Promise((resolve, reject) => { - const chunks = []; - stream.on('data', (chunk) => chunks.push(chunk)); - stream.on('end', () => resolve(Buffer.concat(chunks))); - stream.on('error', reject); - }); -} - -/** - * Get decompressed MRT bytes from a URL or local file. - */ -async function loadMrtData(input) { - const isUrl = input.startsWith('http://') || input.startsWith('https://'); - - if (isUrl) { - process.stderr.write(`Streaming ${input}...\n`); - const stream = await openGzipStream(input); - return collectStream(stream); - } - - const buf = fs.readFileSync(input); - if (input.endsWith('.gz')) { - process.stderr.write('Decompressing gzip...\n'); - return zlib.gunzipSync(buf); - } - if (input.endsWith('.bz2')) { - console.error('bzip2 is not supported by Node.js zlib. Decompress the file first.'); - process.exit(1); - } - return buf; -} - async function main() { - const raw = await loadMrtData(input); - process.stderr.write(`Loaded ${raw.length} bytes of MRT data\n`); + process.stderr.write(`Parsing ${input}...\n`); let count = 0; - if (batchMode) { - // Parse all records at once - const elems = parseMrtFile(raw); + for await (const { elems } of streamMrtFrom(input)) { for (const elem of elems) { console.log(JSON.stringify(elem)); count++; } - } else { - // Stream: parse one MRT record at a time - for (const { elems } of parseMrtRecords(raw)) { - for (const elem of elems) { - console.log(JSON.stringify(elem)); - count++; - } - } } process.stderr.write(`Output ${count} BGP elements\n`); diff --git a/src/wasm.rs b/src/wasm.rs index 8c77144..c099c4e 100644 --- a/src/wasm.rs +++ b/src/wasm.rs @@ -16,9 +16,9 @@ //! - [`parseOpenBmpMessage`](parse_openbmp_message) — parse OpenBMP-wrapped BMP //! messages (e.g. from the RouteViews Kafka stream) //! - [`parseBmpMessage`](parse_bmp_message) — parse raw BMP messages -//! - [`parseMrtFile`](parse_mrt_file) — parse a decompressed MRT file into BGP elements -//! - [`parseMrtRecord`](parse_mrt_record_wasm) — parse a single MRT record at a -//! given offset (for incremental/streaming parsing) +//! - [`parseMrtRecord`](parse_mrt_record_wasm) — parse a single MRT record +//! (for incremental/streaming parsing) +//! - [`resetMrtParser`](reset_mrt_parser) — reset MRT parser state between files //! - [`parseBgpUpdate`](parse_bgp_update) — parse a single BGP UPDATE message use crate::models::*; @@ -292,20 +292,6 @@ fn parse_bmp_message_core(data: &[u8], timestamp: f64) -> Result serde_json::to_string(&result).map_err(|e| e.to_string()) } -/// Parse a decompressed MRT file, returning a JSON array string. -fn parse_mrt_file_core(data: &[u8]) -> Result { - let mut cursor = Cursor::new(data); - let mut elementor = Elementor::default(); - let mut all_elems: Vec = Vec::new(); - - while let Ok(record) = parse_mrt_record(&mut cursor) { - let elems = elementor.record_to_elems(record); - all_elems.extend(elems); - } - - serde_json::to_string(&all_elems).map_err(|e| e.to_string()) -} - /// Result of parsing a single MRT record: the elems + number of bytes consumed. #[derive(serde::Serialize)] struct MrtRecordResult { @@ -391,22 +377,6 @@ pub fn parse_bmp_message(data: &[u8], timestamp: f64) -> Result parse_bmp_message_core(data, timestamp).map_err(|e| JsError::new(&e)) } -/// Parse a decompressed MRT file into BGP elements. -/// -/// Accepts the raw bytes of a fully decompressed MRT file (the caller is -/// responsible for bzip2/gzip decompression). Returns a JSON array of -/// `BgpElem` objects. -/// -/// Handles TABLE_DUMP, TABLE_DUMP_V2, and BGP4MP record types. For -/// TABLE_DUMP_V2, the PeerIndexTable record is consumed internally to -/// resolve peer information. -/// -/// Throws a JavaScript `Error` if the file cannot be parsed at all. -#[wasm_bindgen(js_name = "parseMrtFile")] -pub fn parse_mrt_file(data: &[u8]) -> Result { - parse_mrt_file_core(data).map_err(|e| JsError::new(&e)) -} - /// Reset the internal MRT parser state. /// /// Call this before parsing a new MRT file with `parseMrtRecord` to clear @@ -620,12 +590,6 @@ mod tests { assert!(result.is_err()); } - #[test] - fn test_parse_mrt_empty() { - let json = parse_mrt_file_core(&[]).unwrap(); - assert_eq!(json, "[]"); - } - #[test] fn test_parse_mrt_record_empty() { let result = parse_mrt_record_core(&[]).unwrap(); diff --git a/src/wasm/README.md b/src/wasm/README.md index 0489d4d..e2e6a22 100644 --- a/src/wasm/README.md +++ b/src/wasm/README.md @@ -4,155 +4,194 @@ > format, and build process may change in future releases. This module compiles bgpkit-parser's BGP/BMP/MRT parsing code to WebAssembly -for use in JavaScript and TypeScript environments (Node.js, bundlers, Cloudflare -Workers). +for use in JavaScript and TypeScript environments. -## Quick Start +## Install -### Prerequisites +```sh +npm install @bgpkit/parser +``` -- [Rust](https://rustup.rs/) (stable toolchain) -- [`wasm-pack`](https://rustwasm.github.io/wasm-pack/installer/): - ```sh - cargo install wasm-pack - ``` -- The `wasm32-unknown-unknown` target: - ```sh - rustup target add wasm32-unknown-unknown - ``` +## Use Cases and Examples -### Building and Publishing +### 1. Real-time BMP stream processing (Node.js) -The build script compiles all three targets (Node.js, bundler, web) and -assembles a single npm package in `pkg/`: +Parse OpenBMP messages from the RouteViews Kafka stream. Each Kafka message +is a small binary frame — no memory concerns. -```sh -# From the repository root -bash src/wasm/build.sh +**Requires Node.js** — Kafka clients need raw TCP sockets, which are not +available in browsers or Cloudflare Workers. -# Publish to npm -cd pkg && npm publish -``` - -The resulting `pkg/` directory contains: +```js +const { Kafka } = require('kafkajs'); +const { parseOpenBmpMessage } = require('@bgpkit/parser'); +const kafka = new Kafka({ + brokers: ['stream.routeviews.org:9092'], +}); + +const consumer = kafka.consumer({ groupId: 'my-app' }); +await consumer.connect(); +await consumer.subscribe({ topic: /^routeviews\.amsix\..+\.bmp_raw$/ }); + +await consumer.run({ + eachMessage: async ({ message }) => { + const msg = parseOpenBmpMessage(message.value); + if (!msg) return; // non-router frame (e.g. collector heartbeat) + + switch (msg.type) { + case 'RouteMonitoring': + for (const elem of msg.elems) { + console.log(elem.type, elem.prefix, elem.as_path); + } + break; + case 'PeerUpNotification': + console.log(`Peer up: ${msg.peerHeader.peerIp} AS${msg.peerHeader.peerAsn}`); + break; + case 'PeerDownNotification': + console.log(`Peer down: ${msg.peerHeader.peerIp} (${msg.reason})`); + break; + } + }, +}); ``` -pkg/ -├── nodejs/ # CommonJS, sync WASM loading (Node.js) -├── bundler/ # ES modules (webpack, vite, rollup) -├── web/ # ES modules + init() (browsers, Cloudflare Workers) -├── index.js # CJS entry point -├── index.mjs # ESM entry point -├── index.d.ts # TypeScript types -├── web.mjs # Web entry point (requires init()) -├── web.d.ts # Web TypeScript types -└── package.json # npm package manifest with conditional exports -``` - -The `package.json` uses [conditional exports](https://nodejs.org/api/packages.html#conditional-exports) -so that `require('@bgpkit/parser')` loads the Node.js target and -`import '@bgpkit/parser'` loads the bundler target automatically. The web -target is available as a separate subpath import (`@bgpkit/parser/web`). -### Building a single target +If you have raw BMP messages without the OpenBMP wrapper (e.g. from your own +BMP collector), use `parseBmpMessage` instead: -If you only need one target (e.g. for the Kafka example), you can build it -directly: +```js +const { parseBmpMessage } = require('@bgpkit/parser'); -```sh -wasm-pack build --target nodejs --no-default-features --features wasm +const msg = parseBmpMessage(bmpBytes, Date.now() / 1000); ``` -## API +### 2. MRT updates file analysis (Node.js) -Four parsing functions are exported, all accepting `Uint8Array` input and -returning parsed JavaScript objects: +Parse MRT updates files from RouteViews or RIPE RIS archives. Updates files +are typically 5–50 MB compressed (20–200 MB decompressed) and fit comfortably +in memory. -### `parseOpenBmpMessage(data: Uint8Array): BmpParsedMessage | null` +Supports gzip (`.gz`, RIPE RIS) and bzip2 (`.bz2`, RouteViews) compression. +For bz2, install an optional dependency: `npm install seek-bzip`. -Parse an OpenBMP-wrapped BMP message as received from the -[RouteViews Kafka stream](http://www.routeviews.org/routeviews/). -Returns `null` for non-router OpenBMP frames (e.g. collector heartbeats). +**Using `streamMrtFrom`** (handles fetch + decompression): ```js -const { parseOpenBmpMessage } = require('@bgpkit/parser'); +const { streamMrtFrom } = require('@bgpkit/parser'); -// `raw` is a Buffer/Uint8Array from a Kafka message value -const msg = parseOpenBmpMessage(raw); -if (msg && msg.type === 'RouteMonitoring') { - for (const elem of msg.elems) { +// RIPE RIS (gzip) +for await (const { elems } of streamMrtFrom('https://data.ris.ripe.net/rrc00/2025.01/updates.20250101.0000.gz')) { + for (const elem of elems) { console.log(elem.type, elem.prefix, elem.as_path); } } -``` -### `parseBmpMessage(data: Uint8Array, timestamp: number): BmpParsedMessage` +// RouteViews (bzip2 — requires: npm install seek-bzip) +for await (const { elems } of streamMrtFrom('https://archive.routeviews.org/route-views.amsix/bgpdata/2025.01/UPDATES/updates.20250101.0000.bz2')) { + for (const elem of elems) { + console.log(elem.type, elem.prefix, elem.as_path); + } +} +``` -Parse a raw BMP message without an OpenBMP wrapper. The `timestamp` parameter -provides the collection time in seconds since Unix epoch. +**Using `parseMrtRecords`** with manual I/O: ```js -const { parseBmpMessage } = require('@bgpkit/parser'); +const fs = require('fs'); +const zlib = require('zlib'); +const { parseMrtRecords } = require('@bgpkit/parser'); -const msg = parseBmpMessage(bmpBytes, Date.now() / 1000); -switch (msg.type) { - case 'RouteMonitoring': - console.log(`${msg.elems.length} BGP elements`); - break; - case 'PeerUpNotification': - console.log(`Peer up: ${msg.peerHeader.peerIp}`); - break; - case 'PeerDownNotification': - console.log(`Peer down: ${msg.peerHeader.peerIp} (${msg.reason})`); - break; +const raw = zlib.gunzipSync(fs.readFileSync('updates.20250101.0000.gz')); + +for (const { elems } of parseMrtRecords(raw)) { + for (const elem of elems) { + if (elem.type === 'ANNOUNCE') { + console.log(elem.prefix, elem.next_hop, elem.as_path); + } + } } ``` -### `parseMrtFile(data: Uint8Array): BgpElem[]` +### 3. MRT file analysis (browser) -Parse a fully decompressed MRT file into an array of BGP elements. Handles -TABLE_DUMP, TABLE_DUMP_V2, and BGP4MP record types. The caller is responsible -for bzip2/gzip decompression before passing the raw bytes. +Parse MRT files dropped or fetched in the browser. Uses the web entry point +which requires calling `init()` before any parsing. ```js -const fs = require('fs'); -const zlib = require('zlib'); -const { parseMrtFile } = require('@bgpkit/parser'); +import { init, parseMrtRecords } from '@bgpkit/parser/web'; + +await init(); -const compressed = fs.readFileSync('rib.20260101.0000.bz2'); -const raw = zlib.bunzip2Sync(compressed); // or use a bzip2 library -const elems = parseMrtFile(raw); +// Fetch and decompress a gzip-compressed MRT file +const res = await fetch('https://data.ris.ripe.net/rrc00/2025.01/updates.20250101.0000.gz'); +const stream = res.body.pipeThrough(new DecompressionStream('gzip')); +const raw = new Uint8Array(await new Response(stream).arrayBuffer()); -console.log(`Parsed ${elems.length} BGP elements`); -for (const elem of elems.slice(0, 10)) { - console.log(elem.type, elem.prefix, elem.as_path); +for (const { elems } of parseMrtRecords(raw)) { + for (const elem of elems) { + console.log(elem.type, elem.prefix, elem.as_path); + } } ``` -### `parseBgpUpdate(data: Uint8Array): BgpElem[]` +### 4. Individual BGP UPDATE parsing (all platforms) -Parse a single BGP UPDATE message into BGP elements. Expects the full BGP -message including the 16-byte marker, 2-byte length, and 1-byte type header. -Assumes 4-byte ASN encoding. - -The returned elements have `timestamp: 0` and unspecified peer IP/ASN since -those are not part of the BGP message itself. +Parse a single BGP UPDATE message extracted from a pcap capture or received +via an API. The message must include the 16-byte marker, 2-byte length, and +1-byte type header. ```js const { parseBgpUpdate } = require('@bgpkit/parser'); -// bgpBytes includes the 16-byte marker + length + type header -const elems = parseBgpUpdate(bgpBytes); +const elems = parseBgpUpdate(bgpMessageBytes); for (const elem of elems) { - console.log(elem.prefix, elem.next_hop); + console.log(elem.type, elem.prefix, elem.next_hop, elem.as_path); } ``` -## BMP Message Types +## Memory Considerations + +MRT parsing requires the **entire decompressed file** in memory as a +`Uint8Array` before parsing begins. `parseMrtRecords` then iterates +record-by-record, so parsed output stays small — but the raw bytes remain in +memory throughout. + +| File type | Typical decompressed size | Practical? | +|---|---|---| +| MRT updates (5-min) | 20–200 MB | Yes, all platforms | +| MRT updates (15-min) | 50–500 MB | Yes, Node.js; may exceed browser/Worker limits | +| Full RIB dump | 500 MB – 2+ GB | Not recommended — use the native Rust crate | + +BMP and BGP UPDATE messages are small (KB-sized) and have no memory concerns. + +## API Reference + +### Core parsing functions (all platforms) + +| Function | Input | Output | Use case | +|---|---|---|---| +| `parseOpenBmpMessage(data)` | `Uint8Array` | `BmpParsedMessage \| null` | Real-time BMP streams | +| `parseBmpMessage(data, timestamp)` | `Uint8Array`, `number` | `BmpParsedMessage` | Real-time BMP streams | +| `parseBgpUpdate(data)` | `Uint8Array` | `BgpElem[]` | Individual BGP messages | +| `parseMrtRecords(data)` | `Uint8Array` | `Generator` | MRT file analysis | +| `parseMrtRecord(data)` | `Uint8Array` | `MrtRecordResult \| null` | MRT file analysis (low-level) | +| `resetMrtParser()` | — | `void` | Clear state between MRT files | + +### Node.js I/O helpers -The BMP parsing functions return a discriminated union on the `type` field. -All message types include a `timestamp` and an optional `openBmpHeader` -(present only when parsed via `parseOpenBmpMessage`). +| Function | Input | Output | Description | +|---|---|---|---| +| `streamMrtFrom(pathOrUrl)` | `string` | `AsyncGenerator` | Fetch + decompress + stream-parse | +| `openMrt(pathOrUrl)` | `string` | `Promise` | Fetch + decompress only | + +These use Node.js `fs`, `http`, `https`, and `zlib` modules. They are **not +available** in bundler, browser, or Cloudflare Worker environments. + +### BMP message types + +BMP parsing functions return a discriminated union on the `type` field. All +types include `timestamp` and `openBmpHeader` (present only via +`parseOpenBmpMessage`). | `type` | Additional fields | |---|---| @@ -166,28 +205,26 @@ All message types include a `timestamp` and an optional `openBmpHeader` ## Platform Support -| Platform | Import | Notes | -|---|---|---| -| Node.js (CJS) | `require('@bgpkit/parser')` | Uses `nodejs` target, sync WASM loading | -| Node.js (ESM) | `import { ... } from '@bgpkit/parser'` | Uses `bundler` target | -| Bundler | `import { ... } from '@bgpkit/parser'` | webpack, vite, rollup | -| Browser / CF Workers | `import { init, ... } from '@bgpkit/parser/web'` | Must call `await init()` first | +| Platform | Import | Parsing | I/O helpers | Kafka | +|---|---|---|---|---| +| Node.js (CJS) | `require('@bgpkit/parser')` | Yes | Yes | Yes (via kafkajs) | +| Node.js (ESM) | `import from '@bgpkit/parser'` | Yes | No | Yes (via kafkajs) | +| Bundler (webpack, vite) | `import from '@bgpkit/parser'` | Yes | No | No | +| Browser | `import from '@bgpkit/parser/web'` | Yes (after `init()`) | No | No | +| Cloudflare Workers | `import from '@bgpkit/parser/web'` | Yes (after `init()`) | No | No (no TCP sockets) | -### Web target usage +### Web target -The web target requires explicit initialization before calling any parsing -functions: +The web target requires calling `init()` before any parsing functions: ```js import { init, parseOpenBmpMessage } from '@bgpkit/parser/web'; -await init(); // load and compile the WASM module - +await init(); const msg = parseOpenBmpMessage(data); ``` -You can optionally pass a URL or `ArrayBuffer` of the `.wasm` file to `init()` -if the default path doesn't work in your environment: +You can pass a custom URL to the `.wasm` file if the default path doesn't work: ```js await init(new URL('./bgpkit_parser_bg.wasm', import.meta.url)); @@ -195,6 +232,41 @@ await init(new URL('./bgpkit_parser_bg.wasm', import.meta.url)); ### Cloudflare Workers -Use the `@bgpkit/parser/web` entry point. Note that Workers cannot connect to -Kafka directly (no raw TCP sockets), so BMP message bytes must arrive via HTTP -(e.g. from a proxy service). +Use the `@bgpkit/parser/web` entry point. Workers cannot connect to Kafka +(no raw TCP sockets), so BMP/BGP data must arrive via HTTP requests. + +Workers have a 128 MB memory limit on the free plan (up to 256 MB on paid), +which is sufficient for MRT updates files and individual message parsing, but +not for full RIB dumps. + +## Versioning + +The npm package version tracks the Rust crate's minor version. For Rust crate +version `0.X.Y`, the npm package is published as `0.X.Z` where `Z` increments +independently for JS-specific changes. + +## Building from Source + +### Prerequisites + +- [Rust](https://rustup.rs/) (stable toolchain) +- [`wasm-pack`](https://rustwasm.github.io/wasm-pack/installer/): + `cargo install wasm-pack` +- The `wasm32-unknown-unknown` target: + `rustup target add wasm32-unknown-unknown` + +### Build + +```sh +# From the repository root — builds all targets (nodejs, bundler, web) +bash src/wasm/build.sh + +# Output is in pkg/ +cd pkg && npm publish +``` + +To build a single target: + +```sh +wasm-pack build --target nodejs --no-default-features --features wasm +``` diff --git a/src/wasm/build.sh b/src/wasm/build.sh index 7e78439..fb01448 100755 --- a/src/wasm/build.sh +++ b/src/wasm/build.sh @@ -61,6 +61,9 @@ cp "$WASM_DIR/web.mjs" "$OUT_DIR/" cp "$WASM_DIR/web.d.ts" "$OUT_DIR/" cp "$WASM_DIR/package.json" "$OUT_DIR/" +# README for npm (displayed on npmjs.com) +cp "$REPO_ROOT/src/wasm/README.md" "$OUT_DIR/" + # Cleanup temp directories rm -rf pkg-nodejs pkg-bundler pkg-web diff --git a/src/wasm/js/index.d.ts b/src/wasm/js/index.d.ts index cc0bb35..5137cfd 100644 --- a/src/wasm/js/index.d.ts +++ b/src/wasm/js/index.d.ts @@ -19,12 +19,6 @@ export function parseBmpMessage( timestamp: number ): BmpParsedMessage; -/** - * Parse a decompressed MRT file into BGP elements. - * Handles TABLE_DUMP, TABLE_DUMP_V2, and BGP4MP record types. - */ -export function parseMrtFile(data: Uint8Array): BgpElem[]; - /** * Parse a single BGP UPDATE message (with 16-byte marker + 2-byte length * + type header) into BGP elements. Assumes 4-byte ASN encoding. @@ -59,6 +53,22 @@ export interface MrtRecordResult { bytesRead: number; } +// ── High-level I/O helpers (Node.js only) ──────────────────────────── + +/** + * Open an MRT file from a local path or URL, automatically decompressing + * based on the file extension (.gz, .bz2). + */ +export function openMrt(pathOrUrl: string): Promise; + +/** + * Async generator that streams MRT records from a local path or URL. + * Handles fetching, decompression, and incremental parsing. + */ +export function streamMrtFrom( + pathOrUrl: string +): AsyncGenerator; + // ── BMP message types (discriminated union on `type`) ──────────────── export type BmpParsedMessage = diff --git a/src/wasm/js/index.js b/src/wasm/js/index.js index 2f22d66..6a7230a 100644 --- a/src/wasm/js/index.js +++ b/src/wasm/js/index.js @@ -1,7 +1,13 @@ 'use strict'; +const fs = require('fs'); +const http = require('http'); +const https = require('https'); +const zlib = require('zlib'); const wasm = require('./nodejs/bgpkit_parser.js'); +// ── Low-level parsing functions ────────────────────────────────────── + /** * Parse an OpenBMP-wrapped BMP message (e.g. from the RouteViews Kafka stream). * @@ -26,20 +32,6 @@ function parseBmpMessage(data, timestamp) { return JSON.parse(wasm.parseBmpMessage(data, timestamp)); } -/** - * Parse a decompressed MRT file into BGP elements. - * - * Handles TABLE_DUMP, TABLE_DUMP_V2, and BGP4MP record types. - * The caller is responsible for decompressing the file (bzip2/gzip) before - * passing the raw bytes. - * - * @param {Uint8Array} data - Decompressed MRT file bytes - * @returns {object[]} Array of BgpElem objects - */ -function parseMrtFile(data) { - return JSON.parse(wasm.parseMrtFile(data)); -} - /** * Parse a single BGP UPDATE message into BGP elements. * @@ -56,28 +48,23 @@ function parseBgpUpdate(data) { return JSON.parse(wasm.parseBgpUpdate(data)); } +// ── Streaming MRT record parsing ───────────────────────────────────── + // MRT common header: timestamp(4) + type(2) + subtype(2) + length(4) = 12 bytes. -// Extended timestamp records add 4 more bytes to the header but the length -// field still measures only the message body, so total = 12 + length (or 16 + length -// for extended). We read the basic 12-byte framing to extract one record at a time. const MRT_HEADER_LEN = 12; /** * Read the MRT record length from a 12-byte header. * Returns the total record size (header + body) or -1 if not enough data. - * @param {Uint8Array} data - * @param {number} offset - * @returns {number} */ function mrtRecordSize(data, offset) { if (offset + MRT_HEADER_LEN > data.length) return -1; - // length is a big-endian u32 at offset+8 const bodyLen = (data[offset + 8] << 24) | (data[offset + 9] << 16) | (data[offset + 10] << 8) | data[offset + 11]; - return MRT_HEADER_LEN + (bodyLen >>> 0); // >>> 0 to treat as unsigned + return MRT_HEADER_LEN + (bodyLen >>> 0); } /** @@ -86,22 +73,16 @@ function mrtRecordSize(data, offset) { * Only sends the bytes of that one record to WASM, so even multi-GB files * work without exceeding WASM's 4 GB memory limit. * - * Returns `{ elems: BgpElem[], bytesRead: number }` on success, - * or `null` when there are no more records. - * * @param {Uint8Array} data - Remaining decompressed MRT bytes * @returns {{ elems: object[], bytesRead: number } | null} */ function parseMrtRecord(data) { const size = mrtRecordSize(data, 0); if (size < 0 || size > data.length) return null; - // Pass only this record's bytes to WASM const recordBytes = data.subarray(0, size); const json = wasm.parseMrtRecord(recordBytes); if (!json) return null; const result = JSON.parse(json); - // Override bytesRead with the framed size (the WASM side always reports - // bytesRead == recordBytes.length, but we set it here for clarity) result.bytesRead = size; return result; } @@ -140,12 +121,202 @@ function* parseMrtRecords(data) { } } +// ── I/O helpers (oneio-style transparent source + compression) ─────── + +/** + * Try to load an optional bzip2 decompressor. + * Supports: 'unbzip2-stream' (streaming), 'seek-bzip' (sync), 'bz2' (sync). + */ +let _bz2Module = undefined; // undefined = not checked, null = not available +function getBz2() { + if (_bz2Module !== undefined) return _bz2Module; + for (const name of ['unbzip2-stream', 'seek-bzip', 'bz2']) { + try { + _bz2Module = { name, mod: require(name) }; + return _bz2Module; + } catch {} + } + _bz2Module = null; + return null; +} + +/** + * Detect compression type from a file path or URL. + * @param {string} pathOrUrl + * @returns {'gz' | 'bz2' | 'xz' | 'none'} + */ +function detectCompression(pathOrUrl) { + const name = pathOrUrl.split('?')[0].split('#')[0]; // strip query/fragment + if (name.endsWith('.gz')) return 'gz'; + if (name.endsWith('.bz2')) return 'bz2'; + if (name.endsWith('.xz')) return 'xz'; + return 'none'; +} + +/** + * Open an HTTP(S) URL and return a readable stream, following redirects. + * @param {string} url + * @returns {Promise} + */ +function httpGet(url) { + return new Promise((resolve, reject) => { + const lib = url.startsWith('https') ? https : http; + lib + .get(url, (res) => { + if (res.statusCode === 301 || res.statusCode === 302) { + httpGet(res.headers.location).then(resolve, reject); + return; + } + if (res.statusCode !== 200) { + reject(new Error(`HTTP ${res.statusCode} for ${url}`)); + return; + } + resolve(res); + }) + .on('error', reject); + }); +} + +/** + * Collect a readable stream into a single Buffer. + * @param {import('stream').Readable} stream + * @returns {Promise} + */ +function collectStream(stream) { + return new Promise((resolve, reject) => { + const chunks = []; + stream.on('data', (chunk) => chunks.push(chunk)); + stream.on('end', () => resolve(Buffer.concat(chunks))); + stream.on('error', reject); + }); +} + +/** + * Decompress a buffer based on the detected compression type. + * @param {Buffer} buf + * @param {'gz' | 'bz2' | 'xz' | 'none'} compression + * @returns {Buffer} + */ +function decompressSync(buf, compression) { + if (compression === 'gz') { + return zlib.gunzipSync(buf); + } + if (compression === 'bz2') { + const bz2 = getBz2(); + if (!bz2) { + throw new Error( + 'bzip2 decompression requires an optional dependency. ' + + 'Install one of: npm install unbzip2-stream, npm install seek-bzip, npm install bz2' + ); + } + if (bz2.name === 'seek-bzip') { + return Buffer.from(bz2.mod.decode(buf)); + } + if (bz2.name === 'bz2') { + const decompress = bz2.mod.decompress || bz2.mod; + return Buffer.from(decompress(buf)); + } + // unbzip2-stream: streaming, need to pipe through + throw new Error( + 'unbzip2-stream does not support sync decompression. ' + + 'Use streamMrtFrom() instead, or install seek-bzip or bz2.' + ); + } + if (compression === 'xz') { + throw new Error('xz decompression is not yet supported in the WASM package'); + } + return buf; +} + +/** + * Create a decompression stream for the given compression type. + * @param {'gz' | 'bz2' | 'xz' | 'none'} compression + * @returns {import('stream').Transform | null} + */ +function decompressStream(compression) { + if (compression === 'gz') { + return zlib.createGunzip(); + } + if (compression === 'bz2') { + const bz2 = getBz2(); + if (!bz2) { + throw new Error( + 'bzip2 decompression requires an optional dependency. ' + + 'Install one of: npm install unbzip2-stream, npm install seek-bzip, npm install bz2' + ); + } + if (bz2.name === 'unbzip2-stream') { + return bz2.mod(); + } + // seek-bzip and bz2 don't have streaming APIs; collect and decompress + return null; + } + return null; +} + +/** + * Open an MRT file from a local path or URL, automatically decompressing + * based on the file extension (.gz, .bz2). + * + * This is the JS equivalent of oneio's `get_reader(path)` — it makes the + * source (local file vs HTTP) and compression format transparent. + * + * @param {string} pathOrUrl - Local file path or HTTP(S) URL + * @returns {Promise} Decompressed MRT file bytes + */ +async function openMrt(pathOrUrl) { + const compression = detectCompression(pathOrUrl); + const isUrl = + pathOrUrl.startsWith('http://') || pathOrUrl.startsWith('https://'); + + if (isUrl) { + const rawStream = await httpGet(pathOrUrl); + const decomp = decompressStream(compression); + if (decomp) { + return collectStream(rawStream.pipe(decomp)); + } + // No streaming decompressor available — collect then decompress sync + const raw = await collectStream(rawStream); + return decompressSync(raw, compression); + } + + // Local file + const raw = fs.readFileSync(pathOrUrl); + return decompressSync(raw, compression); +} + +/** + * Async generator that streams MRT records from a local path or URL. + * + * Handles fetching, decompression (gz/bz2), and incremental parsing. + * Each yield returns one MRT record's elements, keeping WASM memory + * usage constant regardless of file size. + * + * @param {string} pathOrUrl - Local file path or HTTP(S) URL + * @yields {{ elems: object[], bytesRead: number }} + * + * @example + * for await (const { elems } of streamMrtFrom("https://archive.routeviews.org/.../rib.20250101.0000.bz2")) { + * for (const elem of elems) { + * console.log(elem.prefix, elem.as_path); + * } + * } + */ +async function* streamMrtFrom(pathOrUrl) { + const raw = await openMrt(pathOrUrl); + yield* parseMrtRecords(raw); +} + module.exports = { + // Low-level byte parsers (all platforms) parseOpenBmpMessage, parseBmpMessage, - parseMrtFile, - parseMrtRecord, + parseBgpUpdate, parseMrtRecords, + parseMrtRecord, resetMrtParser, - parseBgpUpdate, + + // Node.js I/O helpers + openMrt, + streamMrtFrom, }; diff --git a/src/wasm/js/index.mjs b/src/wasm/js/index.mjs index a43eba4..cfba09b 100644 --- a/src/wasm/js/index.mjs +++ b/src/wasm/js/index.mjs @@ -9,10 +9,51 @@ export function parseBmpMessage(data, timestamp) { return JSON.parse(wasm.parseBmpMessage(data, timestamp)); } -export function parseMrtFile(data) { - return JSON.parse(wasm.parseMrtFile(data)); -} - export function parseBgpUpdate(data) { return JSON.parse(wasm.parseBgpUpdate(data)); } + +// ── Streaming MRT record parsing ───────────────────────────────────── + +const MRT_HEADER_LEN = 12; + +function mrtRecordSize(data, offset) { + if (offset + MRT_HEADER_LEN > data.length) return -1; + const bodyLen = + (data[offset + 8] << 24) | + (data[offset + 9] << 16) | + (data[offset + 10] << 8) | + data[offset + 11]; + return MRT_HEADER_LEN + (bodyLen >>> 0); +} + +export function parseMrtRecord(data) { + const size = mrtRecordSize(data, 0); + if (size < 0 || size > data.length) return null; + const recordBytes = data.subarray(0, size); + const json = wasm.parseMrtRecord(recordBytes); + if (!json) return null; + const result = JSON.parse(json); + result.bytesRead = size; + return result; +} + +export function resetMrtParser() { + wasm.resetMrtParser(); +} + +export function* parseMrtRecords(data) { + wasm.resetMrtParser(); + let offset = 0; + while (offset < data.length) { + const size = mrtRecordSize(data, offset); + if (size < 0 || offset + size > data.length) break; + const recordBytes = data.subarray(offset, offset + size); + const json = wasm.parseMrtRecord(recordBytes); + if (!json) break; + const result = JSON.parse(json); + result.bytesRead = size; + yield result; + offset += size; + } +} diff --git a/src/wasm/js/package.json b/src/wasm/js/package.json index c7afc19..54618bb 100644 --- a/src/wasm/js/package.json +++ b/src/wasm/js/package.json @@ -1,6 +1,6 @@ { "name": "@bgpkit/parser", - "version": "0.15.0", + "version": "0.15.2", "description": "BGP/BMP/MRT message parser compiled to WebAssembly", "license": "MIT", "repository": { @@ -40,7 +40,8 @@ "index.mjs", "index.d.ts", "web.mjs", - "web.d.ts" + "web.d.ts", + "README.md" ], "keywords": [ "bgp", diff --git a/src/wasm/js/web.d.ts b/src/wasm/js/web.d.ts index e7dd7c5..5489252 100644 --- a/src/wasm/js/web.d.ts +++ b/src/wasm/js/web.d.ts @@ -13,9 +13,17 @@ export { OpenBmpHeader, BmpPeerHeader, BgpElem, + MrtRecordResult, } from './index'; -export { parseOpenBmpMessage, parseBmpMessage, parseMrtFile, parseBgpUpdate } from './index'; +export { + parseOpenBmpMessage, + parseBmpMessage, + parseBgpUpdate, + parseMrtRecord, + resetMrtParser, + parseMrtRecords, +} from './index'; /** * Initialize the WASM module. Must be called (and awaited) before using diff --git a/src/wasm/js/web.mjs b/src/wasm/js/web.mjs index 3a8bb0b..e832df9 100644 --- a/src/wasm/js/web.mjs +++ b/src/wasm/js/web.mjs @@ -21,10 +21,51 @@ export function parseBmpMessage(data, timestamp) { return JSON.parse(wasm.parseBmpMessage(data, timestamp)); } -export function parseMrtFile(data) { - return JSON.parse(wasm.parseMrtFile(data)); -} - export function parseBgpUpdate(data) { return JSON.parse(wasm.parseBgpUpdate(data)); } + +// ── Streaming MRT record parsing ───────────────────────────────────── + +const MRT_HEADER_LEN = 12; + +function mrtRecordSize(data, offset) { + if (offset + MRT_HEADER_LEN > data.length) return -1; + const bodyLen = + (data[offset + 8] << 24) | + (data[offset + 9] << 16) | + (data[offset + 10] << 8) | + data[offset + 11]; + return MRT_HEADER_LEN + (bodyLen >>> 0); +} + +export function parseMrtRecord(data) { + const size = mrtRecordSize(data, 0); + if (size < 0 || size > data.length) return null; + const recordBytes = data.subarray(0, size); + const json = wasm.parseMrtRecord(recordBytes); + if (!json) return null; + const result = JSON.parse(json); + result.bytesRead = size; + return result; +} + +export function resetMrtParser() { + wasm.resetMrtParser(); +} + +export function* parseMrtRecords(data) { + wasm.resetMrtParser(); + let offset = 0; + while (offset < data.length) { + const size = mrtRecordSize(data, offset); + if (size < 0 || offset + size > data.length) break; + const recordBytes = data.subarray(offset, offset + size); + const json = wasm.parseMrtRecord(recordBytes); + if (!json) break; + const result = JSON.parse(json); + result.bytesRead = size; + yield result; + offset += size; + } +} diff --git a/src/wasm/test/test-published.js b/src/wasm/test/test-published.js index cd72aa1..e88b52a 100644 --- a/src/wasm/test/test-published.js +++ b/src/wasm/test/test-published.js @@ -11,7 +11,7 @@ */ const https = require('https'); -const { parseOpenBmpMessage, parseBmpMessage, parseBgpUpdate, parseMrtFile } = require('@bgpkit/parser'); +const { parseOpenBmpMessage, parseBmpMessage, parseBgpUpdate, parseMrtRecords } = require('@bgpkit/parser'); let passed = 0; let failed = 0; @@ -134,11 +134,11 @@ function testBgpUpdate() { assert(elem.next_hop === '192.0.2.1', `next_hop is ${elem.next_hop}`); } -// ── Test 5: parseMrtFile ───────────────────────────────────────────────────── +// ── Test 5: parseMrtRecords (streaming) ────────────────────────────────────── -function testMrtFile() { +function testMrtRecords() { return new Promise((resolve) => { - console.log('\n[5/5] parseMrtFile'); + console.log('\n[5/5] parseMrtRecords'); console.log(' ↓ downloading test MRT file...'); const url = 'https://spaces.bgpkit.org/parser/update-example'; @@ -163,12 +163,14 @@ function handleResponse(res, resolve) { const raw = Buffer.concat(chunks); console.log(` ↓ downloaded ${(raw.length / 1024).toFixed(0)} KB`); - const elems = parseMrtFile(raw); + const allElems = []; + for (const { elems } of parseMrtRecords(raw)) { + allElems.push(...elems); + } - assert(Array.isArray(elems), 'returns an array'); - assert(elems.length > 0, `parsed ${elems.length} elements`); + assert(allElems.length > 0, `parsed ${allElems.length} elements`); - const announce = elems.find((e) => e.type === 'ANNOUNCE'); + const announce = allElems.find((e) => e.type === 'ANNOUNCE'); if (announce) { assert(typeof announce.prefix === 'string', `sample prefix: ${announce.prefix}`); assert(typeof announce.peer_ip === 'string', `sample peer_ip: ${announce.peer_ip}`); @@ -192,7 +194,7 @@ async function main() { testOpenBmpNull(); testBmpMessage(); testBgpUpdate(); - await testMrtFile(); + await testMrtRecords(); console.log(`\n${passed + failed} assertions: ${passed} passed, ${failed} failed`); process.exit(failed > 0 ? 1 : 0); From f4db1c29b159c2f21f151ac8a1cdc9bf5b416323 Mon Sep 17 00:00:00 2001 From: Mingwei Zhang Date: Sun, 22 Mar 2026 17:24:10 -0700 Subject: [PATCH 06/10] Fix WASM loading in browser MRT explorer Switch from esm.sh to jsdelivr CDN for loading @bgpkit/parser WASM. esm.sh rewrites JS modules, breaking import.meta.url and WASM path resolution. jsdelivr serves raw files with correct Content-Type and CORS headers. Also use the /web entry point (with async init()) instead of the bundler target which requires bundler-specific WASM handling. --- examples/wasm/mrt-explorer.html | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/examples/wasm/mrt-explorer.html b/examples/wasm/mrt-explorer.html index 00aee43..b81d5b1 100644 --- a/examples/wasm/mrt-explorer.html +++ b/examples/wasm/mrt-explorer.html @@ -95,28 +95,35 @@
- + - - - - - -
- - - - - - - diff --git a/src/wasm/README.md b/src/wasm/README.md index ddc7af3..7118faf 100644 --- a/src/wasm/README.md +++ b/src/wasm/README.md @@ -118,7 +118,6 @@ Parse MRT files dropped or fetched in the browser. Uses the web entry point which requires calling `init()` before any parsing. **Live demo**: [mrt-explorer.labs.bgpkit.com](https://mrt-explorer.labs.bgpkit.com/) -([source](https://github.com/bgpkit/labs/tree/main/mrt-explorer)) ```js import { init, parseMrtRecords } from '@bgpkit/parser/web';