Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/testutil/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ type ClickHouseConnection struct {
Port int
Database string
Username string
Password string
Password string //nolint:gosec // G117: test config, not a real secret
}

// Addr returns the ClickHouse address in host:port format.
Expand Down
2 changes: 1 addition & 1 deletion pkg/clickhouse/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type Config struct {
Addr string `yaml:"addr"` // Native protocol address, e.g., "localhost:9000"
Database string `yaml:"database"` // Database name, default: "default"
Username string `yaml:"username"`
Password string `yaml:"password"`
Password string `yaml:"password"` //nolint:gosec // G117: config field, not a hardcoded secret

// Pool settings
MaxConns int32 `yaml:"max_conns"` // Maximum connections in pool, default: 10
Expand Down
11 changes: 11 additions & 0 deletions pkg/ethereum/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,17 @@ func (p *Pool) HasExecutionNodes() bool {
return len(p.executionNodes) > 0
}

// HasEmbeddedNodes returns true if any execution node is an EmbeddedNode.
func (p *Pool) HasEmbeddedNodes() bool {
for _, node := range p.executionNodes {
if _, ok := node.(*execution.EmbeddedNode); ok {
return true
}
}

return false
}

func (p *Pool) HasHealthyExecutionNodes() bool {
p.mu.RLock()
defer p.mu.RUnlock()
Expand Down
6 changes: 3 additions & 3 deletions pkg/processor/tracker/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (l *Limiter) IsBlockedByIncompleteBlocks(
return false, nil, nil
}

maxPendingBlockRange := uint64(l.config.MaxPendingBlockRange) //nolint:gosec // validated above
maxPendingBlockRange := uint64(l.config.MaxPendingBlockRange)

if mode == BACKWARDS_MODE {
// Backwards mode: check distance from newest incomplete block
Expand Down Expand Up @@ -145,7 +145,7 @@ func (l *Limiter) GetAvailableCapacity(ctx context.Context, nextBlock uint64, mo
return l.config.MaxPendingBlockRange, nil
}

maxPendingBlockRange := uint64(l.config.MaxPendingBlockRange) //nolint:gosec // validated above
maxPendingBlockRange := uint64(l.config.MaxPendingBlockRange)

if mode == BACKWARDS_MODE {
// Backwards mode: check distance from newest incomplete block
Expand Down Expand Up @@ -279,7 +279,7 @@ func (l *Limiter) GetGaps(ctx context.Context, currentBlock uint64, lookbackRang
maxBlock := referenceBlock

if l.config.MaxPendingBlockRange > 0 {
exclusionWindow := uint64(l.config.MaxPendingBlockRange) //nolint:gosec // validated in config
exclusionWindow := uint64(l.config.MaxPendingBlockRange)

if referenceBlock > exclusionWindow {
maxBlock = referenceBlock - exclusionWindow - 1
Expand Down
2 changes: 1 addition & 1 deletion pkg/processor/transaction/simple/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (p *Processor) handleProcessTask(ctx context.Context, task *asynq.Task) err
}

// Build transaction row
txRow, buildErr := p.buildTransactionRow(block, tx, receipt, uint64(index)) //nolint:gosec // index is bounded
txRow, buildErr := p.buildTransactionRow(block, tx, receipt, uint64(index))
if buildErr != nil {
return fmt.Errorf("failed to build transaction row: %w", buildErr)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/processor/transaction/structlog/block_processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func (p *Processor) EnqueueTransactionTasks(ctx context.Context, block execution
payload := &ProcessPayload{
BlockNumber: *block.Number(),
TransactionHash: tx.Hash().String(),
TransactionIndex: uint32(index), //nolint:gosec // index is bounded by block.Transactions() length
TransactionIndex: uint32(index),
NetworkName: p.network.Name,
Network: p.network.Name,
}
Expand Down Expand Up @@ -459,7 +459,7 @@ func (p *Processor) ReprocessBlock(ctx context.Context, blockNum uint64) error {
payload := &ProcessPayload{
BlockNumber: *block.Number(),
TransactionHash: tx.Hash().String(),
TransactionIndex: uint32(index), //nolint:gosec // index is bounded by block.Transactions() length
TransactionIndex: uint32(index),
NetworkName: p.network.Name,
Network: p.network.Name,
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/processor/transaction/structlog/gas_cost.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func ComputeGasUsed(structlogs []execution.StructLog) []uint64 {
depthU64 = math.MaxInt
}

depth := int(depthU64) //nolint:gosec // overflow checked above
depth := int(depthU64)

// Ensure slice has enough space for this depth
for len(pendingIdx) <= depth {
Expand All @@ -132,7 +132,7 @@ func ComputeGasUsed(structlogs []execution.StructLog) []uint64 {
}

// Update gasUsed for pending log at current depth
if prevIdx := pendingIdx[depth]; prevIdx >= 0 && prevIdx < len(structlogs) {
if prevIdx := pendingIdx[depth]; prevIdx >= 0 && prevIdx < len(structlogs) && i < len(structlogs) {
// Guard against underflow: if gas values are corrupted or out of order,
// fall back to the pre-calculated GasCost instead of underflowing
if structlogs[prevIdx].Gas >= structlogs[i].Gas {
Expand Down
2 changes: 1 addition & 1 deletion pkg/processor/transaction/structlog/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func ComputeMemoryWords(structlogs []execution.StructLog) (wordsBefore, wordsAft
depthU64 = math.MaxInt
}

depth := int(depthU64) //nolint:gosec // overflow checked above
depth := int(depthU64)

// Ensure slice has enough space for this depth.
for len(pendingIdx) <= depth {
Expand Down
12 changes: 6 additions & 6 deletions pkg/processor/transaction/structlog/transaction_processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (p *Processor) ProcessTransaction(ctx context.Context, block execution.Bloc
TransactionGas: trace.Gas,
TransactionFailed: trace.Failed,
TransactionReturnValue: trace.ReturnValue,
Index: uint32(i), //nolint:gosec // index is bounded by structlogs length
Index: uint32(i),
Operation: sl.Op,
Gas: sl.Gas,
GasCost: sl.GasCost,
Expand Down Expand Up @@ -254,8 +254,8 @@ func (p *Processor) ProcessTransaction(ctx context.Context, block execution.Bloc
TransactionGas: trace.Gas,
TransactionFailed: trace.Failed,
TransactionReturnValue: trace.ReturnValue,
Index: uint32(i), //nolint:gosec // Same index as parent CALL
Operation: "", // Empty = synthetic EOA frame
Index: uint32(i),
Operation: "", // Empty = synthetic EOA frame
Gas: 0,
GasCost: 0,
GasUsed: 0,
Expand Down Expand Up @@ -537,7 +537,7 @@ func (p *Processor) ExtractStructlogs(ctx context.Context, block execution.Block
TransactionGas: trace.Gas,
TransactionFailed: trace.Failed,
TransactionReturnValue: trace.ReturnValue,
Index: uint32(i), //nolint:gosec // index is bounded by structlogs length
Index: uint32(i),
Operation: structLog.Op,
Gas: structLog.Gas,
GasCost: structLog.GasCost,
Expand Down Expand Up @@ -586,8 +586,8 @@ func (p *Processor) ExtractStructlogs(ctx context.Context, block execution.Block
TransactionGas: trace.Gas,
TransactionFailed: trace.Failed,
TransactionReturnValue: trace.ReturnValue,
Index: uint32(i), //nolint:gosec // Same index as parent CALL
Operation: "", // Empty = synthetic EOA frame
Index: uint32(i),
Operation: "", // Empty = synthetic EOA frame
Gas: 0,
GasCost: 0,
GasUsed: 0,
Expand Down
4 changes: 2 additions & 2 deletions pkg/processor/transaction/structlog_agg/block_processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func (p *Processor) EnqueueTransactionTasks(ctx context.Context, block execution
payload := &ProcessPayload{
BlockNumber: *block.Number(),
TransactionHash: tx.Hash().String(),
TransactionIndex: uint32(index), //nolint:gosec // index is bounded by block.Transactions() length
TransactionIndex: uint32(index),
NetworkName: p.network.Name,
Network: p.network.Name,
}
Expand Down Expand Up @@ -459,7 +459,7 @@ func (p *Processor) ReprocessBlock(ctx context.Context, blockNum uint64) error {
payload := &ProcessPayload{
BlockNumber: *block.Number(),
TransactionHash: tx.Hash().String(),
TransactionIndex: uint32(index), //nolint:gosec // index is bounded by block.Transactions() length
TransactionIndex: uint32(index),
NetworkName: p.network.Name,
Network: p.network.Name,
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/processor/transaction/structlog_agg/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,11 @@ func New(deps *Dependencies, config *Config) (*Processor, error) {

// Start starts the processor.
func (p *Processor) Start(ctx context.Context) error {
if !p.pool.HasEmbeddedNodes() {
panic("structlog_agg processor requires embedded mode (e.g. embedded in erigon); " +
"it cannot run against a standard JSON-RPC execution node")
}

// Start the ClickHouse client
if err := p.clickhouse.Start(); err != nil {
return fmt.Errorf("failed to start ClickHouse client: %w", err)
Expand Down