diff --git a/cmd/p2p/sensor/api.go b/cmd/p2p/sensor/api.go index c9802742..696c83de 100644 --- a/cmd/p2p/sensor/api.go +++ b/cmd/p2p/sensor/api.go @@ -18,6 +18,8 @@ import ( type peerData struct { Name string `json:"name"` ProtocolVersion uint `json:"protocol_version"` + BlockHash string `json:"block_hash,omitempty"` + BlockNumber uint64 `json:"block_number,omitempty"` Received p2p.MessageCount `json:"received"` Sent p2p.MessageCount `json:"sent"` PacketsReceived p2p.MessageCount `json:"packets_received"` @@ -85,9 +87,18 @@ func handleAPI(server *ethp2p.Server, conns *p2p.Conns) { continue } + // Get latest block info for this peer + blockHash, blockNumber := conns.GetPeerLatestBlock(peerID) + var blockHashStr string + if blockNumber > 0 { + blockHashStr = blockHash.Hex() + } + peers[url] = peerData{ Name: conns.GetPeerName(peerID), ProtocolVersion: conns.GetPeerVersion(peerID), + BlockHash: blockHashStr, + BlockNumber: blockNumber, Received: messages.Received, Sent: messages.Sent, PacketsReceived: messages.PacketsReceived, diff --git a/cmd/p2p/sensor/rpc.go b/cmd/p2p/sensor/rpc.go index 1263dea1..47394934 100644 --- a/cmd/p2p/sensor/rpc.go +++ b/cmd/p2p/sensor/rpc.go @@ -1,16 +1,20 @@ package sensor import ( + "bytes" "encoding/json" "fmt" "io" "math/big" "net/http" - "strings" "github.com/0xPolygon/polygon-cli/p2p" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/eth/protocols/eth" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/rs/zerolog/log" ) @@ -37,13 +41,60 @@ type rpcError struct { Data any `json:"data,omitempty"` } +// rpcProxy holds configuration for proxying RPC requests to an upstream server. +type rpcProxy struct { + rpcURL string + httpClient *http.Client +} + +// rpcParams holds shared parameters for processing JSON-RPC requests. +type rpcParams struct { + conns *p2p.Conns + chainID *big.Int + gpo *p2p.GasPriceOracle + proxy *rpcProxy + counter *prometheus.CounterVec +} + // handleRPC sets up the JSON-RPC server for receiving and broadcasting transactions. // It handles eth_sendRawTransaction requests, validates transaction signatures, // and broadcasts valid transactions to all connected peers. // Supports both single requests and batch requests per JSON-RPC 2.0 specification. +// If proxyRPC is enabled, unsupported methods are forwarded to the upstream rpcURL. func handleRPC(conns *p2p.Conns, networkID uint64) { // Use network ID as chain ID for signature validation chainID := new(big.Int).SetUint64(networkID) + gpo := p2p.NewGasPriceOracle(conns) + + counter := promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "sensor", + Subsystem: "rpc", + Name: "calls", + Help: "The number of RPC calls made", + }, + []string{"method", "proxied"}, + ) + + params := &rpcParams{ + conns: conns, + chainID: chainID, + gpo: gpo, + counter: counter, + } + + if inputSensorParams.ProxyRPC { + params.proxy = &rpcProxy{ + rpcURL: inputSensorParams.RPC, + httpClient: &http.Client{ + Timeout: inputSensorParams.ProxyRPCTimeout, + }, + } + log.Info(). + Str("rpc", inputSensorParams.RPC). + Dur("timeout", inputSensorParams.ProxyRPCTimeout). + Msg("RPC proxy enabled") + } mux := http.NewServeMux() mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { @@ -57,17 +108,12 @@ func handleRPC(conns *p2p.Conns, networkID uint64) { writeError(w, -32700, "Parse error", nil) return } - defer func() { - if err := r.Body.Close(); err != nil { - log.Debug().Err(err).Msg("Failed to close request body") - } - }() // Check if this is a batch request (starts with '[') or single request - trimmed := strings.TrimSpace(string(body)) + trimmed := bytes.TrimSpace(body) if len(trimmed) > 0 && trimmed[0] == '[' { // Handle batch request - handleBatchRequest(w, body, conns, chainID) + handleBatchRequest(w, r, body, params) return } @@ -78,14 +124,31 @@ func handleRPC(conns *p2p.Conns, networkID uint64) { return } - // Handle eth_sendRawTransaction - if req.Method == "eth_sendRawTransaction" { - handleSendRawTransaction(w, req, conns, chainID) + // Process request + var txs types.Transactions + resp := processRequest(req, params, &txs) + + // If method not found and proxy is enabled, forward to upstream + if isMethodNotFound(resp) && params.proxy != nil { + params.counter.WithLabelValues(req.Method, "true").Inc() + proxyRPCRequest(w, r, body, params.proxy) return } - // Method not found - writeError(w, -32601, "Method not found", req.ID) + params.counter.WithLabelValues(req.Method, "false").Inc() + + // Broadcast any transactions (always broadcast RPC-submitted transactions) + if len(txs) > 0 { + log.Info().Str("hash", txs[0].Hash().Hex()).Msg("Broadcasting transaction") + count := params.conns.BroadcastTxsAlways(txs) + log.Info().Str("hash", txs[0].Hash().Hex()).Int("peers", count).Msg("Transaction broadcast complete") + } + + // Write response + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(&resp); err != nil { + log.Error().Err(err).Msg("Failed to encode response") + } }) addr := fmt.Sprintf(":%d", inputSensorParams.RPCPort) @@ -111,23 +174,49 @@ func writeError(w http.ResponseWriter, code int, message string, id any) { } } -// writeResult writes a JSON-RPC 2.0 success response with the specified result and request ID. -func writeResult(w http.ResponseWriter, result any, id any) { - w.Header().Set("Content-Type", "application/json") - response := rpcResponse{ - JSONRPC: "2.0", - Result: result, - ID: id, +// proxyRPCRequest forwards a JSON-RPC request to the upstream RPC server and streams +// the response back to the client. Used for methods not handled locally. +func proxyRPCRequest(w http.ResponseWriter, r *http.Request, body []byte, config *rpcProxy) { + req, err := http.NewRequestWithContext(r.Context(), http.MethodPost, config.rpcURL, bytes.NewReader(body)) + if err != nil { + log.Error().Err(err).Msg("Failed to create proxy request") + writeError(w, -32603, "Internal error: failed to create proxy request", nil) + return } - if err := json.NewEncoder(w).Encode(response); err != nil { - log.Error().Err(err).Msg("Failed to encode result response") + req.Header.Set("Content-Type", "application/json") + + resp, err := config.httpClient.Do(req) + if err != nil { + log.Error().Err(err).Str("rpc", config.rpcURL).Msg("Proxy request failed") + if r.Context().Err() != nil { + writeError(w, -32603, "Request cancelled or timed out", nil) + return + } + writeError(w, -32603, fmt.Sprintf("Upstream RPC error: %v", err), nil) + return + } + defer func() { _ = resp.Body.Close() }() + + // Copy response headers and status + for key, values := range resp.Header { + for _, value := range values { + w.Header().Add(key, value) + } + } + w.WriteHeader(resp.StatusCode) + + // Stream response body + if _, err := io.Copy(w, resp.Body); err != nil { + log.Error().Err(err).Msg("Failed to copy proxy response") } } -// handleBatchRequest processes JSON-RPC 2.0 batch requests, validates all transactions, -// and broadcasts valid transactions to connected peers. Returns a batch response with -// results or errors for each request in the batch. -func handleBatchRequest(w http.ResponseWriter, body []byte, conns *p2p.Conns, chainID *big.Int) { +// handleBatchRequest processes JSON-RPC 2.0 batch requests. +// For eth_sendRawTransaction requests, it collects valid transactions for batch broadcasting. +// Returns a batch response with results or errors for each request. +// If proxyConfig is provided and the batch contains unsupported methods, +// the entire batch is forwarded to the upstream RPC server for simplicity. +func handleBatchRequest(w http.ResponseWriter, r *http.Request, body []byte, params *rpcParams) { // Parse batch of requests var requests []rpcRequest if err := json.Unmarshal(body, &requests); err != nil { @@ -141,45 +230,35 @@ func handleBatchRequest(w http.ResponseWriter, body []byte, conns *p2p.Conns, ch return } - // Process all requests and collect valid transactions for batch broadcasting + // Process all requests responses := make([]rpcResponse, 0, len(requests)) txs := make(types.Transactions, 0) for _, req := range requests { - if req.Method != "eth_sendRawTransaction" { - responses = append(responses, rpcResponse{ - JSONRPC: "2.0", - Error: &rpcError{ - Code: -32601, - Message: "Method not found", - }, - ID: req.ID, - }) - continue - } + resp := processRequest(req, params, &txs) - tx, response := validateTx(req, chainID) - if tx == nil { - responses = append(responses, response) - continue + // If any method not found and proxy is enabled, forward entire batch + if isMethodNotFound(resp) && params.proxy != nil { + for _, rpcReq := range requests { + params.counter.WithLabelValues(rpcReq.Method, "true").Inc() + } + proxyRPCRequest(w, r, body, params.proxy) + return } - txs = append(txs, tx) - responses = append(responses, rpcResponse{ - JSONRPC: "2.0", - Result: tx.Hash().Hex(), - ID: req.ID, - }) + params.counter.WithLabelValues(req.Method, "false").Inc() + responses = append(responses, resp) } // Broadcast all valid transactions in a single batch if there are any + // (always broadcast RPC-submitted transactions) if len(txs) > 0 { log.Info(). Int("txs", len(txs)). Int("requests", len(requests)). Msg("Broadcasting batch of transactions") - count := conns.BroadcastTxs(txs) + count := params.conns.BroadcastTxsAlways(txs) log.Info(). Int("txs", len(txs)). @@ -194,76 +273,137 @@ func handleBatchRequest(w http.ResponseWriter, body []byte, conns *p2p.Conns, ch } } +// newResultResponse creates a success response. +func newResultResponse(result, id any) rpcResponse { + return rpcResponse{JSONRPC: "2.0", Result: result, ID: id} +} + +// newErrorResponse creates an error response. +func newErrorResponse(err *rpcError, id any) rpcResponse { + return rpcResponse{JSONRPC: "2.0", Error: err, ID: id} +} + +const rpcMethodNotFoundCode = -32601 + +// newMethodNotFoundResponse creates a method not found error response. +func newMethodNotFoundResponse(id any) rpcResponse { + return rpcResponse{ + JSONRPC: "2.0", + Error: &rpcError{Code: rpcMethodNotFoundCode, Message: "Method not found"}, + ID: id, + } +} + +// isMethodNotFound returns true if the response is a method not found error. +func isMethodNotFound(resp rpcResponse) bool { + return resp.Error != nil && resp.Error.Code == rpcMethodNotFoundCode +} + +// processRequest handles a single RPC request and returns a response. +// For eth_sendRawTransaction, valid transactions are appended to txs for batch broadcasting. +// Returns a method not found response if the method is not handled locally. +func processRequest(req rpcRequest, params *rpcParams, txs *types.Transactions) rpcResponse { + switch req.Method { + case "eth_sendRawTransaction": + tx, errResp := validateTx(req, params.chainID) + if tx == nil { + return errResp + } + if txs != nil { + *txs = append(*txs, tx) + } + return newResultResponse(tx.Hash().Hex(), req.ID) + + case "eth_chainId": + return newResultResponse(hexutil.EncodeBig(params.chainID), req.ID) + + case "eth_blockNumber": + head := params.conns.HeadBlock() + if head.Block == nil { + return newResultResponse(nil, req.ID) + } + return newResultResponse(hexutil.EncodeUint64(head.Block.NumberU64()), req.ID) + + case "eth_gasPrice": + return newResultResponse(hexutil.EncodeBig(params.gpo.SuggestGasPrice()), req.ID) + + case "eth_maxPriorityFeePerGas": + tip := params.gpo.SuggestGasTipCap() + if tip == nil { + tip = big.NewInt(1e9) // Default to 1 gwei + } + return newResultResponse(hexutil.EncodeBig(tip), req.ID) + + case "eth_getBlockByHash": + result, err := getBlockByHash(req, params.conns) + return handleMethodResult(result, err, req.ID) + + case "eth_getBlockByNumber": + result, err := getBlockByNumber(req, params.conns) + return handleMethodResult(result, err, req.ID) + + case "eth_getTransactionByHash": + result, err := getTransactionByHash(req, params.conns) + return handleMethodResult(result, err, req.ID) + + case "eth_getTransactionByBlockHashAndIndex": + result, err := getTransactionByBlockHashAndIndex(req, params.conns) + return handleMethodResult(result, err, req.ID) + + case "eth_getBlockTransactionCountByHash": + result, err := getBlockTransactionCountByHash(req, params.conns) + return handleMethodResult(result, err, req.ID) + + case "eth_getUncleCountByBlockHash": + result, err := getUncleCountByBlockHash(req, params.conns) + return handleMethodResult(result, err, req.ID) + + default: + return newMethodNotFoundResponse(req.ID) + } +} + +// handleMethodResult converts a method's result and error into an rpcResponse. +func handleMethodResult(result any, err *rpcError, id any) rpcResponse { + if err != nil { + return newErrorResponse(err, id) + } + return newResultResponse(result, id) +} + // validateTx validates a transaction from a JSON-RPC request by decoding the raw // transaction hex, unmarshaling it, and verifying the signature. Returns the transaction if valid // (with an empty response), or nil transaction with an error response if validation fails. func validateTx(req rpcRequest, chainID *big.Int) (*types.Transaction, rpcResponse) { - // Check params + invalidParams := func(msg string) rpcResponse { + return newErrorResponse(&rpcError{Code: -32602, Message: msg}, req.ID) + } + if len(req.Params) == 0 { - return nil, rpcResponse{ - JSONRPC: "2.0", - Error: &rpcError{ - Code: -32602, - Message: "Invalid params: missing raw transaction", - }, - ID: req.ID, - } + return nil, invalidParams("Invalid params: missing raw transaction") } - // Extract raw transaction hex string hex, ok := req.Params[0].(string) if !ok { - return nil, rpcResponse{ - JSONRPC: "2.0", - Error: &rpcError{ - Code: -32602, - Message: "Invalid params: raw transaction must be a hex string", - }, - ID: req.ID, - } + return nil, invalidParams("Invalid params: raw transaction must be a hex string") } - // Decode hex string to bytes bytes, err := hexutil.Decode(hex) if err != nil { - return nil, rpcResponse{ - JSONRPC: "2.0", - Error: &rpcError{ - Code: -32602, - Message: fmt.Sprintf("Invalid transaction hex: %v", err), - }, - ID: req.ID, - } + return nil, invalidParams(fmt.Sprintf("Invalid transaction hex: %v", err)) } - // Unmarshal transaction tx := new(types.Transaction) if err = tx.UnmarshalBinary(bytes); err != nil { - return nil, rpcResponse{ - JSONRPC: "2.0", - Error: &rpcError{ - Code: -32602, - Message: fmt.Sprintf("Invalid transaction encoding: %v", err), - }, - ID: req.ID, - } + return nil, invalidParams(fmt.Sprintf("Invalid transaction encoding: %v", err)) } - // Validate transaction signature signer := types.LatestSignerForChainID(chainID) sender, err := types.Sender(signer, tx) if err != nil { - return nil, rpcResponse{ - JSONRPC: "2.0", - Error: &rpcError{ - Code: -32602, - Message: fmt.Sprintf("Invalid transaction signature: %v", err), - }, - ID: req.ID, - } + return nil, invalidParams(fmt.Sprintf("Invalid transaction signature: %v", err)) } - // Log the transaction to := "nil" if tx.To() != nil { to = tx.To().Hex() @@ -280,26 +420,366 @@ func validateTx(req rpcRequest, chainID *big.Int) (*types.Transaction, rpcRespon return tx, rpcResponse{} } -// handleSendRawTransaction processes eth_sendRawTransaction requests, validates the -// transaction, broadcasts it to all connected peers, and writes the transaction hash -// as a JSON-RPC response. -func handleSendRawTransaction(w http.ResponseWriter, req rpcRequest, conns *p2p.Conns, chainID *big.Int) { - tx, response := validateTx(req, chainID) - if tx == nil { - writeError(w, response.Error.Code, response.Error.Message, response.ID) - return +// parseFullTxParam extracts the fullTx boolean from params[1], defaulting to false. +func parseFullTxParam(params []any) bool { + if len(params) >= 2 { + if fullTx, ok := params[1].(bool); ok { + return fullTx + } + } + return false +} + +// getBlockByHash retrieves a block by its hash from the cache. +func getBlockByHash(req rpcRequest, conns *p2p.Conns) (any, *rpcError) { + if len(req.Params) < 1 { + return nil, &rpcError{Code: -32602, Message: "missing block hash parameter"} } - log.Info(). - Str("hash", tx.Hash().Hex()). - Msg("Broadcasting transaction") + hashStr, ok := req.Params[0].(string) + if !ok { + return nil, &rpcError{Code: -32602, Message: "invalid block hash parameter"} + } - count := conns.BroadcastTx(tx) + hash := common.HexToHash(hashStr) + cache, ok := conns.Blocks().Get(hash) + if !ok { + return nil, nil // Return null for not found (per spec) + } - log.Info(). - Str("hash", tx.Hash().Hex()). - Int("peers", count). - Msg("Transaction broadcast complete") + return formatBlockResponse(hash, cache, parseFullTxParam(req.Params)), nil +} + +// getBlockByNumber retrieves a block by its number from the cache. +func getBlockByNumber(req rpcRequest, conns *p2p.Conns) (any, *rpcError) { + if len(req.Params) < 1 { + return nil, &rpcError{Code: -32602, Message: "missing block number parameter"} + } + + blockNumParam, ok := req.Params[0].(string) + if !ok { + return nil, &rpcError{Code: -32602, Message: "invalid block number parameter"} + } + + var hash common.Hash + var cache p2p.BlockCache + var found bool + + switch blockNumParam { + case "latest", "pending": + head := conns.HeadBlock() + if head.Block == nil { + return nil, nil + } + hash = head.Block.Hash() + cache, found = conns.Blocks().Get(hash) + if !found { + // Construct cache from head block + cache = p2p.BlockCache{ + Header: head.Block.Header(), + Body: ð.BlockBody{ + Transactions: head.Block.Transactions(), + Uncles: head.Block.Uncles(), + }, + TD: head.TD, + } + found = true + } + case "earliest": + hash, cache, found = conns.GetBlockByNumber(0) + default: + num, err := hexutil.DecodeUint64(blockNumParam) + if err != nil { + return nil, &rpcError{Code: -32602, Message: "invalid block number: " + err.Error()} + } + hash, cache, found = conns.GetBlockByNumber(num) + } + + if !found { + return nil, nil + } + + return formatBlockResponse(hash, cache, parseFullTxParam(req.Params)), nil +} + +// getTransactionByHash retrieves a transaction by its hash from the cache. +func getTransactionByHash(req rpcRequest, conns *p2p.Conns) (any, *rpcError) { + if len(req.Params) < 1 { + return nil, &rpcError{Code: -32602, Message: "missing transaction hash parameter"} + } + + hashStr, ok := req.Params[0].(string) + if !ok { + return nil, &rpcError{Code: -32602, Message: "invalid transaction hash parameter"} + } + + hash := common.HexToHash(hashStr) + + // First check the transactions cache + tx, ok := conns.GetTx(hash) + if ok { + return formatTransactionResponse(tx, common.Hash{}, nil, 0), nil + } + + // Search in blocks for the transaction + for _, blockHash := range conns.Blocks().Keys() { + cache, ok := conns.Blocks().Peek(blockHash) + if !ok || cache.Body == nil { + continue + } + for i, tx := range cache.Body.Transactions { + if tx.Hash() == hash { + return formatTransactionResponse(tx, blockHash, cache.Header, uint64(i)), nil + } + } + } + + return nil, nil +} + +// getTransactionByBlockHashAndIndex retrieves a transaction by block hash and index. +func getTransactionByBlockHashAndIndex(req rpcRequest, conns *p2p.Conns) (any, *rpcError) { + if len(req.Params) < 2 { + return nil, &rpcError{Code: -32602, Message: "missing block hash or index parameter"} + } + + hashStr, ok := req.Params[0].(string) + if !ok { + return nil, &rpcError{Code: -32602, Message: "invalid block hash parameter"} + } + + indexStr, ok := req.Params[1].(string) + if !ok { + return nil, &rpcError{Code: -32602, Message: "invalid index parameter"} + } + + index, err := hexutil.DecodeUint64(indexStr) + if err != nil { + return nil, &rpcError{Code: -32602, Message: "invalid index: " + err.Error()} + } + + blockHash := common.HexToHash(hashStr) + cache, ok := conns.Blocks().Get(blockHash) + if !ok || cache.Body == nil { + return nil, nil + } + + if int(index) >= len(cache.Body.Transactions) { + return nil, nil + } + + tx := cache.Body.Transactions[index] + return formatTransactionResponse(tx, blockHash, cache.Header, index), nil +} + +// getBlockCacheByHashParam parses a block hash from params[0] and returns the block cache. +// Returns the cache and nil error on success, or nil cache and error on parse failure. +// If the block is not found, returns nil cache with nil error (per JSON-RPC spec). +func getBlockCacheByHashParam(req rpcRequest, conns *p2p.Conns) (p2p.BlockCache, *rpcError) { + if len(req.Params) < 1 { + return p2p.BlockCache{}, &rpcError{Code: -32602, Message: "missing block hash parameter"} + } + + hashStr, ok := req.Params[0].(string) + if !ok { + return p2p.BlockCache{}, &rpcError{Code: -32602, Message: "invalid block hash parameter"} + } + + hash := common.HexToHash(hashStr) + cache, ok := conns.Blocks().Get(hash) + if !ok || cache.Body == nil { + return p2p.BlockCache{}, nil + } + + return cache, nil +} + +// getBlockTransactionCountByHash returns the transaction count in a block. +func getBlockTransactionCountByHash(req rpcRequest, conns *p2p.Conns) (any, *rpcError) { + cache, err := getBlockCacheByHashParam(req, conns) + if err != nil || cache.Body == nil { + return nil, err + } + return hexutil.EncodeUint64(uint64(len(cache.Body.Transactions))), nil +} + +// getUncleCountByBlockHash returns the uncle count in a block. +func getUncleCountByBlockHash(req rpcRequest, conns *p2p.Conns) (any, *rpcError) { + cache, err := getBlockCacheByHashParam(req, conns) + if err != nil || cache.Body == nil { + return nil, err + } + return hexutil.EncodeUint64(uint64(len(cache.Body.Uncles))), nil +} + +// formatBlockResponse formats a block cache into the Ethereum JSON-RPC block format. +func formatBlockResponse(hash common.Hash, cache p2p.BlockCache, fullTx bool) map[string]any { + header := cache.Header + if header == nil { + return nil + } + + result := map[string]any{ + "hash": hash.Hex(), + "number": hexutil.EncodeUint64(header.Number.Uint64()), + "parentHash": header.ParentHash.Hex(), + "nonce": hexutil.Encode(header.Nonce[:]), + "sha3Uncles": header.UncleHash.Hex(), + "logsBloom": hexutil.Encode(header.Bloom.Bytes()), + "transactionsRoot": header.TxHash.Hex(), + "stateRoot": header.Root.Hex(), + "receiptsRoot": header.ReceiptHash.Hex(), + "miner": header.Coinbase.Hex(), + "difficulty": hexutil.EncodeBig(header.Difficulty), + "extraData": hexutil.Encode(header.Extra), + "gasLimit": hexutil.EncodeUint64(header.GasLimit), + "gasUsed": hexutil.EncodeUint64(header.GasUsed), + "timestamp": hexutil.EncodeUint64(header.Time), + "mixHash": header.MixDigest.Hex(), + } + + if header.BaseFee != nil { + result["baseFeePerGas"] = hexutil.EncodeBig(header.BaseFee) + } + + if header.WithdrawalsHash != nil { + result["withdrawalsRoot"] = header.WithdrawalsHash.Hex() + } + + if header.BlobGasUsed != nil { + result["blobGasUsed"] = hexutil.EncodeUint64(*header.BlobGasUsed) + } + + if header.ExcessBlobGas != nil { + result["excessBlobGas"] = hexutil.EncodeUint64(*header.ExcessBlobGas) + } + + if header.ParentBeaconRoot != nil { + result["parentBeaconBlockRoot"] = header.ParentBeaconRoot.Hex() + } + + // Add total difficulty (default to 0 if not available) + if cache.TD != nil { + result["totalDifficulty"] = hexutil.EncodeBig(cache.TD) + } else { + result["totalDifficulty"] = "0x0" + } + + // Add transactions + if cache.Body != nil && cache.Body.Transactions != nil { + if fullTx { + txs := make([]map[string]any, len(cache.Body.Transactions)) + for i, tx := range cache.Body.Transactions { + txs[i] = formatTransactionResponse(tx, hash, header, uint64(i)) + } + result["transactions"] = txs + } else { + txHashes := make([]string, len(cache.Body.Transactions)) + for i, tx := range cache.Body.Transactions { + txHashes[i] = tx.Hash().Hex() + } + result["transactions"] = txHashes + } + } else { + result["transactions"] = []string{} + } + + // Add uncles + if cache.Body != nil && cache.Body.Uncles != nil { + uncleHashes := make([]string, len(cache.Body.Uncles)) + for i, uncle := range cache.Body.Uncles { + uncleHashes[i] = uncle.Hash().Hex() + } + result["uncles"] = uncleHashes + } else { + result["uncles"] = []string{} + } + + // Add size (approximate based on header + body) + result["size"] = hexutil.EncodeUint64(0) // We don't have exact size; use 0 + + return result +} + +// formatTransactionResponse formats a transaction into the Ethereum JSON-RPC format. +// If blockHash is empty, the transaction is considered pending. +func formatTransactionResponse(tx *types.Transaction, blockHash common.Hash, header *types.Header, index uint64) map[string]any { + v, r, s := tx.RawSignatureValues() + + result := map[string]any{ + "hash": tx.Hash().Hex(), + "nonce": hexutil.EncodeUint64(tx.Nonce()), + "gas": hexutil.EncodeUint64(tx.Gas()), + "value": hexutil.EncodeBig(tx.Value()), + "input": hexutil.Encode(tx.Data()), + "v": hexutil.EncodeBig(v), + "r": hexutil.EncodeBig(r), + "s": hexutil.EncodeBig(s), + "type": hexutil.EncodeUint64(uint64(tx.Type())), + } + + if tx.To() != nil { + result["to"] = tx.To().Hex() + } else { + result["to"] = nil + } + + // Add from address if we can derive it + signer := types.LatestSignerForChainID(tx.ChainId()) + if from, err := types.Sender(signer, tx); err == nil { + result["from"] = from.Hex() + } + + // Set gas price fields based on transaction type + switch tx.Type() { + case types.LegacyTxType, types.AccessListTxType: + result["gasPrice"] = hexutil.EncodeBig(tx.GasPrice()) + case types.DynamicFeeTxType, types.BlobTxType: + result["maxFeePerGas"] = hexutil.EncodeBig(tx.GasFeeCap()) + result["maxPriorityFeePerGas"] = hexutil.EncodeBig(tx.GasTipCap()) + // For EIP-1559 txs, also set gasPrice to effective gas price if in a block + if header != nil && header.BaseFee != nil { + effectiveGasPrice := new(big.Int).Add(header.BaseFee, tx.GasTipCap()) + if effectiveGasPrice.Cmp(tx.GasFeeCap()) > 0 { + effectiveGasPrice = tx.GasFeeCap() + } + result["gasPrice"] = hexutil.EncodeBig(effectiveGasPrice) + } else { + result["gasPrice"] = hexutil.EncodeBig(tx.GasFeeCap()) + } + } + + // Add chain ID if present + if tx.ChainId() != nil { + result["chainId"] = hexutil.EncodeBig(tx.ChainId()) + } + + // Add yParity for typed transactions (EIP-2930+) + if tx.Type() != types.LegacyTxType { + result["yParity"] = hexutil.EncodeBig(v) + } + + // Add access list if present + if tx.AccessList() != nil { + result["accessList"] = tx.AccessList() + } + + // Add blob-specific fields + if tx.Type() == types.BlobTxType { + result["maxFeePerBlobGas"] = hexutil.EncodeBig(tx.BlobGasFeeCap()) + result["blobVersionedHashes"] = tx.BlobHashes() + } + + // Add block info if transaction is in a block + if blockHash != (common.Hash{}) && header != nil { + result["blockHash"] = blockHash.Hex() + result["blockNumber"] = hexutil.EncodeUint64(header.Number.Uint64()) + result["transactionIndex"] = hexutil.EncodeUint64(index) + } else { + result["blockHash"] = nil + result["blockNumber"] = nil + result["transactionIndex"] = nil + } - writeResult(w, tx.Hash().Hex(), req.ID) + return result } diff --git a/cmd/p2p/sensor/sensor.go b/cmd/p2p/sensor/sensor.go index f329e7b3..3c9fc32d 100644 --- a/cmd/p2p/sensor/sensor.go +++ b/cmd/p2p/sensor/sensor.go @@ -77,6 +77,8 @@ type ( DiscoveryDNS string Database string NoDiscovery bool + ProxyRPC bool + ProxyRPCTimeout time.Duration RequestsCache ds.LRUOptions ParentsCache ds.LRUOptions BlocksCache ds.LRUOptions @@ -501,6 +503,8 @@ will result in less chance of missing data but can significantly increase memory f.IntVar(&inputSensorParams.Port, "port", 30303, "TCP network listening port") f.IntVar(&inputSensorParams.DiscoveryPort, "discovery-port", 30303, "UDP P2P discovery port") f.StringVar(&inputSensorParams.RPC, "rpc", "https://polygon-rpc.com", "RPC endpoint used to fetch latest block") + f.BoolVar(&inputSensorParams.ProxyRPC, "proxy-rpc", false, "proxy unsupported RPC methods to the --rpc endpoint") + f.DurationVar(&inputSensorParams.ProxyRPCTimeout, "proxy-rpc-timeout", 30*time.Second, "timeout for proxied RPC requests") f.StringVar(&inputSensorParams.GenesisHash, "genesis-hash", "0xa9c28ce2141b56c474f1dc504bee9b01eb1bd7d1a507580d5519d4437a97de1b", "genesis block hash") f.BytesHexVar(&inputSensorParams.ForkID, "fork-id", []byte{34, 213, 35, 178}, "hex encoded fork ID (omit 0x)") f.IntVar(&inputSensorParams.DialRatio, "dial-ratio", 0, diff --git a/cmd/p2p/sensor/usage.md b/cmd/p2p/sensor/usage.md index c45a77c5..37109552 100644 --- a/cmd/p2p/sensor/usage.md +++ b/cmd/p2p/sensor/usage.md @@ -9,6 +9,34 @@ created automatically. The bootnodes may change, so refer to the [Polygon Knowledge Layer][bootnodes] if the sensor is not discovering peers. +## JSON-RPC Server + +The sensor runs a JSON-RPC server on port 8545 (configurable via `--rpc-port`) +that supports a subset of Ethereum JSON-RPC methods using cached data. + +### Supported Methods + +| Method | Description | +|--------|-------------| +| `eth_chainId` | Returns the chain ID | +| `eth_blockNumber` | Returns the current head block number | +| `eth_gasPrice` | Returns suggested gas price based on recent blocks | +| `eth_getBlockByHash` | Returns block by hash | +| `eth_getBlockByNumber` | Returns block by number (if cached) | +| `eth_getTransactionByHash` | Returns transaction by hash | +| `eth_getTransactionByBlockHashAndIndex` | Returns transaction at index in block | +| `eth_getBlockTransactionCountByHash` | Returns transaction count in block | +| `eth_getUncleCountByBlockHash` | Returns uncle count in block | +| `eth_sendRawTransaction` | Broadcasts signed transaction to peers | + +### Limitations + +Methods requiring state or receipts are not supported: +- `eth_getBalance`, `eth_getCode`, `eth_call`, `eth_estimateGas` +- `eth_getTransactionReceipt`, `eth_getLogs` + +Data is served from an LRU cache, so older blocks/transactions may not be available. + ## Metrics The sensor exposes Prometheus metrics at `http://localhost:2112/metrics` diff --git a/doc/polycli_p2p_sensor.md b/doc/polycli_p2p_sensor.md index 57b64fbf..f408f218 100644 --- a/doc/polycli_p2p_sensor.md +++ b/doc/polycli_p2p_sensor.md @@ -30,6 +30,34 @@ created automatically. The bootnodes may change, so refer to the [Polygon Knowledge Layer][bootnodes] if the sensor is not discovering peers. +## JSON-RPC Server + +The sensor runs a JSON-RPC server on port 8545 (configurable via `--rpc-port`) +that supports a subset of Ethereum JSON-RPC methods using cached data. + +### Supported Methods + +| Method | Description | +|--------|-------------| +| `eth_chainId` | Returns the chain ID | +| `eth_blockNumber` | Returns the current head block number | +| `eth_gasPrice` | Returns suggested gas price based on recent blocks | +| `eth_getBlockByHash` | Returns block by hash | +| `eth_getBlockByNumber` | Returns block by number (if cached) | +| `eth_getTransactionByHash` | Returns transaction by hash | +| `eth_getTransactionByBlockHashAndIndex` | Returns transaction at index in block | +| `eth_getBlockTransactionCountByHash` | Returns transaction count in block | +| `eth_getUncleCountByBlockHash` | Returns uncle count in block | +| `eth_sendRawTransaction` | Broadcasts signed transaction to peers | + +### Limitations + +Methods requiring state or receipts are not supported: +- `eth_getBalance`, `eth_getCode`, `eth_call`, `eth_estimateGas` +- `eth_getTransactionReceipt`, `eth_getLogs` + +Data is served from an LRU cache, so older blocks/transactions may not be available. + ## Metrics The sensor exposes Prometheus metrics at `http://localhost:2112/metrics` @@ -132,6 +160,8 @@ polycli p2p sensor amoy-nodes.json \ -p, --project-id string GCP project ID --prom run Prometheus server (default true) --prom-port uint port Prometheus runs on (default 2112) + --proxy-rpc proxy unsupported RPC methods to the --rpc endpoint + --proxy-rpc-timeout duration timeout for proxied RPC requests (default 30s) --requests-cache-ttl duration time to live for requests cache entries (0 for no expiration) (default 5m0s) --rpc string RPC endpoint used to fetch latest block (default "https://polygon-rpc.com") --rpc-port uint port for JSON-RPC server to receive transactions (default 8545) diff --git a/p2p/conns.go b/p2p/conns.go index 7690cd62..96b066f7 100644 --- a/p2p/conns.go +++ b/p2p/conns.go @@ -3,6 +3,7 @@ package p2p import ( "math/big" "sync" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" @@ -106,6 +107,19 @@ func (c *Conns) Remove(cn *conn) { cn.logger.Debug().Msg("Removed connection") } +// snapshotPeers returns a copy of current peer connections. +// The caller can safely iterate without holding the lock. +func (c *Conns) snapshotPeers() []*conn { + c.mu.RLock() + defer c.mu.RUnlock() + + peers := make([]*conn, 0, len(c.conns)) + for _, cn := range c.conns { + peers = append(peers, cn) + } + return peers +} + // BroadcastTx broadcasts a single transaction to all connected peers. // Returns the number of peers the transaction was successfully sent to. func (c *Conns) BroadcastTx(tx *types.Transaction) int { @@ -117,57 +131,137 @@ func (c *Conns) BroadcastTx(tx *types.Transaction) int { // Returns the number of peers the transactions were successfully sent to. // If broadcast flags are disabled, this is a no-op. func (c *Conns) BroadcastTxs(txs types.Transactions) int { - if !c.shouldBroadcastTx { + if !c.shouldBroadcastTx || len(txs) == 0 { return 0 } - c.mu.RLock() - defer c.mu.RUnlock() + // Pre-compute transaction hashes once to avoid redundant Keccak256 computations + hashes := make([]common.Hash, len(txs)) + txByHash := make(map[common.Hash]*types.Transaction, len(txs)) + for i, tx := range txs { + h := tx.Hash() + hashes[i] = h + txByHash[h] = tx + } - if len(txs) == 0 { + peers := c.snapshotPeers() + if len(peers) == 0 { return 0 } - count := 0 - for _, cn := range c.conns { - // Filter transactions this peer doesn't know about - unknownTxs := make(types.Transactions, 0, len(txs)) - for _, tx := range txs { - if !cn.hasKnownTx(tx.Hash()) { - unknownTxs = append(unknownTxs, tx) + // Broadcast concurrently to all peers + var count atomic.Int32 + var wg sync.WaitGroup + + for _, cn := range peers { + wg.Add(1) + go func(cn *conn) { + defer wg.Done() + + // Filter transactions this peer doesn't know about using batch bloom operation + unknownHashes := cn.filterUnknownTxHashes(hashes) + if len(unknownHashes) == 0 { + return } - } - if len(unknownTxs) == 0 { - continue - } + // Build transaction list from pre-computed map + unknownTxs := make(types.Transactions, 0, len(unknownHashes)) + for _, h := range unknownHashes { + if tx, ok := txByHash[h]; ok { + unknownTxs = append(unknownTxs, tx) + } + } - // Send as TransactionsPacket - packet := eth.TransactionsPacket(unknownTxs) - cn.countMsgSent(packet.Name(), float64(len(unknownTxs))) - if err := ethp2p.Send(cn.rw, eth.TransactionsMsg, packet); err != nil { - cn.logger.Debug(). - Err(err). - Msg("Failed to send transactions") - continue - } + if len(unknownTxs) == 0 { + return + } - // Mark transactions as known for this peer - for _, tx := range unknownTxs { - cn.addKnownTx(tx.Hash()) - } + // Send as TransactionsPacket + packet := eth.TransactionsPacket(unknownTxs) + cn.countMsgSent(packet.Name(), float64(len(unknownTxs))) + if err := ethp2p.Send(cn.rw, eth.TransactionsMsg, packet); err != nil { + cn.logger.Debug(). + Err(err). + Msg("Failed to send transactions") + return + } - count++ + // Mark transactions as known for this peer using batch bloom operation + cn.addKnownTxHashes(unknownHashes) + count.Add(1) + }(cn) } - if count > 0 { + wg.Wait() + + finalCount := int(count.Load()) + if finalCount > 0 { log.Debug(). - Int("peers", count). + Int("peers", finalCount). Int("txs", len(txs)). Msg("Broadcasted transactions") } - return count + return finalCount +} + +// BroadcastTxsAlways broadcasts transactions to all connected peers regardless +// of the ShouldBroadcastTx configuration. This is used for transactions +// submitted via RPC that must always be broadcast to the network. +// Unlike BroadcastTxs, this does not filter by known transactions since +// RPC-submitted transactions are always new. +func (c *Conns) BroadcastTxsAlways(txs types.Transactions) int { + if len(txs) == 0 { + return 0 + } + + // Pre-compute transaction hashes once to avoid redundant Keccak256 computations + hashes := make([]common.Hash, len(txs)) + for i, tx := range txs { + hashes[i] = tx.Hash() + } + + peers := c.snapshotPeers() + if len(peers) == 0 { + return 0 + } + + // Broadcast concurrently to all peers + var count atomic.Int32 + var wg sync.WaitGroup + + for _, cn := range peers { + wg.Add(1) + go func(cn *conn) { + defer wg.Done() + + // Send as TransactionsPacket + packet := eth.TransactionsPacket(txs) + cn.countMsgSent(packet.Name(), float64(len(txs))) + if err := ethp2p.Send(cn.rw, eth.TransactionsMsg, packet); err != nil { + cn.logger.Debug(). + Err(err). + Msg("Failed to send transactions") + return + } + + // Mark transactions as known for this peer using batch bloom operation + cn.addKnownTxHashes(hashes) + count.Add(1) + }(cn) + } + + wg.Wait() + + finalCount := int(count.Load()) + if finalCount > 0 { + log.Debug(). + Int("peers", finalCount). + Int("txs", len(txs)). + Msg("Broadcasted transactions (always)") + } + + return finalCount } // BroadcastTxHashes enqueues transaction hashes to per-peer broadcast queues. @@ -209,54 +303,63 @@ func (c *Conns) BroadcastTxHashes(hashes []common.Hash) int { // about it and returns the number of peers the block was successfully sent to. // If broadcast flags are disabled, this is a no-op. func (c *Conns) BroadcastBlock(block *types.Block, td *big.Int) int { - if !c.shouldBroadcastBlocks { + if !c.shouldBroadcastBlocks || block == nil { return 0 } - c.mu.RLock() - defer c.mu.RUnlock() + hash := block.Hash() - if block == nil { + peers := c.snapshotPeers() + if len(peers) == 0 { return 0 } - hash := block.Hash() - count := 0 + // Broadcast concurrently to all peers + var count atomic.Int32 + var wg sync.WaitGroup - for _, cn := range c.conns { - // Skip if peer already knows about this block - if cn.hasKnownBlock(hash) { - continue - } + for _, cn := range peers { + wg.Add(1) + go func(cn *conn) { + defer wg.Done() - // Send NewBlockPacket - packet := eth.NewBlockPacket{ - Block: block, - TD: td, - } + // Skip if peer already knows about this block + if cn.hasKnownBlock(hash) { + return + } - cn.countMsgSent(packet.Name(), 1) - if err := ethp2p.Send(cn.rw, eth.NewBlockMsg, &packet); err != nil { - cn.logger.Debug(). - Err(err). - Uint64("number", block.Number().Uint64()). - Msg("Failed to send block") - continue - } + // Send NewBlockPacket + packet := eth.NewBlockPacket{ + Block: block, + TD: td, + } + + cn.countMsgSent(packet.Name(), 1) + if err := ethp2p.Send(cn.rw, eth.NewBlockMsg, &packet); err != nil { + cn.logger.Debug(). + Err(err). + Uint64("number", block.Number().Uint64()). + Msg("Failed to send block") + return + } - // Mark block as known for this peer - cn.addKnownBlock(hash) - count++ + // Mark block as known for this peer + cn.addKnownBlock(hash) + count.Add(1) + }(cn) } - if count > 0 { + wg.Wait() + + finalCount := int(count.Load()) + if finalCount > 0 { log.Debug(). - Int("peers", count). + Int("peers", finalCount). Uint64("number", block.NumberU64()). Msg("Broadcasted block") } - return count + return finalCount } // BroadcastBlockHashes enqueues block hashes to per-peer broadcast queues. @@ -334,6 +437,11 @@ func (c *Conns) AddTxs(txs []*types.Transaction) []common.Hash { return hashes } +// GetTx retrieves a transaction from the shared cache and updates LRU ordering. +func (c *Conns) GetTx(hash common.Hash) (*types.Transaction, bool) { + return c.txs.Get(hash) +} + // PeekTxs retrieves multiple transactions from the shared cache without updating LRU ordering. // Uses a single read lock for better concurrency when LRU ordering is not needed. func (c *Conns) PeekTxs(hashes []common.Hash) []*types.Transaction { @@ -367,7 +475,7 @@ func (c *Conns) HeadBlock() eth.NewBlockPacket { // Returns true if the head block was updated, false otherwise. func (c *Conns) UpdateHeadBlock(packet eth.NewBlockPacket) bool { return c.head.Update(func(current eth.NewBlockPacket) (eth.NewBlockPacket, bool) { - if current.Block == nil || (packet.Block.NumberU64() > current.Block.NumberU64() && packet.TD.Cmp(current.TD) == 1) { + if current.Block == nil || packet.Block.NumberU64() > current.Block.NumberU64() { return packet, true } return current, false @@ -431,6 +539,19 @@ func (c *Conns) GetPeerName(peerID string) string { return "" } +// GetBlockByNumber iterates through the cache to find a block by its number. +// Returns the hash, block cache, and true if found; empty values and false otherwise. +func (c *Conns) GetBlockByNumber(number uint64) (common.Hash, BlockCache, bool) { + for _, hash := range c.blocks.Keys() { + if cache, ok := c.blocks.Peek(hash); ok && cache.Header != nil { + if cache.Header.Number.Uint64() == number { + return hash, cache, true + } + } + } + return common.Hash{}, BlockCache{}, false +} + // GetPeerVersion returns the negotiated eth protocol version for a specific peer. // Returns 0 if the peer is not found. func (c *Conns) GetPeerVersion(peerID string) uint { @@ -443,3 +564,16 @@ func (c *Conns) GetPeerVersion(peerID string) uint { return 0 } + +// GetPeerLatestBlock returns the latest block hash and number for a peer. +// Returns zero hash and 0 if the peer is not found or no block has been received. +func (c *Conns) GetPeerLatestBlock(peerID string) (common.Hash, uint64) { + c.mu.RLock() + defer c.mu.RUnlock() + + if cn, ok := c.conns[peerID]; ok { + return cn.latestBlockHash.Get(), cn.latestBlockNumber.Get() + } + + return common.Hash{}, 0 +} diff --git a/p2p/datastructures/lru.go b/p2p/datastructures/lru.go index 2fd2e0ee..c38ce031 100644 --- a/p2p/datastructures/lru.go +++ b/p2p/datastructures/lru.go @@ -260,6 +260,19 @@ func (c *LRU[K, V]) Remove(key K) (V, bool) { return zero, false } +// Keys returns all keys in the cache in LRU order (most recent first). +func (c *LRU[K, V]) Keys() []K { + c.mu.RLock() + defer c.mu.RUnlock() + + keys := make([]K, 0, c.list.Len()) + for elem := c.list.Front(); elem != nil; elem = elem.Next() { + e := elem.Value.(*entry[K, V]) + keys = append(keys, e.key) + } + return keys +} + // AddBatch adds multiple key-value pairs to the cache. // Uses a single write lock for all additions, reducing lock contention // compared to calling Add in a loop. Keys and values must have the same length. diff --git a/p2p/gasprice.go b/p2p/gasprice.go new file mode 100644 index 00000000..b7c34906 --- /dev/null +++ b/p2p/gasprice.go @@ -0,0 +1,256 @@ +package p2p + +import ( + "math/big" + "sort" + "sync" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" +) + +// Gas price oracle constants (matching Bor/geth defaults) +const ( + // gpoSampleNumber is the number of transactions to sample per block + gpoSampleNumber = 3 + // gpoCheckBlocks is the number of blocks to check for gas price estimation + gpoCheckBlocks = 20 + // gpoPercentile is the percentile to use for gas price estimation + gpoPercentile = 60 +) + +var ( + // gpoMaxPrice is the maximum gas price to suggest (500 gwei) + gpoMaxPrice = big.NewInt(500_000_000_000) + // gpoIgnorePrice is the minimum tip to consider (2 gwei, lower than Bor's 25 gwei for broader network compatibility) + gpoIgnorePrice = big.NewInt(2_000_000_000) + // gpoDefaultPrice is the default gas price when no data is available (1 gwei) + gpoDefaultPrice = big.NewInt(1_000_000_000) +) + +// GasPriceOracle estimates gas prices based on recent block data. +// It follows Bor/geth's gas price oracle approach. +type GasPriceOracle struct { + conns *Conns + + mu sync.RWMutex + lastHead common.Hash + lastTip *big.Int +} + +// NewGasPriceOracle creates a new gas price oracle that uses the given Conns for block data. +func NewGasPriceOracle(conns *Conns) *GasPriceOracle { + return &GasPriceOracle{ + conns: conns, + } +} + +// SuggestGasPrice estimates the gas price based on recent blocks. +// For EIP-1559 networks, this returns baseFee + suggestedTip. +// For legacy networks, this returns the 60th percentile of gas prices. +func (o *GasPriceOracle) SuggestGasPrice() *big.Int { + head := o.conns.HeadBlock() + if head.Block == nil { + return gpoDefaultPrice + } + + // For EIP-1559: return baseFee + suggested tip + if baseFee := head.Block.BaseFee(); baseFee != nil { + tip := o.SuggestGasTipCap() + if tip == nil { + tip = gpoDefaultPrice + } + return new(big.Int).Add(baseFee, tip) + } + + // Legacy: return percentile of gas prices + return o.suggestLegacyGasPrice() +} + +// suggestLegacyGasPrice estimates gas price for pre-EIP-1559 networks. +func (o *GasPriceOracle) suggestLegacyGasPrice() *big.Int { + keys := o.conns.blocks.Keys() + if len(keys) == 0 { + return gpoDefaultPrice + } + + if len(keys) > gpoCheckBlocks { + keys = keys[:gpoCheckBlocks] + } + + var prices []*big.Int + for _, hash := range keys { + cache, ok := o.conns.blocks.Peek(hash) + if !ok || cache.Body == nil { + continue + } + + for _, tx := range cache.Body.Transactions { + if price := tx.GasPrice(); price != nil && price.Sign() > 0 { + prices = append(prices, new(big.Int).Set(price)) + } + } + } + + if len(prices) == 0 { + return gpoDefaultPrice + } + + sort.Slice(prices, func(i, j int) bool { + return prices[i].Cmp(prices[j]) < 0 + }) + + price := prices[(len(prices)-1)*gpoPercentile/100] + if price.Cmp(gpoMaxPrice) > 0 { + return new(big.Int).Set(gpoMaxPrice) + } + return price +} + +// SuggestGasTipCap estimates a gas tip cap (priority fee) based on recent blocks. +// This implementation follows Bor/geth's gas price oracle approach: +// - Samples the lowest N tips from each of the last M blocks +// - Ignores tips below a threshold +// - Returns the configured percentile of collected tips +// - Caches results until head changes +func (o *GasPriceOracle) SuggestGasTipCap() *big.Int { + head := o.conns.HeadBlock() + if head.Block == nil { + return nil + } + headHash := head.Block.Hash() + + // Check cache first + o.mu.RLock() + if headHash == o.lastHead && o.lastTip != nil { + tip := new(big.Int).Set(o.lastTip) + o.mu.RUnlock() + return tip + } + lastTip := o.lastTip + o.mu.RUnlock() + + // Collect tips from recent blocks + keys := o.conns.blocks.Keys() + if len(keys) == 0 { + return lastTip + } + + // Limit to checkBlocks most recent + if len(keys) > gpoCheckBlocks { + keys = keys[:gpoCheckBlocks] + } + + var results []*big.Int + for _, hash := range keys { + tips := o.getBlockTips(hash, gpoSampleNumber, gpoIgnorePrice) + if len(tips) == 0 && lastTip != nil { + // Empty block or all tips below threshold, use last tip + tips = []*big.Int{lastTip} + } + results = append(results, tips...) + } + + if len(results) == 0 { + return lastTip + } + + // Sort and get percentile + sort.Slice(results, func(i, j int) bool { + return results[i].Cmp(results[j]) < 0 + }) + tip := results[(len(results)-1)*gpoPercentile/100] + + // Apply max price cap + if tip.Cmp(gpoMaxPrice) > 0 { + tip = new(big.Int).Set(gpoMaxPrice) + } + + // Cache result + o.mu.Lock() + o.lastHead = headHash + o.lastTip = tip + o.mu.Unlock() + + return new(big.Int).Set(tip) +} + +// getBlockTips returns the lowest N tips from a block that are above the ignore threshold. +// Transactions are sorted by effective tip ascending, and the first N valid tips are returned. +func (o *GasPriceOracle) getBlockTips(hash common.Hash, limit int, ignoreUnder *big.Int) []*big.Int { + cache, ok := o.conns.blocks.Peek(hash) + if !ok || cache.Body == nil || cache.Header == nil { + return nil + } + + baseFee := cache.Header.BaseFee + if baseFee == nil { + return nil // Pre-EIP-1559 block + } + + // Calculate tips for all transactions + var allTips []*big.Int + for _, tx := range cache.Body.Transactions { + tip := effectiveGasTip(tx, baseFee) + if tip != nil && tip.Sign() > 0 { + allTips = append(allTips, tip) + } + } + + if len(allTips) == 0 { + return nil + } + + // Sort by tip ascending (lowest first, like Bor) + sort.Slice(allTips, func(i, j int) bool { + return allTips[i].Cmp(allTips[j]) < 0 + }) + + // Collect tips above threshold, up to limit + var tips []*big.Int + for _, tip := range allTips { + if ignoreUnder != nil && tip.Cmp(ignoreUnder) < 0 { + continue + } + tips = append(tips, tip) + if len(tips) >= limit { + break + } + } + + return tips +} + +// effectiveGasTip returns the effective tip (priority fee) for a transaction. +// For EIP-1559 transactions: min(maxPriorityFeePerGas, maxFeePerGas - baseFee) +// For legacy transactions: gasPrice - baseFee (the implicit tip) +// Returns nil if the tip cannot be determined or is negative. +func effectiveGasTip(tx *types.Transaction, baseFee *big.Int) *big.Int { + switch tx.Type() { + case types.DynamicFeeTxType, types.BlobTxType: + tip := tx.GasTipCap() + if tip == nil { + return nil + } + // Effective tip is min(maxPriorityFeePerGas, maxFeePerGas - baseFee) + if tx.GasFeeCap() != nil { + effectiveTip := new(big.Int).Sub(tx.GasFeeCap(), baseFee) + if effectiveTip.Cmp(tip) < 0 { + tip = effectiveTip + } + } + if tip.Sign() <= 0 { + return nil + } + return new(big.Int).Set(tip) + default: + // Legacy/AccessList transactions: tip is gasPrice - baseFee + if price := tx.GasPrice(); price != nil { + tip := new(big.Int).Sub(price, baseFee) + if tip.Sign() > 0 { + return tip + } + } + return nil + } +} diff --git a/p2p/log.go b/p2p/log.go index bfb26b8e..4ba2caba 100644 --- a/p2p/log.go +++ b/p2p/log.go @@ -89,6 +89,7 @@ func (c *MessageCount) IsEmpty() bool { return sum( c.BlockHeaders, c.BlockBodies, + c.Blocks, c.BlockHashes, c.BlockHeaderRequests, c.BlockBodiesRequests, @@ -98,16 +99,19 @@ func (c *MessageCount) IsEmpty() bool { c.Pings, c.Errors, c.Disconnects, + c.NewWitness, + c.NewWitnessHashes, + c.GetWitnessRequest, + c.Witness, ) == 0 } func sum(ints ...int64) int64 { - var sum int64 = 0 + var total int64 for _, i := range ints { - sum += i + total += i } - - return sum + return total } // IncrementByName increments the appropriate field based on message name. diff --git a/p2p/nodeset.go b/p2p/nodeset.go index 9572f853..aa25870f 100644 --- a/p2p/nodeset.go +++ b/p2p/nodeset.go @@ -97,7 +97,7 @@ func WriteURLs(file string, ns NodeSet) error { } } - urls := []string{} + var urls []string for url := range m { urls = append(urls, url) } @@ -130,7 +130,7 @@ func WritePeers(file string, urls []string) error { } func WriteDNSTreeNodes(file string, tree *dnsdisc.Tree) error { - urls := []string{} + var urls []string for _, node := range tree.Nodes() { urls = append(urls, node.URLv4()) } diff --git a/p2p/protocol.go b/p2p/protocol.go index d8fab248..c6718829 100644 --- a/p2p/protocol.go +++ b/p2p/protocol.go @@ -99,6 +99,12 @@ type conn struct { // version stores the negotiated eth protocol version (e.g., 68 or 69). version uint + + // latestBlockHash and latestBlockNumber track the most recent block + // information received from this peer via NewBlock, NewBlockHashes, + // or BlockRangeUpdate messages. Uses ds.Locked for thread-safe API access. + latestBlockHash *ds.Locked[common.Hash] + latestBlockNumber *ds.Locked[uint64] } // EthProtocolOptions is the options used when creating a new eth protocol. @@ -156,6 +162,8 @@ func NewEthProtocol(version uint, opts EthProtocolOptions) ethp2p.Protocol { blockAnnounce: make(chan eth.NewBlockHashesPacket, maxQueuedBlockAnns), closeCh: make(chan struct{}), version: version, + latestBlockHash: &ds.Locked[common.Hash]{}, + latestBlockNumber: &ds.Locked[uint64]{}, } // Ensure cleanup happens on any exit path (including statusExchange failure) @@ -432,6 +440,12 @@ func (c *conn) handleBlockRangeUpdate(msg ethp2p.Msg) error { Hex("hash", packet.LatestBlockHash[:]). Msg("Received BlockRangeUpdate") + // Update latest block info from the range update (thread-safe for API access) + if packet.LatestBlock > c.latestBlockNumber.Get() { + c.latestBlockHash.Set(packet.LatestBlockHash) + c.latestBlockNumber.Set(packet.LatestBlock) + } + return nil } @@ -529,6 +543,12 @@ func (c *conn) handleNewBlockHashes(ctx context.Context, msg ethp2p.Msg) error { for _, entry := range packet { hash := entry.Hash + // Update latest block info if this block is newer (thread-safe for API access) + if entry.Number > c.latestBlockNumber.Get() { + c.latestBlockHash.Set(hash) + c.latestBlockNumber.Set(entry.Number) + } + // Mark as known from this peer c.addKnownBlock(hash) @@ -588,13 +608,24 @@ func (c *conn) addKnownBlock(hash common.Hash) { c.knownBlocks.Add(hash) } -// hasKnownTx checks if a transaction hash is in the known tx cache. -func (c *conn) hasKnownTx(hash common.Hash) bool { +// filterUnknownTxHashes returns transaction hashes this peer doesn't know about. +// Uses batch bloom filter operations for efficiency (single lock acquisition). +func (c *conn) filterUnknownTxHashes(hashes []common.Hash) []common.Hash { if !c.shouldBroadcastTx && !c.shouldBroadcastTxHashes { - return false + return nil } - return c.knownTxs.Contains(hash) + return c.knownTxs.FilterNotContained(hashes) +} + +// addKnownTxHashes marks multiple transaction hashes as known by this peer. +// Uses batch bloom filter operations for efficiency (single lock acquisition). +func (c *conn) addKnownTxHashes(hashes []common.Hash) { + if !c.shouldBroadcastTx && !c.shouldBroadcastTxHashes { + return + } + + c.knownTxs.AddMany(hashes) } // hasKnownBlock checks if a block hash is in the known block cache. @@ -891,12 +922,23 @@ func (c *conn) handleBlockHeaders(ctx context.Context, msg ethp2p.Msg) error { c.db.WriteBlockHeaders(ctx, headers, tfs, isParent) // Update cache to store headers + var head *types.Header for _, header := range headers { - hash := header.Hash() - c.conns.Blocks().Update(hash, func(cache BlockCache) BlockCache { + c.conns.Blocks().Update(header.Hash(), func(cache BlockCache) BlockCache { cache.Header = header return cache }) + + if head == nil || header.Number.Cmp(head.Number) > 0 { + head = header + } + } + + if head != nil { + c.conns.UpdateHeadBlock(eth.NewBlockPacket{ + Block: types.NewBlockWithHeader(head), + TD: c.conns.HeadBlock().TD, + }) } return nil @@ -966,12 +1008,26 @@ func (c *conn) handleBlockBodies(ctx context.Context, msg ethp2p.Msg) error { c.db.WriteBlockBody(ctx, body, hash, tfs) - // Update cache to store body + // Update cache and try to update head block if we now have complete block data. + var header *types.Header c.conns.Blocks().Update(hash, func(cache BlockCache) BlockCache { cache.Body = body + header = cache.Header return cache }) + if header != nil { + block := types.NewBlockWithHeader(header).WithBody(types.Body{ + Transactions: body.Transactions, + Uncles: body.Uncles, + Withdrawals: body.Withdrawals, + }) + c.conns.UpdateHeadBlock(eth.NewBlockPacket{ + Block: block, + TD: c.conns.HeadBlock().TD, + }) + } + return nil } @@ -999,6 +1055,10 @@ func (c *conn) handleNewBlock(ctx context.Context, msg ethp2p.Msg) error { c.countMsgReceived(packet.Name(), 1) + // Update latest block info for this peer (thread-safe for API access) + c.latestBlockHash.Set(hash) + c.latestBlockNumber.Set(packet.Block.Number().Uint64()) + // Mark block as known from this peer c.addKnownBlock(hash)