Skip to content

clusterd-test-driver: headless frontend to clusterd for scripted compute tests#37008

Open
antiguru wants to merge 47 commits into
MaterializeInc:mainfrom
antiguru:headless-compute-driver
Open

clusterd-test-driver: headless frontend to clusterd for scripted compute tests#37008
antiguru wants to merge 47 commits into
MaterializeInc:mainfrom
antiguru:headless-compute-driver

Conversation

@antiguru

@antiguru antiguru commented Jun 12, 2026

Copy link
Copy Markdown
Member

Motivation

environmentd couples the compute protocol to the full SQL and catalog stack, which makes targeted compute experiments slow to set up and hard to control.
This adds a generic headless frontend to clusterd that drives the compute protocol directly from a script, so a test controls the exact persist state, the exact commands the replica receives, and the exact timestamps.
Design doc: doc/developer/design/20260612_headless_clusterd_test_driver.md.

Description

New crate mz-clusterd-test-driver, split into a generic mechanism, a dataflow builder, and a text scripting layer.

The mechanism hosts the persist PubSub server, connects over CTP (sending only Hello), and exposes a Driver that sends any ComputeCommand, submits dataflows without auto-scheduling, observes merged frontiers and the raw response stream, and peeks.
DataflowBuilder assembles dataflows from generic parts (import_persist, import_index, build over MIR, export_index), owning the MIR-to-LIR lowering, the RenderPlan conversion, the CollectionMetadata attachment, and the SqlRelationType-versus-ReprRelationType bookkeeping; a RenderPlan cannot be hand-built outside mz-compute-types, so it uses the real lowering pipeline.

The driver runs a hand-writable, datadriven-style text script: each stanza is a command (a directive line plus an optional indentation-structured body) followed by a ---- expected-output block that is the assertion, regenerable with REWRITE=1. JSON was awkward to author by hand, especially the MIR spec embedded in a string.
The controller handshake is explicit: a script opens with create-instance, update-configuration (toggles the peek-response stash, off by default so peeks return inline), and initialization-complete. It then declares relations with define-schema, writes synthetic or explicit rows, submits and schedules dataflows, drives allow-compaction, awaits frontiers, and reads results with count/peek — whose output the ---- block asserts. A command that fails renders as error: <message>, so an expected failure is asserted by its golden block rather than a special command.
create-dataflow builds a dataflow from generic MIR — the abstraction behind index / materialized-view / subscribe / copy-to, with define-index as sugar over it (only the index export kind is implemented; the sink kinds are scaffolded). Its build expressions are authored as mz-expr-parser specs (the readable MIR-from-text syntax used by the transform .spec tests, MzReflect-free) rather than a bespoke vocabulary, since MirRelationExpr's serde is not hand-authorable (Row literals are opaque bytes, Get carries a full ReprRelationType). Index import lets a computation read an arrangement the replica already holds, which the lowering picks up automatically with no optimizer.
Assertions run through compute: peek peeks an index and emits its rows, and count is sugar that index-imports the target, computes a count(*) Reduce over it, and peeks the single-row result, so a count runs through a real reduce operator rather than being tallied in the driver.
Because assertions are level-triggered waits on monotonic frontiers, a single sequential script is deterministic regardless of how the dataflows interleave; await-frontier --allow-timeout emits a fixed awaited token so the multi_dataflow repro, whose hydration is nondeterministic, still has stable golden output.
reconnect re-sends only Hello; the script then re-issues create-instance, replays the reconciliation set, and sends initialization-complete to close the window. The builder returns errors instead of panicking on malformed input, so bad script input is asserted as an error: line.
Explicit row values are written as plain tokens, typed against the schema and packed via cell_from_json (JSON value + ColumnType -> Datum).
The scenarios — index, deep-history, side-effects, multi-dataflow, reconciliation, error-behavior, and reduce — are text scripts under test/clusterd-test-driver/scripts/, not compiled Rust.

The handshake version comes from mz_persist_client::BUILD_INFO so it matches the connected clusterd. In mzcompose, persist consensus lives in the environment-selected metadata store (metadata_store_companions(), default Postgres) rather than a hardcoded Cockroach.

Verification

