Skip to content

storage: refactor upsert source to use simplified control flow#35407

Draft
DAlperin wants to merge 3 commits intoMaterializeInc:mainfrom
DAlperin:dov/refactor-upsert-source
Draft

storage: refactor upsert source to use simplified control flow#35407
DAlperin wants to merge 3 commits intoMaterializeInc:mainfrom
DAlperin:dov/refactor-upsert-source

Conversation

@DAlperin
Copy link
Member

@DAlperin DAlperin commented Mar 10, 2026

Instead of doing partial timestamp omission we now rely on a rocksdb
cache to consolidate as we go, only omitting full data. The I/O load
should be about the same since we had to go to rocksb anyway for each
key during partial emission.

We could potentially optimize further by having an in memory stash if
data is small and only spilling to rocks after some threshold but in
absence of evidence I'm avoiding the complication.

Further we could conceivably update our Rocks wrapper to support
multiple CF, but again, I don't think it's super worth it for now.

Instead of doing partial timestamp omission we now rely on a rocksdb
cache to consolidate as we go, only ommitting full data. The I/O load
should be about the same since we had to go to rocksb anyway for each
key during partial emission.

We could potentially optimize further by having an in memory stash if
data is small and only spilling to rocks after some threshold but in
absense of evidence I'm avoiding the complication.

Further we could conceivably update our Rocks wrapper to support
multiple CF, but again, I don't think it's super worth it for now.
@github-actions
Copy link

Thanks for opening this PR! Here are a few tips to help make the review process smooth for everyone.

PR title guidelines

  • Use imperative mood: "Fix X" not "Fixed X" or "Fixes X"
  • Be specific: "Fix panic in catalog sync when controller restarts" not "Fix bug" or "Update catalog code"
  • Prefix with area if helpful: compute: , storage: , adapter: , sql:

Pre-merge checklist

  • The PR title is descriptive and will make sense in the git log.
  • This PR has adequate test coverage / QA involvement has been duly considered. (trigger-ci for additional test/nightly runs)
  • If this PR includes major user-facing behavior changes, I have pinged the relevant PM to schedule a changelog post.
  • This PR has an associated up-to-date design doc, is a design doc (template), or is sufficiently small to not require a design.
  • If this PR evolves an existing $T ⇔ Proto$T mapping (possibly in a backwards-incompatible way), then it is tagged with a T-proto label.
  • If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label (example).

@DAlperin
Copy link
Member Author

bugbot run

@cursor
Copy link

cursor bot commented Mar 10, 2026

PR Summary

Medium Risk
Touches core UPSERT ingestion/state-management paths and changes buffering/dedup semantics and metrics; main risks are performance regressions or subtle ordering/frontier bugs under load.

Overview
Refactors the storage UPSERT operator to simplify control flow by removing the legacy upsert_classic path, dropping partial timestamp emission/provisional state writes, and always using continual-feedback processing.

Adds a RocksDB-backed on-disk UpsertStash (plus new stash-stage/drain metrics) to buffer source updates until they’re eligible, and extends the RocksDB async wrapper with prefix_scan (with tests) to efficiently drain stash entries by timestamp prefix.

Cleans up related configuration/metrics/test plumbing by removing the storage_upsert_*snapshot_buffering* dyncfgs, deleting upsert multi-put insert/update/delete counters, and updating failpoint/CI scenarios accordingly.

Written by Cursor Bugbot for commit a2a9cf0. This will update automatically on new commits. Configure here.

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Bugbot Autofix is ON, but it could not run because the branch was deleted or merged before autofix could start.

self.rocksdb.multi_update(batch).await?;
}

Ok(())
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stash bookkeeping diverges from RocksDB on write failure

Medium Severity

In stage_batch, the in-memory time_info map (count and prefix) is updated eagerly inside the for loop before the fallible self.rocksdb.multi_update(batch) call. If the RocksDB write fails and returns Err, the ? propagates the error but time_info already reflects the staged entries. This causes is_empty(), len(), and min_time() to return incorrect results. In the caller (upsert_continual_feedback), this leads to holding a stash_cap capability for phantom entries that don't exist in RocksDB, which can temporarily block frontier advancement. The inconsistency self-heals on the next drain (the prefix scan finds nothing and removes the phantom time entry), but the window of incorrect state could delay processing.

Fix in Cursor Fix in Web

@DAlperin DAlperin force-pushed the dov/refactor-upsert-source branch from 3ef9d6a to 6792b26 Compare March 11, 2026 08:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant