diff --git a/package-lock.json b/package-lock.json index e6e774f06..9f946198b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -44,7 +44,7 @@ "dotenv": "^16.3.1", "eciesjs": "^0.4.5", "eth-crypto": "^2.6.0", - "ethers": "^6.8.1", + "ethers": "^6.16.0", "express": "^4.21.1", "humanhash": "^1.0.4", "hyperdiff": "^2.0.16", @@ -9923,9 +9923,9 @@ "license": "MIT" }, "node_modules/ethers": { - "version": "6.15.0", - "resolved": "https://registry.npmjs.org/ethers/-/ethers-6.15.0.tgz", - "integrity": "sha512-Kf/3ZW54L4UT0pZtsY/rf+EkBU7Qi5nnhonjUb8yTXcxH3cdcWrV2cRyk0Xk/4jK6OoHhxxZHriyhje20If2hQ==", + "version": "6.16.0", + "resolved": "https://registry.npmjs.org/ethers/-/ethers-6.16.0.tgz", + "integrity": "sha512-U1wulmetNymijEhpSEQ7Ct/P/Jw9/e7R1j5XIbPRydgV2DjLVMsULDlNksq3RQnFgKoLlZf88ijYtWEXcPa07A==", "funding": [ { "type": "individual", diff --git a/package.json b/package.json index 29b8ce2a7..9300287d3 100644 --- a/package.json +++ b/package.json @@ -83,7 +83,7 @@ "dotenv": "^16.3.1", "eciesjs": "^0.4.5", "eth-crypto": "^2.6.0", - "ethers": "^6.8.1", + "ethers": "^6.16.0", "express": "^4.21.1", "humanhash": "^1.0.4", "hyperdiff": "^2.0.16", diff --git a/src/components/Indexer/index.ts b/src/components/Indexer/index.ts index e261bb453..fd9e94476 100644 --- a/src/components/Indexer/index.ts +++ b/src/components/Indexer/index.ts @@ -35,9 +35,10 @@ import { BlockchainRegistry } from '../BlockchainRegistry/index.js' import { CommandStatus, JobStatus } from '../../@types/commands.js' import { buildJobIdentifier, getDeployedContractBlock } from './utils.js' import { create256Hash } from '../../utils/crypt.js' -import { isReachableConnection } from '../../utils/database.js' +import { getDatabase, isReachableConnection } from '../../utils/database.js' import { sleep } from '../../utils/util.js' import { isReindexingNeeded } from './version.js' +import { DB_EVENTS, ES_CONNECTION_EVENTS } from '../database/ElasticsearchConfigHelper.js' /** * Event emitter for DDO (Data Descriptor Object) events @@ -82,6 +83,8 @@ export class OceanIndexer { private supportedChains: string[] private indexers: Map = new Map() private MIN_REQUIRED_VERSION = '0.2.2' + private isDbConnected: boolean = true + private reconnectTimer: NodeJS.Timeout | null = null constructor( db: Database, @@ -93,9 +96,64 @@ export class OceanIndexer { this.blockchainRegistry = blockchainRegistry this.supportedChains = Object.keys(supportedNetworks) INDEXING_QUEUE = [] + this.setupDbConnectionListeners() this.startAllChainIndexers() } + /** + * Listen for Elasticsearch connection events. + * + * CONNECTION_LOST → cancel any pending restart, stop all indexers once. + * CONNECTION_RESTORED → debounce restart by 5 s so rapid LOST/RESTORED cycles are a single restart. + */ + private setupDbConnectionListeners(): void { + ES_CONNECTION_EVENTS.on(DB_EVENTS.CONNECTION_LOST, async () => { + if (this.reconnectTimer) { + clearTimeout(this.reconnectTimer) + this.reconnectTimer = null + } + + if (!this.isDbConnected) { + return + } + + this.isDbConnected = false + INDEXER_LOGGER.error( + 'Database connection lost - stopping all chain indexers until DB is back' + ) + await this.stopAllChainIndexers() + }) + + ES_CONNECTION_EVENTS.on(DB_EVENTS.CONNECTION_RESTORED, () => { + if (this.isDbConnected) { + return + } + + if (this.reconnectTimer) { + clearTimeout(this.reconnectTimer) + } + + this.reconnectTimer = setTimeout(async () => { + this.reconnectTimer = null + if (this.isDbConnected) { + return + } + + this.isDbConnected = true + numCrawlAttempts = 0 + INDEXER_LOGGER.info( + 'Database connection stable - reinitialising DB and restarting all chain indexers' + ) + const freshDb = await getDatabase(true) + if (freshDb) { + this.db = freshDb + } + + await this.startAllChainIndexers() + }, 5000) + }) + } + public getSupportedNetworks(): RPCS { return this.networks } @@ -157,11 +215,10 @@ export class OceanIndexer { async retryCrawlerWithDelay( blockchain: Blockchain, - interval: number = 5000 // in milliseconds, default 5 secs + interval: number = 5000 // in milliseconds, default 2 secs ): Promise { try { const retryInterval = Math.max(blockchain.getKnownRPCs().length * 3000, interval) // give 2 secs per each one - // try const result = await this.startCrawler(blockchain) const dbActive = this.getDatabase() if (!dbActive || !(await isReachableConnection(dbActive.getConfig().url))) { diff --git a/src/components/c2d/compute_engine_docker.ts b/src/components/c2d/compute_engine_docker.ts index 2f20300c6..5d6222721 100644 --- a/src/components/c2d/compute_engine_docker.ts +++ b/src/components/c2d/compute_engine_docker.ts @@ -135,39 +135,41 @@ export class C2DEngineDocker extends C2DEngine { supportedChains.push(parseInt(chain)) } } - for (const feeChain of Object.keys(envConfig.fees)) { - // for (const feeConfig of envConfig.fees) { - if (supportedChains.includes(parseInt(feeChain))) { - if (fees === null) fees = {} - if (!(feeChain in fees)) fees[feeChain] = [] - const tmpFees: ComputeEnvFees[] = [] - for (let i = 0; i < envConfig.fees[feeChain].length; i++) { - if ( - envConfig.fees[feeChain][i].prices && - envConfig.fees[feeChain][i].prices.length > 0 - ) { - if (!envConfig.fees[feeChain][i].feeToken) { - const tokenAddress = getOceanTokenAddressForChain(parseInt(feeChain)) - if (tokenAddress) { - envConfig.fees[feeChain][i].feeToken = tokenAddress - tmpFees.push(envConfig.fees[feeChain][i]) + if (envConfig.fees && Object.keys(envConfig.fees).length > 0) { + for (const feeChain of Object.keys(envConfig.fees)) { + // for (const feeConfig of envConfig.fees) { + if (supportedChains.includes(parseInt(feeChain))) { + if (fees === null) fees = {} + if (!(feeChain in fees)) fees[feeChain] = [] + const tmpFees: ComputeEnvFees[] = [] + for (let i = 0; i < envConfig.fees[feeChain].length; i++) { + if ( + envConfig.fees[feeChain][i].prices && + envConfig.fees[feeChain][i].prices.length > 0 + ) { + if (!envConfig.fees[feeChain][i].feeToken) { + const tokenAddress = getOceanTokenAddressForChain(parseInt(feeChain)) + if (tokenAddress) { + envConfig.fees[feeChain][i].feeToken = tokenAddress + tmpFees.push(envConfig.fees[feeChain][i]) + } else { + CORE_LOGGER.error( + `Unable to find Ocean token address for chain ${feeChain} and no custom token provided` + ) + } } else { - CORE_LOGGER.error( - `Unable to find Ocean token address for chain ${feeChain} and no custom token provided` - ) + tmpFees.push(envConfig.fees[feeChain][i]) } } else { - tmpFees.push(envConfig.fees[feeChain][i]) + CORE_LOGGER.error( + `Unable to find prices for fee ${JSON.stringify( + envConfig.fees[feeChain][i] + )} on chain ${feeChain}` + ) } - } else { - CORE_LOGGER.error( - `Unable to find prices for fee ${JSON.stringify( - envConfig.fees[feeChain][i] - )} on chain ${feeChain}` - ) } + fees[feeChain] = tmpFees } - fees[feeChain] = tmpFees } /* for (const chain of Object.keys(config.supportedNetworks)) { @@ -436,7 +438,7 @@ export class C2DEngineDocker extends C2DEngine { if (minDuration > 0) { // We need to claim payment - const fee = env.fees[job.payment.chainId]?.find( + const fee = env.fees?.[job.payment.chainId]?.find( (fee) => fee.feeToken === job.payment.token ) diff --git a/src/components/database/ElasticsearchConfigHelper.ts b/src/components/database/ElasticsearchConfigHelper.ts index 9838e4ff1..072ccfc2a 100644 --- a/src/components/database/ElasticsearchConfigHelper.ts +++ b/src/components/database/ElasticsearchConfigHelper.ts @@ -1,9 +1,16 @@ +import EventEmitter from 'node:events' import { Client } from '@elastic/elasticsearch' import { OceanNodeDBConfig } from '../../@types' import { DATABASE_LOGGER } from '../../utils/logging/common.js' import { GENERIC_EMOJIS, LOG_LEVELS_STR } from '../../utils/logging/Logger.js' import { DB_TYPES } from '../../utils/constants.js' +export const DB_EVENTS = { + CONNECTION_LOST: 'db:connection:lost', + CONNECTION_RESTORED: 'db:connection:restored' +} as const +export const ES_CONNECTION_EVENTS = new EventEmitter() + export interface ElasticsearchRetryConfig { requestTimeout?: number pingTimeout?: number @@ -16,20 +23,20 @@ export interface ElasticsearchRetryConfig { } export const DEFAULT_ELASTICSEARCH_CONFIG: Required = { - requestTimeout: parseInt(process.env.ELASTICSEARCH_REQUEST_TIMEOUT || '60000'), - pingTimeout: parseInt(process.env.ELASTICSEARCH_PING_TIMEOUT || '5000'), + requestTimeout: parseInt(process.env.ELASTICSEARCH_REQUEST_TIMEOUT || '30000'), + pingTimeout: parseInt(process.env.ELASTICSEARCH_PING_TIMEOUT || '3000'), resurrectStrategy: (process.env.ELASTICSEARCH_RESURRECT_STRATEGY as 'ping' | 'optimistic' | 'none') || 'ping', - maxRetries: parseInt(process.env.ELASTICSEARCH_MAX_RETRIES || '5'), + maxRetries: parseInt(process.env.ELASTICSEARCH_MAX_RETRIES || '3'), sniffOnStart: process.env.ELASTICSEARCH_SNIFF_ON_START !== 'false', sniffInterval: process.env.ELASTICSEARCH_SNIFF_INTERVAL === 'false' ? false - : parseInt(process.env.ELASTICSEARCH_SNIFF_INTERVAL || '30000'), + : parseInt(process.env.ELASTICSEARCH_SNIFF_INTERVAL || '30000', 10) || 30000, sniffOnConnectionFault: process.env.ELASTICSEARCH_SNIFF_ON_CONNECTION_FAULT !== 'false', healthCheckInterval: parseInt( - process.env.ELASTICSEARCH_HEALTH_CHECK_INTERVAL || '60000' + process.env.ELASTICSEARCH_HEALTH_CHECK_INTERVAL || '15000' ) } @@ -42,6 +49,7 @@ class ElasticsearchClientSingleton { private isRetrying: boolean = false private healthCheckTimer: NodeJS.Timeout | null = null private isMonitoring: boolean = false + private connectionLostEmitted: boolean = false private constructor() {} @@ -73,21 +81,27 @@ class ElasticsearchClientSingleton { } if (this.client && this.config) { + // Skip the extra ping here: 5 DB-class constructors all call getClient() + // during reconnect reinit, and concurrent pings cause false errors that trigger another LOST/RESTORED cycle. + if (this.isMonitoring) { + return this.client + } + const isHealthy = await this.checkConnectionHealth() if (isHealthy) { this.startHealthMonitoring(config, customConfig) return this.client } else { DATABASE_LOGGER.logMessageWithEmoji( - `Elasticsearch connection interrupted or failed to ${this.maskUrl( + `Elasticsearch connection unhealthy at ${this.maskUrl( this.config.url - )} - starting retry phase`, + )} - health monitoring will handle reconnection`, true, GENERIC_EMOJIS.EMOJI_CROSS_MARK, LOG_LEVELS_STR.LEVEL_WARN ) - this.closeConnectionSync() - return this.startRetryConnection(config, customConfig) + this.startHealthMonitoring(config, customConfig) + throw new Error('Elasticsearch connection is not healthy') } } @@ -109,33 +123,69 @@ class ElasticsearchClientSingleton { this.isMonitoring = true DATABASE_LOGGER.logMessageWithEmoji( - `Starting Elasticsearch connection monitoring (health check every ${finalConfig.healthCheckInterval}ms)`, + `Starting Elasticsearch health monitoring (interval: ${finalConfig.healthCheckInterval}ms)`, true, GENERIC_EMOJIS.EMOJI_OCEAN_WAVE, LOG_LEVELS_STR.LEVEL_DEBUG ) this.healthCheckTimer = setInterval(async () => { - if (this.client && !this.isRetrying) { - const isHealthy = await this.checkConnectionHealth() - if (!isHealthy) { - DATABASE_LOGGER.logMessageWithEmoji( - `Elasticsearch connection lost during monitoring - triggering automatic reconnection`, - true, - GENERIC_EMOJIS.EMOJI_CROSS_MARK, - LOG_LEVELS_STR.LEVEL_WARN - ) - this.closeConnectionSync() + if (this.isRetrying) { + return + } + + const isHealthy = await this.checkConnectionHealth() + if (!isHealthy) { + if (this.client) { try { - await this.startRetryConnection(config, customConfig) - } catch (error) { + this.client.close() + } catch (err) { DATABASE_LOGGER.logMessageWithEmoji( - `Automatic reconnection failed: ${error.message}`, + `Error closing Elasticsearch client during health check: ${err instanceof Error ? err.message : String(err)}`, true, GENERIC_EMOJIS.EMOJI_CROSS_MARK, - LOG_LEVELS_STR.LEVEL_ERROR + LOG_LEVELS_STR.LEVEL_DEBUG ) } + this.client = null + this.config = null + } + + // Emit CONNECTION_LOST + if (!this.connectionLostEmitted) { + this.connectionLostEmitted = true + DATABASE_LOGGER.logMessageWithEmoji( + `Elasticsearch connection lost to ${this.maskUrl( + config.url + )} - starting reconnection attempts every ${finalConfig.healthCheckInterval}ms`, + true, + GENERIC_EMOJIS.EMOJI_CROSS_MARK, + LOG_LEVELS_STR.LEVEL_WARN + ) + ES_CONNECTION_EVENTS.emit(DB_EVENTS.CONNECTION_LOST) + } + + // Single reconnection attempt + this.isRetrying = true + try { + DATABASE_LOGGER.logMessageWithEmoji( + `Attempting Elasticsearch reconnection to ${this.maskUrl(config.url)}`, + true, + GENERIC_EMOJIS.EMOJI_OCEAN_WAVE, + LOG_LEVELS_STR.LEVEL_INFO + ) + await this.createNewConnection(config, customConfig) + this.isRetrying = false + this.connectionLostEmitted = false + DATABASE_LOGGER.logMessageWithEmoji( + `Elasticsearch connection restored to ${this.maskUrl(config.url)}`, + true, + GENERIC_EMOJIS.EMOJI_CHECK_MARK, + LOG_LEVELS_STR.LEVEL_INFO + ) + ES_CONNECTION_EVENTS.emit(DB_EVENTS.CONNECTION_RESTORED) + } catch { + this.isRetrying = false } } }, finalConfig.healthCheckInterval) @@ -155,71 +205,6 @@ class ElasticsearchClientSingleton { } } - private async startRetryConnection( - config: OceanNodeDBConfig, - customConfig: Partial = {} - ): Promise { - if (!this.isElasticsearchDatabase(config)) { - throw new Error(`Database type '${config.dbType}' is not Elasticsearch`) - } - - this.isRetrying = true - const finalConfig = { - ...DEFAULT_ELASTICSEARCH_CONFIG, - ...customConfig - } - - DATABASE_LOGGER.logMessageWithEmoji( - `Starting Elasticsearch retry connection phase to ${this.maskUrl( - config.url - )} (max retries: ${finalConfig.maxRetries})`, - true, - GENERIC_EMOJIS.EMOJI_OCEAN_WAVE, - LOG_LEVELS_STR.LEVEL_INFO - ) - - for (let attempt = 1; attempt <= finalConfig.maxRetries; attempt++) { - try { - DATABASE_LOGGER.logMessageWithEmoji( - `Elasticsearch reconnection attempt ${attempt}/${ - finalConfig.maxRetries - } to ${this.maskUrl(config.url)}`, - true, - GENERIC_EMOJIS.EMOJI_OCEAN_WAVE, - LOG_LEVELS_STR.LEVEL_INFO - ) - - const client = await this.createNewConnection(config, customConfig) - this.isRetrying = false - return client - } catch (error) { - if (attempt === finalConfig.maxRetries) { - this.isRetrying = false - DATABASE_LOGGER.logMessageWithEmoji( - `Elasticsearch retry connection failed after ${ - finalConfig.maxRetries - } attempts to ${this.maskUrl(config.url)}: ${error.message}`, - true, - GENERIC_EMOJIS.EMOJI_CROSS_MARK, - LOG_LEVELS_STR.LEVEL_ERROR - ) - throw error - } - - const delay = Math.min(1000 * Math.pow(2, attempt - 1), 30000) - DATABASE_LOGGER.logMessageWithEmoji( - `Elasticsearch retry attempt ${attempt}/${finalConfig.maxRetries} failed, waiting ${delay}ms before next attempt: ${error.message}`, - true, - GENERIC_EMOJIS.EMOJI_CROSS_MARK, - LOG_LEVELS_STR.LEVEL_WARN - ) - await new Promise((resolve) => setTimeout(resolve, delay)) - } - } - - throw new Error('Maximum retry attempts reached') - } - private async checkConnectionHealth(): Promise { if (!this.client) return false @@ -228,7 +213,7 @@ class ElasticsearchClientSingleton { return true } catch (error) { DATABASE_LOGGER.logMessageWithEmoji( - `Elasticsearch connection health check failed: ${error.message}`, + `Elasticsearch health check failed: ${error.message}`, true, GENERIC_EMOJIS.EMOJI_CROSS_MARK, LOG_LEVELS_STR.LEVEL_DEBUG @@ -277,9 +262,7 @@ class ElasticsearchClientSingleton { DATABASE_LOGGER.logMessageWithEmoji( `Elasticsearch connection established successfully to ${this.maskUrl( config.url - )} (attempt ${this.connectionAttempts}/${ - finalConfig.maxRetries - }) last successful connection ${this.lastConnectionTime}`, + )} (attempt ${this.connectionAttempts}) last successful connection ${this.lastConnectionTime}`, true, GENERIC_EMOJIS.EMOJI_CHECK_MARK, LOG_LEVELS_STR.LEVEL_INFO @@ -290,9 +273,7 @@ class ElasticsearchClientSingleton { DATABASE_LOGGER.logMessageWithEmoji( `Failed to connect to Elasticsearch at ${this.maskUrl(config.url)} (attempt ${ this.connectionAttempts - }/${finalConfig.maxRetries}) last successful connection ${ - this.lastConnectionTime - }: ${error.message}`, + }) last successful connection ${this.lastConnectionTime}: ${error.message}`, true, GENERIC_EMOJIS.EMOJI_CROSS_MARK, LOG_LEVELS_STR.LEVEL_ERROR @@ -323,6 +304,7 @@ class ElasticsearchClientSingleton { this.client = null this.config = null } + this.connectionLostEmitted = false } public getConnectionStats(): { diff --git a/src/utils/config/schemas.ts b/src/utils/config/schemas.ts index 4e905871a..87b62d683 100644 --- a/src/utils/config/schemas.ts +++ b/src/utils/config/schemas.ts @@ -163,14 +163,20 @@ export const C2DDockerConfigSchema = z.array( accessLists: z.array(z.string()) }) .optional(), - fees: z.record(z.string(), z.array(ComputeEnvFeesSchema)), + fees: z.record(z.string(), z.array(ComputeEnvFeesSchema)).optional(), free: ComputeEnvironmentFreeOptionsSchema.optional(), imageRetentionDays: z.number().int().min(1).optional().default(7), imageCleanupInterval: z.number().int().min(3600).optional().default(86400) // min 1 hour, default 24 hours }) - .refine((data) => data.fees !== undefined && Object.keys(data.fees).length > 0, { - message: 'There is no fees configuration!' - }) + .refine( + (data) => + (data.fees !== undefined && Object.keys(data.fees).length > 0) || + (data.free !== undefined && data.free !== null), + { + message: + 'Each docker compute environment must have either a non-empty "fees" configuration or a "free" configuration' + } + ) .refine((data) => data.storageExpiry >= data.maxJobDuration, { message: '"storageExpiry" should be greater than "maxJobDuration"' }) diff --git a/src/utils/cronjobs/p2pAnnounceC2D.ts b/src/utils/cronjobs/p2pAnnounceC2D.ts index b1142b6b0..156a2ae80 100644 --- a/src/utils/cronjobs/p2pAnnounceC2D.ts +++ b/src/utils/cronjobs/p2pAnnounceC2D.ts @@ -41,56 +41,57 @@ export async function p2pAnnounceC2D(node: OceanNode) { break } } - for (const resource of env.free.resources) { - let min = 0 - let kind = null - let type = null - // we need to get the min from resources - for (const res of env.resources) { - if (res.id === resource.id) { - ;({ min } = res) - ;({ kind } = res) - ;({ type } = res) + if (env.free?.resources) { + for (const resource of env.free.resources) { + let min = 0 + let kind = null + let type = null + // we need to get the min from resources + for (const res of env.resources) { + if (res.id === resource.id) { + ;({ min } = res) + ;({ kind } = res) + ;({ type } = res) + } } - } - switch (type) { - case 'cpu': - case 'gpu': - // For CPU and GPU, we assume the min and max are in terms of cores - // and we generate announcements for each core count in the range - // if min is not defined, we assume it is 1 - for (let i = min || 1; i <= resource.max; i++) { - const obj: Record = {} - obj.free = true - obj[type] = i - if (!announce.includes(obj)) { - announce.push(obj) - } - if (type === 'gpu' && kind) { - obj.kind = kind // add kind if available + switch (type) { + case 'cpu': + case 'gpu': + // For CPU and GPU, we assume the min and max are in terms of cores + // and we generate announcements for each core count in the range + // if min is not defined, we assume it is 1 + for (let i = min || 1; i <= resource.max; i++) { + const obj: Record = {} + obj.free = true + obj[type] = i if (!announce.includes(obj)) { announce.push(obj) } + if (type === 'gpu' && kind) { + obj.kind = kind // add kind if available + if (!announce.includes(obj)) { + announce.push(obj) + } + } } - } - break + break - case 'ram': - case 'disk': - for (let i = min; i <= resource.max; i += GB) { - const obj: Record = {} - obj.free = true - obj[type] = Math.round(i / GB) - if (!announce.includes(obj) && obj[type] > 0) { - announce.push(obj) + case 'ram': + case 'disk': + for (let i = min; i <= resource.max; i += GB) { + const obj: Record = {} + obj.free = true + obj[type] = Math.round(i / GB) + if (!announce.includes(obj) && obj[type] > 0) { + announce.push(obj) + } } - } - break + break + } } } } - // now announce all resources to p2p network for (const obj of announce) { const res = { c2d: obj