mzcompose runs each scenario script against a real clusterd, with the composition directory mounted at /workdir so the scripts are readable in the container; each command's output is compared to its golden ---- block and a mismatch fails the run.
Count assertions go through a count(*) Reduce, reduce asserts a reduce's output via peek, error-behavior asserts a set of error messages, and multi-dataflow reproduces a current limitation while staying deterministic via allow-timeout.
Crate unit tests cover the direct-write round trip, the frontier merge, the lowered dataflow structure (including count-over-index), schema and datum parsing, and the text script parser.
The local runner run-local.py drives the same scripts on the host (with REWRITE=1 to regenerate goldens) for profiling.

antiguru and others added 16 commits June 12, 2026 10:31
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…r/peek

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…unverified plan)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Use mz_persist_client::BUILD_INFO (release-versioned) for the CTP handshake
and persist config so the driver matches the clusterd it connects to; our
crate is 0.0.0 and failed the version check. Disable the peek response stash
so peeks return rows inline. Drop the redundant explicit [[bin]] (the bin is
auto-discovered, and the duplicate broke mzbuild).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Builds cockroach + minio + clusterd + the headless-driver image and runs the
index-over-persist-shard scenario end to end. The driver hosts persist PubSub;
clusterd is pointed at it via mz_service.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…notes

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…entry

Add the BSL copyright header to source files that were missing it, ignore the
mzbuild-copied binary in ci/, and register the composition in the test
pipeline so it runs in CI.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Log send/recv failures and disconnects in the response pump so an e2e hang has
a breadcrumb instead of surfacing only as a misleading frontier timeout. Remove
unused async-trait, differential-dataflow, and semver dependencies.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

@antiguru antiguru left a comment

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What we're missing is a specific language to script the headless driver. Encoding the interactions in Rust is a good first step, but ultimately we want something that is easier to iterate on. I could even imagine Python scripts that we load through pyo3 or so.

Comment thread src/compute-test-driver/src/lib.rs Outdated
Remove the placeholder link test, fix a rustdoc invalid-html-tag error in the
target module example, and note a scripting language as future work.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@antiguru

Copy link
Copy Markdown
Member Author

Agreed on a scripting language being the goal. I captured it under "Future work" in the design doc: the mechanism is already a thin scriptable surface (send, submit_dataflow, schedule, expect_frontier, peek, subscribe_raw), so a declarative script or Python-via-pyo3 layer would bind to it rather than replace it. Left it out of this PR to keep the first step focused.

antiguru and others added 11 commits June 12, 2026 13:22
The driver controls clusterd and is intended to grow storage scenarios, so
"compute" was too narrow. Renames the crate, image, composition, and design
doc to clusterd-test-driver.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Add run-local.sh to run clusterd + the driver entirely on the host (no mzbuild
images), and document how to launch clusterd under a profiler.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…w scenarios

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
run-local.sh accepts WRAPPER (e.g. heaptrack, perf) prepended to the clusterd
command. Cleanup terminates the inner clusterd, not the wrapper, so the
profiler flushes its output and exits cleanly; PID resolution excludes the
wrapper pid since its argv also contains "target/debug/clusterd".

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…file

Build clusterd and the driver with --profile optimized (release-like with
debug symbols) by default for representative profiling; override with
PROFILE=dev. Paths resolve under target/<profile>/.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Default TARGET_BYTES to 2 GiB so local runs do measurable work. Build clusterd
with --no-default-features when WRAPPER is heaptrack (or
CLUSTERD_NO_DEFAULT_FEATURES=1), since the default mz-alloc-default feature
pulls in jemalloc, which bypasses the allocator heaptrack hooks.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
workflow_default runs index, deep-history, side-effects, and multi-dataflow,
restarting clusterd between each for a clean compute state. The first three
assert; multi-dataflow exits 0 by design (documents a current limitation).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
write_rows_spread did one compare_and_append per timestamp (N consensus
round-trips); with large N_TIMESTAMPS this dominated startup. Persist accepts
updates at any timestamp within a sealed range, so write them all in a single
append regardless of N_TIMESTAMPS.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
docker compose run omits the service's network aliases by default, so other
services can't reach the run container by name. Add an opt-in use_aliases flag
and set it for the headless-driver so clusterd can reach the persist PubSub
server the driver hosts (otherwise it falls back to consensus polling).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
run-local.sh built the two binaries in separate cargo invocations; build them
in a single invocation (clusterd only when spawning it).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
The index scenario now has two phases: load (bulk write + hydrate) and tick
(append at advancing timestamps, waiting for the index output frontier to step
forward each time), exercising steady-state incremental maintenance. Tunable
via TICKS/TICK_ROWS; sample_rows_from keeps each batch's ids disjoint so the
final count is exact.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
antiguru and others added 5 commits June 12, 2026 16:21
…hared build

