diff --git a/pkg/clickhouse/client.go b/pkg/clickhouse/client.go index 798a9b9..3b4c803 100644 --- a/pkg/clickhouse/client.go +++ b/pkg/clickhouse/client.go @@ -647,6 +647,7 @@ func (c *Client) collectPoolMetrics() { stat := c.pool.Stat() // Set gauge values (current state) + common.ClickHouseConnectionsActive.WithLabelValues(network, c.processor).Set(float64(stat.AcquiredResources())) common.ClickHousePoolAcquiredResources.WithLabelValues(network, c.processor).Set(float64(stat.AcquiredResources())) common.ClickHousePoolIdleResources.WithLabelValues(network, c.processor).Set(float64(stat.IdleResources())) common.ClickHousePoolConstructingResources.WithLabelValues(network, c.processor).Set(float64(stat.ConstructingResources())) diff --git a/pkg/processor/manager.go b/pkg/processor/manager.go index 6251eb9..d9bcf84 100644 --- a/pkg/processor/manager.go +++ b/pkg/processor/manager.go @@ -583,7 +583,6 @@ func (m *Manager) processBlocks(ctx context.Context) bool { duration := time.Since(startTime) common.BlockProcessingDuration.WithLabelValues(m.network.Name, name).Observe(duration.Seconds()) - common.BlocksProcessed.WithLabelValues(m.network.Name, name).Inc() m.log.WithFields(logrus.Fields{ "processor": name, diff --git a/pkg/processor/transaction/simple/block_processing.go b/pkg/processor/transaction/simple/block_processing.go index 5d8786d..8f9e477 100644 --- a/pkg/processor/transaction/simple/block_processing.go +++ b/pkg/processor/transaction/simple/block_processing.go @@ -257,6 +257,8 @@ func (p *Processor) ProcessBlock(ctx context.Context, block execution.Block) err common.TasksEnqueued.WithLabelValues(p.network.Name, ProcessorName, queue, task.Type()).Inc() } + common.BlocksProcessed.WithLabelValues(p.network.Name, ProcessorName).Inc() + p.log.WithFields(logrus.Fields{ "block_number": blockNumber.Uint64(), "tx_count": len(block.Transactions()), diff --git a/pkg/processor/transaction/structlog/block_processing.go b/pkg/processor/transaction/structlog/block_processing.go index 22b104b..55b530f 100644 --- a/pkg/processor/transaction/structlog/block_processing.go +++ b/pkg/processor/transaction/structlog/block_processing.go @@ -263,6 +263,8 @@ func (p *Processor) ProcessBlock(ctx context.Context, block execution.Block) err }).Warn("task count mismatch - some tasks may have failed to enqueue") } + common.BlocksProcessed.WithLabelValues(p.network.Name, ProcessorName).Inc() + p.log.WithFields(logrus.Fields{ "network": p.network.Name, "block_number": blockNumber, diff --git a/pkg/processor/transaction/structlog_agg/block_processing.go b/pkg/processor/transaction/structlog_agg/block_processing.go index 8030daa..22826f4 100644 --- a/pkg/processor/transaction/structlog_agg/block_processing.go +++ b/pkg/processor/transaction/structlog_agg/block_processing.go @@ -263,6 +263,8 @@ func (p *Processor) ProcessBlock(ctx context.Context, block execution.Block) err }).Warn("task count mismatch - some tasks may have failed to enqueue") } + common.BlocksProcessed.WithLabelValues(p.network.Name, ProcessorName).Inc() + p.log.WithFields(logrus.Fields{ "network": p.network.Name, "block_number": blockNumber, diff --git a/pkg/rowbuffer/buffer.go b/pkg/rowbuffer/buffer.go index c38d195..f511589 100644 --- a/pkg/rowbuffer/buffer.go +++ b/pkg/rowbuffer/buffer.go @@ -271,6 +271,14 @@ func (b *Buffer[R]) doFlush(ctx context.Context, rows []R, waiters []waiter, tri b.config.Network, b.config.Processor, b.config.Table, trigger, status, ).Inc() + common.ClickHouseOperationDuration.WithLabelValues( + b.config.Network, b.config.Processor, "insert", b.config.Table, status, "", + ).Observe(duration.Seconds()) + + common.ClickHouseOperationTotal.WithLabelValues( + b.config.Network, b.config.Processor, "insert", b.config.Table, status, "", + ).Inc() + common.RowBufferFlushDuration.WithLabelValues( b.config.Network, b.config.Processor, b.config.Table, ).Observe(duration.Seconds())