Skip to content
60 changes: 59 additions & 1 deletion src/components/Indexer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ import { BlockchainRegistry } from '../BlockchainRegistry/index.js'
import { CommandStatus, JobStatus } from '../../@types/commands.js'
import { buildJobIdentifier, getDeployedContractBlock } from './utils.js'
import { create256Hash } from '../../utils/crypt.js'
import { isReachableConnection } from '../../utils/database.js'
import { getDatabase, isReachableConnection } from '../../utils/database.js'
import { sleep } from '../../utils/util.js'
import { isReindexingNeeded } from './version.js'
import { DB_EVENTS, ES_CONNECTION_EVENTS } from '../database/ElasticsearchConfigHelper.js'

/**
* Event emitter for DDO (Data Descriptor Object) events
Expand Down Expand Up @@ -82,6 +83,8 @@ export class OceanIndexer {
private supportedChains: string[]
private indexers: Map<number, ChainIndexer> = new Map()
private MIN_REQUIRED_VERSION = '0.2.2'
private isDbConnected: boolean = true
private reconnectTimer: NodeJS.Timeout | null = null

constructor(
db: Database,
Expand All @@ -93,9 +96,64 @@ export class OceanIndexer {
this.blockchainRegistry = blockchainRegistry
this.supportedChains = Object.keys(supportedNetworks)
INDEXING_QUEUE = []
this.setupDbConnectionListeners()
this.startAllChainIndexers()
}

/**
* Listen for Elasticsearch connection events.
*
* CONNECTION_LOST → cancel any pending restart, stop all indexers once.
* CONNECTION_RESTORED → debounce restart by 5 s so rapid LOST/RESTORED cycles are a single restart.
*/
private setupDbConnectionListeners(): void {
ES_CONNECTION_EVENTS.on(DB_EVENTS.CONNECTION_LOST, async () => {
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer)
this.reconnectTimer = null
}

if (!this.isDbConnected) {
return
}

this.isDbConnected = false
INDEXER_LOGGER.error(
'Database connection lost - stopping all chain indexers until DB is back'
)
await this.stopAllChainIndexers()
})

ES_CONNECTION_EVENTS.on(DB_EVENTS.CONNECTION_RESTORED, () => {
if (this.isDbConnected) {
return
}

if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer)
}

this.reconnectTimer = setTimeout(async () => {
this.reconnectTimer = null
if (this.isDbConnected) {
return
}

this.isDbConnected = true
numCrawlAttempts = 0
INDEXER_LOGGER.info(
'Database connection stable - reinitialising DB and restarting all chain indexers'
)
const freshDb = await getDatabase(true)
if (freshDb) {
this.db = freshDb
}

await this.startAllChainIndexers()
}, 5000)
})
}

public getSupportedNetworks(): RPCS {
return this.networks
}
Expand Down
14 changes: 11 additions & 3 deletions src/components/Indexer/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,17 @@ export const getDeployedContractBlock = (network: number) => {
}

export const getNetworkHeight = async (provider: FallbackProvider) => {
const networkHeight = await provider.getBlockNumber()

return networkHeight
try {
const result = await withRetrial(() => provider.getBlockNumber(), 3, 2000)
console.log(`----------> RPC NETWORK HEIGHT: ${result}`)
return result
} catch (error: unknown) {
const msg = (error as Error)?.message ?? String(error)
if (msg.includes('invalid numeric value') || msg.includes('timeout')) {
throw new Error(`RPC timeout fetching block number: ${msg}`)
}
throw error
}
}

export const retrieveChunkEvents = async (
Expand Down
6 changes: 4 additions & 2 deletions src/components/c2d/compute_engine_docker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,17 +135,18 @@
supportedChains.push(parseInt(chain))
}
}
for (const feeChain of Object.keys(envConfig.fees)) {
if (envConfig.fees && Object.keys(envConfig.fees).length > 0) {
for (const feeChain of Object.keys(envConfig.fees)) {
// for (const feeConfig of envConfig.fees) {

Check failure on line 140 in src/components/c2d/compute_engine_docker.ts

View workflow job for this annotation

GitHub Actions / lint

Insert `··`
if (supportedChains.includes(parseInt(feeChain))) {

Check failure on line 141 in src/components/c2d/compute_engine_docker.ts

View workflow job for this annotation

GitHub Actions / lint

Insert `··`
if (fees === null) fees = {}

Check failure on line 142 in src/components/c2d/compute_engine_docker.ts

View workflow job for this annotation

GitHub Actions / lint

Replace `········` with `··········`
if (!(feeChain in fees)) fees[feeChain] = []

Check failure on line 143 in src/components/c2d/compute_engine_docker.ts

View workflow job for this annotation

GitHub Actions / lint

Insert `··`
const tmpFees: ComputeEnvFees[] = []

Check failure on line 144 in src/components/c2d/compute_engine_docker.ts

View workflow job for this annotation

GitHub Actions / lint

Replace `········` with `··········`
for (let i = 0; i < envConfig.fees[feeChain].length; i++) {

Check failure on line 145 in src/components/c2d/compute_engine_docker.ts

View workflow job for this annotation

GitHub Actions / lint

Insert `··`
if (

Check failure on line 146 in src/components/c2d/compute_engine_docker.ts

View workflow job for this annotation

GitHub Actions / lint

Replace `··········` with `············`
envConfig.fees[feeChain][i].prices &&

Check failure on line 147 in src/components/c2d/compute_engine_docker.ts

View workflow job for this annotation

GitHub Actions / lint

Insert `··`
envConfig.fees[feeChain][i].prices.length > 0

Check failure on line 148 in src/components/c2d/compute_engine_docker.ts

View workflow job for this annotation

GitHub Actions / lint

Replace `············` with `··············`
) {

Check failure on line 149 in src/components/c2d/compute_engine_docker.ts

View workflow job for this annotation

GitHub Actions / lint

Insert `··`
if (!envConfig.fees[feeChain][i].feeToken) {
const tokenAddress = getOceanTokenAddressForChain(parseInt(feeChain))
if (tokenAddress) {
Expand All @@ -169,6 +170,7 @@
}
fees[feeChain] = tmpFees
}
}

/* for (const chain of Object.keys(config.supportedNetworks)) {
const chainId = parseInt(chain)
Expand Down Expand Up @@ -436,7 +438,7 @@

if (minDuration > 0) {
// We need to claim payment
const fee = env.fees[job.payment.chainId]?.find(
const fee = env.fees?.[job.payment.chainId]?.find(
(fee) => fee.feeToken === job.payment.token
)

Expand Down
Loading
Loading