diff --git a/docs/client/networking.md b/docs/client/networking.md index 8160d6eb..3db1c00c 100644 --- a/docs/client/networking.md +++ b/docs/client/networking.md @@ -33,6 +33,7 @@ Each node entry contains an ENR. This is an Ethereum Node Record. It includes: - The node's public key - Network address - Port numbers +- Committee assignments (for aggregators) - Other metadata In production, dynamic discovery would replace static configuration. @@ -62,15 +63,35 @@ Messages are organized by topic. Topic names follow a pattern that includes: This structure lets clients subscribe to relevant messages and ignore others. +The payload carried in the gossipsub message is the SSZ-encoded, +Snappy-compressed message, which type is identified by the topic: + +| Topic Name | Message Type | Encoding | +|------------------------------------------------------------|-----------------------------|--------------| +| /leanconsensus/devnet3/blocks/ssz_snappy | SignedBlockWithAttestation | SSZ + Snappy | +| /leanconsensus/devnet3/attestations/ssz_snappy | SignedAttestation | SSZ + Snappy | +| /leanconsensus/devnet3/attestation\_{subnet_id}/ssz_snappy | SignedAttestation | SSZ + Snappy | +| /leanconsensus/devnet3/aggregation/ssz_snappy | SignedAggregatedAttestation | SSZ + Snappy | + ### Message Types -Two main message types exist: +Three main message types exist: + +- _Blocks_, defined by the `SignedBlockWithAttestation` type, are proposed by + validators and propagated on the block topic. Every node needs to see blocks + quickly. -Blocks are proposed by validators. They propagate on the block topic. Every -node needs to see blocks quickly. +- _Attestations_, defined by the `SignedAttestation` type, come from all + validators. They propagate on the global attestation topic. Additionally, + each committee has its own attestation topic. Validators publish to their + committee's attestation topic and global attestation topic. Non-aggregating + validators subscribe only to the global attestation topic, while aggregators + subscribe to both the global and their committee's attestation topic. -Attestations come from all validators. They propagate on the attestation topic. High volume -but small messages. +- _Committee aggregations_, defined by the `SignedAggregatedAttestation` type, + created by committee aggregators. These combine attestations from committee + members. Aggregations propagate on the aggregation topic to which every + validator subscribes. ### Encoding diff --git a/docs/client/validator.md b/docs/client/validator.md index 3284c4f2..ab68f10d 100644 --- a/docs/client/validator.md +++ b/docs/client/validator.md @@ -2,8 +2,9 @@ ## Overview -Validators participate in consensus by proposing blocks and producing attestations. This -document describes what honest validators do. +Validators participate in consensus by proposing blocks and producing attestations. +Optionally validators can opt-in to behave as aggregators in their committee. +This document describes what honest validators do. ## Validator Assignment @@ -16,6 +17,32 @@ diversity helps test interoperability. In production, validator assignment will work differently. The current approach is temporary for devnet testing. +## Attestation Committees and Subnets + +Attestation committee is a group of validators contributing to the common +aggregated attestations. Subnets are network channels dedicated to specific committees. + +In the devnet-3 design, however, there is one global subnet for signed +attestations propagation, in addition to publishing into per committee subnets. +This is due to 3SF-mini consensus design, that requires 2/3+ of all +attestations to be observed by any validator to compute safe target correctly. + +Note that non-aggregating validators do not need to subscribe to committee +attestation subnets. They only need to subscribe to the global attestation +subnet. + +Every validator is assigned to a single committee. Number of committees is +defined in config.yaml. Each committee maps to a subnet ID. Validator's +subnet ID is derived using their validator index modulo number of committees. +This is to simplify debugging and testing. In the future, validator's subnet ID +will be assigned randomly per epoch. + +## Aggregator assignment + +Some validators are self-assigned as aggregators. Aggregators collect and combine +attestations from other validators in their committee. To become an aggregator, +a validator sets `is_aggregator` flag to true as ENR record field. + ## Proposing Blocks Each slot has exactly one designated proposer. The proposer is determined by @@ -52,7 +79,7 @@ receive and validate it. ## Attesting -Every validator attestations in every slot. Attesting happens in the second interval, +Every validator attests in every slot. Attesting happens in the second interval, after proposals are made. ### What to Attest For @@ -78,8 +105,8 @@ compute the head. ### Broadcasting Attestations -Validators sign their attestations and broadcast them. The network uses a single topic -for all attestations. No subnets or committees in the current design. +Validators sign their attestations and broadcast them into the global +attestation topic and its corresponding subnet topic. ## Timing @@ -98,11 +125,7 @@ blocks and attestations. Attestation aggregation combines multiple attestations into one. This saves bandwidth and block space. -Devnet 0 has no aggregation. Each attestation is separate. Future devnets will add -aggregation. - -When aggregation is added, aggregators will collect attestations and combine them. -Aggregated attestations will be broadcast separately. +Devnet-3 introduces signatures aggregation. Aggregators will collect attestations and combine them. Aggregated attestations will be broadcast separately. ## Signature Handling diff --git a/packages/testing/src/consensus_testing/test_fixtures/fork_choice.py b/packages/testing/src/consensus_testing/test_fixtures/fork_choice.py index b0ed9e21..4239c962 100644 --- a/packages/testing/src/consensus_testing/test_fixtures/fork_choice.py +++ b/packages/testing/src/consensus_testing/test_fixtures/fork_choice.py @@ -15,6 +15,7 @@ from lean_spec.subspecs.containers.attestation import ( Attestation, AttestationData, + SignedAttestation, ) from lean_spec.subspecs.containers.block import ( Block, @@ -50,6 +51,8 @@ ) from .base import BaseConsensusFixture +DEFAULT_VALIDATOR_ID = ValidatorIndex(0) + class ForkChoiceTest(BaseConsensusFixture): """ @@ -210,8 +213,9 @@ def make_fixture(self) -> Self: # The Store is the node's local view of the chain. # It starts from a trusted anchor (usually genesis). store = Store.get_forkchoice_store( - state=self.anchor_state, + anchor_state=self.anchor_state, anchor_block=self.anchor_block, + validator_id=DEFAULT_VALIDATOR_ID, ) # Block registry for fork creation @@ -261,7 +265,11 @@ def make_fixture(self) -> Self: # Process the block through Store. # This validates, applies state transition, and updates head. - store = store.on_block(signed_block, LEAN_ENV_TO_SCHEMES[self.lean_env]) + store = store.on_block( + signed_block, + current_validator=DEFAULT_VALIDATOR_ID, + scheme=LEAN_ENV_TO_SCHEMES[self.lean_env], + ) elif isinstance(step, AttestationStep): # Process a gossip attestation. @@ -356,33 +364,85 @@ def _build_block_from_spec( # # Attestations vote for blocks and influence fork choice weight. # The spec may include attestations to include in this block. - attestations, attestation_signatures = self._build_attestations_from_spec( - spec, store, block_registry, parent_root, key_manager + attestations, attestation_signatures, valid_signature_keys = ( + self._build_attestations_from_spec( + spec, store, block_registry, parent_root, key_manager + ) ) - # Merge new attestation signatures with existing gossip signatures. - # These are needed for signature aggregation later. - gossip_signatures = dict(store.gossip_signatures) - gossip_signatures.update(attestation_signatures) + # Merge per-attestation signatures into the Store's gossip signature cache. + # Required so the Store can aggregate committee signatures later when building payloads. + working_store = store + for attestation in attestations: + sig_key = SignatureKey(attestation.validator_id, attestation.data.data_root_bytes()) + if sig_key not in valid_signature_keys: + continue + signature = attestation_signatures.get(sig_key) + if signature is None: + continue + signed_attestation = SignedAttestation( + validator_id=attestation.validator_id, + message=attestation.data, + signature=signature, + ) + working_store = working_store.on_gossip_attestation( + signed_attestation, + scheme=LEAN_ENV_TO_SCHEMES[self.lean_env], + is_aggregator=True, + ) - # Collect attestations from the store if requested. + # Prepare attestations and aggregated payloads for block construction. # - # Previous proposers' attestations become available for inclusion. - # This makes test vectors more realistic. - available_attestations: list[Attestation] | None = None + # Two sources of attestations: + # 1. Explicit attestations from the spec (always included) + # 2. Store attestations (only if include_store_attestations is True) + # + # For all attestations, we need to create aggregated proofs + # so build_block can include them in the block body. + # Attestations with the same data should be merged into a single proof. + available_attestations: list[Attestation] known_block_roots: set[Bytes32] | None = None + aggregated_payloads = dict(store.aggregated_payloads) if store.aggregated_payloads else {} + + # Collect all attestations that need aggregated proofs + all_attestations_for_proofs: list[Attestation] = list(attestations) + if spec.include_store_attestations: # Gather all attestations: both active and recently received. - available_attestations = [ + store_attestations = [ Attestation(validator_id=vid, data=data) for vid, data in store.latest_known_attestations.items() ] - available_attestations.extend( + store_attestations.extend( Attestation(validator_id=vid, data=data) for vid, data in store.latest_new_attestations.items() ) + + # Add store attestations to the list for proof creation + all_attestations_for_proofs.extend(store_attestations) + + # Combine for block construction + available_attestations = store_attestations + attestations known_block_roots = set(store.blocks.keys()) + else: + # Use only explicit attestations from the spec + available_attestations = attestations + + # Build aggregated proofs via Store aggregation logic. + attestation_map = { + attestation.validator_id: attestation.data + for attestation in all_attestations_for_proofs + } + aggregation_store = working_store.model_copy( + update={ + "head": parent_root, + "latest_new_attestations": attestation_map, + "aggregated_payloads": aggregated_payloads, + } + ) + aggregation_store = aggregation_store.aggregate_committee_signatures() + aggregated_payloads = aggregation_store.aggregated_payloads # Build the block using spec logic # @@ -393,11 +453,10 @@ def _build_block_from_spec( slot=spec.slot, proposer_index=proposer_index, parent_root=parent_root, - attestations=attestations, + attestations=available_attestations, available_attestations=available_attestations, known_block_roots=known_block_roots, - gossip_signatures=gossip_signatures, - aggregated_payloads=store.aggregated_payloads, + aggregated_payloads=aggregated_payloads, ) # Create proposer attestation @@ -505,7 +564,7 @@ def _build_attestations_from_spec( block_registry: dict[str, Block], parent_root: Bytes32, key_manager: XmssKeyManager, - ) -> tuple[list[Attestation], dict[SignatureKey, Signature]]: + ) -> tuple[list[Attestation], dict[SignatureKey, Signature], set[SignatureKey]]: """ Build attestations and signatures from block specification. @@ -521,15 +580,16 @@ def _build_attestations_from_spec( key_manager: Key manager for signing. Returns: - Tuple of (attestations list, signature lookup dict). + Tuple of (attestations list, signature lookup dict, valid signature keys). """ # No attestations specified means empty block body. if spec.attestations is None: - return [], {} + return [], {}, set() parent_state = store.states[parent_root] attestations = [] signature_lookup: dict[SignatureKey, Signature] = {} + valid_signature_keys: set[SignatureKey] = set() for aggregated_spec in spec.attestations: # Build attestation data once. @@ -567,8 +627,10 @@ def _build_attestations_from_spec( # This enables lookup during signature aggregation. sig_key = SignatureKey(validator_id, attestation_data.data_root_bytes()) signature_lookup[sig_key] = signature + if aggregated_spec.valid_signature: + valid_signature_keys.add(sig_key) - return attestations, signature_lookup + return attestations, signature_lookup, valid_signature_keys def _build_attestation_data_from_spec( self, diff --git a/packages/testing/src/consensus_testing/test_fixtures/state_transition.py b/packages/testing/src/consensus_testing/test_fixtures/state_transition.py index 04cd2a9c..f1097447 100644 --- a/packages/testing/src/consensus_testing/test_fixtures/state_transition.py +++ b/packages/testing/src/consensus_testing/test_fixtures/state_transition.py @@ -10,10 +10,8 @@ from lean_spec.subspecs.containers.state.state import State from lean_spec.subspecs.containers.validator import ValidatorIndex from lean_spec.subspecs.ssz.hash import hash_tree_root -from lean_spec.subspecs.xmss.aggregation import SignatureKey from lean_spec.types import Bytes32 -from ..keys import get_shared_key_manager from ..test_types import BlockSpec, StateExpectation from .base import BaseConsensusFixture @@ -263,26 +261,11 @@ def _build_block_from_spec(self, spec: BlockSpec, state: State) -> tuple[Block, for vid in agg.aggregation_bits.to_validator_indices() ] - if plain_attestations: - key_manager = get_shared_key_manager(max_slot=spec.slot) - gossip_signatures = { - SignatureKey( - att.validator_id, att.data.data_root_bytes() - ): key_manager.sign_attestation_data( - att.validator_id, - att.data, - ) - for att in plain_attestations - } - else: - gossip_signatures = {} - block, post_state, _, _ = state.build_block( slot=spec.slot, proposer_index=proposer_index, parent_root=parent_root, attestations=plain_attestations, - gossip_signatures=gossip_signatures, aggregated_payloads={}, ) return block, post_state diff --git a/packages/testing/src/consensus_testing/test_fixtures/verify_signatures.py b/packages/testing/src/consensus_testing/test_fixtures/verify_signatures.py index f11aad4e..a4ec903b 100644 --- a/packages/testing/src/consensus_testing/test_fixtures/verify_signatures.py +++ b/packages/testing/src/consensus_testing/test_fixtures/verify_signatures.py @@ -26,7 +26,7 @@ from lean_spec.subspecs.containers.validator import ValidatorIndex from lean_spec.subspecs.koalabear import Fp from lean_spec.subspecs.ssz import hash_tree_root -from lean_spec.subspecs.xmss.aggregation import AggregatedSignatureProof, SignatureKey +from lean_spec.subspecs.xmss.aggregation import AggregatedSignatureProof from lean_spec.subspecs.xmss.constants import TARGET_CONFIG from lean_spec.subspecs.xmss.containers import Signature from lean_spec.subspecs.xmss.types import ( @@ -233,19 +233,12 @@ def _build_block_from_spec( spec, state, key_manager ) - # Provide signatures to State.build_block for valid attestations - gossip_signatures = { - SignatureKey(att.validator_id, att.data.data_root_bytes()): sig - for att, sig in zip(valid_attestations, valid_signatures, strict=True) - } - # Use State.build_block for valid attestations (pure spec logic) final_block, _, _, aggregated_signatures = state.build_block( slot=spec.slot, proposer_index=proposer_index, parent_root=parent_root, attestations=valid_attestations, - gossip_signatures=gossip_signatures, aggregated_payloads={}, ) diff --git a/src/lean_spec/__main__.py b/src/lean_spec/__main__.py index 28390133..7e638bd2 100644 --- a/src/lean_spec/__main__.py +++ b/src/lean_spec/__main__.py @@ -34,7 +34,7 @@ from lean_spec.subspecs.networking.client import LiveNetworkEventSource from lean_spec.subspecs.networking.gossipsub import GossipTopic from lean_spec.subspecs.networking.reqresp.message import Status -from lean_spec.subspecs.node import Node, NodeConfig +from lean_spec.subspecs.node import Node, NodeConfig, get_local_validator_id from lean_spec.subspecs.ssz.hash import hash_tree_root from lean_spec.subspecs.validator import ValidatorRegistry from lean_spec.types import Bytes32, Uint64 @@ -263,7 +263,8 @@ async def _init_from_checkpoint( # # The store treats this as the new "genesis" for fork choice purposes. # All blocks before the checkpoint are effectively pruned. - store = Store.get_forkchoice_store(state, anchor_block) + validator_id = get_local_validator_id(validator_registry) + store = Store.get_forkchoice_store(state, anchor_block, validator_id) logger.info( "Initialized from checkpoint at slot %d (finalized=%s)", state.slot, diff --git a/src/lean_spec/subspecs/chain/config.py b/src/lean_spec/subspecs/chain/config.py index 98e1dbf7..5ce616b4 100644 --- a/src/lean_spec/subspecs/chain/config.py +++ b/src/lean_spec/subspecs/chain/config.py @@ -30,3 +30,6 @@ VALIDATOR_REGISTRY_LIMIT: Final = Uint64(2**12) """The maximum number of validators that can be in the registry.""" + +ATTESTATION_COMMITTEE_COUNT: Final = Uint64(1) +"""The number of attestation committees per slot.""" diff --git a/src/lean_spec/subspecs/containers/__init__.py b/src/lean_spec/subspecs/containers/__init__.py index 263e6dd7..4a269a68 100644 --- a/src/lean_spec/subspecs/containers/__init__.py +++ b/src/lean_spec/subspecs/containers/__init__.py @@ -12,6 +12,7 @@ AggregatedAttestation, Attestation, AttestationData, + SignedAggregatedAttestation, SignedAttestation, ) from .block import ( @@ -37,6 +38,7 @@ "BlockWithAttestation", "Checkpoint", "Config", + "SignedAggregatedAttestation", "SignedAttestation", "SignedBlockWithAttestation", "Slot", diff --git a/src/lean_spec/subspecs/containers/attestation/__init__.py b/src/lean_spec/subspecs/containers/attestation/__init__.py index febbf61e..8a2c4537 100644 --- a/src/lean_spec/subspecs/containers/attestation/__init__.py +++ b/src/lean_spec/subspecs/containers/attestation/__init__.py @@ -5,6 +5,7 @@ AggregatedAttestation, Attestation, AttestationData, + SignedAggregatedAttestation, SignedAttestation, ) @@ -13,5 +14,6 @@ "AggregationBits", "Attestation", "AttestationData", + "SignedAggregatedAttestation", "SignedAttestation", ] diff --git a/src/lean_spec/subspecs/containers/attestation/attestation.py b/src/lean_spec/subspecs/containers/attestation/attestation.py index be9d0613..683310f7 100644 --- a/src/lean_spec/subspecs/containers/attestation/attestation.py +++ b/src/lean_spec/subspecs/containers/attestation/attestation.py @@ -21,6 +21,7 @@ from lean_spec.subspecs.ssz import hash_tree_root from lean_spec.types import Bytes32, Container +from ...xmss.aggregation import AggregatedSignatureProof from ...xmss.containers import Signature from ..checkpoint import Checkpoint from .aggregation_bits import AggregationBits @@ -108,3 +109,17 @@ def aggregate_by_data( ) for data, validator_ids in data_to_validator_ids.items() ] + + +class SignedAggregatedAttestation(Container): + """ + A signed aggregated attestation for broadcasting. + + Contains the attestation data and the aggregated signature proof. + """ + + data: AttestationData + """Combined attestation data similar to the beacon chain format.""" + + proof: AggregatedSignatureProof + """Aggregated signature proof covering all participating validators.""" diff --git a/src/lean_spec/subspecs/containers/state/state.py b/src/lean_spec/subspecs/containers/state/state.py index 021903f9..aa38f8a3 100644 --- a/src/lean_spec/subspecs/containers/state/state.py +++ b/src/lean_spec/subspecs/containers/state/state.py @@ -1,6 +1,6 @@ """State Container for the Lean Ethereum consensus specification.""" -from typing import AbstractSet, Iterable +from typing import AbstractSet, Collection, Iterable from lean_spec.subspecs.ssz.hash import hash_tree_root from lean_spec.subspecs.xmss.aggregation import ( @@ -656,7 +656,6 @@ def build_block( attestations: list[Attestation] | None = None, available_attestations: Iterable[Attestation] | None = None, known_block_roots: AbstractSet[Bytes32] | None = None, - gossip_signatures: dict[SignatureKey, "Signature"] | None = None, aggregated_payloads: dict[SignatureKey, list[AggregatedSignatureProof]] | None = None, ) -> tuple[Block, "State", list[AggregatedAttestation], list[AggregatedSignatureProof]]: """ @@ -736,14 +735,13 @@ def build_block( continue # We can only include an attestation if we have some way to later provide - # an aggregated proof for its group: - # - either a per validator XMSS signature from gossip, or - # - at least one aggregated proof learned from a block that references - # this validator+data. - has_gossip_sig = bool(gossip_signatures and sig_key in gossip_signatures) + # an aggregated proof for its group. + # + # We strictly rely on existing aggregated proofs learned from blocks. + # We do NOT aggregate fresh gossip signatures during block production. has_block_proof = bool(aggregated_payloads and sig_key in aggregated_payloads) - if has_gossip_sig or has_block_proof: + if has_block_proof: new_attestations.append(attestation) # Fixed point reached: no new attestations found @@ -753,15 +751,13 @@ def build_block( # Add new attestations and continue iteration attestations.extend(new_attestations) - # Compute the aggregated signatures for the attestations. - # If the attestations cannot be aggregated, split it in a greedy way. - aggregated_attestations, aggregated_signatures = self.compute_aggregated_signatures( + # Select aggregated attestations and proofs for the final block. + aggregated_attestations, aggregated_signatures = self.select_aggregated_proofs( attestations, - gossip_signatures, - aggregated_payloads, + aggregated_payloads=aggregated_payloads, ) - # Create the final block with aggregated attestations + # Create the final block with aggregated attestations and proofs final_block = Block( slot=slot, proposer_index=proposer_index, @@ -781,49 +777,37 @@ def build_block( return final_block, post_state, aggregated_attestations, aggregated_signatures - def compute_aggregated_signatures( + def aggregate_gossip_signatures( self, - attestations: list[Attestation], + attestations: Collection[Attestation], gossip_signatures: dict[SignatureKey, "Signature"] | None = None, - aggregated_payloads: dict[SignatureKey, list[AggregatedSignatureProof]] | None = None, - ) -> tuple[list[AggregatedAttestation], list[AggregatedSignatureProof]]: + ) -> list[tuple[AggregatedAttestation, AggregatedSignatureProof]]: """ - Compute aggregated signatures for a set of attestations. - - This method implements a two-phase signature collection strategy: - - 1. **Gossip Phase**: For each attestation group, first attempt to collect - individual XMSS signatures from the gossip network. These are fresh - signatures that validators broadcast when they attest. - - 2. **Fallback Phase**: For any validators not covered by gossip, fall back - to previously-seen aggregated proofs from blocks. This uses a greedy - set-cover approach to minimize the number of proofs needed. + Collect aggregated signatures from gossip network and aggregate them. - The result is a list of (attestation, proof) pairs ready for block inclusion. + For each attestation group, attempt to collect individual XMSS signatures + from the gossip network. These are fresh signatures that validators + broadcast when they attest. Parameters ---------- - attestations : list[Attestation] + attestations : Collection[Attestation] Individual attestations to aggregate and sign. gossip_signatures : dict[SignatureKey, Signature] | None Per-validator XMSS signatures learned from the gossip network. - aggregated_payloads : dict[SignatureKey, list[AggregatedSignatureProof]] | None - Aggregated proofs learned from previously-seen blocks. Returns: ------- - tuple[list[AggregatedAttestation], list[AggregatedSignatureProof]] - Paired attestations and their corresponding proofs. + list[tuple[AggregatedAttestation, AggregatedSignatureProof]] + - List of (attestation, proof) pairs from gossip collection. """ - # Accumulator for (attestation, proof) pairs. results: list[tuple[AggregatedAttestation, AggregatedSignatureProof]] = [] # Group individual attestations by data # # Multiple validators may attest to the same data (slot, head, target, source). # We aggregate them into groups so each group can share a single proof. - for aggregated in AggregatedAttestation.aggregate_by_data(attestations): + for aggregated in AggregatedAttestation.aggregate_by_data(list(attestations)): # Extract the common attestation data and its hash. # # All validators in this group signed the same message (the data root). @@ -833,8 +817,6 @@ def compute_aggregated_signatures( # Get the list of validators who attested to this data. validator_ids = aggregated.aggregation_bits.to_validator_indices() - # Phase 1: Gossip Collection - # # When a validator creates an attestation, it broadcasts the # individual XMSS signature over the gossip network. If we have # received these signatures, we can aggregate them ourselves. @@ -849,13 +831,11 @@ def compute_aggregated_signatures( # Track validators we couldn't find signatures for. # # These will need to be covered by Phase 2 (existing proofs). - remaining: set[ValidatorIndex] = set() # Attempt to collect each validator's signature from gossip. # # Signatures are keyed by (validator ID, data root). # - If a signature exists, we add it to our collection. - # - Otherwise, we mark that validator as "remaining" for the fallback phase. if gossip_signatures: for vid in validator_ids: key = SignatureKey(vid, data_root) @@ -864,12 +844,6 @@ def compute_aggregated_signatures( gossip_sigs.append(sig) gossip_keys.append(self.validators[vid].get_pubkey()) gossip_ids.append(vid) - else: - # No signature available: mark for fallback coverage. - remaining.add(vid) - else: - # No gossip data at all: all validators need fallback coverage. - remaining = set(validator_ids) # If we collected any gossip signatures, aggregate them into a proof. # @@ -884,14 +858,88 @@ def compute_aggregated_signatures( message=data_root, epoch=data.slot, ) - results.append( - ( - AggregatedAttestation(aggregation_bits=participants, data=data), - proof, - ) - ) + attestation = AggregatedAttestation(aggregation_bits=participants, data=data) + results.append((attestation, proof)) + + return results + + def compute_aggregated_signatures( + self, + attestations: list[Attestation], + gossip_signatures: dict[SignatureKey, "Signature"] | None = None, + aggregated_payloads: dict[SignatureKey, list[AggregatedSignatureProof]] | None = None, + ) -> tuple[list[AggregatedAttestation], list[AggregatedSignatureProof]]: + """ + Backwards-compatible wrapper for signature aggregation. + + Older code/tests expect a single method that returns two parallel lists: + (aggregated_attestations, aggregated_proofs). + + The current implementation separates: + - `aggregate_gossip_signatures` (fresh per-validator signatures collected via gossip) + - `select_aggregated_proofs` (reusing previously-seen aggregated proofs from blocks) + """ + results = self.aggregate_gossip_signatures( + attestations, gossip_signatures=gossip_signatures + ) + if aggregated_payloads: + # Note: This may add additional proofs for the same attestation data. + # Callers that rely on strict minimality should use the split APIs. + fallback_atts, fallback_proofs = self.select_aggregated_proofs( + attestations, aggregated_payloads=aggregated_payloads + ) + results.extend(zip(fallback_atts, fallback_proofs, strict=True)) - # Phase 2: Fallback to existing proofs + if not results: + return [], [] + + atts, proofs = zip(*results, strict=True) + return list(atts), list(proofs) + + def select_aggregated_proofs( + self, + attestations: list[Attestation], + aggregated_payloads: dict[SignatureKey, list[AggregatedSignatureProof]] | None = None, + ) -> tuple[list[AggregatedAttestation], list[AggregatedSignatureProof]]: + """ + Select aggregated proofs for a set of attestations. + + This method selects aggregated proofs from aggregated_payloads, + prioritizing proofs from the most recent blocks. + + Strategy: + 1. For each attestation group, aggregate as many signatures as possible + from the most recent block's proofs. + 2. If remaining validators exist after step 1, include proofs from + previous blocks that cover them. + + Parameters: + ---------- + attestations : list[Attestation] + Individual attestations to aggregate and sign. + aggregated_payloads : dict[SignatureKey, list[AggregatedSignatureProof]] | None + Aggregated proofs learned from previously-seen blocks. + The list for each key should be ordered with most recent proofs first. + + Returns: + ------- + tuple[list[AggregatedAttestation], list[AggregatedSignatureProof]] + Paired attestations and their corresponding proofs. + """ + results: list[tuple[AggregatedAttestation, AggregatedSignatureProof]] = [] + + # Group individual attestations by data + for aggregated in AggregatedAttestation.aggregate_by_data(attestations): + data = aggregated.data + data_root = data.data_root_bytes() + validator_ids = ( + aggregated.aggregation_bits.to_validator_indices() + ) # validators contributed to this attestation + + # Validators that are missing in the current aggregation are put into remaining. + remaining: set[Uint64] = set(validator_ids) + + # Fallback to existing proofs # # Some validators may not have broadcast their signatures over gossip, # but we might have seen proofs for them in previously-received blocks. @@ -969,11 +1017,6 @@ def compute_aggregated_signatures( remaining -= covered # Final Assembly - # - # - We built a list of (attestation, proof) tuples. - # - Now we unzip them into two parallel lists for the return value. - - # Handle the empty case explicitly. if not results: return [], [] diff --git a/src/lean_spec/subspecs/forkchoice/store.py b/src/lean_spec/subspecs/forkchoice/store.py index c536b667..93bfb8f0 100644 --- a/src/lean_spec/subspecs/forkchoice/store.py +++ b/src/lean_spec/subspecs/forkchoice/store.py @@ -15,6 +15,7 @@ from collections import defaultdict from lean_spec.subspecs.chain.config import ( + ATTESTATION_COMMITTEE_COUNT, INTERVALS_PER_SLOT, JUSTIFICATION_LOOKBACK_SLOTS, SECONDS_PER_INTERVAL, @@ -31,11 +32,14 @@ State, ValidatorIndex, ) +from lean_spec.subspecs.containers.attestation.attestation import SignedAggregatedAttestation from lean_spec.subspecs.containers.block import BlockLookup from lean_spec.subspecs.containers.slot import Slot +from lean_spec.subspecs.networking import compute_subnet_id from lean_spec.subspecs.ssz.hash import hash_tree_root from lean_spec.subspecs.xmss.aggregation import ( AggregatedSignatureProof, + AggregationError, SignatureKey, ) from lean_spec.subspecs.xmss.containers import Signature @@ -124,6 +128,9 @@ class Store(Container): `Store`'s latest justified and latest finalized checkpoints. """ + validator_id: ValidatorIndex | None + """Index of the validator running this store instance.""" + latest_known_attestations: dict[ValidatorIndex, AttestationData] = {} """ Latest attestation data by validator that have been processed. @@ -145,7 +152,7 @@ class Store(Container): gossip_signatures: dict[SignatureKey, Signature] = {} """ - Per-validator XMSS signatures learned from gossip. + Per-validator XMSS signatures learned from committee attesters. Keyed by SignatureKey(validator_id, attestation_data_root). """ @@ -162,7 +169,12 @@ class Store(Container): """ @classmethod - def get_forkchoice_store(cls, state: State, anchor_block: Block) -> "Store": + def get_forkchoice_store( + cls, + anchor_state: State, + anchor_block: Block, + validator_id: ValidatorIndex | None, + ) -> "Store": """ Initialize forkchoice store from an anchor state and block. @@ -170,10 +182,9 @@ def get_forkchoice_store(cls, state: State, anchor_block: Block) -> "Store": We treat this anchor as both justified and finalized. Args: - state: - The trusted post-state corresponding to the anchor block. - anchor_block: - The trusted block acting as the initial chain root. + anchor_state: The state corresponding to the anchor block. + anchor_block: A trusted block (e.g. genesis or checkpoint). + validator_id: Index of the validator running this store. Returns: A new Store instance, ready to accept blocks and attestations. @@ -186,7 +197,7 @@ def get_forkchoice_store(cls, state: State, anchor_block: Block) -> "Store": # Compute the SSZ root of the given state. # # This is the canonical hash that should appear in the block's state root. - computed_state_root = hash_tree_root(state) + computed_state_root = hash_tree_root(anchor_state) # Check that the block actually points to this state. # @@ -209,17 +220,22 @@ def get_forkchoice_store(cls, state: State, anchor_block: Block) -> "Store": # Build an initial checkpoint using the anchor block. # # Both the root and the slot come directly from the anchor. - anchor_checkpoint = Checkpoint(root=anchor_root, slot=anchor_slot) + # Initialize checkpoints from the anchor state + # + # We explicitly set the root to the anchor block root. + # The anchor state internally might have zero-hash checkpoints (if genesis), + # but the Store must treat the anchor block as the justified/finalized point. return cls( time=Uint64(anchor_slot * INTERVALS_PER_SLOT), - config=state.config, + config=anchor_state.config, head=anchor_root, safe_target=anchor_root, - latest_justified=anchor_checkpoint, - latest_finalized=anchor_checkpoint, - blocks={anchor_root: copy.copy(anchor_block)}, - states={anchor_root: copy.copy(state)}, + latest_justified=anchor_state.latest_justified.model_copy(update={"root": anchor_root}), + latest_finalized=anchor_state.latest_finalized.model_copy(update={"root": anchor_root}), + blocks={anchor_root: anchor_block}, + states={anchor_root: anchor_state}, + validator_id=validator_id, ) def validate_attestation(self, attestation: Attestation) -> None: @@ -270,18 +286,21 @@ def on_gossip_attestation( self, signed_attestation: SignedAttestation, scheme: GeneralizedXmssScheme = TARGET_SIGNATURE_SCHEME, + is_aggregator: bool = False, ) -> "Store": """ Process a signed attestation received via gossip network. This method: 1. Verifies the XMSS signature - 2. Stores the signature in the gossip signature map + 2. If current node is aggregator, stores the signature in the gossip + signature map if it belongs to the current validator's subnet 3. Processes the attestation data via on_attestation Args: signed_attestation: The signed attestation from gossip. scheme: XMSS signature scheme for verification. + is_aggregator: True if current validator holds aggregator role. Returns: New Store with attestation processed and signature stored. @@ -313,16 +332,27 @@ def on_gossip_attestation( public_key, attestation_data.slot, attestation_data.data_root_bytes(), scheme ), "Signature verification failed" - # Store signature for later lookup during block building - new_gossip_sigs = dict(self.gossip_signatures) - sig_key = SignatureKey(validator_id, attestation_data.data_root_bytes()) - new_gossip_sigs[sig_key] = signature + # Store signature for later aggregation if applicable. + # + new_commitee_sigs = dict(self.gossip_signatures) + if is_aggregator: + assert self.validator_id is not None, "Current validator ID must be set for aggregation" + current_validator_subnet = compute_subnet_id( + self.validator_id, ATTESTATION_COMMITTEE_COUNT + ) + attester_subnet = compute_subnet_id(validator_id, ATTESTATION_COMMITTEE_COUNT) + if current_validator_subnet != attester_subnet: + # Not part of our committee; ignore for committee aggregation. + pass + else: + sig_key = SignatureKey(validator_id, attestation_data.data_root_bytes()) + new_commitee_sigs[sig_key] = signature # Process the attestation data store = self.on_attestation(attestation=attestation, is_from_block=False) - # Return store with updated signature map - return store.model_copy(update={"gossip_signatures": new_gossip_sigs}) + # Return store with updated signature maps + return store.model_copy(update={"gossip_signatures": new_commitee_sigs}) def on_attestation( self, @@ -454,9 +484,90 @@ def on_attestation( } ) + def on_gossip_aggregated_attestation( + self, signed_attestation: SignedAggregatedAttestation + ) -> "Store": + """ + Process a signed aggregated attestation received via aggregation topic + + This method: + 1. Verifies the aggregated attestation + 2. Stores the aggregation in aggregation_payloads map + + Args: + signed_attestation: The signed aggregated attestation from committee aggregation. + + Returns: + New Store with aggregation processed and stored. + + Raises: + ValueError: If validator not found in state. + AssertionError: If signature verification fails. + """ + data = signed_attestation.data + proof = signed_attestation.proof + + # Get validator IDs who participated in this aggregation + validator_ids = proof.participants.to_validator_indices() + + # Retrieve the relevant state to look up public keys for verification. + key_state = self.states.get(data.target.root) + assert key_state is not None, ( + f"No state available to verify committee aggregation for target " + f"{data.target.root.hex()}" + ) + + # Ensure all participants exist in the active set + validators = key_state.validators + for validator_id in validator_ids: + assert validator_id < ValidatorIndex(len(validators)), ( + f"Validator {validator_id} not found in state {data.target.root.hex()}" + ) + + # Prepare public keys for verification + public_keys = [validators[vid].get_pubkey() for vid in validator_ids] + + # Verify the leanVM aggregated proof + try: + proof.verify( + public_keys=public_keys, + message=data.data_root_bytes(), + epoch=data.slot, + ) + except AggregationError as exc: + raise AssertionError( + f"Committee aggregation signature verification failed: {exc}" + ) from exc + + # Copy the aggregated proof map for updates + # Must deep copy the lists to maintain immutability of previous store snapshots + new_aggregated_payloads = copy.deepcopy(self.aggregated_payloads) + data_root = data.data_root_bytes() + + store = self + for vid in validator_ids: + # Update Proof Map + # + # Store the proof so future block builders can reuse this aggregation + key = SignatureKey(vid, data_root) + new_aggregated_payloads.setdefault(key, []).append(proof) + + # Process the attestation data. Since it's from gossip, is_from_block=False. + # Note, we could have already processed individual attestations from this aggregation, + # during votes propagation into attestation topic, but it's safe to re-process here as + # on_attestation has idempotent behavior. + store = store.on_attestation( + attestation=Attestation(validator_id=vid, data=data), + is_from_block=False, + ) + + # Return store with updated aggregated payloads + return store.model_copy(update={"aggregated_payloads": new_aggregated_payloads}) + def on_block( self, signed_block_with_attestation: SignedBlockWithAttestation, + current_validator: ValidatorIndex | None = None, scheme: GeneralizedXmssScheme = TARGET_SIGNATURE_SCHEME, ) -> "Store": """ @@ -492,6 +603,7 @@ def on_block( Args: signed_block_with_attestation: Complete signed block with proposer attestation. + current_validator: Index of the current validator processing this block. scheme: XMSS signature scheme to use for signature verification. Returns: @@ -574,8 +686,6 @@ def on_block( key = SignatureKey(vid, data_root) new_block_proofs.setdefault(key, []).append(proof) - # Update Fork Choice - # # Register the vote immediately (historical/on-chain) store = store.on_attestation( attestation=Attestation(validator_id=vid, data=att.data), @@ -598,16 +708,27 @@ def on_block( # 1. NOT affect this block's fork choice position (processed as "new") # 2. Be available for inclusion in future blocks # 3. Influence fork choice only after interval 3 (end of slot) - # - # We also store the proposer's signature for potential future block building. - proposer_sig_key = SignatureKey( - proposer_attestation.validator_id, - proposer_attestation.data.data_root_bytes(), - ) + new_gossip_sigs = dict(store.gossip_signatures) - new_gossip_sigs[proposer_sig_key] = ( - signed_block_with_attestation.signature.proposer_signature - ) + + # Store proposer signature for future lookup if it belongs to the same committee + # as the current validator (if provided). + if current_validator is not None: + proposer_validator_id = proposer_attestation.validator_id + proposer_subnet_id = compute_subnet_id( + proposer_validator_id, ATTESTATION_COMMITTEE_COUNT + ) + current_validator_subnet_id = compute_subnet_id( + current_validator, ATTESTATION_COMMITTEE_COUNT + ) + if proposer_subnet_id == current_validator_subnet_id: + proposer_sig_key = SignatureKey( + proposer_attestation.validator_id, + proposer_attestation.data.data_root_bytes(), + ) + new_gossip_sigs[proposer_sig_key] = ( + signed_block_with_attestation.signature.proposer_signature + ) store = store.on_attestation( attestation=proposer_attestation, @@ -760,7 +881,7 @@ def accept_new_attestations(self) -> "Store": - Interval 0: Block proposal - Interval 1: Validators cast attestations (enter "new") - Interval 2: Safe target update - - Interval 3: Attestations accepted (move to "known") + - Interval 3: Process accumulated attestations This staged progression ensures proper timing and prevents premature influence on fork choice decisions. @@ -814,7 +935,51 @@ def update_safe_target(self) -> "Store": return self.model_copy(update={"safe_target": safe_target}) - def tick_interval(self, has_proposal: bool) -> "Store": + def aggregate_committee_signatures(self) -> "Store": + """ + Aggregate committee signatures for attestations in committee_signatures. + + This method aggregates signatures from the gossip_signatures map + + Returns: + New Store with updated aggregated_payloads. + """ + new_aggregated_payloads = dict(self.aggregated_payloads) + + attestations = self.latest_new_attestations + committee_signatures = self.gossip_signatures + + attestation_list = [ + Attestation(validator_id=vid, data=data) for vid, data in attestations.items() + ] + + head_state = self.states[self.head] + # Perform aggregation + aggregated_results = head_state.aggregate_gossip_signatures( + attestation_list, + committee_signatures, + ) + + # iterate to broadcast aggregated attestations + for aggregated_attestation, aggregated_signature in aggregated_results: + _ = SignedAggregatedAttestation( + data=aggregated_attestation.data, + proof=aggregated_signature, + ) + # Note: here we should broadcast the aggregated signature to committee_aggregators topic + + # Compute new aggregated payloads + for aggregated_attestation, aggregated_signature in aggregated_results: + data_root = aggregated_attestation.data.data_root_bytes() + validator_ids = aggregated_signature.participants.to_validator_indices() + for vid in validator_ids: + sig_key = SignatureKey(vid, data_root) + if sig_key not in new_aggregated_payloads: + new_aggregated_payloads[sig_key] = [] + new_aggregated_payloads[sig_key].append(aggregated_signature) + return self.model_copy(update={"aggregated_payloads": new_aggregated_payloads}) + + def tick_interval(self, has_proposal: bool, is_aggregator: bool = False) -> "Store": """ Advance store time by one interval and perform interval-specific actions. @@ -833,10 +998,6 @@ def tick_interval(self, has_proposal: bool) -> "Store": - If proposal exists, immediately accept new attestations - This ensures validators see the block before attesting - **Interval 1 (Validator Attesting)**: - - Validators create and gossip attestations - - No store action (waiting for attestations to arrive) - **Interval 2 (Safe Target Update)**: - Compute safe target with 2/3+ majority - Provides validators with a stable attestation target @@ -848,6 +1009,7 @@ def tick_interval(self, has_proposal: bool) -> "Store": Args: has_proposal: Whether a proposal exists for this interval. + is_aggregator: Whether the node is an aggregator. Returns: New Store with advanced time and interval-specific updates applied. @@ -863,13 +1025,15 @@ def tick_interval(self, has_proposal: bool) -> "Store": elif current_interval == Uint64(2): # Mid-slot - update safe target for validators store = store.update_safe_target() + if is_aggregator: + store = store.aggregate_committee_signatures() elif current_interval == Uint64(3): # End of slot - accept accumulated attestations store = store.accept_new_attestations() return store - def on_tick(self, time: Uint64, has_proposal: bool) -> "Store": + def on_tick(self, time: Uint64, has_proposal: bool, is_aggregator: bool = False) -> "Store": """ Advance forkchoice store time to given timestamp. @@ -880,6 +1044,7 @@ def on_tick(self, time: Uint64, has_proposal: bool) -> "Store": Args: time: Target time as Unix timestamp in seconds. has_proposal: Whether node has proposal for current slot. + is_aggregator: Whether the node is an aggregator. Returns: New Store with time advanced and all interval actions performed. @@ -894,7 +1059,7 @@ def on_tick(self, time: Uint64, has_proposal: bool) -> "Store": should_signal_proposal = has_proposal and (store.time + Uint64(1)) == tick_interval_time # Advance by one interval with appropriate signaling - store = store.tick_interval(should_signal_proposal) + store = store.tick_interval(should_signal_proposal, is_aggregator) return store @@ -1095,7 +1260,6 @@ def produce_block_with_signatures( parent_root=head_root, available_attestations=available_attestations, known_block_roots=set(store.blocks.keys()), - gossip_signatures=store.gossip_signatures, aggregated_payloads=store.aggregated_payloads, ) diff --git a/src/lean_spec/subspecs/networking/__init__.py b/src/lean_spec/subspecs/networking/__init__.py index 086ad046..3192e919 100644 --- a/src/lean_spec/subspecs/networking/__init__.py +++ b/src/lean_spec/subspecs/networking/__init__.py @@ -32,6 +32,7 @@ PeerDisconnectedEvent, PeerStatusEvent, ) +from .subnet import compute_subnet_id from .transport import PeerId from .types import DomainType, ForkDigest, ProtocolId @@ -73,4 +74,5 @@ "ForkDigest", "PeerId", "ProtocolId", + "compute_subnet_id", ] diff --git a/src/lean_spec/subspecs/networking/client/event_source.py b/src/lean_spec/subspecs/networking/client/event_source.py index 3e30446b..ed48ddbb 100644 --- a/src/lean_spec/subspecs/networking/client/event_source.py +++ b/src/lean_spec/subspecs/networking/client/event_source.py @@ -324,7 +324,7 @@ def decode_message( self, topic_str: str, compressed_data: bytes, - ) -> SignedBlockWithAttestation | SignedAttestation: + ) -> SignedBlockWithAttestation | SignedAttestation | None: """ Decode a gossip message from topic and compressed data. diff --git a/src/lean_spec/subspecs/networking/gossipsub/topic.py b/src/lean_spec/subspecs/networking/gossipsub/topic.py index 0bb2040b..0d3d25af 100644 --- a/src/lean_spec/subspecs/networking/gossipsub/topic.py +++ b/src/lean_spec/subspecs/networking/gossipsub/topic.py @@ -87,6 +87,19 @@ Used in the topic string to identify signed attestation messages. """ +ATTESTATION_SUBNET_TOPIC_NAME: str = "attestation_{subnet_id}" +"""Template topic name for attestation subnet messages. + +Used in the topic string to identify attestation messages for a specific subnet. +`{subnet_id}` should be replaced with the subnet identifier (0-63). +""" + +AGGREGATED_ATTESTATION_TOPIC_NAME: str = "aggregation" +"""Topic name for committee aggregation messages. + +Used in the topic string to identify committee's aggregation messages. +""" + class TopicKind(Enum): """Gossip topic types. @@ -103,6 +116,12 @@ class TopicKind(Enum): ATTESTATION = ATTESTATION_TOPIC_NAME """Signed attestation messages.""" + ATTESTATION_SUBNET = ATTESTATION_SUBNET_TOPIC_NAME + """Attestation subnet messages.""" + + AGGREGATED_ATTESTATION = AGGREGATED_ATTESTATION_TOPIC_NAME + """Committee aggregated signatures messages.""" + def __str__(self) -> str: """Return the topic name string.""" return self.value @@ -207,6 +226,18 @@ def attestation(cls, fork_digest: str) -> GossipTopic: """ return cls(kind=TopicKind.ATTESTATION, fork_digest=fork_digest) + @classmethod + def committee_aggregation(cls, fork_digest: str) -> GossipTopic: + """Create a committee aggregation topic for the given fork. + + Args: + fork_digest: Fork digest as 0x-prefixed hex string. + + Returns: + GossipTopic for committee aggregation messages. + """ + return cls(kind=TopicKind.AGGREGATED_ATTESTATION, fork_digest=fork_digest) + def format_topic_string( topic_name: str, diff --git a/src/lean_spec/subspecs/networking/service/service.py b/src/lean_spec/subspecs/networking/service/service.py index 26244ea5..529f8969 100644 --- a/src/lean_spec/subspecs/networking/service/service.py +++ b/src/lean_spec/subspecs/networking/service/service.py @@ -138,10 +138,12 @@ async def _handle_event(self, event: NetworkEvent) -> None: await self.sync_service.on_gossip_block(block, peer_id) case GossipAttestationEvent(attestation=attestation, peer_id=peer_id): - # Route gossip attestations to the sync service. # # SyncService will validate signature and update forkchoice. - await self.sync_service.on_gossip_attestation(attestation, peer_id) + await self.sync_service.on_gossip_attestation( + attestation=attestation, + peer_id=peer_id, + ) case PeerStatusEvent(peer_id=peer_id, status=status): # Route peer status updates to sync service. diff --git a/src/lean_spec/subspecs/networking/subnet.py b/src/lean_spec/subspecs/networking/subnet.py new file mode 100644 index 00000000..8a3c8fd1 --- /dev/null +++ b/src/lean_spec/subspecs/networking/subnet.py @@ -0,0 +1,23 @@ +"""Subnet helpers for networking. + +Provides a small utility to compute a validator's attestation subnet id from +its validator index and number of committees. +""" + +from __future__ import annotations + +from lean_spec.types import Uint64 + + +def compute_subnet_id(validator_index: Uint64, num_committees: Uint64) -> int: + """Compute the attestation subnet id for a validator. + + Args: + validator_index: Non-negative validator index . + num_committees: Positive number of committees. + + Returns: + An integer subnet id in 0..(num_committees-1). + """ + subnet_id = validator_index % num_committees + return subnet_id diff --git a/src/lean_spec/subspecs/node/__init__.py b/src/lean_spec/subspecs/node/__init__.py index a5d8bcb1..d497ebb1 100644 --- a/src/lean_spec/subspecs/node/__init__.py +++ b/src/lean_spec/subspecs/node/__init__.py @@ -1,5 +1,5 @@ """Node orchestrator for the Lean Ethereum consensus client.""" -from .node import Node, NodeConfig +from .node import Node, NodeConfig, get_local_validator_id -__all__ = ["Node", "NodeConfig"] +__all__ = ["Node", "NodeConfig", "get_local_validator_id"] diff --git a/src/lean_spec/subspecs/node/helpers.py b/src/lean_spec/subspecs/node/helpers.py new file mode 100644 index 00000000..f1cdf7f7 --- /dev/null +++ b/src/lean_spec/subspecs/node/helpers.py @@ -0,0 +1,20 @@ +"""Helper functions for node operations.""" + +from lean_spec.subspecs.containers.validator import ValidatorIndex + + +def is_aggregator(validator_id: ValidatorIndex | None) -> bool: + """ + Determine if a validator is an aggregator. + + Args: + validator_id: The index of the validator. + + Returns: + True if the validator is an aggregator, False otherwise. + """ + if validator_id is None: + return False + return ( + False # Placeholder implementation, in future should be defined by node operator settings + ) diff --git a/src/lean_spec/subspecs/node/node.py b/src/lean_spec/subspecs/node/node.py index 61493aef..fcaf7a9f 100644 --- a/src/lean_spec/subspecs/node/node.py +++ b/src/lean_spec/subspecs/node/node.py @@ -92,6 +92,20 @@ class NodeConfig: """ +def get_local_validator_id(registry: ValidatorRegistry | None) -> ValidatorIndex | None: + """ + Get the validator index for this node. + + For now, returns None as a default for passive nodes or simple setups. + Future implementations will look up keys in the registry. + """ + if registry is None or len(registry) == 0: + return None + + # For simplicity, use the first validator in the registry. + return registry.indices()[0] + + @dataclass(slots=True) class Node: """ @@ -146,11 +160,11 @@ def from_genesis(cls, config: NodeConfig) -> Node: if config.database_path is not None: database = cls._create_database(config.database_path) - # Try to load existing state from database. # # If database contains valid state, resume from there. # Otherwise, fall through to genesis initialization. - store = cls._try_load_from_database(database) + validator_id = get_local_validator_id(config.validator_registry) + store = cls._try_load_from_database(database, validator_id) if store is None: # Generate genesis state from validators. @@ -173,7 +187,7 @@ def from_genesis(cls, config: NodeConfig) -> Node: # Initialize forkchoice store. # # Genesis block is both justified and finalized. - store = Store.get_forkchoice_store(state, block) + store = Store.get_forkchoice_store(state, block, validator_id) # Persist genesis to database if available. if database is not None: @@ -262,7 +276,10 @@ def _create_database(path: Path | str) -> Database: return SQLiteDatabase(path) @staticmethod - def _try_load_from_database(database: Database | None) -> Store | None: + def _try_load_from_database( + database: Database | None, + validator_id: ValidatorIndex | None, + ) -> Store | None: """ Try to load forkchoice store from existing database state. @@ -270,6 +287,7 @@ def _try_load_from_database(database: Database | None) -> Store | None: Args: database: Database to load from. + validator_id: Validator index for the store instance. Returns: Loaded Store or None if no valid state exists. @@ -309,6 +327,7 @@ def _try_load_from_database(database: Database | None) -> Store | None: latest_finalized=finalized, blocks={head_root: head_block}, states={head_root: head_state}, + validator_id=validator_id, ) async def run(self, *, install_signal_handlers: bool = True) -> None: diff --git a/src/lean_spec/subspecs/sync/service.py b/src/lean_spec/subspecs/sync/service.py index f20e7376..dc591605 100644 --- a/src/lean_spec/subspecs/sync/service.py +++ b/src/lean_spec/subspecs/sync/service.py @@ -43,11 +43,14 @@ from lean_spec.subspecs import metrics from lean_spec.subspecs.chain.clock import SlotClock -from lean_spec.subspecs.containers import Block, SignedBlockWithAttestation -from lean_spec.subspecs.containers.attestation import SignedAttestation -from lean_spec.subspecs.forkchoice import Store -from lean_spec.subspecs.networking import PeerId +from lean_spec.subspecs.containers import ( + Block, + SignedAttestation, + SignedBlockWithAttestation, +) +from lean_spec.subspecs.forkchoice.store import Store from lean_spec.subspecs.networking.reqresp.message import Status +from lean_spec.subspecs.networking.transport.peer_id import PeerId from lean_spec.subspecs.ssz.hash import hash_tree_root from .backfill_sync import BackfillSync, NetworkRequester @@ -409,13 +412,21 @@ async def on_gossip_attestation( if not self._state.accepts_gossip: return + from lean_spec.subspecs.node.helpers import is_aggregator + + # Check if we are an aggregator + is_aggregator_role = is_aggregator(self.store.validator_id) + # Integrate the attestation into forkchoice state. # # The store validates the signature and updates branch weights. # Invalid attestations (bad signature, unknown target) are rejected. # Validation failures are logged but don't crash the event loop. try: - self.store = self.store.on_gossip_attestation(attestation) + self.store = self.store.on_gossip_attestation( + signed_attestation=attestation, + is_aggregator=is_aggregator_role, + ) except (AssertionError, KeyError): # Attestation validation failed. # diff --git a/tests/lean_spec/conftest.py b/tests/lean_spec/conftest.py index e590bae8..d1a1d025 100644 --- a/tests/lean_spec/conftest.py +++ b/tests/lean_spec/conftest.py @@ -10,6 +10,7 @@ import pytest from lean_spec.subspecs.containers import Block, State +from lean_spec.subspecs.containers.validator import ValidatorIndex from lean_spec.subspecs.forkchoice import Store from tests.lean_spec.helpers import make_genesis_block, make_genesis_state @@ -29,4 +30,8 @@ def genesis_block(genesis_state: State) -> Block: @pytest.fixture def base_store(genesis_state: State, genesis_block: Block) -> Store: """Fork choice store initialized with genesis.""" - return Store.get_forkchoice_store(genesis_state, genesis_block) + return Store.get_forkchoice_store( + genesis_state, + genesis_block, + validator_id=ValidatorIndex(0), + ) diff --git a/tests/lean_spec/helpers/__init__.py b/tests/lean_spec/helpers/__init__.py index 8a93bf8f..34d3f0a4 100644 --- a/tests/lean_spec/helpers/__init__.py +++ b/tests/lean_spec/helpers/__init__.py @@ -1,5 +1,7 @@ """Test helpers for leanSpec unit tests.""" +from lean_spec.subspecs.containers.validator import ValidatorIndex + from .builders import ( make_aggregated_attestation, make_block, @@ -16,6 +18,9 @@ ) from .mocks import MockNoiseSession +TEST_VALIDATOR_ID = ValidatorIndex(0) + + __all__ = [ # Builders "make_aggregated_attestation", @@ -32,4 +37,6 @@ "make_validators_with_keys", # Mocks "MockNoiseSession", + # Constants + "TEST_VALIDATOR_ID", ] diff --git a/tests/lean_spec/subspecs/containers/test_state_aggregation.py b/tests/lean_spec/subspecs/containers/test_state_aggregation.py index 1620adcf..fbcf32d2 100644 --- a/tests/lean_spec/subspecs/containers/test_state_aggregation.py +++ b/tests/lean_spec/subspecs/containers/test_state_aggregation.py @@ -207,11 +207,16 @@ def test_build_block_collects_valid_available_attestations() -> None: attestation = Attestation(validator_id=ValidatorIndex(0), data=att_data) data_root = att_data.data_root_bytes() - gossip_signatures = { - SignatureKey(ValidatorIndex(0), data_root): key_manager.sign_attestation_data( - ValidatorIndex(0), att_data - ) - } + # Calculate aggregated proof directly + signature = key_manager.sign_attestation_data(ValidatorIndex(0), att_data) + proof = AggregatedSignatureProof.aggregate( + participants=AggregationBits.from_validator_indices([ValidatorIndex(0)]), + public_keys=[key_manager.get_public_key(ValidatorIndex(0))], + signatures=[signature], + message=data_root, + epoch=att_data.slot, + ) + aggregated_payloads = {SignatureKey(ValidatorIndex(0), data_root): [proof]} # Proposer for slot 1 with 2 validators: slot % num_validators = 1 % 2 = 1 block, post_state, aggregated_atts, aggregated_proofs = state.build_block( @@ -221,8 +226,7 @@ def test_build_block_collects_valid_available_attestations() -> None: attestations=[], available_attestations=[attestation], known_block_roots={head_root}, - gossip_signatures=gossip_signatures, - aggregated_payloads={}, + aggregated_payloads=aggregated_payloads, ) assert post_state.latest_block_header.slot == Slot(1) @@ -270,7 +274,6 @@ def test_build_block_skips_attestations_without_signatures() -> None: attestations=[], available_attestations=[attestation], known_block_roots={head_root}, - gossip_signatures={}, aggregated_payloads={}, ) @@ -468,15 +471,15 @@ def test_build_block_state_root_valid_when_signatures_split() -> None: # Three validators attest to identical data. attestations = [Attestation(validator_id=ValidatorIndex(i), data=att_data) for i in range(3)] - # Simulate partial gossip coverage. - # - # Only one signature arrived via the gossip network. - # This happens when network partitions delay some messages. - gossip_signatures = { - SignatureKey(ValidatorIndex(0), data_root): key_manager.sign_attestation_data( - ValidatorIndex(0), att_data - ) - } + # Use a second aggregated proof for Validator 0 instead of gossip. + # This simulates receiving an aggregated signature for this validator from another source. + proof_0 = AggregatedSignatureProof.aggregate( + participants=AggregationBits.from_validator_indices([ValidatorIndex(0)]), + public_keys=[key_manager.get_public_key(ValidatorIndex(0))], + signatures=[key_manager.sign_attestation_data(ValidatorIndex(0), att_data)], + message=data_root, + epoch=att_data.slot, + ) # Simulate the remaining signatures arriving via aggregated proof. # @@ -496,6 +499,7 @@ def test_build_block_state_root_valid_when_signatures_split() -> None: epoch=att_data.slot, ) aggregated_payloads = { + SignatureKey(ValidatorIndex(0), data_root): [proof_0], SignatureKey(ValidatorIndex(1), data_root): [fallback_proof], SignatureKey(ValidatorIndex(2), data_root): [fallback_proof], } @@ -508,7 +512,6 @@ def test_build_block_state_root_valid_when_signatures_split() -> None: proposer_index=ValidatorIndex(1), parent_root=parent_root, attestations=attestations, - gossip_signatures=gossip_signatures, aggregated_payloads=aggregated_payloads, ) @@ -520,7 +523,7 @@ def test_build_block_state_root_valid_when_signatures_split() -> None: # Confirm each attestation covers the expected validators. actual_bits = [set(att.aggregation_bits.to_validator_indices()) for att in aggregated_atts] - assert {ValidatorIndex(0)} in actual_bits, "Gossip attestation should cover only validator 0" + assert {ValidatorIndex(0)} in actual_bits, "First attestation should cover only validator 0" assert {ValidatorIndex(1), ValidatorIndex(2)} in actual_bits, ( "Fallback should cover validators 1,2" ) diff --git a/tests/lean_spec/subspecs/forkchoice/test_store_attestations.py b/tests/lean_spec/subspecs/forkchoice/test_store_attestations.py index d70898e4..b761db96 100644 --- a/tests/lean_spec/subspecs/forkchoice/test_store_attestations.py +++ b/tests/lean_spec/subspecs/forkchoice/test_store_attestations.py @@ -25,6 +25,7 @@ from lean_spec.subspecs.ssz.hash import hash_tree_root from lean_spec.subspecs.xmss.aggregation import SignatureKey from lean_spec.types import Bytes32, Bytes52, Uint64 +from tests.lean_spec.helpers import TEST_VALIDATOR_ID def test_on_block_processes_multi_validator_aggregations() -> None: @@ -48,7 +49,11 @@ def test_on_block_processes_multi_validator_aggregations() -> None: body=BlockBody(attestations=AggregatedAttestations(data=[])), ) - base_store = Store.get_forkchoice_store(genesis_state, genesis_block) + base_store = Store.get_forkchoice_store( + genesis_state, + genesis_block, + validator_id=TEST_VALIDATOR_ID, + ) consumer_store = base_store # Producer view knows about attestations from validators 1 and 2 @@ -60,19 +65,32 @@ def test_on_block_processes_multi_validator_aggregations() -> None: validator_id: attestation_data for validator_id in (ValidatorIndex(1), ValidatorIndex(2)) } - # Store signatures in gossip_signatures + # Aggregate signatures manually for aggregated_payloads data_root = attestation_data.data_root_bytes() - gossip_sigs = { - SignatureKey(validator_id, data_root): key_manager.sign_attestation_data( - validator_id, attestation_data - ) - for validator_id in (ValidatorIndex(1), ValidatorIndex(2)) - } + signatures_list = [ + key_manager.sign_attestation_data(vid, attestation_data) + for vid in (ValidatorIndex(1), ValidatorIndex(2)) + ] + participants = [ValidatorIndex(1), ValidatorIndex(2)] + + from lean_spec.subspecs.containers.attestation import AggregationBits + from lean_spec.subspecs.xmss.aggregation import AggregatedSignatureProof + + proof = AggregatedSignatureProof.aggregate( + participants=AggregationBits.from_validator_indices(participants), + public_keys=[key_manager.get_public_key(vid) for vid in participants], + signatures=signatures_list, + message=data_root, + epoch=attestation_data.slot, + ) + + aggregated_payloads = {SignatureKey(vid, data_root): [proof] for vid in participants} producer_store = base_store.model_copy( update={ "latest_known_attestations": attestation_data_map, - "gossip_signatures": gossip_sigs, + # No gossip signatures needed for block production now + "aggregated_payloads": aggregated_payloads, } ) @@ -145,7 +163,11 @@ def test_on_block_preserves_immutability_of_aggregated_payloads() -> None: body=BlockBody(attestations=AggregatedAttestations(data=[])), ) - base_store = Store.get_forkchoice_store(genesis_state, genesis_block) + base_store = Store.get_forkchoice_store( + genesis_state, + genesis_block, + validator_id=TEST_VALIDATOR_ID, + ) # First block: create and process a block with attestations to populate # `aggregated_payloads`. diff --git a/tests/lean_spec/subspecs/forkchoice/test_time_management.py b/tests/lean_spec/subspecs/forkchoice/test_time_management.py index 83954b8d..94622501 100644 --- a/tests/lean_spec/subspecs/forkchoice/test_time_management.py +++ b/tests/lean_spec/subspecs/forkchoice/test_time_management.py @@ -20,7 +20,7 @@ from lean_spec.subspecs.forkchoice import Store from lean_spec.subspecs.ssz.hash import hash_tree_root from lean_spec.types import Bytes32, Bytes52, Uint64 -from tests.lean_spec.helpers import make_signed_attestation +from tests.lean_spec.helpers import TEST_VALIDATOR_ID, make_signed_attestation @pytest.fixture @@ -62,6 +62,7 @@ def sample_store(sample_config: Config) -> Store: latest_finalized=checkpoint, blocks={genesis_hash: genesis_block}, states={genesis_hash: state}, + validator_id=TEST_VALIDATOR_ID, ) @@ -89,7 +90,11 @@ def test_store_time_from_anchor_slot(self, anchor_slot: int) -> None: body=BlockBody(attestations=AggregatedAttestations(data=[])), ) - store = Store.get_forkchoice_store(state=state, anchor_block=anchor_block) + store = Store.get_forkchoice_store( + anchor_state=state, + anchor_block=anchor_block, + validator_id=TEST_VALIDATOR_ID, + ) assert store.time == INTERVALS_PER_SLOT * Uint64(anchor_slot) diff --git a/tests/lean_spec/subspecs/forkchoice/test_validator.py b/tests/lean_spec/subspecs/forkchoice/test_validator.py index 68c3d332..69a21977 100644 --- a/tests/lean_spec/subspecs/forkchoice/test_validator.py +++ b/tests/lean_spec/subspecs/forkchoice/test_validator.py @@ -29,6 +29,7 @@ from lean_spec.subspecs.ssz.hash import hash_tree_root from lean_spec.subspecs.xmss.aggregation import SignatureKey from lean_spec.types import Bytes32, Bytes52, Uint64 +from tests.lean_spec.helpers import TEST_VALIDATOR_ID @pytest.fixture @@ -121,6 +122,7 @@ def sample_store(config: Config, sample_state: State) -> Store: latest_finalized=finalized, blocks={genesis_hash: genesis_block}, states={genesis_hash: consistent_state}, # States are indexed by block hash + validator_id=TEST_VALIDATOR_ID, ) @@ -490,6 +492,7 @@ def test_validator_operations_empty_store(self) -> None: latest_finalized=final_checkpoint, blocks={genesis_hash: genesis}, states={genesis_hash: state}, + validator_id=TEST_VALIDATOR_ID, ) # Should be able to produce block and attestation @@ -532,6 +535,7 @@ def test_produce_block_missing_parent_state(self) -> None: latest_finalized=checkpoint, blocks={}, # No blocks states={}, # No states + validator_id=TEST_VALIDATOR_ID, ) with pytest.raises(KeyError): # Missing head in get_proposal_head diff --git a/tests/lean_spec/subspecs/networking/client/test_gossip_reception.py b/tests/lean_spec/subspecs/networking/client/test_gossip_reception.py index e3f1b485..d1b5a559 100644 --- a/tests/lean_spec/subspecs/networking/client/test_gossip_reception.py +++ b/tests/lean_spec/subspecs/networking/client/test_gossip_reception.py @@ -511,7 +511,7 @@ class TestGossipReceptionIntegration: def test_full_block_reception_flow(self) -> None: """Tests complete flow: stream -> parse -> decompress -> decode.""" - async def run() -> tuple[SignedBlockWithAttestation | SignedAttestation, bytes]: + async def run() -> tuple[SignedBlockWithAttestation | SignedAttestation | None, bytes]: handler = GossipHandler(fork_digest="0x00000000") original_block = make_test_signed_block() ssz_bytes = original_block.encode_bytes() @@ -536,7 +536,9 @@ async def run() -> tuple[SignedBlockWithAttestation | SignedAttestation, bytes]: def test_full_attestation_reception_flow(self) -> None: """Tests complete flow for attestation messages.""" - async def run() -> tuple[SignedBlockWithAttestation | SignedAttestation, bytes, TopicKind]: + async def run() -> tuple[ + SignedBlockWithAttestation | SignedAttestation | None, bytes, TopicKind + ]: handler = GossipHandler(fork_digest="0x00000000") original_attestation = make_test_signed_attestation() ssz_bytes = original_attestation.encode_bytes() @@ -586,6 +588,7 @@ async def run() -> tuple[bytes, bytes]: # Decode decoded = handler.decode_message(topic_str, compressed) + assert decoded is not None, "decode_message should not return None for valid input" decoded_bytes = decoded.encode_bytes() return decoded_bytes, original_bytes diff --git a/tests/lean_spec/subspecs/networking/test_network_service.py b/tests/lean_spec/subspecs/networking/test_network_service.py index a7c15f8a..4488d33c 100644 --- a/tests/lean_spec/subspecs/networking/test_network_service.py +++ b/tests/lean_spec/subspecs/networking/test_network_service.py @@ -36,7 +36,7 @@ from lean_spec.subspecs.sync.service import SyncService from lean_spec.subspecs.sync.states import SyncState from lean_spec.types import Bytes32, Uint64 -from tests.lean_spec.helpers import make_mock_signature, make_signed_block +from tests.lean_spec.helpers import TEST_VALIDATOR_ID, make_mock_signature, make_signed_block @dataclass @@ -90,6 +90,7 @@ def __init__(self, head_slot: int = 0) -> None: """Initialize mock store with genesis block.""" self._head_slot = head_slot self.head = Bytes32.zero() + self.validator_id: ValidatorIndex = TEST_VALIDATOR_ID self.blocks: dict[Bytes32, Any] = {} self.states: dict[Bytes32, Any] = {} self._attestations_received: list[SignedAttestation] = [] @@ -118,14 +119,18 @@ def on_block(self, block: SignedBlockWithAttestation) -> "MockStore": new_store.head = root return new_store - def on_gossip_attestation(self, attestation: SignedAttestation) -> "MockStore": + def on_gossip_attestation( + self, + signed_attestation: SignedAttestation, + is_aggregator: bool = False, + ) -> "MockStore": """Process an attestation: track it for verification.""" new_store = MockStore(self._head_slot) new_store.blocks = dict(self.blocks) new_store.states = dict(self.states) new_store.head = self.head new_store._attestations_received = list(self._attestations_received) - new_store._attestations_received.append(attestation) + new_store._attestations_received.append(signed_attestation) return new_store @@ -192,7 +197,10 @@ def test_block_added_to_store_blocks_dict( GossipBlockEvent(block=block, peer_id=peer_id, topic=block_topic), ] source = MockEventSource(events=events) - network_service = NetworkService(sync_service=sync_service, event_source=source) + network_service = NetworkService( + sync_service=sync_service, + event_source=source, + ) asyncio.run(network_service.run()) @@ -224,7 +232,10 @@ def test_store_head_updated_after_block( GossipBlockEvent(block=block, peer_id=peer_id, topic=block_topic), ] source = MockEventSource(events=events) - network_service = NetworkService(sync_service=sync_service, event_source=source) + network_service = NetworkService( + sync_service=sync_service, + event_source=source, + ) asyncio.run(network_service.run()) @@ -255,7 +266,10 @@ def test_block_ignored_in_idle_state_store_unchanged( GossipBlockEvent(block=block, peer_id=peer_id, topic=block_topic), ] source = MockEventSource(events=events) - network_service = NetworkService(sync_service=sync_service, event_source=source) + network_service = NetworkService( + sync_service=sync_service, + event_source=source, + ) asyncio.run(network_service.run()) @@ -299,7 +313,10 @@ def test_attestation_processed_by_store( ), ] source = MockEventSource(events=events) - network_service = NetworkService(sync_service=sync_service, event_source=source) + network_service = NetworkService( + sync_service=sync_service, + event_source=source, + ) asyncio.run(network_service.run()) @@ -339,7 +356,10 @@ def test_attestation_ignored_in_idle_state( ), ] source = MockEventSource(events=events) - network_service = NetworkService(sync_service=sync_service, event_source=source) + network_service = NetworkService( + sync_service=sync_service, + event_source=source, + ) asyncio.run(network_service.run()) @@ -369,7 +389,10 @@ def test_peer_status_triggers_idle_to_syncing( PeerStatusEvent(peer_id=peer_id, status=status), ] source = MockEventSource(events=events) - network_service = NetworkService(sync_service=sync_service, event_source=source) + network_service = NetworkService( + sync_service=sync_service, + event_source=source, + ) asyncio.run(network_service.run()) @@ -392,7 +415,10 @@ def test_peer_status_updates_peer_manager( PeerStatusEvent(peer_id=peer_id, status=status), ] source = MockEventSource(events=events) - network_service = NetworkService(sync_service=sync_service, event_source=source) + network_service = NetworkService( + sync_service=sync_service, + event_source=source, + ) asyncio.run(network_service.run()) @@ -444,7 +470,10 @@ def test_full_sync_flow_status_then_block( GossipBlockEvent(block=block, peer_id=peer_id, topic=block_topic), ] source = MockEventSource(events=events) - network_service = NetworkService(sync_service=sync_service, event_source=source) + network_service = NetworkService( + sync_service=sync_service, + event_source=source, + ) asyncio.run(network_service.run()) @@ -487,7 +516,10 @@ def test_block_before_status_is_ignored( PeerStatusEvent(peer_id=peer_id, status=status), ] source = MockEventSource(events=events) - network_service = NetworkService(sync_service=sync_service, event_source=source) + network_service = NetworkService( + sync_service=sync_service, + event_source=source, + ) asyncio.run(network_service.run()) @@ -529,7 +561,10 @@ def test_multiple_blocks_chain_extension( GossipBlockEvent(block=block2, peer_id=peer_id, topic=block_topic), ] source = MockEventSource(events=events) - network_service = NetworkService(sync_service=sync_service, event_source=source) + network_service = NetworkService( + sync_service=sync_service, + event_source=source, + ) asyncio.run(network_service.run()) diff --git a/tests/lean_spec/subspecs/node/test_node.py b/tests/lean_spec/subspecs/node/test_node.py index dea0e580..8e931cee 100644 --- a/tests/lean_spec/subspecs/node/test_node.py +++ b/tests/lean_spec/subspecs/node/test_node.py @@ -175,7 +175,7 @@ def test_store_time_from_database_uses_intervals_not_seconds(self) -> None: # Patching to 8 distinguishes from the seconds per slot. patched_intervals = Uint64(8) with patch("lean_spec.subspecs.node.node.INTERVALS_PER_SLOT", patched_intervals): - store = Node._try_load_from_database(mock_db) + store = Node._try_load_from_database(mock_db, validator_id=ValidatorIndex(0)) assert store is not None expected_time = Uint64(test_slot * patched_intervals) diff --git a/tests/lean_spec/subspecs/ssz/test_state.py b/tests/lean_spec/subspecs/ssz/test_state.py index 59c43c53..20203f93 100644 --- a/tests/lean_spec/subspecs/ssz/test_state.py +++ b/tests/lean_spec/subspecs/ssz/test_state.py @@ -41,13 +41,6 @@ def test_encode_decode_state_roundtrip() -> None: ) encode = state.encode_bytes() - expected_value = ( - "e80300000000000000000000000000000000000000000000000000000000000000000000000000000000000000" - "000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000" - "000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000" - "000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000" - "00000000000000000000000000000000000000000000000000000000e4000000e4000000e5000000e5000000e5" - "0000000101" - ) + expected_value = "e8030000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000e4000000e4000000e5000000e5000000e50000000101" # noqa: E501 assert encode.hex() == expected_value assert State.decode_bytes(encode) == state diff --git a/tests/lean_spec/subspecs/validator/test_service.py b/tests/lean_spec/subspecs/validator/test_service.py index 579fdc29..896907f0 100644 --- a/tests/lean_spec/subspecs/validator/test_service.py +++ b/tests/lean_spec/subspecs/validator/test_service.py @@ -32,8 +32,8 @@ from lean_spec.subspecs.validator.registry import ValidatorEntry from lean_spec.subspecs.xmss import TARGET_SIGNATURE_SCHEME from lean_spec.subspecs.xmss.aggregation import SignatureKey -from lean_spec.subspecs.xmss.containers import Signature from lean_spec.types import Bytes32, Bytes52, Uint64 +from tests.lean_spec.helpers import TEST_VALIDATOR_ID class MockNetworkRequester(NetworkRequester): @@ -51,7 +51,11 @@ async def request_block_by_root( @pytest.fixture def store(genesis_state: State, genesis_block: Block) -> Store: """Forkchoice store initialized with genesis.""" - return Store.get_forkchoice_store(genesis_state, genesis_block) + return Store.get_forkchoice_store( + genesis_state, + genesis_block, + validator_id=TEST_VALIDATOR_ID, + ) @pytest.fixture @@ -532,7 +536,11 @@ def real_store(self, key_manager: XmssKeyManager) -> Store: state_root=hash_tree_root(genesis_state), body=BlockBody(attestations=AggregatedAttestations(data=[])), ) - return Store.get_forkchoice_store(genesis_state, genesis_block) + return Store.get_forkchoice_store( + genesis_state, + genesis_block, + validator_id=TEST_VALIDATOR_ID, + ) @pytest.fixture def real_sync_service(self, real_store: Store) -> SyncService: @@ -772,21 +780,36 @@ def test_block_includes_pending_attestations( attestation_data = store.produce_attestation_data(Slot(0)) data_root = attestation_data.data_root_bytes() - # Simulate gossip attestations from validators 3 and 4 + # Simulate aggregated payloads for validators 3 and 4 + from lean_spec.subspecs.containers.attestation import AggregationBits + from lean_spec.subspecs.xmss.aggregation import AggregatedSignatureProof + attestation_map: dict[ValidatorIndex, AttestationData] = {} - gossip_sigs: dict[SignatureKey, Signature] = {} + signatures = [] + participants = [ValidatorIndex(3), ValidatorIndex(4)] + public_keys = [] + + for vid in participants: + sig = key_manager.sign_attestation_data(vid, attestation_data) + signatures.append(sig) + public_keys.append(key_manager.get_public_key(vid)) + attestation_map[vid] = attestation_data + + proof = AggregatedSignatureProof.aggregate( + participants=AggregationBits.from_validator_indices(participants), + public_keys=public_keys, + signatures=signatures, + message=data_root, + epoch=attestation_data.slot, + ) - for validator_id in (ValidatorIndex(3), ValidatorIndex(4)): - attestation_map[validator_id] = attestation_data - gossip_sigs[SignatureKey(validator_id, data_root)] = key_manager.sign_attestation_data( - validator_id, attestation_data - ) + aggregated_payloads = {SignatureKey(vid, data_root): [proof] for vid in participants} - # Update store with pending attestations + # Update store with pending attestations and aggregated payloads updated_store = store.model_copy( update={ "latest_known_attestations": attestation_map, - "gossip_signatures": gossip_sigs, + "aggregated_payloads": aggregated_payloads, } ) real_sync_service.store = updated_store