diff --git a/cmd/p2p/sensor/api.go b/cmd/p2p/sensor/api.go index fb86cfb02..2187f6a77 100644 --- a/cmd/p2p/sensor/api.go +++ b/cmd/p2p/sensor/api.go @@ -5,6 +5,7 @@ import ( "fmt" "net/http" "slices" + "strings" "time" "github.com/0xPolygon/polygon-cli/p2p" @@ -165,30 +166,22 @@ func removePeerMessages(counter *prometheus.CounterVec, urls []string) error { return err } - var family *dto.MetricFamily - for _, f := range families { - if f.GetName() == "sensor_messages" { - family = f - break + // Find all matching metric families + for _, family := range families { + // Check for any sensor_messages metric (received, sent, etc.) + if !strings.Contains(family.GetName(), "sensor_messages") { + continue } - } - // During DNS-discovery or when the server is taking a while to discover - // peers and has yet to receive a message, the sensor_messages prometheus - // metric may not exist yet. - if family == nil { - log.Trace().Msg("Could not find sensor_messages metric family") - return nil - } + for _, metric := range family.GetMetric() { + for _, label := range metric.GetLabel() { + url := label.GetValue() + if label.GetName() != "url" || slices.Contains(urls, url) { + continue + } - for _, metric := range family.GetMetric() { - for _, label := range metric.GetLabel() { - url := label.GetValue() - if label.GetName() != "url" || slices.Contains(urls, url) { - continue + counter.DeletePartialMatch(prometheus.Labels{"url": url}) } - - counter.DeletePartialMatch(prometheus.Labels{"url": url}) } } diff --git a/cmd/p2p/sensor/sensor.go b/cmd/p2p/sensor/sensor.go index 685018eb3..34dd0b3ab 100644 --- a/cmd/p2p/sensor/sensor.go +++ b/cmd/p2p/sensor/sensor.go @@ -52,6 +52,10 @@ type ( ShouldWriteTransactions bool ShouldWriteTransactionEvents bool ShouldWritePeers bool + ShouldBroadcastTx bool + ShouldBroadcastTxHashes bool + ShouldBroadcastBlocks bool + ShouldBroadcastBlockHashes bool ShouldRunPprof bool PprofPort uint ShouldRunPrometheus bool @@ -74,6 +78,9 @@ type ( RequestsCache p2p.CacheOptions ParentsCache p2p.CacheOptions BlocksCache p2p.CacheOptions + TxsCache p2p.CacheOptions + KnownTxsCache p2p.CacheOptions + KnownBlocksCache p2p.CacheOptions bootnodes []*enode.Node staticNodes []*enode.Node @@ -201,22 +208,33 @@ var SensorCmd = &cobra.Command{ // Create peer connection manager for broadcasting transactions // and managing the global blocks cache conns := p2p.NewConns(p2p.ConnsOptions{ - BlocksCache: inputSensorParams.BlocksCache, - Head: head, + BlocksCache: inputSensorParams.BlocksCache, + TxsCache: inputSensorParams.TxsCache, + KnownTxsCache: inputSensorParams.KnownTxsCache, + KnownBlocksCache: inputSensorParams.KnownBlocksCache, + Head: head, + ShouldBroadcastTx: inputSensorParams.ShouldBroadcastTx, + ShouldBroadcastTxHashes: inputSensorParams.ShouldBroadcastTxHashes, + ShouldBroadcastBlocks: inputSensorParams.ShouldBroadcastBlocks, + ShouldBroadcastBlockHashes: inputSensorParams.ShouldBroadcastBlockHashes, }) opts := p2p.EthProtocolOptions{ - Context: cmd.Context(), - Database: db, - GenesisHash: common.HexToHash(inputSensorParams.GenesisHash), - RPC: inputSensorParams.RPC, - SensorID: inputSensorParams.SensorID, - NetworkID: inputSensorParams.NetworkID, - Conns: conns, - ForkID: forkid.ID{Hash: [4]byte(inputSensorParams.ForkID)}, - MsgCounter: msgCounter, - RequestsCache: inputSensorParams.RequestsCache, - ParentsCache: inputSensorParams.ParentsCache, + Context: cmd.Context(), + Database: db, + GenesisHash: common.HexToHash(inputSensorParams.GenesisHash), + RPC: inputSensorParams.RPC, + SensorID: inputSensorParams.SensorID, + NetworkID: inputSensorParams.NetworkID, + Conns: conns, + ForkID: forkid.ID{Hash: [4]byte(inputSensorParams.ForkID)}, + MsgCounter: msgCounter, + RequestsCache: inputSensorParams.RequestsCache, + ParentsCache: inputSensorParams.ParentsCache, + ShouldBroadcastTx: inputSensorParams.ShouldBroadcastTx, + ShouldBroadcastTxHashes: inputSensorParams.ShouldBroadcastTxHashes, + ShouldBroadcastBlocks: inputSensorParams.ShouldBroadcastBlocks, + ShouldBroadcastBlockHashes: inputSensorParams.ShouldBroadcastBlockHashes, } config := ethp2p.Config{ @@ -460,6 +478,10 @@ will result in less chance of missing data but can significantly increase memory f.BoolVar(&inputSensorParams.ShouldWriteTransactionEvents, "write-tx-events", true, `write transaction events to database (this option can significantly increase CPU and memory usage)`) f.BoolVar(&inputSensorParams.ShouldWritePeers, "write-peers", true, "write peers to database") + f.BoolVar(&inputSensorParams.ShouldBroadcastTx, "broadcast-tx", false, "broadcast full transactions to peers") + f.BoolVar(&inputSensorParams.ShouldBroadcastTxHashes, "broadcast-tx-hashes", false, "broadcast transaction hashes to peers") + f.BoolVar(&inputSensorParams.ShouldBroadcastBlocks, "broadcast-blocks", false, "broadcast full blocks to peers") + f.BoolVar(&inputSensorParams.ShouldBroadcastBlockHashes, "broadcast-block-hashes", false, "broadcast block hashes to peers") f.BoolVar(&inputSensorParams.ShouldRunPprof, "pprof", false, "run pprof server") f.UintVar(&inputSensorParams.PprofPort, "pprof-port", 6060, "port pprof runs on") f.BoolVar(&inputSensorParams.ShouldRunPrometheus, "prom", true, "run Prometheus server") @@ -493,4 +515,10 @@ will result in less chance of missing data but can significantly increase memory f.DurationVar(&inputSensorParams.ParentsCache.TTL, "parents-cache-ttl", 5*time.Minute, "time to live for parent hash cache entries (0 for no expiration)") f.IntVar(&inputSensorParams.BlocksCache.MaxSize, "max-blocks", 1024, "maximum blocks to track across all peers (0 for no limit)") f.DurationVar(&inputSensorParams.BlocksCache.TTL, "blocks-cache-ttl", 10*time.Minute, "time to live for block cache entries (0 for no expiration)") + f.IntVar(&inputSensorParams.TxsCache.MaxSize, "max-txs", 8192, "maximum transactions to cache for serving to peers (0 for no limit)") + f.DurationVar(&inputSensorParams.TxsCache.TTL, "txs-cache-ttl", 10*time.Minute, "time to live for transaction cache entries (0 for no expiration)") + f.IntVar(&inputSensorParams.KnownTxsCache.MaxSize, "max-known-txs", 8192, "maximum transaction hashes to track per peer (0 for no limit)") + f.DurationVar(&inputSensorParams.KnownTxsCache.TTL, "known-txs-cache-ttl", 5*time.Minute, "time to live for known transaction cache entries (0 for no expiration)") + f.IntVar(&inputSensorParams.KnownBlocksCache.MaxSize, "max-known-blocks", 1024, "maximum block hashes to track per peer (0 for no limit)") + f.DurationVar(&inputSensorParams.KnownBlocksCache.TTL, "known-blocks-cache-ttl", 5*time.Minute, "time to live for known block cache entries (0 for no expiration)") } diff --git a/doc/polycli_p2p_sensor.md b/doc/polycli_p2p_sensor.md index bed39f197..d6cf51399 100644 --- a/doc/polycli_p2p_sensor.md +++ b/doc/polycli_p2p_sensor.md @@ -91,50 +91,60 @@ polycli p2p sensor amoy-nodes.json \ ## Flags ```bash - --api-port uint port API server will listen on (default 8080) - --blocks-cache-ttl duration time to live for block cache entries (0 for no expiration) (default 10m0s) - -b, --bootnodes string comma separated nodes used for bootstrapping - --database string which database to persist data to, options are: - - datastore (GCP Datastore) - - json (output to stdout) - - none (no persistence) (default "none") - -d, --database-id string datastore database ID - --dial-ratio int ratio of inbound to dialed connections (dial ratio of 2 allows 1/2 of connections to be dialed, setting to 0 defaults to 3) - --discovery-dns string DNS discovery ENR tree URL - --discovery-port int UDP P2P discovery port (default 30303) - --fork-id bytesHex hex encoded fork ID (omit 0x) (default F097BC13) - --genesis-hash string genesis block hash (default "0xa9c28ce2141b56c474f1dc504bee9b01eb1bd7d1a507580d5519d4437a97de1b") - -h, --help help for sensor - --key string hex-encoded private key (cannot be set with --key-file) - -k, --key-file string private key file (cannot be set with --key) - --max-blocks int maximum blocks to track across all peers (0 for no limit) (default 1024) - -D, --max-db-concurrency int maximum number of concurrent database operations to perform (increasing this - will result in less chance of missing data but can significantly increase memory usage) (default 10000) - --max-parents int maximum parent block hashes to track per peer (0 for no limit) (default 1024) - -m, --max-peers int maximum number of peers to connect to (default 2000) - --max-requests int maximum request IDs to track per peer (0 for no limit) (default 2048) - --nat string NAT port mapping mechanism (any|none|upnp|pmp|pmp:|extip:) (default "any") - -n, --network-id uint filter discovered nodes by this network ID - --no-discovery disable P2P peer discovery - --parents-cache-ttl duration time to live for parent hash cache entries (0 for no expiration) (default 5m0s) - --port int TCP network listening port (default 30303) - --pprof run pprof server - --pprof-port uint port pprof runs on (default 6060) - -p, --project-id string GCP project ID - --prom run Prometheus server (default true) - --prom-port uint port Prometheus runs on (default 2112) - --requests-cache-ttl duration time to live for requests cache entries (0 for no expiration) (default 5m0s) - --rpc string RPC endpoint used to fetch latest block (default "https://polygon-rpc.com") - --rpc-port uint port for JSON-RPC server to receive transactions (default 8545) - -s, --sensor-id string sensor ID when writing block/tx events - --static-nodes string static nodes file - --trusted-nodes string trusted nodes file - --ttl duration time to live (default 336h0m0s) - --write-block-events write block events to database (default true) - -B, --write-blocks write blocks to database (default true) - --write-peers write peers to database (default true) - --write-tx-events write transaction events to database (this option can significantly increase CPU and memory usage) (default true) - -t, --write-txs write transactions to database (this option can significantly increase CPU and memory usage) (default true) + --api-port uint port API server will listen on (default 8080) + --blocks-cache-ttl duration time to live for block cache entries (0 for no expiration) (default 10m0s) + -b, --bootnodes string comma separated nodes used for bootstrapping + --broadcast-block-hashes broadcast block hashes to peers + --broadcast-blocks broadcast full blocks to peers + --broadcast-tx broadcast full transactions to peers + --broadcast-tx-hashes broadcast transaction hashes to peers + --database string which database to persist data to, options are: + - datastore (GCP Datastore) + - json (output to stdout) + - none (no persistence) (default "none") + -d, --database-id string datastore database ID + --dial-ratio int ratio of inbound to dialed connections (dial ratio of 2 allows 1/2 of connections to be dialed, setting to 0 defaults to 3) + --discovery-dns string DNS discovery ENR tree URL + --discovery-port int UDP P2P discovery port (default 30303) + --fork-id bytesHex hex encoded fork ID (omit 0x) (default F097BC13) + --genesis-hash string genesis block hash (default "0xa9c28ce2141b56c474f1dc504bee9b01eb1bd7d1a507580d5519d4437a97de1b") + -h, --help help for sensor + --key string hex-encoded private key (cannot be set with --key-file) + -k, --key-file string private key file (cannot be set with --key) + --known-blocks-cache-ttl duration time to live for known block cache entries (0 for no expiration) (default 5m0s) + --known-txs-cache-ttl duration time to live for known transaction cache entries (0 for no expiration) (default 5m0s) + --max-blocks int maximum blocks to track across all peers (0 for no limit) (default 1024) + -D, --max-db-concurrency int maximum number of concurrent database operations to perform (increasing this + will result in less chance of missing data but can significantly increase memory usage) (default 10000) + --max-known-blocks int maximum block hashes to track per peer (0 for no limit) (default 1024) + --max-known-txs int maximum transaction hashes to track per peer (0 for no limit) (default 8192) + --max-parents int maximum parent block hashes to track per peer (0 for no limit) (default 1024) + -m, --max-peers int maximum number of peers to connect to (default 2000) + --max-requests int maximum request IDs to track per peer (0 for no limit) (default 2048) + --max-txs int maximum transactions to cache for serving to peers (0 for no limit) (default 8192) + --nat string NAT port mapping mechanism (any|none|upnp|pmp|pmp:|extip:) (default "any") + -n, --network-id uint filter discovered nodes by this network ID + --no-discovery disable P2P peer discovery + --parents-cache-ttl duration time to live for parent hash cache entries (0 for no expiration) (default 5m0s) + --port int TCP network listening port (default 30303) + --pprof run pprof server + --pprof-port uint port pprof runs on (default 6060) + -p, --project-id string GCP project ID + --prom run Prometheus server (default true) + --prom-port uint port Prometheus runs on (default 2112) + --requests-cache-ttl duration time to live for requests cache entries (0 for no expiration) (default 5m0s) + --rpc string RPC endpoint used to fetch latest block (default "https://polygon-rpc.com") + --rpc-port uint port for JSON-RPC server to receive transactions (default 8545) + -s, --sensor-id string sensor ID when writing block/tx events + --static-nodes string static nodes file + --trusted-nodes string trusted nodes file + --ttl duration time to live (default 336h0m0s) + --txs-cache-ttl duration time to live for transaction cache entries (0 for no expiration) (default 10m0s) + --write-block-events write block events to database (default true) + -B, --write-blocks write blocks to database (default true) + --write-peers write peers to database (default true) + --write-tx-events write transaction events to database (this option can significantly increase CPU and memory usage) (default true) + -t, --write-txs write transactions to database (this option can significantly increase CPU and memory usage) (default true) ``` The command also inherits flags from parent commands. diff --git a/p2p/conns.go b/p2p/conns.go index c0e313f08..c905a3aa5 100644 --- a/p2p/conns.go +++ b/p2p/conns.go @@ -10,6 +10,7 @@ import ( "github.com/ethereum/go-ethereum/eth/protocols/eth" ethp2p "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/rs/zerolog/log" ) // BlockCache stores the actual block data to avoid duplicate fetches and database queries. @@ -19,6 +20,19 @@ type BlockCache struct { TD *big.Int } +// ConnsOptions contains configuration options for creating a new Conns manager. +type ConnsOptions struct { + BlocksCache CacheOptions + TxsCache CacheOptions + KnownTxsCache CacheOptions + KnownBlocksCache CacheOptions + Head eth.NewBlockPacket + ShouldBroadcastTx bool + ShouldBroadcastTxHashes bool + ShouldBroadcastBlocks bool + ShouldBroadcastBlockHashes bool +} + // Conns manages a collection of active peer connections for transaction broadcasting. // It also maintains a global cache of blocks written to the database. type Conns struct { @@ -29,18 +43,25 @@ type Conns struct { // to avoid duplicate writes and requests. blocks *Cache[common.Hash, BlockCache] + // txs caches transactions for serving to peers and duplicate detection + txs *Cache[common.Hash, *types.Transaction] + + // knownTxsOpts and knownBlocksOpts store cache options for per-peer caches + knownTxsOpts CacheOptions + knownBlocksOpts CacheOptions + // oldest stores the first block the sensor has seen so when fetching // parent blocks, it does not request blocks older than this. oldest *Locked[*types.Header] // head keeps track of the current head block of the chain. head *Locked[eth.NewBlockPacket] -} -// ConnsOptions contains configuration options for creating a new Conns manager. -type ConnsOptions struct { - BlocksCache CacheOptions - Head eth.NewBlockPacket + // Broadcast flags control what gets cached and rebroadcasted + shouldBroadcastTx bool + shouldBroadcastTxHashes bool + shouldBroadcastBlocks bool + shouldBroadcastBlockHashes bool } // NewConns creates a new connection manager with a blocks cache. @@ -52,38 +73,51 @@ func NewConns(opts ConnsOptions) *Conns { oldest.Set(opts.Head.Block.Header()) return &Conns{ - conns: make(map[string]*conn), - blocks: NewCache[common.Hash, BlockCache](opts.BlocksCache), - oldest: oldest, - head: head, + conns: make(map[string]*conn), + blocks: NewCache[common.Hash, BlockCache](opts.BlocksCache), + txs: NewCache[common.Hash, *types.Transaction](opts.TxsCache), + knownTxsOpts: opts.KnownTxsCache, + knownBlocksOpts: opts.KnownBlocksCache, + oldest: oldest, + head: head, + shouldBroadcastTx: opts.ShouldBroadcastTx, + shouldBroadcastTxHashes: opts.ShouldBroadcastTxHashes, + shouldBroadcastBlocks: opts.ShouldBroadcastBlocks, + shouldBroadcastBlockHashes: opts.ShouldBroadcastBlockHashes, } } -// Add adds a connection to the manager. -func (c *Conns) Add(cn *conn) { +// AddConn adds a connection to the manager. +func (c *Conns) AddConn(cn *conn) { c.mu.Lock() defer c.mu.Unlock() c.conns[cn.node.ID().String()] = cn cn.logger.Debug().Msg("Added connection") } -// Remove removes a connection from the manager when a peer disconnects. -func (c *Conns) Remove(cn *conn) { +// RemoveConn removes a connection from the manager when a peer disconnects. +func (c *Conns) RemoveConn(cn *conn) { c.mu.Lock() defer c.mu.Unlock() delete(c.conns, cn.node.ID().String()) cn.logger.Debug().Msg("Removed connection") } -// BroadcastTx broadcasts a single transaction to all connected peers. -// Returns the number of peers the transaction was successfully sent to. +// BroadcastTx broadcasts a single transaction to all connected peers and +// returns the number of peers the transaction was successfully sent to. func (c *Conns) BroadcastTx(tx *types.Transaction) int { return c.BroadcastTxs(types.Transactions{tx}) } -// BroadcastTxs broadcasts multiple transactions to all connected peers. -// Returns the number of peers the transactions were successfully sent to. +// BroadcastTxs broadcasts multiple transactions to all connected peers, +// filtering out transactions that each peer already knows about, and returns +// the number of peers the transactions were successfully sent to. +// If broadcast flags are disabled, this is a no-op. func (c *Conns) BroadcastTxs(txs types.Transactions) int { + if !c.shouldBroadcastTx { + return 0 + } + c.mu.RLock() defer c.mu.RUnlock() @@ -93,12 +127,225 @@ func (c *Conns) BroadcastTxs(txs types.Transactions) int { count := 0 for _, cn := range c.conns { - if err := ethp2p.Send(cn.rw, eth.TransactionsMsg, txs); err != nil { + // Filter transactions this peer doesn't know about + unknownTxs := make(types.Transactions, 0, len(txs)) + for _, tx := range txs { + if !cn.hasKnownTx(tx.Hash()) { + unknownTxs = append(unknownTxs, tx) + } + } + + if len(unknownTxs) == 0 { + continue + } + + // Send as TransactionsPacket + packet := eth.TransactionsPacket(unknownTxs) + cn.countMsgSent(packet.Name(), float64(len(unknownTxs))) + if err := ethp2p.Send(cn.rw, eth.TransactionsMsg, packet); err != nil { + cn.logger.Debug(). + Err(err). + Msg("Failed to send transactions") + continue + } + + // Mark transactions as known for this peer + for _, tx := range unknownTxs { + cn.addKnownTx(tx.Hash()) + } + + count++ + } + + if count > 0 { + log.Debug(). + Int("peers", count). + Int("txs", len(txs)). + Msg("Broadcasted transactions") + } + + return count +} + +// BroadcastTxHashes broadcasts transaction hashes to peers that don't already +// know about them and returns the number of peers the hashes were successfully +// sent to. If broadcast flags are disabled, this is a no-op. +func (c *Conns) BroadcastTxHashes(hashes []common.Hash) int { + if !c.shouldBroadcastTxHashes { + return 0 + } + + c.mu.RLock() + defer c.mu.RUnlock() + + if len(hashes) == 0 { + return 0 + } + + count := 0 + for _, cn := range c.conns { + // Filter hashes this peer doesn't know about + unknownHashes := make([]common.Hash, 0, len(hashes)) + for _, hash := range hashes { + if !cn.hasKnownTx(hash) { + unknownHashes = append(unknownHashes, hash) + } + } + + if len(unknownHashes) == 0 { + continue + } + + // Send NewPooledTransactionHashesPacket + packet := eth.NewPooledTransactionHashesPacket{ + Types: make([]byte, len(unknownHashes)), + Sizes: make([]uint32, len(unknownHashes)), + Hashes: unknownHashes, + } + + cn.countMsgSent(packet.Name(), float64(len(unknownHashes))) + if err := ethp2p.Send(cn.rw, eth.NewPooledTransactionHashesMsg, packet); err != nil { + cn.logger.Debug(). + Err(err). + Msg("Failed to send transaction hashes") + continue + } + + // Mark hashes as known for this peer + for _, hash := range unknownHashes { + cn.addKnownTx(hash) + } + + count++ + } + + if count > 0 { + log.Debug(). + Int("peers", count). + Int("hashes", len(hashes)). + Msg("Broadcasted transaction hashes") + } + + return count +} + +// BroadcastBlock broadcasts a full block to peers that don't already know +// about it and returns the number of peers the block was successfully sent to. +// If broadcast flags are disabled, this is a no-op. +func (c *Conns) BroadcastBlock(block *types.Block, td *big.Int) int { + if !c.shouldBroadcastBlocks { + return 0 + } + + c.mu.RLock() + defer c.mu.RUnlock() + + if block == nil { + return 0 + } + + hash := block.Hash() + count := 0 + + for _, cn := range c.conns { + // Skip if peer already knows about this block + if cn.hasKnownBlock(hash) { continue } + + // Send NewBlockPacket + packet := eth.NewBlockPacket{ + Block: block, + TD: td, + } + + cn.countMsgSent(packet.Name(), 1) + if err := ethp2p.Send(cn.rw, eth.NewBlockMsg, &packet); err != nil { + cn.logger.Debug(). + Err(err). + Uint64("number", block.Number().Uint64()). + Msg("Failed to send block") + continue + } + + // Mark block as known for this peer + cn.addKnownBlock(hash) count++ } + if count > 0 { + log.Debug(). + Int("peers", count). + Uint64("number", block.NumberU64()). + Msg("Broadcasted block") + } + + return count +} + +// BroadcastBlockHashes broadcasts block hashes with their corresponding block +// numbers to peers that don't already know about them and returns the number +// of peers the hashes were successfully sent to. If broadcast flags are disabled, this is a no-op. +func (c *Conns) BroadcastBlockHashes(hashes []common.Hash, numbers []uint64) int { + if !c.shouldBroadcastBlockHashes { + return 0 + } + + c.mu.RLock() + defer c.mu.RUnlock() + + if len(hashes) == 0 || len(hashes) != len(numbers) { + return 0 + } + + count := 0 + + for _, cn := range c.conns { + // Filter hashes this peer doesn't know about + unknownHashes := make([]common.Hash, 0, len(hashes)) + unknownNumbers := make([]uint64, 0, len(numbers)) + + for i, hash := range hashes { + if !cn.hasKnownBlock(hash) { + unknownHashes = append(unknownHashes, hash) + unknownNumbers = append(unknownNumbers, numbers[i]) + } + } + + if len(unknownHashes) == 0 { + continue + } + + // Send NewBlockHashesPacket + packet := make(eth.NewBlockHashesPacket, len(unknownHashes)) + for i := range unknownHashes { + packet[i].Hash = unknownHashes[i] + packet[i].Number = unknownNumbers[i] + } + + cn.countMsgSent(packet.Name(), float64(len(unknownHashes))) + if err := ethp2p.Send(cn.rw, eth.NewBlockHashesMsg, packet); err != nil { + cn.logger.Debug(). + Err(err). + Msg("Failed to send block hashes") + continue + } + + // Mark hashes as known for this peer + for _, hash := range unknownHashes { + cn.addKnownBlock(hash) + } + + count++ + } + + if count > 0 { + log.Debug(). + Int("peers", count). + Int("hashes", len(hashes)). + Msg("Broadcasted block hashes") + } + return count } @@ -128,6 +375,16 @@ func (c *Conns) PeerConnectedAt(peerID string) time.Time { return time.Time{} } +// AddTx adds a transaction to the shared cache for duplicate detection and serving. +func (c *Conns) AddTx(hash common.Hash, tx *types.Transaction) { + c.txs.Add(hash, tx) +} + +// GetTx retrieves a transaction from the shared cache. +func (c *Conns) GetTx(hash common.Hash) (*types.Transaction, bool) { + return c.txs.Get(hash) +} + // Blocks returns the global blocks cache. func (c *Conns) Blocks() *Cache[common.Hash, BlockCache] { return c.blocks @@ -155,3 +412,33 @@ func (c *Conns) UpdateHeadBlock(packet eth.NewBlockPacket) bool { return current, false }) } + +// KnownTxsOpts returns the cache options for per-peer known tx caches. +func (c *Conns) KnownTxsOpts() CacheOptions { + return c.knownTxsOpts +} + +// KnownBlocksOpts returns the cache options for per-peer known block caches. +func (c *Conns) KnownBlocksOpts() CacheOptions { + return c.knownBlocksOpts +} + +// ShouldBroadcastTx returns whether full transaction broadcasting is enabled. +func (c *Conns) ShouldBroadcastTx() bool { + return c.shouldBroadcastTx +} + +// ShouldBroadcastTxHashes returns whether transaction hash broadcasting is enabled. +func (c *Conns) ShouldBroadcastTxHashes() bool { + return c.shouldBroadcastTxHashes +} + +// ShouldBroadcastBlocks returns whether full block broadcasting is enabled. +func (c *Conns) ShouldBroadcastBlocks() bool { + return c.shouldBroadcastBlocks +} + +// ShouldBroadcastBlockHashes returns whether block hash broadcasting is enabled. +func (c *Conns) ShouldBroadcastBlockHashes() bool { + return c.shouldBroadcastBlockHashes +} diff --git a/p2p/protocol.go b/p2p/protocol.go index d5afccf38..a230eb78f 100644 --- a/p2p/protocol.go +++ b/p2p/protocol.go @@ -53,6 +53,16 @@ type conn struct { // Cached values for prometheus labels to avoid repeated URLv4() calls peerURL string peerFullname string + + // Broadcast flags control what gets rebroadcasted to other peers + shouldBroadcastTx bool + shouldBroadcastTxHashes bool + shouldBroadcastBlocks bool + shouldBroadcastBlockHashes bool + + // Known caches track what this peer has seen to avoid redundant sends. + knownTxs *Cache[common.Hash, struct{}] + knownBlocks *Cache[common.Hash, struct{}] } // EthProtocolOptions is the options used when creating a new eth protocol. @@ -70,6 +80,12 @@ type EthProtocolOptions struct { // Cache configurations RequestsCache CacheOptions ParentsCache CacheOptions + + // Broadcast flags control what gets rebroadcasted to other peers + ShouldBroadcastTx bool + ShouldBroadcastTxHashes bool + ShouldBroadcastBlocks bool + ShouldBroadcastBlockHashes bool } // NewEthProtocol creates the new eth protocol. This will handle writing the @@ -82,20 +98,26 @@ func NewEthProtocol(version uint, opts EthProtocolOptions) ethp2p.Protocol { Run: func(p *ethp2p.Peer, rw ethp2p.MsgReadWriter) error { peerURL := p.Node().URLv4() c := &conn{ - sensorID: opts.SensorID, - node: p.Node(), - logger: log.With().Str("peer", peerURL).Logger(), - rw: rw, - db: opts.Database, - requests: NewCache[uint64, common.Hash](opts.RequestsCache), - requestNum: 0, - parents: NewCache[common.Hash, struct{}](opts.ParentsCache), - counter: opts.MsgCounter, - peer: p, - conns: opts.Conns, - connectedAt: time.Now(), - peerURL: peerURL, - peerFullname: p.Fullname(), + sensorID: opts.SensorID, + node: p.Node(), + logger: log.With().Str("peer", peerURL).Logger(), + rw: rw, + db: opts.Database, + requests: NewCache[uint64, common.Hash](opts.RequestsCache), + requestNum: 0, + parents: NewCache[common.Hash, struct{}](opts.ParentsCache), + counter: opts.MsgCounter, + peer: p, + conns: opts.Conns, + connectedAt: time.Now(), + peerURL: peerURL, + peerFullname: p.Fullname(), + shouldBroadcastTx: opts.ShouldBroadcastTx, + shouldBroadcastTxHashes: opts.ShouldBroadcastTxHashes, + shouldBroadcastBlocks: opts.ShouldBroadcastBlocks, + shouldBroadcastBlockHashes: opts.ShouldBroadcastBlockHashes, + knownTxs: NewCache[common.Hash, struct{}](opts.Conns.KnownTxsOpts()), + knownBlocks: NewCache[common.Hash, struct{}](opts.Conns.KnownBlocksOpts()), } head := c.conns.HeadBlock() @@ -113,8 +135,8 @@ func NewEthProtocol(version uint, opts EthProtocolOptions) ethp2p.Protocol { } // Send the connection object to the conns manager for RPC broadcasting - opts.Conns.Add(c) - defer opts.Conns.Remove(c) + opts.Conns.AddConn(c) + defer opts.Conns.RemoveConn(c) ctx := opts.Context @@ -222,6 +244,12 @@ func (c *conn) readStatus(packet *eth.StatusPacket68) error { return err } + defer func() { + if msgErr := msg.Discard(); msgErr != nil { + c.logger.Error().Err(msgErr).Msg("Failed to discard message") + } + }() + if msg.Code != eth.StatusMsg { return errors.New("expected status message code") } @@ -339,12 +367,16 @@ func (c *conn) handleNewBlockHashes(ctx context.Context, msg ethp2p.Msg) error { c.countMsgReceived(packet.Name(), float64(len(packet))) - // Collect unique hashes for database write. + // Collect unique hashes and numbers for database write and broadcasting. uniqueHashes := make([]common.Hash, 0, len(packet)) + uniqueNumbers := make([]uint64, 0, len(packet)) for _, entry := range packet { hash := entry.Hash + // Mark as known from this peer + c.addKnownBlock(hash) + // Check what parts of the block we already have cache, ok := c.conns.Blocks().Get(hash) if ok { @@ -361,16 +393,58 @@ func (c *conn) handleNewBlockHashes(ctx context.Context, msg ethp2p.Msg) error { c.conns.Blocks().Add(hash, BlockCache{}) uniqueHashes = append(uniqueHashes, hash) + uniqueNumbers = append(uniqueNumbers, entry.Number) } // Write only unique hashes to the database. - if len(uniqueHashes) > 0 { - c.db.WriteBlockHashes(ctx, c.node, uniqueHashes, tfs) + if len(uniqueHashes) == 0 { + return nil } + c.db.WriteBlockHashes(ctx, c.node, uniqueHashes, tfs) + + // Broadcast block hashes to other peers + c.conns.BroadcastBlockHashes(uniqueHashes, uniqueNumbers) + return nil } +// addKnownTx adds a transaction hash to the known tx cache. +func (c *conn) addKnownTx(hash common.Hash) { + if !c.shouldBroadcastTx && !c.shouldBroadcastTxHashes { + return + } + + c.knownTxs.Add(hash, struct{}{}) +} + +// addKnownBlock adds a block hash to the known block cache. +func (c *conn) addKnownBlock(hash common.Hash) { + if !c.shouldBroadcastBlocks && !c.shouldBroadcastBlockHashes { + return + } + + c.knownBlocks.Add(hash, struct{}{}) +} + +// hasKnownTx checks if a transaction hash is in the known tx cache. +func (c *conn) hasKnownTx(hash common.Hash) bool { + if !c.shouldBroadcastTx && !c.shouldBroadcastTxHashes { + return false + } + + return c.knownTxs.Contains(hash) +} + +// hasKnownBlock checks if a block hash is in the known block cache. +func (c *conn) hasKnownBlock(hash common.Hash) bool { + if !c.shouldBroadcastBlocks && !c.shouldBroadcastBlockHashes { + return false + } + + return c.knownBlocks.Contains(hash) +} + func (c *conn) handleTransactions(ctx context.Context, msg ethp2p.Msg) error { payload, err := io.ReadAll(msg.Payload) if err != nil { @@ -388,10 +462,26 @@ func (c *conn) handleTransactions(ctx context.Context, msg ethp2p.Msg) error { c.countMsgReceived((ð.TransactionsPacket{}).Name(), float64(len(txs))) + // Mark transactions as known from this peer + for _, tx := range txs { + c.addKnownTx(tx.Hash()) + } + if len(txs) > 0 { c.db.WriteTransactions(ctx, c.node, txs, tfs) } + // Cache transactions for duplicate detection and serving to peers + hashes := make([]common.Hash, len(txs)) + for i, tx := range txs { + c.conns.AddTx(tx.Hash(), tx) + hashes[i] = tx.Hash() + } + + // Broadcast transactions or hashes to other peers + c.conns.BroadcastTxs(types.Transactions(txs)) + c.conns.BroadcastTxHashes(hashes) + return nil } @@ -403,8 +493,17 @@ func (c *conn) handleGetBlockHeaders(msg ethp2p.Msg) error { c.countMsgReceived(request.Name(), 1) - response := ð.BlockHeadersPacket{RequestId: request.RequestId} - c.countMsgSent(response.Name(), 0) + // Try to serve from cache if we have the block + var headers []*types.Header + if cache, ok := c.conns.Blocks().Peek(request.Origin.Hash); ok && cache.Header != nil { + headers = []*types.Header{cache.Header} + } + + response := ð.BlockHeadersPacket{ + RequestId: request.RequestId, + BlockHeadersRequest: headers, + } + c.countMsgSent(response.Name(), float64(len(headers))) return ethp2p.Send(c.rw, eth.BlockHeadersMsg, response) } @@ -454,8 +553,19 @@ func (c *conn) handleGetBlockBodies(msg ethp2p.Msg) error { c.countMsgReceived(request.Name(), float64(len(request.GetBlockBodiesRequest))) - response := ð.BlockBodiesPacket{RequestId: request.RequestId} - c.countMsgSent(response.Name(), 0) + // Try to serve from cache + var bodies []*eth.BlockBody + for _, hash := range request.GetBlockBodiesRequest { + if cache, ok := c.conns.Blocks().Peek(hash); ok && cache.Body != nil { + bodies = append(bodies, cache.Body) + } + } + + response := ð.BlockBodiesPacket{ + RequestId: request.RequestId, + BlockBodiesResponse: bodies, + } + c.countMsgSent(response.Name(), float64(len(bodies))) return ethp2p.Send(c.rw, eth.BlockBodiesMsg, response) } @@ -532,6 +642,9 @@ func (c *conn) handleNewBlock(ctx context.Context, msg ethp2p.Msg) error { c.countMsgReceived(packet.Name(), 1) + // Mark block as known from this peer + c.addKnownBlock(hash) + // Set the head block if newer. if c.conns.UpdateHeadBlock(*packet) { c.logger.Info(). @@ -563,6 +676,13 @@ func (c *conn) handleNewBlock(ctx context.Context, msg ethp2p.Msg) error { TD: packet.TD, }) + // Broadcast block or block hash to other peers + c.conns.BroadcastBlock(packet.Block, packet.TD) + c.conns.BroadcastBlockHashes( + []common.Hash{hash}, + []uint64{packet.Block.Number().Uint64()}, + ) + return nil } @@ -574,8 +694,19 @@ func (c *conn) handleGetPooledTransactions(msg ethp2p.Msg) error { c.countMsgReceived(request.Name(), float64(len(request.GetPooledTransactionsRequest))) - response := ð.PooledTransactionsPacket{RequestId: request.RequestId} - c.countMsgSent(response.Name(), 0) + // Try to serve from cache + var txs []*types.Transaction + for _, hash := range request.GetPooledTransactionsRequest { + if tx, ok := c.conns.GetTx(hash); ok { + txs = append(txs, tx) + } + } + + response := ð.PooledTransactionsPacket{ + RequestId: request.RequestId, + PooledTransactionsResponse: txs, + } + c.countMsgSent(response.Name(), float64(len(txs))) return ethp2p.Send(c.rw, eth.PooledTransactionsMsg, response) } @@ -626,10 +757,26 @@ func (c *conn) handlePooledTransactions(ctx context.Context, msg ethp2p.Msg) err c.countMsgReceived(packet.Name(), float64(len(packet.PooledTransactionsResponse))) + // Mark transactions as known from this peer + for _, tx := range packet.PooledTransactionsResponse { + c.addKnownTx(tx.Hash()) + } + if len(packet.PooledTransactionsResponse) > 0 { c.db.WriteTransactions(ctx, c.node, packet.PooledTransactionsResponse, tfs) } + // Cache transactions for duplicate detection and serving to peers + hashes := make([]common.Hash, len(packet.PooledTransactionsResponse)) + for i, tx := range packet.PooledTransactionsResponse { + c.conns.AddTx(tx.Hash(), tx) + hashes[i] = tx.Hash() + } + + // Broadcast transactions or hashes to other peers + c.conns.BroadcastTxs(types.Transactions(packet.PooledTransactionsResponse)) + c.conns.BroadcastTxHashes(hashes) + return nil }