feat(cluster): wire multi-shard cross-shard communication for server-ng#3269
feat(cluster): wire multi-shard cross-shard communication for server-ng#3269hubcio wants to merge 1 commit into
Conversation
e0e289d to
84e79d3
Compare
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #3269 +/- ##
============================================
- Coverage 74.18% 71.27% -2.91%
Complexity 943 943
============================================
Files 1200 1201 +1
Lines 109645 105567 -4078
Branches 86535 82472 -4063
============================================
- Hits 81340 75246 -6094
- Misses 25567 27371 +1804
- Partials 2738 2950 +212
🚀 New features to boost your workflow:
|
92db4e4 to
4ee1f23
Compare
7ef14ed to
bcf8da7
Compare
There was a problem hiding this comment.
Commit, StartViewChange, DoViewChange, StartView (and Register requests) all carry Operation::Reserved / Operation::Register. Neither is in is_metadata() nor is_partition(), so route_typed falls through to route_consensus_control and hashes header.namespace. Metadata's consensus group is initialized with namespace = 0 (bootstrap.rs:1032), and Murmur3(0u64) >> 16 = 25477, so for every shard_count > 1, the frame is dispatched to a non-zero shard whose IggyMetadata.consensus is None. The receiver hits the fall-through arm and tracing::warn!-drops. VSR retransmit re-routes via the same hash and cluster wedges permanently.
core/server-ng bootstrapped exactly one shard with SHARD_ID=0 and senders=vec![sender] hardcoded; the multi-shard path was dead code. Cross-shard primitives copied from legacy core/server also did not fit core/shard's crossfire model (bounded MTx + try_send-or-drop, fd-transfer coordinator instead of SO_REUSEPORT). bootstrap() now spawns N OS threads from sharding.cpu_allocation, each pinned via nix::sched + hwlocality and running its own compio runtime + IggyMessageBus + IggyShard for the partitions hashed to that shard. Cross-thread shutdown rides an Arc<AtomicBool> polled by a per-shard watchdog, since the bus' Shutdown is !Send and cannot be triggered from the main thread directly. Partial shard-spawn failure and shard-thread panic now signal cluster-wide shutdown instead of hanging. ShardFrame becomes a concrete enum (Consensus + Lifecycle); the R generic is lifted off IggyShard. Named routers (route_metadata / route_partition / route_consensus_control) replace the duplicated MessageBag match blocks, and a debug_assert at pump entry catches receiver-thread mis-binding that the ctor's assert_sender_ordering cannot see. ShardMetrics records frame_drops_total (counter, variant+reason labels), bumped at every inter-shard try_send rejection and at the receiver-side delivery failure paths (partition-namespace miss, ForwardClientSend failure, consensus misroute). Without it, drop-and-recover under VSR retransmit is operationally indistinguishable from a livelock. The counter is atomic, so it is safe to bump from !Send compio reactor contexts. shard_id is NOT a label; each shard owns its own Family and the exporter must attach shard scope at the registry layer. The legacy shard-mapping broadcast subsystem (periodic snapshot refresh task, three-state MappingSlot table, ReplicaMappingUpdate / ReplicaMappingClear frames) is retired entirely. Cross-shard replica routing now flows through the cluster-shared ReplicaOwnerTable: the owning shard's installer stamps its slot on a successful registry insert and CAS-clears it on disconnect, so every bus' send_to_replica slow path reads authoritative state with no broadcast or reconcile loop. Builder accepts the coordinator at ctor; IggyShard stays immutable post-construction. message_bus forward-fn types widen to carry replica/client id, and send_to_replica routes via the shared owner table so non-zero shards reconcile against shared state rather than shard 0's private bus. Metadata recovery: shard 0 owns the WAL writer (via PrepareJournal::open) and the metadata consensus replica. Peer shards never open the WAL; they receive a MetadataHandoff::Waiter factory bundle broadcast by shard 0 over a single-shot crossfire channel and reconstruct their mux_stm from the in-memory snapshot it carries. broadcast_metadata_bundle / await_metadata_bundle poll a shutdown flag so a shard 0 that panics before broadcasting cannot strand peers. PrepareJournal::drain advances snapshot_op only AFTER the WAL rewrite sequence (tmp create -> write -> fsync -> rename -> fsync parent -> reopen) is durable. Advancing earlier would leave snapshot_op past entries still present on disk on any ? failure, letting a future append() pass the slot collision check and silently evict a live entry from the index. server-ng's main installs Ctrl-C with abort-on-failure: without a working SIGINT handler the shutdown flag would never flip and shard threads would park indefinitely. InvalidShardsCount split into ShardsCountZero (allocator produced zero shards) and ShardsCountOverflow (count past OWNER_NONE - 1) so the operator-facing message names the actual failure mode. Bootstrap namespace materialization stream-filters inside read(): heavy (Arc<TopicStats>, Partition) clones are kept only for partitions owned by this shard; non-owning entries are inserted straight into the shards_table from the closure. Partial-spawn survivor join fans out joiner threads so total wait is bounded by the slowest shard, not N * SHARD_SHUTDOWN_TIMEOUT. TODOs left in place for follow-ups: - shutdown watchdog is still .detach()'d; tracking it on the bus self-deadlocks because the watchdog drives bus.shutdown(). Needs a shared core/task_registry crate ported from core/server. - primary IggyMetadata::on_prepare pre-advances sequencer + checksum in push_prepare_entry before the journal append; an append failure leaves state pointing at a phantom op. Needs a rollback path in push_prepare_entry. Storage milestones (durable PartitionJournal, durable (view, commit_op) watermark) and SDK (client_id, request_id) durability across reconnect remain out of scope; tracked separately.
bcf8da7 to
e0067ca
Compare
|
good catch @krishvishal , should be fixed now. I added a sentinel router got an is_vsr_reserved arm in route_typed that short-circuits any frame with I think its good enough for now. |
core/server-ng bootstrapped exactly one shard with SHARD_ID=0 and
senders=vec![sender] hardcoded; the multi-shard path was dead code.
Cross-shard primitives copied from legacy core/server also did not
fit core/shard's crossfire model (bounded MTx + try_send-or-drop,
fd-transfer coordinator instead of SO_REUSEPORT).
bootstrap() now spawns N OS threads from sharding.cpu_allocation,
each pinned via nix::sched + hwlocality and running its own compio
runtime + IggyMessageBus + IggyShard for the partitions hashed to
that shard. Cross-thread shutdown rides an Arc polled by
a per-shard watchdog, since the bus' Shutdown is !Send and cannot be
triggered from the main thread directly. Partial shard-spawn failure
and shard-thread panic now signal cluster-wide shutdown instead of
hanging; the shutdown watchdog is detached from the bus drain.
ShardFrame becomes a concrete enum (Consensus + Lifecycle); the R
generic is lifted off IggyShard. Named routers (route_metadata /
route_partition / route_consensus_control) replace the duplicated
MessageBag match blocks, and a debug_assert at pump entry catches
receiver-thread mis-binding that the ctor's assert_sender_ordering
cannot see.
ShardMetrics records frame_drops_total (counter, variant+reason
labels), bumped at every inter-shard try_send rejection; without it,
drop-and-recover under VSR retransmit is operationally
indistinguishable from a livelock. The counter is atomic, so it is
safe to bump from !Send compio reactor contexts.
The legacy shard-mapping broadcast subsystem (periodic snapshot
refresh task, three-state MappingSlot table, ReplicaMappingUpdate /
ReplicaMappingClear frames) is retired entirely. Cross-shard replica
routing now flows through the cluster-shared ReplicaOwnerTable: the
owning shard's installer stamps its slot on a successful registry
insert and CAS-clears it on disconnect, so every bus' send_to_replica
slow path reads authoritative state with no broadcast or reconcile
loop. Builder accepts the coordinator at ctor; IggyShard stays
immutable post-construction.
message_bus forward-fn types widen to carry replica/client id, and
send_to_replica routes via the shared owner table so non-zero shards
reconcile against shared state rather than shard 0's private bus.
WAL recovery is serialized across shards: non-zero shards open the
WAL read-only, reject mutating ops (drain, set_snapshot_op) on a
read-only storage, route an invalid WAL header to truncate_or_fail,
and close the read-only fd once recovery completes.
Storage milestones (durable PartitionJournal, durable
(view, commit_op) watermark) and SDK (client_id, request_id)
durability across reconnect remain out of scope; tracked separately.