mzbuild builds several images' bins in one cargo invocation. The driver pulls
mz-ore/tokio-console transitively (via mz-compute-client), but did not enable
mz-orchestrator-tracing's tokio-console feature, so when co-built with
orchestratord (which builds orchestrator-tracing with that feature off)
orchestrator-tracing failed to compile (E0063 on TracingConfig::tokio_console).
Configure tracing in the driver via mz-orchestrator-tracing like the other
binaries do, which both gives the driver structured logging and keeps the
feature consistent. Also drop mz-ore default features and move the test feature
to dev-dependencies so the production binary doesn't carry tokio-console/test.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…t effort

Rewrite the local runner as a Python script (run via bin/pyactivate) so it can
reuse Materialize's mzcompose helpers. It builds the timely config via the same
timely_config and DEFAULT_*_EXERT_PROPORTIONALITY constants as the Clusterd
service, keeping the arrangement merge effort in sync with CI defaults (compute
16, storage 1337) instead of hardcoding 0. Extract those constants in
clusterd.py. Behavior (WRAPPER profiling, single build, --no-default-features,
scenarios) is preserved.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
It was committed unformatted because bin/fmt only touches git-tracked files and
the file was still untracked when fmt ran.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Replace the opinionated index_dataflow constructor with a DataflowBuilder
that owns the lowering/augment mechanism and exposes the five axes a test
should control (imports, computation, exports, temporal bounds, ids) as
verbs: import_persist, build, export_index, as_of, until, finish.

index_dataflow becomes thin sugar over the builder. The augment step now
attaches per-source persist metadata looked up by id, so multi-source
dataflows fall out for free. Document the JSON-scripting direction
(Claude-authored MIR, with the Row-literal shim caveat) in the design doc.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Add a persistent JSON-line command reader so a test (or agent) can script
the driver without recompiling. `SCENARIO=script` reads commands from stdin,
executes each against clusterd, and writes one JSON response per line,
exiting non-zero if any command failed.

Orchestration verbs map directly to Driver calls: write_single_ts,
write_spread, schedule, allow_compaction, await_frontier, peek_count. Shards
are named by string alias (allocated on first use); object ids are raw u64.
define_index covers the common shape via the index_dataflow sugar; a
full-MIR define with the literal shim is the remaining step.

run-local.py gains SCRIPT=<path> to pipe a command file into the driver;
sample at test/clusterd-test-driver/scripts/index.jsonl. Verified end to end
locally: write -> define -> schedule -> await -> peek returned 5000 rows.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@antiguru antiguru changed the title compute-test-driver: headless frontend to clusterd for scripted compute tests clusterd-test-driver: headless frontend to clusterd for scripted compute tests Jun 12, 2026
antiguru and others added 8 commits June 12, 2026 17:39
Scripts can now declare relation schemas and supply row values, instead of
the hardcoded (bigint, text) sample schema.

* define_schema {name, columns:[{name,type,nullable}]} builds a RelationDesc
  stored under a name; write/define_index/peek reference it via `schema`
  (default = built-in sample relation). Type vocabulary: int16/int32/int64,
  bool, string, bytes (+ SQL aliases).
* Synthetic generation is now schema-driven: data.rs gains an owned Cell
  enum, synth_cell/synth_rows that fill any supported column by type;
  sample_rows_from delegates to synth_rows for DRY.
* write_rows takes explicit JSON values, parsed by cell_from_json
  (JSON value + ColumnType -> Datum) -- the literal shim the future full-MIR
  define will reuse, so the Row-packed-bytes caveat is resolved early.

