From 353c46a85aa224c860ab72bb4ff184e36ed3cfe5 Mon Sep 17 00:00:00 2001 From: Matty Evans Date: Wed, 18 Feb 2026 12:51:03 +1000 Subject: [PATCH 1/3] feat(pool.go): add HasEmbeddedNodes method to check for embedded execution nodes fix(processor.go): add validation to ensure processor runs only with embedded nodes --- pkg/ethereum/pool.go | 11 +++++++++++ pkg/processor/transaction/structlog_agg/processor.go | 5 +++++ 2 files changed, 16 insertions(+) diff --git a/pkg/ethereum/pool.go b/pkg/ethereum/pool.go index d04297c..3e9c58c 100644 --- a/pkg/ethereum/pool.go +++ b/pkg/ethereum/pool.go @@ -70,6 +70,17 @@ func (p *Pool) HasExecutionNodes() bool { return len(p.executionNodes) > 0 } +// HasEmbeddedNodes returns true if any execution node is an EmbeddedNode. +func (p *Pool) HasEmbeddedNodes() bool { + for _, node := range p.executionNodes { + if _, ok := node.(*execution.EmbeddedNode); ok { + return true + } + } + + return false +} + func (p *Pool) HasHealthyExecutionNodes() bool { p.mu.RLock() defer p.mu.RUnlock() diff --git a/pkg/processor/transaction/structlog_agg/processor.go b/pkg/processor/transaction/structlog_agg/processor.go index a39d044..c54956f 100644 --- a/pkg/processor/transaction/structlog_agg/processor.go +++ b/pkg/processor/transaction/structlog_agg/processor.go @@ -173,6 +173,11 @@ func New(deps *Dependencies, config *Config) (*Processor, error) { // Start starts the processor. func (p *Processor) Start(ctx context.Context) error { + if !p.pool.HasEmbeddedNodes() { + panic("structlog_agg processor requires embedded mode (e.g. embedded in erigon); " + + "it cannot run against a standard JSON-RPC execution node") + } + // Start the ClickHouse client if err := p.clickhouse.Start(); err != nil { return fmt.Errorf("failed to start ClickHouse client: %w", err) From 781be96c055e59f8e5cf9425bf7a41a460dffa25 Mon Sep 17 00:00:00 2001 From: Matty Evans Date: Wed, 18 Feb 2026 13:01:07 +1000 Subject: [PATCH 2/3] fix: update linter comments to use G115 for inline validations and improve code clarity This commit replaces the generic 'validated' comments in the code with specific G115 comments from the gosec linter, enhancing readability and providing clearer information about the validation checks for various variables. This helps developers understand the context of the validations at a glance, improving code maintainability. --- pkg/processor/tracker/limiter.go | 6 +++--- pkg/processor/transaction/simple/handlers.go | 2 +- pkg/processor/transaction/structlog/block_processing.go | 4 ++-- pkg/processor/transaction/structlog/gas_cost.go | 4 ++-- pkg/processor/transaction/structlog/memory.go | 2 +- .../transaction/structlog/transaction_processing.go | 8 ++++---- .../transaction/structlog_agg/block_processing.go | 4 ++-- 7 files changed, 15 insertions(+), 15 deletions(-) diff --git a/pkg/processor/tracker/limiter.go b/pkg/processor/tracker/limiter.go index 2342b75..cc559c0 100644 --- a/pkg/processor/tracker/limiter.go +++ b/pkg/processor/tracker/limiter.go @@ -79,7 +79,7 @@ func (l *Limiter) IsBlockedByIncompleteBlocks( return false, nil, nil } - maxPendingBlockRange := uint64(l.config.MaxPendingBlockRange) //nolint:gosec // validated above + maxPendingBlockRange := uint64(l.config.MaxPendingBlockRange) //nolint:gosec // G115: validated positive above if mode == BACKWARDS_MODE { // Backwards mode: check distance from newest incomplete block @@ -145,7 +145,7 @@ func (l *Limiter) GetAvailableCapacity(ctx context.Context, nextBlock uint64, mo return l.config.MaxPendingBlockRange, nil } - maxPendingBlockRange := uint64(l.config.MaxPendingBlockRange) //nolint:gosec // validated above + maxPendingBlockRange := uint64(l.config.MaxPendingBlockRange) //nolint:gosec // G115: validated positive above if mode == BACKWARDS_MODE { // Backwards mode: check distance from newest incomplete block @@ -279,7 +279,7 @@ func (l *Limiter) GetGaps(ctx context.Context, currentBlock uint64, lookbackRang maxBlock := referenceBlock if l.config.MaxPendingBlockRange > 0 { - exclusionWindow := uint64(l.config.MaxPendingBlockRange) //nolint:gosec // validated in config + exclusionWindow := uint64(l.config.MaxPendingBlockRange) //nolint:gosec // G115: validated positive above if referenceBlock > exclusionWindow { maxBlock = referenceBlock - exclusionWindow - 1 diff --git a/pkg/processor/transaction/simple/handlers.go b/pkg/processor/transaction/simple/handlers.go index 84c8131..630df0d 100644 --- a/pkg/processor/transaction/simple/handlers.go +++ b/pkg/processor/transaction/simple/handlers.go @@ -135,7 +135,7 @@ func (p *Processor) handleProcessTask(ctx context.Context, task *asynq.Task) err } // Build transaction row - txRow, buildErr := p.buildTransactionRow(block, tx, receipt, uint64(index)) //nolint:gosec // index is bounded + txRow, buildErr := p.buildTransactionRow(block, tx, receipt, uint64(index)) //nolint:gosec // G115: index bounded by slice length if buildErr != nil { return fmt.Errorf("failed to build transaction row: %w", buildErr) } diff --git a/pkg/processor/transaction/structlog/block_processing.go b/pkg/processor/transaction/structlog/block_processing.go index 22b104b..330298b 100644 --- a/pkg/processor/transaction/structlog/block_processing.go +++ b/pkg/processor/transaction/structlog/block_processing.go @@ -287,7 +287,7 @@ func (p *Processor) EnqueueTransactionTasks(ctx context.Context, block execution payload := &ProcessPayload{ BlockNumber: *block.Number(), TransactionHash: tx.Hash().String(), - TransactionIndex: uint32(index), //nolint:gosec // index is bounded by block.Transactions() length + TransactionIndex: uint32(index), //nolint:gosec // G115: index bounded by block transactions NetworkName: p.network.Name, Network: p.network.Name, } @@ -459,7 +459,7 @@ func (p *Processor) ReprocessBlock(ctx context.Context, blockNum uint64) error { payload := &ProcessPayload{ BlockNumber: *block.Number(), TransactionHash: tx.Hash().String(), - TransactionIndex: uint32(index), //nolint:gosec // index is bounded by block.Transactions() length + TransactionIndex: uint32(index), //nolint:gosec // G115: index bounded by block transactions NetworkName: p.network.Name, Network: p.network.Name, } diff --git a/pkg/processor/transaction/structlog/gas_cost.go b/pkg/processor/transaction/structlog/gas_cost.go index bcf5968..a069427 100644 --- a/pkg/processor/transaction/structlog/gas_cost.go +++ b/pkg/processor/transaction/structlog/gas_cost.go @@ -116,7 +116,7 @@ func ComputeGasUsed(structlogs []execution.StructLog) []uint64 { depthU64 = math.MaxInt } - depth := int(depthU64) //nolint:gosec // overflow checked above + depth := int(depthU64) //nolint:gosec // G115: overflow guarded above // Ensure slice has enough space for this depth for len(pendingIdx) <= depth { @@ -132,7 +132,7 @@ func ComputeGasUsed(structlogs []execution.StructLog) []uint64 { } // Update gasUsed for pending log at current depth - if prevIdx := pendingIdx[depth]; prevIdx >= 0 && prevIdx < len(structlogs) { + if prevIdx := pendingIdx[depth]; prevIdx >= 0 && prevIdx < len(structlogs) && i < len(structlogs) { // Guard against underflow: if gas values are corrupted or out of order, // fall back to the pre-calculated GasCost instead of underflowing if structlogs[prevIdx].Gas >= structlogs[i].Gas { diff --git a/pkg/processor/transaction/structlog/memory.go b/pkg/processor/transaction/structlog/memory.go index 9299786..cbfb0f4 100644 --- a/pkg/processor/transaction/structlog/memory.go +++ b/pkg/processor/transaction/structlog/memory.go @@ -57,7 +57,7 @@ func ComputeMemoryWords(structlogs []execution.StructLog) (wordsBefore, wordsAft depthU64 = math.MaxInt } - depth := int(depthU64) //nolint:gosec // overflow checked above + depth := int(depthU64) //nolint:gosec // G115: overflow guarded above // Ensure slice has enough space for this depth. for len(pendingIdx) <= depth { diff --git a/pkg/processor/transaction/structlog/transaction_processing.go b/pkg/processor/transaction/structlog/transaction_processing.go index 07d2512..5dd2af9 100644 --- a/pkg/processor/transaction/structlog/transaction_processing.go +++ b/pkg/processor/transaction/structlog/transaction_processing.go @@ -213,7 +213,7 @@ func (p *Processor) ProcessTransaction(ctx context.Context, block execution.Bloc TransactionGas: trace.Gas, TransactionFailed: trace.Failed, TransactionReturnValue: trace.ReturnValue, - Index: uint32(i), //nolint:gosec // index is bounded by structlogs length + Index: uint32(i), //nolint:gosec // G115: index bounded by structlogs length Operation: sl.Op, Gas: sl.Gas, GasCost: sl.GasCost, @@ -254,7 +254,7 @@ func (p *Processor) ProcessTransaction(ctx context.Context, block execution.Bloc TransactionGas: trace.Gas, TransactionFailed: trace.Failed, TransactionReturnValue: trace.ReturnValue, - Index: uint32(i), //nolint:gosec // Same index as parent CALL + Index: uint32(i), //nolint:gosec // G115: same index as parent CALL Operation: "", // Empty = synthetic EOA frame Gas: 0, GasCost: 0, @@ -537,7 +537,7 @@ func (p *Processor) ExtractStructlogs(ctx context.Context, block execution.Block TransactionGas: trace.Gas, TransactionFailed: trace.Failed, TransactionReturnValue: trace.ReturnValue, - Index: uint32(i), //nolint:gosec // index is bounded by structlogs length + Index: uint32(i), //nolint:gosec // G115: index bounded by structlogs length Operation: structLog.Op, Gas: structLog.Gas, GasCost: structLog.GasCost, @@ -586,7 +586,7 @@ func (p *Processor) ExtractStructlogs(ctx context.Context, block execution.Block TransactionGas: trace.Gas, TransactionFailed: trace.Failed, TransactionReturnValue: trace.ReturnValue, - Index: uint32(i), //nolint:gosec // Same index as parent CALL + Index: uint32(i), //nolint:gosec // G115: same index as parent CALL Operation: "", // Empty = synthetic EOA frame Gas: 0, GasCost: 0, diff --git a/pkg/processor/transaction/structlog_agg/block_processing.go b/pkg/processor/transaction/structlog_agg/block_processing.go index 8030daa..ba64b37 100644 --- a/pkg/processor/transaction/structlog_agg/block_processing.go +++ b/pkg/processor/transaction/structlog_agg/block_processing.go @@ -287,7 +287,7 @@ func (p *Processor) EnqueueTransactionTasks(ctx context.Context, block execution payload := &ProcessPayload{ BlockNumber: *block.Number(), TransactionHash: tx.Hash().String(), - TransactionIndex: uint32(index), //nolint:gosec // index is bounded by block.Transactions() length + TransactionIndex: uint32(index), //nolint:gosec // G115: index bounded by block transactions NetworkName: p.network.Name, Network: p.network.Name, } @@ -459,7 +459,7 @@ func (p *Processor) ReprocessBlock(ctx context.Context, blockNum uint64) error { payload := &ProcessPayload{ BlockNumber: *block.Number(), TransactionHash: tx.Hash().String(), - TransactionIndex: uint32(index), //nolint:gosec // index is bounded by block.Transactions() length + TransactionIndex: uint32(index), //nolint:gosec // G115: index bounded by block transactions NetworkName: p.network.Name, Network: p.network.Name, } From ab1d52832d4925c7719ee0010a759becc2b08a34 Mon Sep 17 00:00:00 2001 From: Matty Evans Date: Wed, 18 Feb 2026 13:29:09 +1000 Subject: [PATCH 3/3] chore: remove unnecessary nolint comments from code to improve readability and maintain cleanliness --- internal/testutil/testutil.go | 2 +- pkg/clickhouse/config.go | 2 +- pkg/processor/tracker/limiter.go | 6 +++--- pkg/processor/transaction/simple/handlers.go | 2 +- .../transaction/structlog/block_processing.go | 4 ++-- pkg/processor/transaction/structlog/gas_cost.go | 2 +- pkg/processor/transaction/structlog/memory.go | 2 +- .../transaction/structlog/transaction_processing.go | 12 ++++++------ .../transaction/structlog_agg/block_processing.go | 4 ++-- 9 files changed, 18 insertions(+), 18 deletions(-) diff --git a/internal/testutil/testutil.go b/internal/testutil/testutil.go index 696c9ea..ce69206 100644 --- a/internal/testutil/testutil.go +++ b/internal/testutil/testutil.go @@ -79,7 +79,7 @@ type ClickHouseConnection struct { Port int Database string Username string - Password string + Password string //nolint:gosec // G117: test config, not a real secret } // Addr returns the ClickHouse address in host:port format. diff --git a/pkg/clickhouse/config.go b/pkg/clickhouse/config.go index 62410e0..0e2f674 100644 --- a/pkg/clickhouse/config.go +++ b/pkg/clickhouse/config.go @@ -13,7 +13,7 @@ type Config struct { Addr string `yaml:"addr"` // Native protocol address, e.g., "localhost:9000" Database string `yaml:"database"` // Database name, default: "default" Username string `yaml:"username"` - Password string `yaml:"password"` + Password string `yaml:"password"` //nolint:gosec // G117: config field, not a hardcoded secret // Pool settings MaxConns int32 `yaml:"max_conns"` // Maximum connections in pool, default: 10 diff --git a/pkg/processor/tracker/limiter.go b/pkg/processor/tracker/limiter.go index cc559c0..0a90777 100644 --- a/pkg/processor/tracker/limiter.go +++ b/pkg/processor/tracker/limiter.go @@ -79,7 +79,7 @@ func (l *Limiter) IsBlockedByIncompleteBlocks( return false, nil, nil } - maxPendingBlockRange := uint64(l.config.MaxPendingBlockRange) //nolint:gosec // G115: validated positive above + maxPendingBlockRange := uint64(l.config.MaxPendingBlockRange) if mode == BACKWARDS_MODE { // Backwards mode: check distance from newest incomplete block @@ -145,7 +145,7 @@ func (l *Limiter) GetAvailableCapacity(ctx context.Context, nextBlock uint64, mo return l.config.MaxPendingBlockRange, nil } - maxPendingBlockRange := uint64(l.config.MaxPendingBlockRange) //nolint:gosec // G115: validated positive above + maxPendingBlockRange := uint64(l.config.MaxPendingBlockRange) if mode == BACKWARDS_MODE { // Backwards mode: check distance from newest incomplete block @@ -279,7 +279,7 @@ func (l *Limiter) GetGaps(ctx context.Context, currentBlock uint64, lookbackRang maxBlock := referenceBlock if l.config.MaxPendingBlockRange > 0 { - exclusionWindow := uint64(l.config.MaxPendingBlockRange) //nolint:gosec // G115: validated positive above + exclusionWindow := uint64(l.config.MaxPendingBlockRange) if referenceBlock > exclusionWindow { maxBlock = referenceBlock - exclusionWindow - 1 diff --git a/pkg/processor/transaction/simple/handlers.go b/pkg/processor/transaction/simple/handlers.go index 630df0d..28c5a5c 100644 --- a/pkg/processor/transaction/simple/handlers.go +++ b/pkg/processor/transaction/simple/handlers.go @@ -135,7 +135,7 @@ func (p *Processor) handleProcessTask(ctx context.Context, task *asynq.Task) err } // Build transaction row - txRow, buildErr := p.buildTransactionRow(block, tx, receipt, uint64(index)) //nolint:gosec // G115: index bounded by slice length + txRow, buildErr := p.buildTransactionRow(block, tx, receipt, uint64(index)) if buildErr != nil { return fmt.Errorf("failed to build transaction row: %w", buildErr) } diff --git a/pkg/processor/transaction/structlog/block_processing.go b/pkg/processor/transaction/structlog/block_processing.go index 330298b..d4256f7 100644 --- a/pkg/processor/transaction/structlog/block_processing.go +++ b/pkg/processor/transaction/structlog/block_processing.go @@ -287,7 +287,7 @@ func (p *Processor) EnqueueTransactionTasks(ctx context.Context, block execution payload := &ProcessPayload{ BlockNumber: *block.Number(), TransactionHash: tx.Hash().String(), - TransactionIndex: uint32(index), //nolint:gosec // G115: index bounded by block transactions + TransactionIndex: uint32(index), NetworkName: p.network.Name, Network: p.network.Name, } @@ -459,7 +459,7 @@ func (p *Processor) ReprocessBlock(ctx context.Context, blockNum uint64) error { payload := &ProcessPayload{ BlockNumber: *block.Number(), TransactionHash: tx.Hash().String(), - TransactionIndex: uint32(index), //nolint:gosec // G115: index bounded by block transactions + TransactionIndex: uint32(index), NetworkName: p.network.Name, Network: p.network.Name, } diff --git a/pkg/processor/transaction/structlog/gas_cost.go b/pkg/processor/transaction/structlog/gas_cost.go index a069427..91e436a 100644 --- a/pkg/processor/transaction/structlog/gas_cost.go +++ b/pkg/processor/transaction/structlog/gas_cost.go @@ -116,7 +116,7 @@ func ComputeGasUsed(structlogs []execution.StructLog) []uint64 { depthU64 = math.MaxInt } - depth := int(depthU64) //nolint:gosec // G115: overflow guarded above + depth := int(depthU64) // Ensure slice has enough space for this depth for len(pendingIdx) <= depth { diff --git a/pkg/processor/transaction/structlog/memory.go b/pkg/processor/transaction/structlog/memory.go index cbfb0f4..1a1801b 100644 --- a/pkg/processor/transaction/structlog/memory.go +++ b/pkg/processor/transaction/structlog/memory.go @@ -57,7 +57,7 @@ func ComputeMemoryWords(structlogs []execution.StructLog) (wordsBefore, wordsAft depthU64 = math.MaxInt } - depth := int(depthU64) //nolint:gosec // G115: overflow guarded above + depth := int(depthU64) // Ensure slice has enough space for this depth. for len(pendingIdx) <= depth { diff --git a/pkg/processor/transaction/structlog/transaction_processing.go b/pkg/processor/transaction/structlog/transaction_processing.go index 5dd2af9..2327b76 100644 --- a/pkg/processor/transaction/structlog/transaction_processing.go +++ b/pkg/processor/transaction/structlog/transaction_processing.go @@ -213,7 +213,7 @@ func (p *Processor) ProcessTransaction(ctx context.Context, block execution.Bloc TransactionGas: trace.Gas, TransactionFailed: trace.Failed, TransactionReturnValue: trace.ReturnValue, - Index: uint32(i), //nolint:gosec // G115: index bounded by structlogs length + Index: uint32(i), Operation: sl.Op, Gas: sl.Gas, GasCost: sl.GasCost, @@ -254,8 +254,8 @@ func (p *Processor) ProcessTransaction(ctx context.Context, block execution.Bloc TransactionGas: trace.Gas, TransactionFailed: trace.Failed, TransactionReturnValue: trace.ReturnValue, - Index: uint32(i), //nolint:gosec // G115: same index as parent CALL - Operation: "", // Empty = synthetic EOA frame + Index: uint32(i), + Operation: "", // Empty = synthetic EOA frame Gas: 0, GasCost: 0, GasUsed: 0, @@ -537,7 +537,7 @@ func (p *Processor) ExtractStructlogs(ctx context.Context, block execution.Block TransactionGas: trace.Gas, TransactionFailed: trace.Failed, TransactionReturnValue: trace.ReturnValue, - Index: uint32(i), //nolint:gosec // G115: index bounded by structlogs length + Index: uint32(i), Operation: structLog.Op, Gas: structLog.Gas, GasCost: structLog.GasCost, @@ -586,8 +586,8 @@ func (p *Processor) ExtractStructlogs(ctx context.Context, block execution.Block TransactionGas: trace.Gas, TransactionFailed: trace.Failed, TransactionReturnValue: trace.ReturnValue, - Index: uint32(i), //nolint:gosec // G115: same index as parent CALL - Operation: "", // Empty = synthetic EOA frame + Index: uint32(i), + Operation: "", // Empty = synthetic EOA frame Gas: 0, GasCost: 0, GasUsed: 0, diff --git a/pkg/processor/transaction/structlog_agg/block_processing.go b/pkg/processor/transaction/structlog_agg/block_processing.go index ba64b37..b587b64 100644 --- a/pkg/processor/transaction/structlog_agg/block_processing.go +++ b/pkg/processor/transaction/structlog_agg/block_processing.go @@ -287,7 +287,7 @@ func (p *Processor) EnqueueTransactionTasks(ctx context.Context, block execution payload := &ProcessPayload{ BlockNumber: *block.Number(), TransactionHash: tx.Hash().String(), - TransactionIndex: uint32(index), //nolint:gosec // G115: index bounded by block transactions + TransactionIndex: uint32(index), NetworkName: p.network.Name, Network: p.network.Name, } @@ -459,7 +459,7 @@ func (p *Processor) ReprocessBlock(ctx context.Context, blockNum uint64) error { payload := &ProcessPayload{ BlockNumber: *block.Number(), TransactionHash: tx.Hash().String(), - TransactionIndex: uint32(index), //nolint:gosec // G115: index bounded by block transactions + TransactionIndex: uint32(index), NetworkName: p.network.Name, Network: p.network.Name, }