Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ dependencies = [
"numpy>=2.0.0,<3",
"aioquic>=1.2.0,<2",
"pyyaml>=6.0.0,<7",
"prometheus-client>=0.21.0,<1",
]

[project.license]
Expand Down
24 changes: 24 additions & 0 deletions src/lean_spec/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
--validator-keys Path to validator keys directory
--node-id Node identifier for validator assignment (default: lean_spec_0)
--is-aggregator Enable aggregator mode for attestation aggregation (default: false)
--api-port Port for API server and Prometheus /metrics (default: 5052, 0 to disable)
"""

from __future__ import annotations
Expand All @@ -32,13 +33,15 @@
from pathlib import Path
from typing import Final

from lean_spec.subspecs.api import ApiServerConfig
from lean_spec.subspecs.chain.config import ATTESTATION_COMMITTEE_COUNT
from lean_spec.subspecs.containers import Block, BlockBody, Checkpoint, State
from lean_spec.subspecs.containers.block.types import AggregatedAttestations
from lean_spec.subspecs.containers.slot import Slot
from lean_spec.subspecs.containers.validator import SubnetId
from lean_spec.subspecs.forkchoice import Store
from lean_spec.subspecs.genesis import GenesisConfig
from lean_spec.subspecs.metrics import registry as metrics
from lean_spec.subspecs.networking.client import LiveNetworkEventSource
from lean_spec.subspecs.networking.enr import ENR
from lean_spec.subspecs.networking.gossipsub import GossipTopic
Expand Down Expand Up @@ -161,6 +164,7 @@ def _init_from_genesis(
event_source: LiveNetworkEventSource,
validator_registry: ValidatorRegistry | None = None,
is_aggregator: bool = False,
api_port: int | None = None,
) -> Node:
"""
Initialize a node from genesis configuration.
Expand All @@ -170,6 +174,7 @@ def _init_from_genesis(
event_source: Network transport for the node.
validator_registry: Optional registry with validator secret keys.
is_aggregator: Enable aggregator mode for attestation aggregation.
api_port: Port for API server and /metrics. None disables the API.

Returns:
A fully initialized Node starting from genesis.
Expand All @@ -192,6 +197,7 @@ def _init_from_genesis(
validator_registry=validator_registry,
fork_digest=GOSSIP_FORK_DIGEST,
is_aggregator=is_aggregator,
api_config=ApiServerConfig(port=api_port) if api_port is not None else None,
)

# Create and return the node.
Expand All @@ -204,6 +210,7 @@ async def _init_from_checkpoint(
event_source: LiveNetworkEventSource,
validator_registry: ValidatorRegistry | None = None,
is_aggregator: bool = False,
api_port: int | None = None,
) -> Node | None:
"""
Initialize a node from a checkpoint state fetched from a remote node.
Expand Down Expand Up @@ -231,6 +238,7 @@ async def _init_from_checkpoint(
event_source: Network transport for the node.
validator_registry: Optional registry with validator secret keys.
is_aggregator: Enable aggregator mode for attestation aggregation.
api_port: Port for API server and /metrics. None disables the API.

Returns:
A fully initialized Node if successful, None if checkpoint sync failed.
Expand Down Expand Up @@ -298,6 +306,7 @@ async def _init_from_checkpoint(
validator_registry=validator_registry,
fork_digest=GOSSIP_FORK_DIGEST,
is_aggregator=is_aggregator,
api_config=ApiServerConfig(port=api_port) if api_port is not None else None,
)

# Create node and inject checkpoint store.
Expand Down Expand Up @@ -390,6 +399,7 @@ async def run_node(
node_id: str = "lean_spec_0",
genesis_time_now: bool = False,
is_aggregator: bool = False,
api_port: int | None = 5052,
) -> None:
"""
Run the lean consensus node.
Expand All @@ -403,7 +413,9 @@ async def run_node(
node_id: Node identifier for validator assignment.
genesis_time_now: Override genesis time to current time for testing.
is_aggregator: Enable aggregator mode for attestation aggregation.
api_port: Port for API server (health, fork_choice, /metrics). None or 0 disables.
"""
metrics.init(name="leanspec-node", version="0.0.1")
logger.info("Loading genesis from %s", genesis_path)
genesis = GenesisConfig.from_yaml_file(genesis_path)

Expand Down Expand Up @@ -506,6 +518,8 @@ async def run_node(
# - Starts from block 0 with initial validator set
# - Must process every block to reach current head
# - Only practical for new or small networks
api_port_int: int | None = api_port if api_port and api_port > 0 else None

node: Node | None
if checkpoint_sync_url is not None:
node = await _init_from_checkpoint(
Expand All @@ -514,6 +528,7 @@ async def run_node(
event_source=event_source,
validator_registry=validator_registry,
is_aggregator=is_aggregator,
api_port=api_port_int,
)
if node is None:
# Checkpoint sync failed. Exit rather than falling back.
Expand All @@ -527,6 +542,7 @@ async def run_node(
event_source=event_source,
validator_registry=validator_registry,
is_aggregator=is_aggregator,
api_port=api_port_int,
)

logger.info("Node initialized, peer_id=%s", event_source.connection_manager.peer_id)
Expand Down Expand Up @@ -662,6 +678,13 @@ def main() -> None:
action="store_true",
help="Enable aggregator mode (node performs attestation aggregation)",
)
parser.add_argument(
"--api-port",
type=int,
default=5052,
metavar="PORT",
help="Port for API server and /metrics (default: 5052). Set 0 to disable.",
)

args = parser.parse_args()

Expand All @@ -680,6 +703,7 @@ def main() -> None:
node_id=args.node_id,
genesis_time_now=args.genesis_time_now,
is_aggregator=args.is_aggregator,
api_port=args.api_port,
)
)
except KeyboardInterrupt:
Expand Down
3 changes: 2 additions & 1 deletion src/lean_spec/subspecs/api/endpoints/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
"""API endpoint specifications."""

from . import checkpoints, fork_choice, health, states
from . import checkpoints, fork_choice, health, metrics, states

__all__ = [
"checkpoints",
"fork_choice",
"health",
"metrics",
"states",
]
19 changes: 19 additions & 0 deletions src/lean_spec/subspecs/api/endpoints/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
"""Metrics endpoint (Prometheus exposition)."""

from __future__ import annotations

from aiohttp import web

from lean_spec.subspecs.metrics.registry import get_metrics_output

CONTENT_TYPE = "text/plain; version=0.0.4; charset=utf-8"


async def handle(_request: web.Request) -> web.Response:
"""
Handle Prometheus metrics scrape request.

Returns metrics in Prometheus text exposition format.
"""
body = get_metrics_output()
return web.Response(body=body, content_type=CONTENT_TYPE)
3 changes: 2 additions & 1 deletion src/lean_spec/subspecs/api/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@

from aiohttp import web

from .endpoints import checkpoints, fork_choice, health, states
from .endpoints import checkpoints, fork_choice, health, metrics, states

ROUTES: dict[str, Callable[[web.Request], Awaitable[web.Response]]] = {
"/lean/v0/health": health.handle,
"/lean/v0/states/finalized": states.handle_finalized,
"/lean/v0/checkpoints/justified": checkpoints.handle_justified,
"/lean/v0/fork_choice": fork_choice.handle,
"/metrics": metrics.handle,
}
"""All API routes mapped to their handlers."""
116 changes: 112 additions & 4 deletions src/lean_spec/subspecs/containers/block/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,127 @@

from __future__ import annotations

from typing import TYPE_CHECKING
from collections.abc import Iterator
from typing import TYPE_CHECKING, Any

from pydantic import GetCoreSchemaHandler
from pydantic_core import CoreSchema, core_schema

from lean_spec.subspecs.xmss.aggregation import AggregatedSignatureProof
from lean_spec.types import Bytes32, SSZList
from lean_spec.types import ZERO_HASH, Bytes32, SSZList

from ...chain.config import VALIDATOR_REGISTRY_LIMIT
from ..attestation import AggregatedAttestation

if TYPE_CHECKING:
from .block import Block

BlockLookup = dict[Bytes32, "Block"]
"""Mapping from block root to Block objects."""

class BlockLookup(dict[Bytes32, "Block"]):
"""
Index of all known blocks, keyed by block root.

The fork choice store uses this mapping to navigate the block tree.
Every block the node has received and validated appears here.

Blockchains can fork when two valid blocks reference the same parent.
This creates a tree structure rather than a single chain.
Walking this tree is essential for:

- Determining ancestor relationships between blocks
- Measuring reorganization depth when the head changes
- Resolving which chain is canonical

Supports Pydantic validation so it can be used in store models.
"""

@classmethod
def __get_pydantic_core_schema__(
cls, source_type: Any, handler: GetCoreSchemaHandler
) -> CoreSchema:
"""Define Pydantic validation: accept plain dicts and coerce."""
return core_schema.no_info_plain_validator_function(
cls._validate,
serialization=core_schema.plain_serializer_function_ser_schema(dict),
)

@classmethod
def _validate(cls, v: Any) -> BlockLookup:
if isinstance(v, cls):
return v
if isinstance(v, dict):
return cls(v)
raise ValueError(f"expected dict or BlockLookup, got {type(v)}")

def __or__(self, other: dict[Bytes32, Block]) -> BlockLookup:
"""Merge with another dict, preserving the BlockLookup type."""
return BlockLookup(super().__or__(other))

def ancestors(self, root: Bytes32) -> Iterator[Bytes32]:
"""
Walk the chain backward from a block toward genesis.

Each block points to its parent via parent_root.
This method follows those links, yielding each block root
along the way. The walk stops when it reaches:

- A block whose parent is the zero hash (genesis boundary)
- A block whose parent is not in the lookup (pruned history)

Fork choice relies on ancestor walks to compare chains.
Two blocks share a common ancestor if their ancestor sets
overlap. The point where they diverge defines a fork.

Args:
root: Starting block root. Walk proceeds toward genesis.

Yields:
Block roots from the starting block back to the oldest
reachable ancestor (inclusive on both ends).
"""
while root in self:
yield root

# Follow the parent link one step back.
#
# A zero-hash parent means this block sits at the genesis
# boundary. No further ancestors exist.
parent = self[root].parent_root
if parent == ZERO_HASH:
break
root = parent

def reorg_depth(self, old_head: Bytes32, new_head: Bytes32) -> int:
"""
Count how many blocks the old head must revert to reach the new chain.

A reorganization (reorg) happens when fork choice switches to a
different chain. The depth measures how many blocks on the old
chain are abandoned. Deeper reorgs are more disruptive because
more transactions and attestations are reverted.

The algorithm finds the common ancestor by collecting the new
chain's ancestors, then counting old-chain blocks that are not
in that set.

Args:
old_head: The previous canonical head block root.
new_head: The new canonical head block root.

Returns:
Number of old-chain blocks between old_head and the common
ancestor (exclusive of the common ancestor itself).
Returns 0 when both heads are the same.
"""
# Collect the full ancestry of the new head.
#
# This set lets us identify the common ancestor efficiently.
ancestors_of_new = set(self.ancestors(new_head))

# Count old-chain blocks not shared with the new chain.
#
# Each such block represents one slot of reverted history.
return sum(1 for root in self.ancestors(old_head) if root not in ancestors_of_new)


class AggregatedAttestations(SSZList[AggregatedAttestation]):
Expand Down
Loading
Loading