Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/docs_deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,6 @@ jobs:
- name: Deploy to GitHub Pages
uses: peaceiris/actions-gh-pages@v4
with:
github_token: ${{ secrets.DOCS_DEPLOY_TOKEN }}
github_token: ${{ secrets.PAT_DOCS }}
publish_dir: ./docs/.vitepress/dist
cname: ev.xyz
9 changes: 9 additions & 0 deletions apps/evm/server/force_inclusion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/evstack/ev-node/pkg/config"
blobrpc "github.com/evstack/ev-node/pkg/da/jsonrpc"
da "github.com/evstack/ev-node/pkg/da/types"
"github.com/evstack/ev-node/pkg/genesis"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -73,6 +74,14 @@ func (m *mockDA) HasForcedInclusionNamespace() bool {
return true
}

func (m *mockDA) Subscribe(ctx context.Context, namespace []byte) (<-chan *blobrpc.SubscriptionResponse, error) {
return nil, nil
}

func (m *mockDA) LocalHead(ctx context.Context) (uint64, error) {
return 0, nil
}

func TestForceInclusionServer_handleSendRawTransaction_Success(t *testing.T) {
testHeight := uint64(100)

Expand Down
32 changes: 32 additions & 0 deletions block/internal/common/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ type Metrics struct {
// Forced inclusion metrics
ForcedInclusionTxsInGracePeriod metrics.Gauge // Number of forced inclusion txs currently in grace period
ForcedInclusionTxsMalicious metrics.Counter // Total number of forced inclusion txs marked as malicious

// Sync mode metrics
SyncMode metrics.Gauge // Current sync mode: 0=catchup, 1=follow
SubscribeErrors metrics.Counter // Number of subscription failures
ModeSwitches metrics.Counter // Number of catchup<->follow mode transitions
}

// PrometheusMetrics returns Metrics built using Prometheus client library
Expand Down Expand Up @@ -201,6 +206,28 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
Help: "Total number of forced inclusion transactions marked as malicious (past grace boundary)",
}, labels).With(labelsAndValues...)

// Sync mode metrics
m.SyncMode = prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "sync_mode",
Help: "Current sync mode: 0=catchup (polling), 1=follow (subscription)",
}, labels).With(labelsAndValues...)

m.SubscribeErrors = prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "subscribe_errors_total",
Help: "Total number of DA subscription failures",
}, labels).With(labelsAndValues...)

m.ModeSwitches = prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "mode_switches_total",
Help: "Total number of sync mode transitions between catchup and follow",
}, labels).With(labelsAndValues...)

// DA Submitter metrics
m.DASubmitterPendingBlobs = prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Expand Down Expand Up @@ -269,6 +296,11 @@ func NopMetrics() *Metrics {
// Forced inclusion metrics
ForcedInclusionTxsInGracePeriod: discard.NewGauge(),
ForcedInclusionTxsMalicious: discard.NewCounter(),

// Sync mode metrics
SyncMode: discard.NewGauge(),
SubscribeErrors: discard.NewCounter(),
ModeSwitches: discard.NewCounter(),
}

// Initialize maps with no-op metrics
Expand Down
24 changes: 24 additions & 0 deletions block/internal/common/subscription.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package common

import blobrpc "github.com/evstack/ev-node/pkg/da/jsonrpc"

// BlobsFromSubscription returns non-empty blob data from a subscription response.
func BlobsFromSubscription(resp *blobrpc.SubscriptionResponse) [][]byte {
if resp == nil || len(resp.Blobs) == 0 {
return nil
}

blobs := make([][]byte, 0, len(resp.Blobs))
for _, blob := range resp.Blobs {
if blob == nil {
continue
}
data := blob.Data()
if len(data) == 0 {
continue
}
blobs = append(blobs, data)
}

return blobs
}
156 changes: 155 additions & 1 deletion block/internal/da/async_block_retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"google.golang.org/protobuf/proto"

"github.com/evstack/ev-node/pkg/config"
blobrpc "github.com/evstack/ev-node/pkg/da/jsonrpc"
datypes "github.com/evstack/ev-node/pkg/da/types"
pb "github.com/evstack/ev-node/types/pb/evnode/v1"
)
Expand All @@ -25,6 +26,7 @@ type AsyncBlockRetriever interface {
Stop()
GetCachedBlock(ctx context.Context, daHeight uint64) (*BlockData, error)
UpdateCurrentHeight(height uint64)
StartSubscription()
}

// BlockData contains data retrieved from a single DA height
Expand All @@ -35,7 +37,8 @@ type BlockData struct {
}