The synthetic count field is renamed rows->count to disambiguate from
write_rows' explicit array. Verified e2e: a 3-column custom schema with
1000 synthetic + 2 explicit rows (incl. a null) indexes and peeks 1002.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
The four scenarios are now JSON command scripts, not compiled Rust. The bin
is solely a script reader: it reads commands from DRIVER_SCRIPT (or stdin),
so the SCENARIO match and scenario_* functions are gone.

New primitives make the conversion faithful:
* `start` offset on write_single_ts/write_spread, so the index tick phase
  writes disjoint id ranges that accumulate instead of consolidating.
* `expect_count` asserts a peek's row count and fails the run on mismatch,
  replacing each scenario's anyhow::ensure.
* `await_frontier` gains `allow_timeout`: multi_dataflow reports a
  non-hydrating dataflow (status: timeout) without failing the run.

scripts/: index, deep_history, side_effects (self-checking via expect_count)
and multi_dataflow (tolerant, exits 0). mzcompose mounts the composition dir
at /workdir (the testdrive convention) and runs each via DRIVER_SCRIPT;
run-local.py takes SCRIPT and passes DRIVER_SCRIPT instead of piping stdin.
All four verified e2e (index=10000, deep_history=5000, side_effects=6000,
multi_dataflow one index hydrates + one times out, exit 0).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Add coverage for two more protocol surfaces via three new script commands.

* `reconnect` re-handshakes but stops before `initialization_complete`,
  opening the reconciliation window; `initialization_complete` closes it.
  The ctp handshake is split into connect_and_hello + the final
  InitializationComplete, and Driver::reconnect swaps in a fresh pump
  (dropping the old sender stops the old one). reconciliation.jsonl replays
  the running dataflow in the window so the replica reconciles and keeps it.
