diff --git a/internal/testutil/testutil.go b/internal/testutil/testutil.go index 696c9ea..ce69206 100644 --- a/internal/testutil/testutil.go +++ b/internal/testutil/testutil.go @@ -79,7 +79,7 @@ type ClickHouseConnection struct { Port int Database string Username string - Password string + Password string //nolint:gosec // G117: test config, not a real secret } // Addr returns the ClickHouse address in host:port format. diff --git a/pkg/clickhouse/config.go b/pkg/clickhouse/config.go index 62410e0..0e2f674 100644 --- a/pkg/clickhouse/config.go +++ b/pkg/clickhouse/config.go @@ -13,7 +13,7 @@ type Config struct { Addr string `yaml:"addr"` // Native protocol address, e.g., "localhost:9000" Database string `yaml:"database"` // Database name, default: "default" Username string `yaml:"username"` - Password string `yaml:"password"` + Password string `yaml:"password"` //nolint:gosec // G117: config field, not a hardcoded secret // Pool settings MaxConns int32 `yaml:"max_conns"` // Maximum connections in pool, default: 10 diff --git a/pkg/ethereum/pool.go b/pkg/ethereum/pool.go index d04297c..3e9c58c 100644 --- a/pkg/ethereum/pool.go +++ b/pkg/ethereum/pool.go @@ -70,6 +70,17 @@ func (p *Pool) HasExecutionNodes() bool { return len(p.executionNodes) > 0 } +// HasEmbeddedNodes returns true if any execution node is an EmbeddedNode. +func (p *Pool) HasEmbeddedNodes() bool { + for _, node := range p.executionNodes { + if _, ok := node.(*execution.EmbeddedNode); ok { + return true + } + } + + return false +} + func (p *Pool) HasHealthyExecutionNodes() bool { p.mu.RLock() defer p.mu.RUnlock() diff --git a/pkg/processor/tracker/limiter.go b/pkg/processor/tracker/limiter.go index 2342b75..0a90777 100644 --- a/pkg/processor/tracker/limiter.go +++ b/pkg/processor/tracker/limiter.go @@ -79,7 +79,7 @@ func (l *Limiter) IsBlockedByIncompleteBlocks( return false, nil, nil } - maxPendingBlockRange := uint64(l.config.MaxPendingBlockRange) //nolint:gosec // validated above + maxPendingBlockRange := uint64(l.config.MaxPendingBlockRange) if mode == BACKWARDS_MODE { // Backwards mode: check distance from newest incomplete block @@ -145,7 +145,7 @@ func (l *Limiter) GetAvailableCapacity(ctx context.Context, nextBlock uint64, mo return l.config.MaxPendingBlockRange, nil } - maxPendingBlockRange := uint64(l.config.MaxPendingBlockRange) //nolint:gosec // validated above + maxPendingBlockRange := uint64(l.config.MaxPendingBlockRange) if mode == BACKWARDS_MODE { // Backwards mode: check distance from newest incomplete block @@ -279,7 +279,7 @@ func (l *Limiter) GetGaps(ctx context.Context, currentBlock uint64, lookbackRang maxBlock := referenceBlock if l.config.MaxPendingBlockRange > 0 { - exclusionWindow := uint64(l.config.MaxPendingBlockRange) //nolint:gosec // validated in config + exclusionWindow := uint64(l.config.MaxPendingBlockRange) if referenceBlock > exclusionWindow { maxBlock = referenceBlock - exclusionWindow - 1 diff --git a/pkg/processor/transaction/simple/handlers.go b/pkg/processor/transaction/simple/handlers.go index 84c8131..28c5a5c 100644 --- a/pkg/processor/transaction/simple/handlers.go +++ b/pkg/processor/transaction/simple/handlers.go @@ -135,7 +135,7 @@ func (p *Processor) handleProcessTask(ctx context.Context, task *asynq.Task) err } // Build transaction row - txRow, buildErr := p.buildTransactionRow(block, tx, receipt, uint64(index)) //nolint:gosec // index is bounded + txRow, buildErr := p.buildTransactionRow(block, tx, receipt, uint64(index)) if buildErr != nil { return fmt.Errorf("failed to build transaction row: %w", buildErr) } diff --git a/pkg/processor/transaction/structlog/block_processing.go b/pkg/processor/transaction/structlog/block_processing.go index 22b104b..d4256f7 100644 --- a/pkg/processor/transaction/structlog/block_processing.go +++ b/pkg/processor/transaction/structlog/block_processing.go @@ -287,7 +287,7 @@ func (p *Processor) EnqueueTransactionTasks(ctx context.Context, block execution payload := &ProcessPayload{ BlockNumber: *block.Number(), TransactionHash: tx.Hash().String(), - TransactionIndex: uint32(index), //nolint:gosec // index is bounded by block.Transactions() length + TransactionIndex: uint32(index), NetworkName: p.network.Name, Network: p.network.Name, } @@ -459,7 +459,7 @@ func (p *Processor) ReprocessBlock(ctx context.Context, blockNum uint64) error { payload := &ProcessPayload{ BlockNumber: *block.Number(), TransactionHash: tx.Hash().String(), - TransactionIndex: uint32(index), //nolint:gosec // index is bounded by block.Transactions() length + TransactionIndex: uint32(index), NetworkName: p.network.Name, Network: p.network.Name, } diff --git a/pkg/processor/transaction/structlog/gas_cost.go b/pkg/processor/transaction/structlog/gas_cost.go index bcf5968..91e436a 100644 --- a/pkg/processor/transaction/structlog/gas_cost.go +++ b/pkg/processor/transaction/structlog/gas_cost.go @@ -116,7 +116,7 @@ func ComputeGasUsed(structlogs []execution.StructLog) []uint64 { depthU64 = math.MaxInt } - depth := int(depthU64) //nolint:gosec // overflow checked above + depth := int(depthU64) // Ensure slice has enough space for this depth for len(pendingIdx) <= depth { @@ -132,7 +132,7 @@ func ComputeGasUsed(structlogs []execution.StructLog) []uint64 { } // Update gasUsed for pending log at current depth - if prevIdx := pendingIdx[depth]; prevIdx >= 0 && prevIdx < len(structlogs) { + if prevIdx := pendingIdx[depth]; prevIdx >= 0 && prevIdx < len(structlogs) && i < len(structlogs) { // Guard against underflow: if gas values are corrupted or out of order, // fall back to the pre-calculated GasCost instead of underflowing if structlogs[prevIdx].Gas >= structlogs[i].Gas { diff --git a/pkg/processor/transaction/structlog/memory.go b/pkg/processor/transaction/structlog/memory.go index 9299786..1a1801b 100644 --- a/pkg/processor/transaction/structlog/memory.go +++ b/pkg/processor/transaction/structlog/memory.go @@ -57,7 +57,7 @@ func ComputeMemoryWords(structlogs []execution.StructLog) (wordsBefore, wordsAft depthU64 = math.MaxInt } - depth := int(depthU64) //nolint:gosec // overflow checked above + depth := int(depthU64) // Ensure slice has enough space for this depth. for len(pendingIdx) <= depth { diff --git a/pkg/processor/transaction/structlog/transaction_processing.go b/pkg/processor/transaction/structlog/transaction_processing.go index 07d2512..2327b76 100644 --- a/pkg/processor/transaction/structlog/transaction_processing.go +++ b/pkg/processor/transaction/structlog/transaction_processing.go @@ -213,7 +213,7 @@ func (p *Processor) ProcessTransaction(ctx context.Context, block execution.Bloc TransactionGas: trace.Gas, TransactionFailed: trace.Failed, TransactionReturnValue: trace.ReturnValue, - Index: uint32(i), //nolint:gosec // index is bounded by structlogs length + Index: uint32(i), Operation: sl.Op, Gas: sl.Gas, GasCost: sl.GasCost, @@ -254,8 +254,8 @@ func (p *Processor) ProcessTransaction(ctx context.Context, block execution.Bloc TransactionGas: trace.Gas, TransactionFailed: trace.Failed, TransactionReturnValue: trace.ReturnValue, - Index: uint32(i), //nolint:gosec // Same index as parent CALL - Operation: "", // Empty = synthetic EOA frame + Index: uint32(i), + Operation: "", // Empty = synthetic EOA frame Gas: 0, GasCost: 0, GasUsed: 0, @@ -537,7 +537,7 @@ func (p *Processor) ExtractStructlogs(ctx context.Context, block execution.Block TransactionGas: trace.Gas, TransactionFailed: trace.Failed, TransactionReturnValue: trace.ReturnValue, - Index: uint32(i), //nolint:gosec // index is bounded by structlogs length + Index: uint32(i), Operation: structLog.Op, Gas: structLog.Gas, GasCost: structLog.GasCost, @@ -586,8 +586,8 @@ func (p *Processor) ExtractStructlogs(ctx context.Context, block execution.Block TransactionGas: trace.Gas, TransactionFailed: trace.Failed, TransactionReturnValue: trace.ReturnValue, - Index: uint32(i), //nolint:gosec // Same index as parent CALL - Operation: "", // Empty = synthetic EOA frame + Index: uint32(i), + Operation: "", // Empty = synthetic EOA frame Gas: 0, GasCost: 0, GasUsed: 0, diff --git a/pkg/processor/transaction/structlog_agg/block_processing.go b/pkg/processor/transaction/structlog_agg/block_processing.go index 8030daa..b587b64 100644 --- a/pkg/processor/transaction/structlog_agg/block_processing.go +++ b/pkg/processor/transaction/structlog_agg/block_processing.go @@ -287,7 +287,7 @@ func (p *Processor) EnqueueTransactionTasks(ctx context.Context, block execution payload := &ProcessPayload{ BlockNumber: *block.Number(), TransactionHash: tx.Hash().String(), - TransactionIndex: uint32(index), //nolint:gosec // index is bounded by block.Transactions() length + TransactionIndex: uint32(index), NetworkName: p.network.Name, Network: p.network.Name, } @@ -459,7 +459,7 @@ func (p *Processor) ReprocessBlock(ctx context.Context, blockNum uint64) error { payload := &ProcessPayload{ BlockNumber: *block.Number(), TransactionHash: tx.Hash().String(), - TransactionIndex: uint32(index), //nolint:gosec // index is bounded by block.Transactions() length + TransactionIndex: uint32(index), NetworkName: p.network.Name, Network: p.network.Name, } diff --git a/pkg/processor/transaction/structlog_agg/processor.go b/pkg/processor/transaction/structlog_agg/processor.go index a39d044..c54956f 100644 --- a/pkg/processor/transaction/structlog_agg/processor.go +++ b/pkg/processor/transaction/structlog_agg/processor.go @@ -173,6 +173,11 @@ func New(deps *Dependencies, config *Config) (*Processor, error) { // Start starts the processor. func (p *Processor) Start(ctx context.Context) error { + if !p.pool.HasEmbeddedNodes() { + panic("structlog_agg processor requires embedded mode (e.g. embedded in erigon); " + + "it cannot run against a standard JSON-RPC execution node") + } + // Start the ClickHouse client if err := p.clickhouse.Start(); err != nil { return fmt.Errorf("failed to start ClickHouse client: %w", err)