// asyncBlockRetriever handles background prefetching of individual DA blocks
// from a specific namespace.
// from a specific namespace. It can optionally subscribe to the namespace for
// real-time updates instead of relying solely on polling.
type asyncBlockRetriever struct {
client Client
logger zerolog.Logger
Expand All @@ -58,6 +61,9 @@ type asyncBlockRetriever struct {

// Polling interval for checking new DA heights
pollInterval time.Duration

// Subscription support
subscriptionStarted atomic.Bool
}

// NewAsyncBlockRetriever creates a new async block retriever with in-memory cache.
Expand Down Expand Up @@ -94,6 +100,7 @@ func NewAsyncBlockRetriever(
func (f *asyncBlockRetriever) Start() {
f.wg.Add(1)
go f.backgroundFetchLoop()

f.logger.Debug().
Uint64("da_start_height", f.daStartHeight).
Uint64("prefetch_window", f.prefetchWindow).
Expand Down Expand Up @@ -125,6 +132,77 @@ func (f *asyncBlockRetriever) UpdateCurrentHeight(height uint64) {
}
}

// StartSubscription starts the subscription loop once; it is safe to call repeatedly.
func (f *asyncBlockRetriever) StartSubscription() {
if len(f.namespace) == 0 {
return
}
if !f.subscriptionStarted.CompareAndSwap(false, true) {
return
}
f.wg.Add(1)
go f.subscriptionLoop()
}

// storeBlock caches a block's blobs, favoring existing data to avoid churn.
// This is used internally by the subscription loop.
func (f *asyncBlockRetriever) storeBlock(ctx context.Context, height uint64, blobs [][]byte, timestamp time.Time) {
if len(f.namespace) == 0 {
return
}
if height < f.daStartHeight {
return
}
if len(blobs) == 0 {
return
}

filtered := make([][]byte, 0, len(blobs))
for _, blob := range blobs {
if len(blob) > 0 {
filtered = append(filtered, blob)
}
}
if len(filtered) == 0 {
return
}

key := newBlockDataKey(height)
if existing, err := f.cache.Get(ctx, key); err == nil {
var pbBlock pb.BlockData
if err := proto.Unmarshal(existing, &pbBlock); err == nil && len(pbBlock.Blobs) > 0 {
return
}
}

pbBlock := &pb.BlockData{
Height: height,
Timestamp: timestamp.Unix(),
Blobs: filtered,
}
data, err := proto.Marshal(pbBlock)
if err != nil {
f.logger.Error().
Err(err).
Uint64("height", height).
Msg("failed to marshal block for caching")
return
}

if err := f.cache.Put(ctx, key, data); err != nil {
f.logger.Error().
Err(err).
Uint64("height", height).
Msg("failed to cache block")
return
}

f.logger.Debug().
Uint64("height", height).
Int("blob_count", len(filtered)).
Msg("cached block from subscription")
}

func newBlockDataKey(height uint64) ds.Key {
return ds.NewKey(fmt.Sprintf("/block/%d", height))
}
Expand Down Expand Up @@ -186,6 +264,82 @@ func (f *asyncBlockRetriever) backgroundFetchLoop() {
}
}

// subscriptionLoop subscribes to the namespace and caches incoming blobs.
func (f *asyncBlockRetriever) subscriptionLoop() {
defer f.wg.Done()

for {
select {
case <-f.ctx.Done():
return
default:
}

if err := f.runSubscription(); err != nil {
if errors.Is(err, context.Canceled) {
return
}
f.logger.Warn().Err(err).Msg("subscription error, will retry")
// Backoff before retry
select {
case <-f.ctx.Done():
return
case <-time.After(f.pollInterval):
}
}
}
}

// runSubscription runs a single subscription session.
func (f *asyncBlockRetriever) runSubscription() error {
ch, err := f.client.Subscribe(f.ctx, f.namespace)
if err != nil {
return fmt.Errorf("failed to subscribe: %w", err)
}

f.logger.Debug().Msg("subscribed to namespace for real-time updates")

for {
select {
case <-f.ctx.Done():
return f.ctx.Err()
case resp, ok := <-ch:
if !ok {
return errors.New("subscription channel closed")
}
f.handleSubscriptionResponse(resp)
}
}
}

// handleSubscriptionResponse processes a subscription response and caches the blobs.
func (f *asyncBlockRetriever) handleSubscriptionResponse(resp *blobrpc.SubscriptionResponse) {
if resp == nil {
return
}

f.UpdateCurrentHeight(resp.Height)

if len(resp.Blobs) == 0 {
return
}

// Extract raw blob data
blobs := make([][]byte, 0, len(resp.Blobs))
for _, b := range resp.Blobs {
if b != nil && len(b.Data()) > 0 {
blobs = append(blobs, b.Data())
}
}

if len(blobs) > 0 {
// TODO: Use Celestia subscription timestamps once available:
// https://github.com/celestiaorg/celestia-node/pull/4752
// Subscription responses do not carry DA timestamps; keep zero to avoid lying.
f.storeBlock(f.ctx, resp.Height, blobs, time.Time{})
}
}

// prefetchBlocks prefetches blocks within the prefetch window.
func (f *asyncBlockRetriever) prefetchBlocks() {
if len(f.namespace) == 0 {
Expand Down
24 changes: 24 additions & 0 deletions block/internal/da/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,3 +442,27 @@ func (c *client) Validate(ctx context.Context, ids []datypes.ID, proofs []datype

return results, nil
}

// Subscribe subscribes to blobs in the specified namespace.
// Returns a channel that receives subscription responses as new blobs are included.
func (c *client) Subscribe(ctx context.Context, namespace []byte) (<-chan *blobrpc.SubscriptionResponse, error) {
ns, err := share.NewNamespaceFromBytes(namespace)
if err != nil {
return nil, fmt.Errorf("invalid namespace: %w", err)
}

return c.blobAPI.Subscribe(ctx, ns)
}

// LocalHead returns the height of the locally synced DA head.
func (c *client) LocalHead(ctx context.Context) (uint64, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for me while reading, appears a little confusing localHead in this context.

Is it querying the local node? is the last header that the DA has of my chain? is the last header that the DA layer has synced?

maybe this thought changes once all the review is done

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ill amend to make more clear

headCtx, cancel := context.WithTimeout(ctx, c.defaultTimeout)
defer cancel()

header, err := c.headerAPI.LocalHead(headCtx)
if err != nil {
return 0, fmt.Errorf("failed to get local head: %w", err)
}

return header.Height, nil
}
Loading
Loading