* `expect_error` runs a nested command and asserts it fails, so
  error_behavior.jsonl covers bad input (unknown schema, wrong arity/type)
  and replica behavior (an unscheduled dataflow's frontier never advancing)
  as passing assertions.

Both verified e2e: reconciliation keeps the index (count 5000 after
reconnect); error_behavior catches all four expected failures. Added to the
mzcompose SCRIPTS list.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
cargo doc rejects intra-doc links from public items to private ones. The
script command field docs linked the private scalar_type_from_str fn and the
private DEFAULT_ROW_BYTES/DEFAULT_TIMEOUT_SECS consts; demote them to plain
code spans. This failed the Doctests job, which fail-fast-cancelled the image
build and cascaded into the Clusterd test driver job (image not found).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
The step uses its own clusterd-test-driver mzbuild image. The instrumented
no-LTO builds (coverage, sanitizer) produce the standard images but not this
custom one, so docker compose fails to resolve it ("image not found") — as
seen in the coverage build 125447, while the main pipeline (125427) passed.
Exclude the step from those pipelines, matching the convention for steps that
can't run instrumented.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
* DataflowBuilder::finish/augment and index_dataflow now return
  anyhow::Result instead of panicking on a malformed plan, so script input
  that reaches the build path surfaces a clean error. define_index also
  validates key columns against the schema arity up front, so an out-of-range
  key is caught by expect_error rather than crashing the driver process.
  error_behavior.jsonl gains a key-out-of-range case.
* Drop the unused mz-cluster-client dependency (no references in the crate;
  cargo-udeps would flag it).
* Move src/clusterd-test-driver to its alphabetical spot after src/clusterd
  in the workspace member lists.
* run-local.py wait_for_port raises on timeout instead of returning silently.

Also revert the coverage/sanitizer skip from the prior commit; that was not
the right fix for the image-not-found.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Scripting via JSON promoted from future work to a main section; Future
work now lists only the remaining full-MIR define, joins/optimize, and
the storage CTP channel. Script list, fallible finish(), define_index
key validation, and the SCRIPT= local invocation reflect the code.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@antiguru antiguru marked this pull request as ready for review June 12, 2026 18:04
@antiguru antiguru requested a review from a team as a code owner June 12, 2026 18:04
@antiguru antiguru requested review from def- and frankmcsherry June 12, 2026 18:04

@def- def- left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have once built something very similar at a previous company, also using json inputs to test an execution engine directly instead of going through the entire database stack. Back then the test framework turned out to not be very useful since it was easier to write unit tests and full system tests instead of the middle-ground headless json tests.

Do we have some concrete examples of regression tests we've wanted to build and failed before, which would be possible with clusterd-test-driver?

I still think it's a worthy experiment, and I can especially imagine that building a fuzzer for clusterd-test-driver with some invariants could lead us to interesting bugs.

antiguru and others added 4 commits June 15, 2026 10:07
Add cross-dataflow index import to DataflowBuilder, a full-MIR `define`
command (get/reduce/project via MirJson), and a generic `expect_peek` row
assertion. peek_count/expect_count become sugar that index-imports the
target index, computes a count(*) Reduce over it, and peeks the result, so
counts run through real compute instead of being tallied with .len() in the
driver. New reduce.jsonl scenario spells the pattern out directly.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
MirRelationExpr derives serde but is not hand-authorable (Row literals are
opaque bytes, Get carries a full ReprRelationType), so the first cut invented
a parallel MirJson vocabulary. Reuse the repo's existing MIR-from-text parser
instead: define's build expr is now an mz-lowertest spec parsed by build_rel
against a TestCatalog seeded from the imports, with the parsed Get ids remapped
back to the script's ids by name. Full MIR vocabulary for free; no curated
sub-language to maintain.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…test

mz-lowertest is built on MzReflect, which the codebase intends to retire.
Switch define's build expressions to mz-expr-parser — the MzReflect-free
pretty-MIR parser used by the transform .spec tests (parses with syn). The
syntax is also more readable: `Reduce aggregates=[count(*)]` over `Get u1000`
instead of the lowertest sexpr. Get-id remapping against the seeded catalog is
unchanged.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Replace the JSONL command scripts with a hand-writable, datadriven-style text
format (the `text` module): each stanza is a command (directive line plus an
indentation-structured body) and a `----` expected block that is the assertion,
regenerable with REWRITE=1. JSON was awkward to hand-write, especially the MIR
spec embedded in a string.

Drop the explicit expect_* commands: `count`/`peek` just emit output and the
golden block asserts; a failing command renders as `error: <msg>`, so
error-behavior is golden with no wrapper. Assertions are level-triggered waits
on monotonic frontiers, so a sequential script is deterministic regardless of
how the dataflows interleave; `await-frontier --allow-timeout` emits a fixed
`awaited` token so multi_dataflow stays deterministic despite its (deferred)
hydration indeterminism.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Comment thread test/clusterd-test-driver/mzcompose.py Outdated


SERVICES = [
Cockroach(setup_materialize=True),

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use the metadata store that automatically dispatches to the DB configured through the environment?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in d8bf972. The composition now pulls the store in via metadata_store_companions() (dispatches to the env-configured DB, default postgres-metadata) and derives the consensus URI from METADATA_STORE instead of hardcoding Cockroach. Guarded to a SQL store, since persist consensus needs a Postgres-wire backend (FoundationDB wouldn't work).

antiguru and others added 2 commits June 15, 2026 12:15
…mpanion

Make the controller handshake explicit script commands instead of hiding it in
connect: `create-instance`, `update-configuration` (toggles the peek-response
stash), and `initialization-complete`. Connecting (and reconnecting) now sends
only Hello, so a script controls the instance config and exactly when the
reconciliation window opens and closes. Every scenario opens with the handshake;
reconciliation re-issues create-instance after reconnect.

Address review: mzcompose pulls in the metadata store via
metadata_store_companions() (dispatches to the env-configured DB, default
Postgres) and derives the consensus URI from it, instead of hardcoding Cockroach.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…xport kind

Reframe `define` as `create-dataflow` — the abstraction behind index /
materialized-view / subscribe / copy-to. Each export now carries a `kind`; only
`index` is implemented, with the sink kinds (matching ComputeSinkConnection)
scaffolded as named variants that error clearly until built. `define-index`
stays as sugar. Also note in the metadata-store guard why FoundationDB is
excluded (persist's FdbConsensus is behind a cargo feature clusterd doesn't
build, and its consensus URI is a cluster file, not postgres-wire).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants