Skip to content

Commit 623b257

Browse files
mattevansSavidclaude
authored
release: embed mode + structlog agg (#54)
* feat(structlog): add call frame tracking to identify EVM call contexts Introduce CallTracker to assign sequential frame IDs and maintain call paths during opcode traversal. This enables accurate identification of which contract call each opcode belongs to, even when the same contract is called multiple times. Extend Structlog with CallFrameID and CallFramePath fields to persist the tracking information alongside each opcode record. Update extractCallAddress to handle all CALL-type opcodes (CALL, CALLCODE, DELEGATECALL, STATICCALL) for complete call target extraction. * test: add comprehensive unit tests for extractCallAddress function * test: add unit tests for CREATE/CREATE2 address extraction feat: detect CREATE/CREATE2 opcodes and fetch contract address from receipt refactor: replace extractCallAddress with extractCallAddressWithCreate to handle contract creation addresses * style(transaction_processing.go): move comment to line above log to match Go style * refactor(processor): replace receipt-based CREATE address lookup with trace-based computation - Remove fetchCreateAddress and hasCreateOpcode helpers - Introduce ComputeCreateAddresses to extract addresses directly from trace - Update extractCallAddressWithCreate signature to accept index and map - Rename queue name from "transaction-structlog" to "transaction_structlog" - Expand test coverage for nested and failed CREATE scenarios * fix(call_tracker): align root frame depth with EVM traces (depth 1) The root frame now starts at depth 1 instead of 0 to match the actual EVM structlog output, where execution begins at depth 1. All tests updated to reflect the new depth semantics. * feat(structlog): add GasSelf field to isolate CALL/CREATE overhead from child gas Introduce GasSelf to represent the gas consumed by an opcode *excluding* any gas spent in child frames. For CALL/CREATE opcodes this yields the pure call overhead (warm/cold access, memory expansion, value transfer); for all other opcodes GasSelf equals GasUsed. This allows accurate aggregation of total execution gas without double counting. - Add ComputeGasSelf() helper that subtracts the sum of *direct* child GasUsed values from each CALL/CREATE opcode’s GasUsed. - Extend Structlog struct and ClickHouse schema with new GasSelf column. - Update both ProcessTransaction() and ExtractStructlogs() to populate the new field. - Provide extensive unit tests covering nested, sibling and edge cases. * test(structlog): fix and expand address extraction tests for CALL opcodes test(structlog): add comprehensive format_address tests for 20-byte padding refactor(structlog): introduce formatAddress to normalize stack values to 42-char addresses * WIP: current changes * fix(structlog): correct EOA call detection to prevent phantom synthetic frames Replace heuristic precompile check with go-ethereum's canonical list to avoid mis-classifying early EOAs/contracts as precompiles. Tighten the EOA detection logic to only create synthetic frames when call depth remains unchanged (depth == nextDepth) instead of the previous nextDepth <= depth, eliminating phantom frames for failed calls and out-of-gas scenarios. * chore: accidental commit * refactor: remove unused ParityTrace types and helper from execution package * feat: add Node interface and EmbeddedNode for library embedding Introduce abstraction layer for execution nodes to support both RPC-based and embedded usage patterns. - Add Node interface defining the contract for execution data providers - Add DataSource interface for hosts to provide data directly - Add EmbeddedNode implementation that delegates to a DataSource - Rename existing implementation to RPCNode - Add NewPoolWithNodes() for creating pools with pre-created nodes - Update pool to work with Node interface instead of concrete types * refactor: abstract execution types to remove CGO dependency Replace go-ethereum types (Block, Transaction, Receipt) with abstract interfaces to enable library embedding without CGO. Move geth-specific RPC implementation to pkg/ethereum/execution/geth/ subpackage with build tags, allowing clean separation between embedded and RPC modes. * refactor(structlog): add embedded-mode support to eliminate 99% of stack allocations Introduce dual-mode processing (RPC vs embedded) in StructLog to reduce memory pressure. Embedded mode pre-computes GasUsed and CallToAddress in the tracer, avoiding post-processing passes and stack allocations. - Add GasUsed and CallToAddress fields to StructLog - Document operation modes and field semantics - Extract call address via new helper, supporting both modes - Add opcode constants and isCallOpcode utility * perf(embedded_node): force DisableStack=true in DebugTraceTransaction for embedded mode Reduces memory pressure by skipping full stack capture; the tracer already extracts CallToAddress directly for CALL-family opcodes. * feat: add support for pre-computed GasUsed values from embedded tracer Introduce hasPrecomputedGasUsed() to detect when the tracer already populates GasUsed (embedded mode). Skip expensive post-processing computation in that case, falling back to RPC mode calculation only when needed. This maintains backward compatibility while optimizing performance for embedded mode traces. * style(transaction_processing.go): add blank line after batch slice creation for readability * refactor: change ChainID return type from int32 to int64 docs: remove trailing whitespace in README * test(embedded_node_test.go): remove obsolete TestEmbeddedNode_DelegatesToDataSource_ChainID test * fix(pool): register OnReady callbacks before spawning goroutines Eliminates race condition where MarkReady() could fire before callbacks were registered, causing missed ready events. * refactor(embedded_node): add debug logging for OnReady callback execution refactor(pool): add debug logging for OnReady callback registration and execution * fix(transaction_processing.go): extract pre-computed GasUsed values from structlogs in embedded mode instead of skipping post-processing * test(gas_cost_test.go): add unit tests for hasPrecomputedGasUsed function * feat: detect pre-computed CREATE/CREATE2 addresses to skip expensive scan Introduce hasPrecomputedCreateAddresses() to check whether the tracer already populated CallToAddress for CREATE/CREATE2 opcodes. When true (embedded mode), we skip the costly ComputeCreateAddresses() pass, improving performance for large traces. * style(embedded_node.go): add blank line before callback execution for readability * lint * refactor(structlog): remove ProgramCounter field from Structlog struct The field is no longer needed for downstream processing and simplifies the data model by eliminating redundant information. * feat: introduce transaction call_frame processor for EVM execution analysis Add a new processor that aggregates EVM structlog traces into per-call-frame metrics. It extracts gas usage, opcode counts, error counts, and call hierarchy for every CALL/CREATE frame, including synthetic EOA frames. - Add Config, Processor, and all supporting components - Integrate into Manager: config validation, initialization, task enqueue - Provide forwards & backwards processing modes via Asynq tasks - Emit ClickHouse rows with frame-level gas, depth, type, and parent links - Include comprehensive unit tests for aggregation logic * refactor(call_frame): unify root frame CallType to empty string and drop "EOA" label Replace "ROOT" and "EOA" magic strings with an empty string for the root frame and rely on the initiating CALL opcode for synthetic EOA frames. This removes special-case values and keeps the field strictly tied to actual CALL opcodes, simplifying downstream consumers. * fix(aggregator): skip synthetic EOA rows in opcode count fix(aggregator): compute intrinsic gas only when error_count = 0 feat(aggregator): add SetRootTargetAddress to set root frame target fix(transaction_processing): set root frame target_address from tx.To() fix(transaction_processing): emit consistent root frame for simple transfers * refactor(processor): replace call_frame processor with structlog_agg processor The call_frame processor is superseded by a new structlog_agg implementation that aggregates structlog data into call frame rows with per-opcode statistics. This provides richer analytics while maintaining the same downstream table structure. - Remove all call_frame processor code - Introduce structlog_agg with frame-aware aggregation logic - Update config, manager and imports to use the new processor - Keep the same ClickHouse table name for compatibility * fix: replace leftover "call_frame" references with "structlog_agg" docs: update comments and error messages to match actual processor name * fix(aggregator): skip gas refund and intrinsic gas for failed transactions test(aggregator): add unit tests for failed vs successful tx gas handling Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * fix: prevent uint64 underflow in gas calculations and correct intrinsic gas for failed txs - Add underflow guards in ComputeGasUsed and computeGasUsed when structlog gas values are corrupted or out of order, falling back to GasCost. - Reorder arithmetic in computeIntrinsicGas to avoid underflow when receiptGas < gasCumulative. - Always compute intrinsic gas for root frame, even on failed transactions, since it is charged before EVM execution starts. - Add StartBlock/EndBlock config options to allow reprocessing ranges. - Extend test coverage for all underflow scenarios and failed-tx behavior. * fix(geth): sanitize corrupted gasCost values from Erigon underflow bug Erigon's debug_traceTransaction can return huge gasCost values when an unsigned integer underflow occurs in callGas(). Detect and clamp any gasCost that exceeds the available gas to the actual gas left, matching Reth's behavior. Prevents downstream consumers from seeing implausibly large gas costs. * fix(aggregator): count REVERT as error even when opcode has no error field REVERT executes successfully (no opcode error) but still causes the transaction to fail. The failure is indicated by trace.Failed=true. Update Finalize to set ErrorCount=1 in this case and use the corrected value when deciding whether to apply gas refunds. Add test TestFrameAggregator_RevertWithoutOpcodeError to ensure REVERT transactions are correctly flagged as failed and receive no refund. * fix(state): handle nullable UInt32 column in getLimiterMaxBlock Use toUInt64OrZero to cast nullable UInt32 to UInt64, returning 0 for NULL. Treat 0 as "no data" and return genesis block, preventing nil pointer panic. * fix(manager): start state manager in embedded mode to ensure ClickHouse connections * fix(state): replace toUInt64OrZero with ifNull+toUInt64 for nullable UInt32 The new expression avoids potential type-casting issues in ch-go by explicitly handling NULL values before casting to UInt64. * fix(leaderelection): add callback-based notification for guaranteed delivery The channel-based leadership notification was dropping events when the buffer (size 10) was full, causing instances to get stuck thinking they were still leader when they had lost leadership. This adds OnLeadershipChange(callback) to the Elector interface for guaranteed delivery via synchronous callback invocation. The channel API is preserved for backward compatibility but marked as deprecated. Changes: - Add LeadershipCallback type and OnLeadershipChange method to interface - Implement callback storage and notification in RedisElector - Update handlers to call callbacks first, then channel (best-effort) - Update Manager to use callbacks instead of monitorLeadership goroutine - Add tests for callback invocation, multiple callbacks, guaranteed delivery, and slow callback handling * fix: reorder block processing steps to prevent race condition Move block registration (ClickHouse & Redis) before enqueueing tasks to guarantee the block record exists before any task can complete. Calculate expected task count upfront and warn if enqueueing fails. * refactor(leaderelection): replace hand-rolled Redis locking with redsync (#60) * refactor(leaderelection): replace hand-rolled Redis locking with redsync Replace manual SetNX/Lua script implementation with github.com/go-redsync/redsync/v4 for distributed mutex handling. Changes: - Add redsync v4.15.0 dependency - Simplify Elector interface: remove LeadershipChannel() and GetLeaderID() - Rewrite RedisElector using redsync.Mutex with: - WithSetNXOnExtend() for Redis restart resilience - WithDriftFactor(0.01) for clock drift safety - WithTries(1) for non-blocking acquisition attempts - Update tests to use callback-based notification only - Reduce implementation from ~374 to ~260 lines Breaking changes: - LeadershipChannel() removed (use OnLeadershipChange callback instead) - GetLeaderID() removed (redsync uses opaque internal values) * fix logging * perf: move expensive metrics updates to background workers Replace synchronous, per-block metrics collection with dedicated background goroutines that refresh heavy metrics every 15 s. This removes costly ClickHouse queries from the hot path and reduces block-processing latency. - Remove updateBlockMetrics() from both structlog & structlog_agg - Add start/stopMetricsWorker() helpers and runMetricsWorker() loop - Keep only lightweight BlockHeight update in ProcessNextBlock - Introduce limiter cache in state.Manager to avoid repeated MAX() queries; refresh every 6 s via background goroutine - Update limiter query to use ORDER BY … LIMIT 1 for index usage - Add comprehensive tests for cache hit/miss, concurrent access, single-start guarantees and refresh behaviour * perf(manager.go): remove FINAL keyword from queryLimiterMaxBlock query to avoid full table scan and improve performance * style(manager.go): remove "(optimized)" from debug log message * fix(pending.go): use SetNX with 30min TTL to prevent duplicate block processing (#64) feat(pending.go): add ErrBlockAlreadyBeingProcessed sentinel error refactor(block_processing): acquire Redis lock before ClickHouse mark to prevent races fix(state): include milliseconds in ClickHouse timestamp format * feat(processor): zero-interval processing mode as default (#65) Change default processing behavior to run as fast as possible with no delay between cycles. When no work is available, a 10ms backoff prevents CPU spin. - Remove DefaultInterval (was 10s), add DefaultNoWorkBackoff (10ms) - Remove automatic interval assignment in config validation - Change processBlocks to return bool indicating if work was done - Rewrite runBlockProcessing to use default case for continuous processing - Fixed interval mode (interval > 0) still supported for rate limiting * Add async insert settings to processors (#67) - Add configurable asyncInsert and waitForAsyncInsert settings to each processor's ClickHouse writes to reduce part creation pressure - Use *bool pointers to distinguish omitted config (defaults to true) from explicit false values - Remove unused chunkSize and progressLogThreshold config options - Simplify structlog ProcessTransaction by removing OnInput streaming pattern in favor of simple batch insert - Remove DefaultChunkSize and DefaultProgressLogThreshold from tracker - Update tests to remove chunk-related assertions * feat(rowbuffer): add row batching system for ClickHouse inserts (#68) * feat(rowbuffer): add row batching system for ClickHouse inserts Implements a generic row buffer that pools rows in memory across concurrent tasks and flushes when hitting a row limit (100k default) or timer (1s default). - Add pkg/rowbuffer with Buffer[R any] generic type using Go 1.25 features - Add comprehensive tests using testing/synctest for deterministic concurrency - Add Prometheus metrics for flush operations and pending state - Update structlog, structlog_agg, and simple processors to use row buffer - Replace asyncInsert/waitForAsyncInsert config with bufferMaxRows/bufferFlushInterval This reduces ClickHouse distributed pending inserts by batching many small transaction inserts into fewer large batches. * refactor(logging): improve rowbuffer and pending tracker logs - Change "Decremented pending task count" from debug to trace level - Add processor and table fields to ClickHouse flush logs - Rename flush log messages for clarity ("ClickHouse flush completed/failed") * feat(processor): add batch block fetching for improved throughput (#69) Implement batch block fetching to request multiple blocks at once using BatchCallContext RPC, with proper backpressure handling and contiguous-only batching. Changes: - Add BlocksByNumbers to Node/DataSource interfaces for batch RPC - Implement batch RPC using go-ethereum's BatchCallContext - Add GetAvailableCapacity and ValidateBatchWithinLeash to Limiter - Add NextBlocks method to State Manager for batch number generation - Add InitBlocks batch method to PendingTracker using Redis pipeline - Refactor ProcessNextBlock in all processors to support batch fetching - Add exponential backoff with jitter for backpressure handling The batch size is limited by min(maxPendingBlockRange, availableCapacity) and stops at the first missing/not-found block to maintain contiguity. * feat(tracker): replace counter-based tracking with SET-based BlockCompletionTracker (#66) * feat(tracker): replace counter-based tracking with SET-based BlockCompletionTracker - Replace PendingTracker (counter-based) with BlockCompletionTracker (SET-based) - Use deterministic TaskID for deduplication via asynq.TaskID() - Track completed taskIDs in Redis SETs (idempotent SADD) - Remove asynq.Retention since deduplication now handled by Redis SETs - Fix Redis key prefix to be a proper prefix (prefix:key instead of key:prefix) - Add 30min TTL to all block tracking keys (completed, expected, block_meta) - Support orphaned block detection and reprocessing - Change default processing interval to zero (immediate processing) * docs(config): add staleBlockDetection to example config Document the new stale block detection configuration options introduced with the SET-based block completion tracker. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com> * feat(processor): add high-priority reprocess queue for orphaned blocks (#72) Add a dedicated reprocess queue with highest priority (20) that ensures orphaned and stale blocks are processed before regular forwards (10) or backwards (5) processing tasks. Changes: - Add ProcessReprocessQueue and PrefixedProcessReprocessQueue helpers - Enable StrictPriority in asynq server for strict queue ordering - Update all processors to return 3 queues with reprocess as highest priority - Update ReprocessBlock methods to enqueue directly to reprocess queue - Handle non-existent queue gracefully in monitoring (normal for reprocess) - Add comprehensive tests for new queue helpers * feat(rowbuffer): add circuit breaker and concurrent flush limiting (#71) - Add circuit breaker using sony/gobreaker to protect against cascading failures when ClickHouse is unavailable. Configurable via bufferCircuitBreakerMaxFailures and bufferCircuitBreakerTimeout. - Add semaphore to limit concurrent flush operations, configurable via bufferMaxConcurrentFlushes (default: 10). - Add metrics: inflight_flushes, circuit_open, circuit_rejections_total - Fix RPC batch fetching to automatically chunk requests exceeding the 100 block limit imposed by most RPC nodes. - Fix BlocksProcessed metric to correctly count all blocks in a batch by moving increment to processors. - Fix block not found diff calculation: use diff > 0 && diff <= 5 to correctly distinguish blocks that might appear soon from blocks that should already exist. - Add comprehensive tests for circuit breaker, concurrent flushes, semaphore limiting, and block diff calculation. * Revert "feat(rowbuffer): add circuit breaker and concurrent flush limiting (#71)" (#73) This reverts commit fec76c6. --------- Co-authored-by: Andrew Davis <1709934+Savid@users.noreply.github.com> Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1 parent a0c657b commit 623b257

75 files changed

Lines changed: 12770 additions & 1624 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

README.md

Lines changed: 91 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ A distributed system for processing Ethereum execution layer data with support f
3737
### Core Components
3838

3939
- **Ethereum Nodes**: Configure execution node endpoints
40-
- **Redis**: Task queue and leader election coordination
40+
- **Redis**: Task queue and leader election coordination
4141
- **State Manager**: Track processing progress in ClickHouse
4242
- **Processors**: Configure structlog extraction settings
4343

@@ -57,6 +57,93 @@ A distributed system for processing Ethereum execution layer data with support f
5757
└─────────────────────────────────────────┘
5858
```
5959

60+
## Embedded Mode (Library Usage)
61+
62+
The execution-processor can be embedded as a library within an execution client, providing direct data access without JSON-RPC overhead.
63+
64+
### Implementing DataSource
65+
66+
```go
67+
import (
68+
"context"
69+
"math/big"
70+
71+
"github.com/ethereum/go-ethereum/core/types"
72+
"github.com/ethpandaops/execution-processor/pkg/ethereum/execution"
73+
)
74+
75+
type MyDataSource struct {
76+
client *MyExecutionClient
77+
}
78+
79+
func (ds *MyDataSource) BlockNumber(ctx context.Context) (*uint64, error) {
80+
num := ds.client.CurrentBlock()
81+
return &num, nil
82+
}
83+
84+
func (ds *MyDataSource) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) {
85+
return ds.client.GetBlock(number), nil
86+
}
87+
88+
func (ds *MyDataSource) BlockReceipts(ctx context.Context, number *big.Int) ([]*types.Receipt, error) {
89+
return ds.client.GetBlockReceipts(number), nil
90+
}
91+
92+
func (ds *MyDataSource) TransactionReceipt(ctx context.Context, hash string) (*types.Receipt, error) {
93+
return ds.client.GetReceipt(hash), nil
94+
}
95+
96+
func (ds *MyDataSource) DebugTraceTransaction(
97+
ctx context.Context,
98+
hash string,
99+
blockNumber *big.Int,
100+
opts execution.TraceOptions,
101+
) (*execution.TraceTransaction, error) {
102+
return ds.client.TraceTransaction(hash, opts), nil
103+
}
104+
105+
func (ds *MyDataSource) ChainID() int64 {
106+
return ds.client.ChainID()
107+
}
108+
109+
func (ds *MyDataSource) ClientType() string {
110+
return "my-client/1.0.0"
111+
}
112+
113+
func (ds *MyDataSource) IsSynced() bool {
114+
return ds.client.IsSynced()
115+
}
116+
```
117+
118+
### Creating an Embedded Pool
119+
120+
```go
121+
import (
122+
"github.com/ethpandaops/execution-processor/pkg/ethereum"
123+
"github.com/ethpandaops/execution-processor/pkg/ethereum/execution"
124+
)
125+
126+
// Create embedded node with your data source
127+
dataSource := &MyDataSource{client: myClient}
128+
node := execution.NewEmbeddedNode(log, "embedded", dataSource)
129+
130+
// Create pool with the embedded node
131+
pool := ethereum.NewPoolWithNodes(log, "processor", []execution.Node{node}, nil)
132+
pool.Start(ctx)
133+
134+
// Mark ready when your client is synced and ready to serve data
135+
node.MarkReady(ctx)
136+
```
137+
138+
### Embedded vs RPC Mode
139+
140+
| Aspect | RPC Mode | Embedded Mode |
141+
|--------|----------|---------------|
142+
| Data Access | JSON-RPC over HTTP | Direct function calls |
143+
| Readiness | Auto-detected via RPC health checks | Host calls MarkReady() |
144+
| Performance | Network + serialization overhead | Zero serialization overhead |
145+
| Use Case | External execution clients | Library integration |
146+
60147
## Manual Block Queue API
61148

62149
The execution processor provides an HTTP API for manually queuing blocks for reprocessing. This is useful for fixing data issues or reprocessing specific blocks.
@@ -80,7 +167,7 @@ curl -X POST http://localhost:8080/api/v1/queue/block/transaction_structlog/1234
80167
"status": "queued",
81168
"block_number": 12345,
82169
"processor": "transaction_structlog",
83-
"queue": "process:forwards",
170+
"queue": "process:forwards",
84171
"transaction_count": 150,
85172
"tasks_created": 150
86173
}
@@ -158,7 +245,7 @@ curl -X POST http://localhost:8080/api/v1/queue/blocks/transaction_structlog \
158245
# Run tests
159246
go test ./...
160247

161-
# Run with race detector
248+
# Run with race detector
162249
go test ./... --race
163250

164251
# Build
@@ -167,4 +254,4 @@ go build .
167254

168255
## License
169256

170-
See LICENSE file.
257+
See LICENSE file.

example_config.yaml

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,12 @@ processors:
3737
maxProcessQueueSize: 10000 # Stop processing new blocks if process queue exceeds this size
3838
backpressureHysteresis: 0.8 # Clear backpressure when queue drops below this fraction of max (8000 in this case)
3939

40+
# Stale block detection (optional, enabled by default)
41+
staleBlockDetection:
42+
enabled: true
43+
staleThreshold: "5m" # Time after which a block is considered stale
44+
checkInterval: "1m" # How often to check for stale blocks
45+
4046
# Leader election configuration (optional, enabled by default)
4147
leaderElection:
4248
enabled: true
@@ -50,27 +56,32 @@ processors:
5056
addr: "localhost:9000"
5157
database: "default"
5258
table: "canonical_execution_transaction_structlog"
53-
# debug: false # Enable debug logging for ClickHouse queries
54-
# maxPendingBlockRange: 2 # Max distance between oldest incomplete and current block (default: 2)
55-
# Channel-based batching configuration for memory-efficient processing
56-
# bigTransactionThreshold: 500000 # Transactions with more structlogs are considered "big" (default: 500000)
57-
# chunkSize: 10000 # Number of structlogs per batch (default: 10000)
58-
# channelBufferSize: 2 # Number of chunks to buffer in channel (default: 2)
59-
# progressLogThreshold: 100000 # Log progress every N structlogs for large transactions (default: 100000)
59+
# debug: false # Enable debug logging for ClickHouse queries
60+
# maxPendingBlockRange: 2 # Max distance between oldest incomplete and current block (default: 2)
61+
# bufferMaxRows: 100000 # Max rows before flush (default: 100000)
62+
# bufferFlushInterval: "1s" # Max time before flush (default: 1s)
6063

61-
# Small transaction batching configuration
62-
# batchInsertThreshold: 50000 # Transactions with fewer structlogs than this will be batched (default: 50000)
63-
# batchFlushInterval: 5s # Maximum time to wait before flushing a batch (default: 5s)
64-
# batchMaxSize: 100000 # Maximum number of structlogs to accumulate in a batch (default: 100000)
64+
# Aggregated structlog processor (call frame level aggregation)
65+
transactionStructlogAgg:
66+
enabled: false
67+
addr: "localhost:9000"
68+
database: "default"
69+
table: "canonical_execution_transaction_structlog_agg"
70+
# debug: false # Enable debug logging for ClickHouse queries
71+
# maxPendingBlockRange: 2 # Max distance between oldest incomplete and current block (default: 2)
72+
# bufferMaxRows: 100000 # Max rows before flush (default: 100000)
73+
# bufferFlushInterval: "1s" # Max time before flush (default: 1s)
6574

6675
# Simple transaction processor (lightweight - no debug traces)
6776
transactionSimple:
6877
enabled: false
6978
addr: "localhost:9000"
7079
database: "default"
7180
table: "execution_transaction"
72-
# debug: false # Enable debug logging for ClickHouse queries
73-
# maxPendingBlockRange: 2 # Max distance between oldest incomplete and current block (default: 2)
81+
# debug: false # Enable debug logging for ClickHouse queries
82+
# maxPendingBlockRange: 2 # Max distance between oldest incomplete and current block (default: 2)
83+
# bufferMaxRows: 100000 # Max rows before flush (default: 100000)
84+
# bufferFlushInterval: "1s" # Max time before flush (default: 1s)
7485

7586
# Application settings
7687
shutdownTimeout: 6m

go.mod

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,11 @@ require (
99
github.com/creasty/defaults v1.8.0
1010
github.com/ethereum/go-ethereum v1.16.7
1111
github.com/go-co-op/gocron v1.37.0
12+
github.com/go-redsync/redsync/v4 v4.15.0
1213
github.com/hibiken/asynq v0.25.1
1314
github.com/lib/pq v1.10.9
1415
github.com/prometheus/client_golang v1.20.5
15-
github.com/redis/go-redis/v9 v9.14.0
16+
github.com/redis/go-redis/v9 v9.17.2
1617
github.com/sirupsen/logrus v1.9.3
1718
github.com/spf13/cobra v1.10.1
1819
github.com/stretchr/testify v1.11.1
@@ -60,6 +61,8 @@ require (
6061
github.com/google/uuid v1.6.0 // indirect
6162
github.com/gorilla/websocket v1.5.3 // indirect
6263
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.5 // indirect
64+
github.com/hashicorp/errwrap v1.1.0 // indirect
65+
github.com/hashicorp/go-multierror v1.1.1 // indirect
6366
github.com/hashicorp/go-version v1.7.0 // indirect
6467
github.com/holiman/uint256 v1.3.2 // indirect
6568
github.com/inconshreveable/mousetrap v1.1.0 // indirect

go.sum

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,14 @@ github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre
124124
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
125125
github.com/go-ole/go-ole v1.3.0 h1:Dt6ye7+vXGIKZ7Xtk4s6/xVdGDQynvom7xCFEdWr6uE=
126126
github.com/go-ole/go-ole v1.3.0/go.mod h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW6zv78=
127+
github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg=
128+
github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
129+
github.com/go-redis/redis/v7 v7.4.1 h1:PASvf36gyUpr2zdOUS/9Zqc80GbM+9BDyiJSJDDOrTI=
130+
github.com/go-redis/redis/v7 v7.4.1/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg=
131+
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
132+
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
133+
github.com/go-redsync/redsync/v4 v4.15.0 h1:KH/XymuxSV7vyKs6z1Cxxj+N+N18JlPxgXeP6x4JY54=
134+
github.com/go-redsync/redsync/v4 v4.15.0/go.mod h1:qNp+lLs3vkfZbtA/aM/OjlZHfEr5YTAYhRktFPKHC7s=
127135
github.com/gofrs/flock v0.12.1 h1:MTLVXXHf8ekldpJk3AKicLij9MdwOWkZ+a/jHHZby9E=
128136
github.com/gofrs/flock v0.12.1/go.mod h1:9zxTsyu5xtJ9DK+1tFZyibEV7y3uwDxPPfbxeeHCoD0=
129137
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
@@ -132,6 +140,8 @@ github.com/golang-jwt/jwt/v4 v4.5.2 h1:YtQM7lnr8iZ+j5q71MGKkNw9Mn7AjHM68uc9g5fXe
132140
github.com/golang-jwt/jwt/v4 v4.5.2/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0=
133141
github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs=
134142
github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
143+
github.com/gomodule/redigo v1.9.3 h1:dNPSXeXv6HCq2jdyWfjgmhBdqnR6PRO3m/G05nvpPC8=
144+
github.com/gomodule/redigo v1.9.3/go.mod h1:KsU3hiK/Ay8U42qpaJk+kuNa3C+spxapWpM+ywhcgtw=
135145
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
136146
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
137147
github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
@@ -141,8 +151,13 @@ github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aN
141151
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
142152
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.5 h1:jP1RStw811EvUDzsUQ9oESqw2e4RqCjSAD9qIL8eMns=
143153
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.5/go.mod h1:WXNBZ64q3+ZUemCMXD9kYnr56H7CgZxDBHCVwstfl3s=
154+
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
155+
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
156+
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
144157
github.com/hashicorp/go-bexpr v0.1.10 h1:9kuI5PFotCboP3dkDYFr/wi0gg0QVbSNz5oFRpxn4uE=
145158
github.com/hashicorp/go-bexpr v0.1.10/go.mod h1:oxlubA2vC/gFVfX1A6JGp7ls7uCDlfJn732ehYYg+g0=
159+
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
160+
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
146161
github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY=
147162
github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
148163
github.com/hibiken/asynq v0.25.1 h1:phj028N0nm15n8O2ims+IvJ2gz4k2auvermngh9JhTw=
@@ -255,8 +270,12 @@ github.com/prometheus/common v0.64.0 h1:pdZeA+g617P7oGv1CzdTzyeShxAGrTBsolKNOLQP
255270
github.com/prometheus/common v0.64.0/go.mod h1:0gZns+BLRQ3V6NdaerOhMbwwRbNh9hkGINtQAsP5GS8=
256271
github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg=
257272
github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is=
258-
github.com/redis/go-redis/v9 v9.14.0 h1:u4tNCjXOyzfgeLN+vAZaW1xUooqWDqVEsZN0U01jfAE=
259-
github.com/redis/go-redis/v9 v9.14.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw=
273+
github.com/redis/go-redis/v9 v9.17.2 h1:P2EGsA4qVIM3Pp+aPocCJ7DguDHhqrXNhVcEp4ViluI=
274+
github.com/redis/go-redis/v9 v9.17.2/go.mod h1:u410H11HMLoB+TP67dz8rL9s6QW2j76l0//kSOd3370=
275+
github.com/redis/rueidis v1.0.69 h1:WlUefRhuDekji5LsD387ys3UCJtSFeBVf0e5yI0B8b4=
276+
github.com/redis/rueidis v1.0.69/go.mod h1:Lkhr2QTgcoYBhxARU7kJRO8SyVlgUuEkcJO1Y8MCluA=
277+
github.com/redis/rueidis/rueidiscompat v1.0.69 h1:IWVYY9lXdjNO3do2VpJT7aDFi8zbCUuQxZB6E2Grahs=
278+
github.com/redis/rueidis/rueidiscompat v1.0.69/go.mod h1:iC4Y8DoN0Uth0Uezg9e2trvNRC7QAgGeuP2OPLb5ccI=
260279
github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY=
261280
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
262281
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
@@ -299,6 +318,8 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO
299318
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
300319
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
301320
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
321+
github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203 h1:QVqDTf3h2WHt08YuiTGPZLls0Wq99X9bWd0Q5ZSBesM=
322+
github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203/go.mod h1:oqN97ltKNihBbwlX8dLpwxCl3+HnXKV/R0e+sRLd9C8=
302323
github.com/supranational/blst v0.3.16 h1:bTDadT+3fK497EvLdWRQEjiGnUtzJ7jjIUMF0jqwYhE=
303324
github.com/supranational/blst v0.3.16/go.mod h1:jZJtfjgudtNl4en1tzwPIV3KjUnQUvG3/j+w+fVonLw=
304325
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70Z7CTTCmYQn2CKbY8j86K7/FAIr141uY=

pkg/common/metrics.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,4 +205,32 @@ var (
205205
Name: "execution_processor_clickhouse_pool_empty_acquire_wait_duration_seconds",
206206
Help: "Cumulative time spent waiting for a resource when pool was empty",
207207
}, []string{"network", "processor"})
208+
209+
// Row buffer metrics for batched ClickHouse inserts.
210+
RowBufferFlushTotal = promauto.NewCounterVec(prometheus.CounterOpts{
211+
Name: "execution_processor_row_buffer_flush_total",
212+
Help: "Total number of row buffer flushes",
213+
}, []string{"network", "processor", "table", "trigger", "status"})
214+
215+
RowBufferFlushDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
216+
Name: "execution_processor_row_buffer_flush_duration_seconds",
217+
Help: "Duration of row buffer flushes",
218+
Buckets: prometheus.ExponentialBuckets(0.001, 2, 15),
219+
}, []string{"network", "processor", "table"})
220+
221+
RowBufferFlushSize = promauto.NewHistogramVec(prometheus.HistogramOpts{
222+
Name: "execution_processor_row_buffer_flush_size_rows",
223+
Help: "Number of rows per flush",
224+
Buckets: prometheus.ExponentialBuckets(100, 2, 12),
225+
}, []string{"network", "processor", "table"})
226+
227+
RowBufferPendingRows = promauto.NewGaugeVec(prometheus.GaugeOpts{
228+
Name: "execution_processor_row_buffer_pending_rows",
229+
Help: "Current number of rows waiting in the buffer",
230+
}, []string{"network", "processor", "table"})
231+
232+
RowBufferPendingTasks = promauto.NewGaugeVec(prometheus.GaugeOpts{
233+
Name: "execution_processor_row_buffer_pending_tasks",
234+
Help: "Current number of tasks waiting for their rows to be flushed",
235+
}, []string{"network", "processor", "table"})
208236
)

pkg/config/config.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
// Package config provides configuration types for execution-processor.
2+
// This package is designed to be imported without pulling in go-ethereum dependencies,
3+
// making it suitable for embedded mode integrations.
4+
package config
5+
6+
import (
7+
"fmt"
8+
"time"
9+
10+
"github.com/ethpandaops/execution-processor/pkg/ethereum/execution"
11+
"github.com/ethpandaops/execution-processor/pkg/processor"
12+
"github.com/ethpandaops/execution-processor/pkg/redis"
13+
"github.com/ethpandaops/execution-processor/pkg/state"
14+
)
15+
16+
// EthereumConfig is the ethereum network configuration.
17+
// This is a copy of ethereum.Config to avoid importing pkg/ethereum
18+
// which would pull in go-ethereum dependencies.
19+
type EthereumConfig struct {
20+
// Execution configuration
21+
Execution []*execution.Config `yaml:"execution"`
22+
// Override network name for custom networks (bypasses networkMap)
23+
OverrideNetworkName *string `yaml:"overrideNetworkName"`
24+
}
25+
26+
// Validate validates the ethereum configuration.
27+
func (c *EthereumConfig) Validate() error {
28+
for i, exec := range c.Execution {
29+
if err := exec.Validate(); err != nil {
30+
return fmt.Errorf("invalid execution configuration at index %d: %w", i, err)
31+
}
32+
}
33+
34+
return nil
35+
}
36+
37+
// Config is the main configuration for execution-processor.
38+
type Config struct {
39+
// MetricsAddr is the address to listen on for metrics.
40+
MetricsAddr string `yaml:"metricsAddr" default:":9090"`
41+
// HealthCheckAddr is the address to listen on for healthcheck.
42+
HealthCheckAddr *string `yaml:"healthCheckAddr"`
43+
// PProfAddr is the address to listen on for pprof.
44+
PProfAddr *string `yaml:"pprofAddr"`
45+
// APIAddr is the address to listen on for the API server.
46+
APIAddr *string `yaml:"apiAddr"`
47+
// LoggingLevel is the logging level to use.
48+
LoggingLevel string `yaml:"logging" default:"info"`
49+
// Ethereum is the ethereum network configuration.
50+
Ethereum EthereumConfig `yaml:"ethereum"`
51+
// Redis is the redis configuration.
52+
Redis *redis.Config `yaml:"redis"`
53+
// StateManager is the state manager configuration.
54+
StateManager state.Config `yaml:"stateManager"`
55+
// Processors is the processor configuration.
56+
Processors processor.Config `yaml:"processors"`
57+
// ShutdownTimeout is the timeout for shutting down the server.
58+
ShutdownTimeout time.Duration `yaml:"shutdownTimeout" default:"10s"`
59+
}
60+
61+
// Validate validates the configuration.
62+
func (c *Config) Validate() error {
63+
if c.Redis == nil {
64+
return fmt.Errorf("redis configuration is required")
65+
}
66+
67+
if err := c.Redis.Validate(); err != nil {
68+
return fmt.Errorf("invalid redis configuration: %w", err)
69+
}
70+
71+
if err := c.Ethereum.Validate(); err != nil {
72+
return fmt.Errorf("invalid ethereum configuration: %w", err)
73+
}
74+
75+
if err := c.StateManager.Validate(); err != nil {
76+
return fmt.Errorf("invalid state manager configuration: %w", err)
77+
}
78+
79+
if err := c.Processors.Validate(); err != nil {
80+
return fmt.Errorf("invalid processor configuration: %w", err)
81+
}
82+
83+
return nil
84+
}

0 commit comments

Comments
 (0)