From 7159a13e0fb13661b9d6fa8cae1a0713bdbfb4eb Mon Sep 17 00:00:00 2001 From: Jan Buchar Date: Wed, 1 Apr 2026 17:29:49 +0200 Subject: [PATCH] feat: Modularize SystemStatus to allow custom backpressure mechanisms (#3529) --- .../core/src/autoscaling/autoscaled_pool.ts | 8 + .../src/autoscaling/client_load_signal.ts | 60 ++++ .../core/src/autoscaling/cpu_load_signal.ts | 45 +++ .../src/autoscaling/event_loop_load_signal.ts | 56 +++ packages/core/src/autoscaling/index.ts | 5 + packages/core/src/autoscaling/load_signal.ts | 215 +++++++++++ .../src/autoscaling/memory_load_signal.ts | 148 ++++++++ packages/core/src/autoscaling/snapshotter.ts | 335 +++++------------- .../core/src/autoscaling/system_status.ts | 150 ++++---- test/core/autoscaling/autoscaled_pool.test.ts | 68 ++++ test/core/autoscaling/system_status.test.ts | 76 ++-- 11 files changed, 811 insertions(+), 355 deletions(-) create mode 100644 packages/core/src/autoscaling/client_load_signal.ts create mode 100644 packages/core/src/autoscaling/cpu_load_signal.ts create mode 100644 packages/core/src/autoscaling/event_loop_load_signal.ts create mode 100644 packages/core/src/autoscaling/load_signal.ts create mode 100644 packages/core/src/autoscaling/memory_load_signal.ts diff --git a/packages/core/src/autoscaling/autoscaled_pool.ts b/packages/core/src/autoscaling/autoscaled_pool.ts index 7bfa33f80707..42f7e42e2e35 100644 --- a/packages/core/src/autoscaling/autoscaled_pool.ts +++ b/packages/core/src/autoscaling/autoscaled_pool.ts @@ -8,6 +8,7 @@ import { betterClearInterval, betterSetInterval } from '@apify/utilities'; import { Configuration } from '../configuration'; import { CriticalError } from '../errors'; import { log as defaultLog } from '../log'; +import type { LoadSignal } from './load_signal'; import type { SnapshotterOptions } from './snapshotter'; import { Snapshotter } from './snapshotter'; import type { SystemInfo, SystemStatusOptions } from './system_status'; @@ -203,6 +204,10 @@ export class AutoscaledPool { private resolve: ((val?: unknown) => void) | null = null; private reject: ((reason?: unknown) => void) | null = null; private snapshotter: Snapshotter; + + /** Additional SystemStatus loadSignals - tracked here for initialization and cleanup */ + private loadSignals: LoadSignal[]; + private systemStatus: SystemStatus; private autoscaleInterval!: BetterIntervalID; private maybeRunInterval!: BetterIntervalID; @@ -295,6 +300,7 @@ export class AutoscaledPool { }); ssoCopy.config ??= this.config; this.snapshotter = ssoCopy.snapshotter; + this.loadSignals = ssoCopy.loadSignals ?? []; this.systemStatus = new SystemStatus(ssoCopy); } @@ -366,6 +372,7 @@ export class AutoscaledPool { }); await this.snapshotter.start(); + await Promise.all(this.loadSignals.map((s) => s.start())); // This interval checks the system status and updates the desired concurrency accordingly. this.autoscaleInterval = betterSetInterval(this._autoscale, this.autoscaleIntervalMillis); @@ -699,6 +706,7 @@ export class AutoscaledPool { betterClearInterval(this.maybeRunInterval); if (this.tasksDonePerSecondInterval) betterClearInterval(this.tasksDonePerSecondInterval); if (this.snapshotter) await this.snapshotter.stop(); + await Promise.all(this.loadSignals.map((s) => s.stop())); } protected _incrementTasksDonePerSecond(intervalCallback: () => void) { diff --git a/packages/core/src/autoscaling/client_load_signal.ts b/packages/core/src/autoscaling/client_load_signal.ts new file mode 100644 index 000000000000..b2fd9eeeb97a --- /dev/null +++ b/packages/core/src/autoscaling/client_load_signal.ts @@ -0,0 +1,60 @@ +import type { StorageClient } from '@crawlee/types'; + +import type { LoadSnapshot } from './load_signal'; +import { SnapshotStore } from './load_signal'; + +const CLIENT_RATE_LIMIT_ERROR_RETRY_COUNT = 2; + +export interface ClientSnapshot extends LoadSnapshot { + rateLimitErrorCount: number; +} + +export interface ClientLoadSignalOptions { + client: StorageClient; + clientSnapshotIntervalSecs?: number; + maxClientErrors?: number; + overloadedRatio?: number; + snapshotHistoryMillis?: number; +} + +/** + * Periodically checks the storage client for rate-limit errors (HTTP 429) + * and reports overload when the error delta exceeds a threshold. + */ +export function createClientLoadSignal(options: ClientLoadSignalOptions) { + const maxClientErrors = options.maxClientErrors ?? 3; + + const signal = SnapshotStore.fromInterval({ + name: 'clientInfo', + overloadedRatio: options.overloadedRatio ?? 0.3, + intervalMillis: (options.clientSnapshotIntervalSecs ?? 1) * 1000, + snapshotHistoryMillis: options.snapshotHistoryMillis, + handler(store, intervalCallback) { + const now = new Date(); + + const allErrorCounts = options.client.stats?.rateLimitErrors ?? []; + const currentErrCount = allErrorCounts[CLIENT_RATE_LIMIT_ERROR_RETRY_COUNT] || 0; + + const snapshot: ClientSnapshot = { + createdAt: now, + isOverloaded: false, + rateLimitErrorCount: currentErrCount, + }; + const all = store.getAll(); + const previousSnapshot = all[all.length - 1]; + if (previousSnapshot) { + const { rateLimitErrorCount } = previousSnapshot; + const delta = currentErrCount - rateLimitErrorCount; + if (delta > maxClientErrors) snapshot.isOverloaded = true; + } + + store.push(snapshot, now); + intervalCallback(); + }, + }); + + return signal; +} + +/** @internal Return type for backward compat in Snapshotter facade */ +export type ClientLoadSignal = ReturnType; diff --git a/packages/core/src/autoscaling/cpu_load_signal.ts b/packages/core/src/autoscaling/cpu_load_signal.ts new file mode 100644 index 000000000000..c5fd78b5a985 --- /dev/null +++ b/packages/core/src/autoscaling/cpu_load_signal.ts @@ -0,0 +1,45 @@ +import type { Configuration } from '../configuration'; +import { EventType } from '../events/event_manager'; +import type { LoadSnapshot } from './load_signal'; +import { SnapshotStore } from './load_signal'; +import type { SystemInfo } from './system_status'; + +export interface CpuSnapshot extends LoadSnapshot { + usedRatio: number; + ticks?: { idle: number; total: number }; +} + +export interface CpuLoadSignalOptions { + overloadedRatio?: number; + snapshotHistoryMillis?: number; + config: Configuration; +} + +/** + * Tracks CPU usage via `SYSTEM_INFO` events and reports overload when + * the platform or local OS metrics indicate the CPU is overloaded. + */ +export function createCpuLoadSignal(options: CpuLoadSignalOptions) { + return SnapshotStore.fromEvent({ + name: 'cpuInfo', + overloadedRatio: options.overloadedRatio ?? 0.4, + events: options.config.getEventManager(), + event: EventType.SYSTEM_INFO, + snapshotHistoryMillis: options.snapshotHistoryMillis, + handler(store, systemInfo) { + const { cpuCurrentUsage, isCpuOverloaded } = systemInfo; + const createdAt = systemInfo.createdAt ? new Date(systemInfo.createdAt) : new Date(); + store.push( + { + createdAt, + isOverloaded: isCpuOverloaded!, + usedRatio: Math.ceil(cpuCurrentUsage! / 100), + }, + createdAt, + ); + }, + }); +} + +/** @internal Return type for backward compat in Snapshotter facade */ +export type CpuLoadSignal = ReturnType; diff --git a/packages/core/src/autoscaling/event_loop_load_signal.ts b/packages/core/src/autoscaling/event_loop_load_signal.ts new file mode 100644 index 000000000000..eabbcd88bc47 --- /dev/null +++ b/packages/core/src/autoscaling/event_loop_load_signal.ts @@ -0,0 +1,56 @@ +import type { LoadSnapshot } from './load_signal'; +import { SnapshotStore } from './load_signal'; + +export interface EventLoopSnapshot extends LoadSnapshot { + exceededMillis: number; +} + +export interface EventLoopLoadSignalOptions { + eventLoopSnapshotIntervalSecs?: number; + maxBlockedMillis?: number; + overloadedRatio?: number; + snapshotHistoryMillis?: number; +} + +/** + * Periodically measures event loop delay and reports overload when the + * delay exceeds a configured threshold. + */ +export function createEventLoopLoadSignal(options: EventLoopLoadSignalOptions = {}) { + const intervalMillis = (options.eventLoopSnapshotIntervalSecs ?? 0.5) * 1000; + const maxBlockedMillis = options.maxBlockedMillis ?? 50; + + const signal = SnapshotStore.fromInterval({ + name: 'eventLoopInfo', + overloadedRatio: options.overloadedRatio ?? 0.6, + intervalMillis, + snapshotHistoryMillis: options.snapshotHistoryMillis, + handler(store, intervalCallback) { + const now = new Date(); + + const snapshot: EventLoopSnapshot = { + createdAt: now, + isOverloaded: false, + exceededMillis: 0, + }; + + const all = store.getAll(); + const previousSnapshot = all[all.length - 1]; + if (previousSnapshot) { + const { createdAt } = previousSnapshot; + const delta = now.getTime() - +createdAt - intervalMillis; + + if (delta > maxBlockedMillis) snapshot.isOverloaded = true; + snapshot.exceededMillis = Math.max(delta - maxBlockedMillis, 0); + } + + store.push(snapshot, now); + intervalCallback(); + }, + }); + + return signal; +} + +/** @internal Return type for backward compat in Snapshotter facade */ +export type EventLoopLoadSignal = ReturnType; diff --git a/packages/core/src/autoscaling/index.ts b/packages/core/src/autoscaling/index.ts index 991e454b1988..431921d78873 100644 --- a/packages/core/src/autoscaling/index.ts +++ b/packages/core/src/autoscaling/index.ts @@ -1,3 +1,8 @@ export * from './autoscaled_pool'; +export * from './client_load_signal'; +export * from './cpu_load_signal'; +export * from './event_loop_load_signal'; +export * from './load_signal'; +export * from './memory_load_signal'; export * from './snapshotter'; export * from './system_status'; diff --git a/packages/core/src/autoscaling/load_signal.ts b/packages/core/src/autoscaling/load_signal.ts new file mode 100644 index 000000000000..ecbd5029e776 --- /dev/null +++ b/packages/core/src/autoscaling/load_signal.ts @@ -0,0 +1,215 @@ +import { weightedAvg } from '@crawlee/utils'; + +import type { BetterIntervalID } from '@apify/utilities'; +import { betterClearInterval, betterSetInterval } from '@apify/utilities'; + +import type { EventManager, EventTypeName } from '../events/event_manager'; +import type { ClientInfo } from './system_status'; + +/** + * A snapshot of a resource's overload state at a point in time. + */ +export interface LoadSnapshot { + createdAt: Date; + isOverloaded: boolean; +} + +/** + * A signal that reports whether a particular resource is overloaded. + * + * `SystemStatus` aggregates multiple `LoadSignal` instances to determine + * overall system health. The built-in signals cover memory, CPU, event loop, + * and API client rate limits. You can implement this interface to add + * custom overload signals (e.g. navigation timeouts, proxy health). + */ +export interface LoadSignal { + /** Human-readable name used in logging and `SystemInfo` keys. */ + readonly name: string; + + /** + * Maximum ratio of overloaded snapshots in a sample before the signal + * is considered overloaded. For example, `0.2` means the signal fires + * when more than 20% of the sample window is overloaded. + */ + readonly overloadedRatio: number; + + /** Start collecting snapshots. Called when the pool starts. */ + start(): Promise; + + /** Stop collecting snapshots. Called when the pool shuts down. */ + stop(): Promise; + + /** + * Return snapshots for a recent time window (used for "current" status). + * @param sampleDurationMillis How far back to look, in milliseconds. + */ + getSample(sampleDurationMillis?: number): LoadSnapshot[]; +} + +/** + * A time-pruning, time-windowed store for `LoadSnapshot` values. + * Signals compose with this instead of inheriting from a base class. + */ +export class SnapshotStore { + private snapshots: T[] = []; + private readonly historyMillis: number; + + constructor(historyMillis = 30_000) { + this.historyMillis = historyMillis; + } + + /** + * Add a snapshot and prune entries older than the history window. + */ + push(snapshot: T, now: Date = snapshot.createdAt): void { + // Inline pruning to avoid private-method transpilation issues + let oldCount = 0; + for (let i = 0; i < this.snapshots.length; i++) { + const { createdAt } = this.snapshots[i]; + if (now.getTime() - new Date(createdAt).getTime() > this.historyMillis) oldCount++; + else break; + } + if (oldCount) this.snapshots.splice(0, oldCount); + + this.snapshots.push(snapshot); + } + + /** + * Return all snapshots, or only those within the given time window. + */ + getSample(sampleDurationMillis?: number): T[] { + if (!sampleDurationMillis) return this.snapshots; + + const sample: T[] = []; + let idx = this.snapshots.length; + if (!idx) return sample; + + const latestTime = this.snapshots[idx - 1].createdAt; + while (idx--) { + const snapshot = this.snapshots[idx]; + if (+latestTime - +snapshot.createdAt <= sampleDurationMillis) { + sample.unshift(snapshot); + } else { + break; + } + } + + return sample; + } + + /** + * Direct access to the underlying array (for backward-compat getters). + */ + getAll(): T[] { + return this.snapshots; + } + + /** + * Create a `LoadSignal` that snapshots on a `betterSetInterval` tick. + * + * The `handler` receives the store (to read previous snapshots) and the + * interval callback (which it **must** call when done). It should call + * `store.push()` to record a snapshot. + */ + static fromInterval(options: { + name: string; + overloadedRatio: number; + intervalMillis: number; + snapshotHistoryMillis?: number; + handler: (store: SnapshotStore, intervalCallback: () => unknown) => void; + }): Omit & { + store: SnapshotStore; + handle: (cb: () => unknown) => void; + getSample(sampleDurationMillis?: number): T[]; + } { + const store = new SnapshotStore(options.snapshotHistoryMillis); + let interval: BetterIntervalID = null!; + + const handle = (cb: () => unknown) => options.handler(store, cb); + + return { + name: options.name, + overloadedRatio: options.overloadedRatio, + store, + handle, + getSample: (ms) => store.getSample(ms), + async start() { + interval = betterSetInterval(handle, options.intervalMillis); + }, + async stop() { + betterClearInterval(interval); + }, + }; + } + + /** + * Create a `LoadSignal` that snapshots in response to an `EventManager` event. + * + * The `handler` receives the event payload and the store. It should call + * `store.push()` to record a snapshot. + */ + static fromEvent(options: { + name: string; + overloadedRatio: number; + events: EventManager; + event: EventTypeName; + snapshotHistoryMillis?: number; + handler: (store: SnapshotStore, payload: E) => void; + }): Omit & { + store: SnapshotStore; + handle: (payload: E) => void; + getSample(sampleDurationMillis?: number): T[]; + } { + const store = new SnapshotStore(options.snapshotHistoryMillis); + + const handle = (payload: E) => options.handler(store, payload); + + return { + name: options.name, + overloadedRatio: options.overloadedRatio, + store, + handle, + getSample: (ms) => store.getSample(ms), + async start() { + options.events.on(options.event, handle); + }, + async stop() { + options.events.off(options.event, handle); + }, + }; + } +} + +/** + * Evaluate whether a sample of `LoadSnapshot` values exceeds the given + * overloaded ratio, using a time-weighted average. This is the shared + * evaluation logic used by `SystemStatus` for all signal types. + */ +export function evaluateLoadSignalSample(sample: LoadSnapshot[], overloadedRatio: number): ClientInfo { + if (sample.length === 0) { + return { + isOverloaded: false, + limitRatio: overloadedRatio, + actualRatio: 0, + }; + } + + const weights: number[] = []; + const values: number[] = []; + + for (let i = 1; i < sample.length; i++) { + const previous = sample[i - 1]; + const current = sample[i]; + const weight = +current.createdAt - +previous.createdAt; + weights.push(weight || 1); // Prevent errors from 0ms long intervals (sync) between snapshots. + values.push(+current.isOverloaded); + } + + const wAvg = sample.length === 1 ? +sample[0].isOverloaded : weightedAvg(values, weights); + + return { + isOverloaded: wAvg > overloadedRatio, + limitRatio: overloadedRatio, + actualRatio: Math.round(wAvg * 1000) / 1000, + }; +} diff --git a/packages/core/src/autoscaling/memory_load_signal.ts b/packages/core/src/autoscaling/memory_load_signal.ts new file mode 100644 index 000000000000..7eaa47e689e7 --- /dev/null +++ b/packages/core/src/autoscaling/memory_load_signal.ts @@ -0,0 +1,148 @@ +import { getMemoryInfo, getMemoryInfoV2, isContainerized } from '@crawlee/utils'; + +import type { Log } from '@apify/log'; + +import type { Configuration } from '../configuration'; +import type { EventManager } from '../events/event_manager'; +import { EventType } from '../events/event_manager'; +import { log as defaultLog } from '../log'; +import type { LoadSignal, LoadSnapshot } from './load_signal'; +import { SnapshotStore } from './load_signal'; +import type { SystemInfo } from './system_status'; + +const RESERVE_MEMORY_RATIO = 0.5; +const CRITICAL_OVERLOAD_RATE_LIMIT_MILLIS = 10_000; + +export interface MemorySnapshot extends LoadSnapshot { + usedBytes?: number; +} + +export interface MemoryLoadSignalOptions { + maxUsedMemoryRatio?: number; + overloadedRatio?: number; + snapshotHistoryMillis?: number; + config: Configuration; + log?: Log; +} + +/** + * Tracks memory usage via `SYSTEM_INFO` events and reports overload when + * the used-to-available memory ratio exceeds a threshold. + */ +export class MemoryLoadSignal implements LoadSignal { + readonly name = 'memInfo'; + readonly overloadedRatio: number; + + private readonly store: SnapshotStore; + private readonly config: Configuration; + private readonly events: EventManager; + private readonly log: Log; + private readonly maxUsedMemoryRatio: number; + private maxMemoryRatio: number | undefined; + private maxMemoryBytes!: number; + private lastLoggedCriticalMemoryOverloadAt: Date | null = null; + + constructor(options: MemoryLoadSignalOptions) { + this.store = new SnapshotStore(options.snapshotHistoryMillis); + this.config = options.config; + this.events = this.config.getEventManager(); + this.log = options.log ?? defaultLog.child({ prefix: 'MemoryLoadSignal' }); + this.maxUsedMemoryRatio = options.maxUsedMemoryRatio ?? 0.9; + this.overloadedRatio = options.overloadedRatio ?? 0.2; + this._onSystemInfo = this._onSystemInfo.bind(this); + } + + async start(): Promise { + const memoryMbytes = this.config.get('memoryMbytes', 0); + + if (memoryMbytes > 0) { + this.maxMemoryBytes = memoryMbytes * 1024 * 1024; + } else { + this.maxMemoryRatio = this.config.get('availableMemoryRatio'); + if (!this.maxMemoryRatio) { + throw new Error('availableMemoryRatio is not set in configuration.'); + } else { + this.log.debug( + `Setting max memory of this run to ${this.maxMemoryRatio * 100} % of available memory. ` + + 'Use the CRAWLEE_MEMORY_MBYTES or CRAWLEE_AVAILABLE_MEMORY_RATIO environment variable to override it.', + ); + } + // Fallback memory measurement in case memTotalBytes is missing from SystemInfo. + this.maxMemoryBytes = await this._getTotalMemoryBytes(); + } + + this.events.on(EventType.SYSTEM_INFO, this._onSystemInfo); + } + + async stop(): Promise { + this.events.off(EventType.SYSTEM_INFO, this._onSystemInfo); + } + + getSample(sampleDurationMillis?: number): MemorySnapshot[] { + return this.store.getSample(sampleDurationMillis); + } + + /** + * Returns typed memory snapshots for backward compatibility with `Snapshotter`. + */ + getMemorySnapshots(): MemorySnapshot[] { + return this.store.getAll(); + } + + /** @internal */ + _onSystemInfo(systemInfo: SystemInfo): void { + const createdAt = systemInfo.createdAt ? new Date(systemInfo.createdAt) : new Date(); + const { memCurrentBytes, memTotalBytes } = systemInfo; + + let maxMemoryBytes = this.maxMemoryBytes!; + if (this.maxMemoryRatio !== undefined && this.maxMemoryRatio > 0) { + maxMemoryBytes = this.maxMemoryRatio * (memTotalBytes ?? this.maxMemoryBytes); + } + + const snapshot: MemorySnapshot = { + createdAt, + isOverloaded: memCurrentBytes! / maxMemoryBytes > this.maxUsedMemoryRatio, + usedBytes: memCurrentBytes, + }; + + this.store.push(snapshot, createdAt); + this._memoryOverloadWarning(systemInfo, maxMemoryBytes); + } + + /** @internal */ + _memoryOverloadWarning(systemInfo: SystemInfo, maxMemoryBytes?: number): void { + const effectiveMax = maxMemoryBytes ?? this.maxMemoryBytes!; + const { memCurrentBytes } = systemInfo; + const createdAt = systemInfo.createdAt ? new Date(systemInfo.createdAt) : new Date(); + if ( + this.lastLoggedCriticalMemoryOverloadAt && + +createdAt < +this.lastLoggedCriticalMemoryOverloadAt + CRITICAL_OVERLOAD_RATE_LIMIT_MILLIS + ) + return; + + const maxDesiredMemoryBytes = this.maxUsedMemoryRatio * effectiveMax; + const reserveMemory = effectiveMax * (1 - this.maxUsedMemoryRatio) * RESERVE_MEMORY_RATIO; + const criticalOverloadBytes = maxDesiredMemoryBytes + reserveMemory; + const isCriticalOverload = memCurrentBytes! > criticalOverloadBytes; + + if (isCriticalOverload) { + const usedPercentage = Math.round((memCurrentBytes! / effectiveMax) * 100); + const toMb = (bytes: number) => Math.round(bytes / 1024 ** 2); + this.log.warning( + 'Memory is critically overloaded. ' + + `Using ${toMb(memCurrentBytes!)} MB of ${toMb( + effectiveMax, + )} MB (${usedPercentage}%). Consider increasing available memory.`, + ); + this.lastLoggedCriticalMemoryOverloadAt = createdAt; + } + } + + private async _getTotalMemoryBytes(): Promise { + if (this.config.get('systemInfoV2')) { + const containerized = this.config.get('containerized', await isContainerized()); + return (await getMemoryInfoV2(containerized)).totalBytes; + } + return (await getMemoryInfo()).totalBytes; + } +} diff --git a/packages/core/src/autoscaling/snapshotter.ts b/packages/core/src/autoscaling/snapshotter.ts index 4261a616f197..213b5bb33050 100644 --- a/packages/core/src/autoscaling/snapshotter.ts +++ b/packages/core/src/autoscaling/snapshotter.ts @@ -1,21 +1,22 @@ import type { StorageClient } from '@crawlee/types'; -import { getMemoryInfo, getMemoryInfoV2, isContainerized } from '@crawlee/utils'; import ow from 'ow'; import type { Log } from '@apify/log'; -import type { BetterIntervalID } from '@apify/utilities'; -import { betterClearInterval, betterSetInterval } from '@apify/utilities'; import { Configuration } from '../configuration'; import type { EventManager } from '../events/event_manager'; -import { EventType } from '../events/event_manager'; import { log as defaultLog } from '../log'; +import type { ClientLoadSignal, ClientSnapshot } from './client_load_signal'; +import { createClientLoadSignal } from './client_load_signal'; +import type { CpuLoadSignal, CpuSnapshot } from './cpu_load_signal'; +import { createCpuLoadSignal } from './cpu_load_signal'; +import type { EventLoopLoadSignal, EventLoopSnapshot } from './event_loop_load_signal'; +import { createEventLoopLoadSignal } from './event_loop_load_signal'; +import type { LoadSignal } from './load_signal'; +import type { MemorySnapshot } from './memory_load_signal'; +import { MemoryLoadSignal } from './memory_load_signal'; import type { SystemInfo } from './system_status'; -const RESERVE_MEMORY_RATIO = 0.5; -const CLIENT_RATE_LIMIT_ERROR_RETRY_COUNT = 2; -const CRITICAL_OVERLOAD_RATE_LIMIT_MILLIS = 10000; - export interface SnapshotterOptions { /** * Defines the interval of measuring the event loop response time. @@ -68,28 +69,6 @@ export interface SnapshotterOptions { config?: Configuration; } -interface MemorySnapshot { - createdAt: Date; - isOverloaded: boolean; - usedBytes?: number; -} -interface CpuSnapshot { - createdAt: Date; - isOverloaded: boolean; - usedRatio: number; - ticks?: { idle: number; total: number }; -} -interface EventLoopSnapshot { - createdAt: Date; - isOverloaded: boolean; - exceededMillis: number; -} -interface ClientSnapshot { - createdAt: Date; - isOverloaded: boolean; - rateLimitErrorCount: number; -} - /** * Creates snapshots of system resources at given intervals and marks the resource * as either overloaded or not during the last interval. Keeps a history of the snapshots. @@ -113,6 +92,7 @@ interface ClientSnapshot { * * Client becomes overloaded when rate limit errors (429 - Too Many Requests), * typically received from the request queue, exceed the set limit within the set interval. + * * @category Scaling */ export class Snapshotter { @@ -120,24 +100,36 @@ export class Snapshotter { client: StorageClient; config: Configuration; events: EventManager; - eventLoopSnapshotIntervalMillis: number; - clientSnapshotIntervalMillis: number; - snapshotHistoryMillis: number; - maxBlockedMillis: number; - maxUsedMemoryRatio: number; - maxClientErrors: number; - maxMemoryBytes!: number; - private maxMemoryRatio: number | undefined; - cpuSnapshots: CpuSnapshot[] = []; - eventLoopSnapshots: EventLoopSnapshot[] = []; - memorySnapshots: MemorySnapshot[] = []; - clientSnapshots: ClientSnapshot[] = []; + private readonly memorySignal: MemoryLoadSignal; + private readonly eventLoopSignal: EventLoopLoadSignal; + private readonly cpuSignal: CpuLoadSignal; + private readonly clientSignal: ClientLoadSignal; - eventLoopInterval: BetterIntervalID = null!; - clientInterval: BetterIntervalID = null!; + /** + * Returns the four built-in signals as an array, so `SystemStatus` can + * iterate them alongside any custom `LoadSignal` instances. + */ + getLoadSignals(): LoadSignal[] { + return [this.memorySignal, this.eventLoopSignal, this.cpuSignal, this.clientSignal]; + } + + // Legacy public properties kept for backward compat (tests read these directly) + get cpuSnapshots(): CpuSnapshot[] { + return this.cpuSignal.store.getAll(); + } + + get eventLoopSnapshots(): EventLoopSnapshot[] { + return this.eventLoopSignal.store.getAll(); + } + + get memorySnapshots(): MemorySnapshot[] { + return this.memorySignal.getMemorySnapshots(); + } - lastLoggedCriticalMemoryOverloadAt: Date | null = null; + get clientSnapshots(): ClientSnapshot[] { + return this.clientSignal.store.getAll(); + } /** * @param [options] All `Snapshotter` configuration options. @@ -175,60 +167,52 @@ export class Snapshotter { this.config = config; this.events = this.config.getEventManager(); - this.eventLoopSnapshotIntervalMillis = eventLoopSnapshotIntervalSecs * 1000; - this.clientSnapshotIntervalMillis = clientSnapshotIntervalSecs * 1000; - this.snapshotHistoryMillis = snapshotHistorySecs * 1000; - this.maxBlockedMillis = maxBlockedMillis; - this.maxUsedMemoryRatio = maxUsedMemoryRatio; - this.maxClientErrors = maxClientErrors; - // We need to pre-bind those functions to be able to successfully remove listeners. - this._snapshotCpu = this._snapshotCpu.bind(this); - this._snapshotMemory = this._snapshotMemory.bind(this); + const snapshotHistoryMillis = snapshotHistorySecs * 1000; + + this.memorySignal = new MemoryLoadSignal({ + maxUsedMemoryRatio, + snapshotHistoryMillis, + config: this.config, + log: this.log, + }); + + this.eventLoopSignal = createEventLoopLoadSignal({ + eventLoopSnapshotIntervalSecs, + maxBlockedMillis, + snapshotHistoryMillis, + }); + + this.cpuSignal = createCpuLoadSignal({ + snapshotHistoryMillis, + config: this.config, + }); + + this.clientSignal = createClientLoadSignal({ + client: this.client, + clientSnapshotIntervalSecs, + maxClientErrors, + snapshotHistoryMillis, + }); } /** * Starts capturing snapshots at configured intervals. */ async start(): Promise { - const memoryMbytes = this.config.get('memoryMbytes', 0); - - if (memoryMbytes > 0) { - this.maxMemoryBytes = memoryMbytes * 1024 * 1024; - } else { - this.maxMemoryRatio = this.config.get('availableMemoryRatio'); - if (!this.maxMemoryRatio) { - throw new Error('availableMemoryRatio is not set in configuration.'); - } else { - this.log.debug( - `Setting max memory of this run to ${this.maxMemoryRatio * 100} % of available memory. ` + - 'Use the CRAWLEE_MEMORY_MBYTES or CRAWLEE_AVAILABLE_MEMORY_RATIO environment variable to override it.', - ); - } - // Create a fallback memory measurement in case of missing memTotalBytes in SystemInfo. Weak types of - // SystemInfo do not guarantee that memTotalBytes is always present, and without it, we cannot compute the - // maxMemoryBytes. - // This does not happen in practice, but code allows it. - this.maxMemoryBytes = await this.getTotalMemoryBytes(); - } - - // Start snapshotting. - this.eventLoopInterval = betterSetInterval( - this._snapshotEventLoop.bind(this), - this.eventLoopSnapshotIntervalMillis, - ); - this.clientInterval = betterSetInterval(this._snapshotClient.bind(this), this.clientSnapshotIntervalMillis); - this.events.on(EventType.SYSTEM_INFO, this._snapshotCpu); - this.events.on(EventType.SYSTEM_INFO, this._snapshotMemory); + await this.memorySignal.start(); + await this.eventLoopSignal.start(); + await this.cpuSignal.start(); + await this.clientSignal.start(); } /** * Stops all resource capturing. */ async stop(): Promise { - betterClearInterval(this.eventLoopInterval); - betterClearInterval(this.clientInterval); - this.events.off(EventType.SYSTEM_INFO, this._snapshotCpu); - this.events.off(EventType.SYSTEM_INFO, this._snapshotMemory); + await this.memorySignal.stop(); + await this.eventLoopSignal.stop(); + await this.cpuSignal.stop(); + await this.clientSignal.stop(); // Allow microtask queue to unwind before stop returns. await new Promise((resolve) => { setImmediate(resolve); @@ -239,206 +223,73 @@ export class Snapshotter { * Returns a sample of latest memory snapshots, with the size of the sample defined * by the sampleDurationMillis parameter. If omitted, it returns a full snapshot history. */ - getMemorySample(sampleDurationMillis?: number) { - return this._getSample(this.memorySnapshots, sampleDurationMillis); + getMemorySample(sampleDurationMillis?: number): MemorySnapshot[] { + return this.memorySignal.getSample(sampleDurationMillis); } /** * Returns a sample of latest event loop snapshots, with the size of the sample defined * by the sampleDurationMillis parameter. If omitted, it returns a full snapshot history. */ - getEventLoopSample(sampleDurationMillis?: number) { - return this._getSample(this.eventLoopSnapshots, sampleDurationMillis); + getEventLoopSample(sampleDurationMillis?: number): EventLoopSnapshot[] { + return this.eventLoopSignal.getSample(sampleDurationMillis); } /** * Returns a sample of latest CPU snapshots, with the size of the sample defined * by the sampleDurationMillis parameter. If omitted, it returns a full snapshot history. */ - getCpuSample(sampleDurationMillis?: number) { - return this._getSample(this.cpuSnapshots, sampleDurationMillis); + getCpuSample(sampleDurationMillis?: number): CpuSnapshot[] { + return this.cpuSignal.getSample(sampleDurationMillis); } /** * Returns a sample of latest Client snapshots, with the size of the sample defined * by the sampleDurationMillis parameter. If omitted, it returns a full snapshot history. */ - getClientSample(sampleDurationMillis?: number) { - return this._getSample(this.clientSnapshots, sampleDurationMillis); - } - - /** - * Finds the latest snapshots by sampleDurationMillis in the provided array. - */ - protected _getSample(snapshots: T[], sampleDurationMillis?: number): T[] { - if (!sampleDurationMillis) return snapshots; - - const sample: T[] = []; - let idx = snapshots.length; - if (!idx) return sample; - - const latestTime = snapshots[idx - 1].createdAt; - while (idx--) { - const snapshot = snapshots[idx]; - if (+latestTime - +snapshot.createdAt <= sampleDurationMillis) { - sample.unshift(snapshot); - } else { - break; - } - } - - return sample; + getClientSample(sampleDurationMillis?: number): ClientSnapshot[] { + return this.clientSignal.getSample(sampleDurationMillis); } /** - * Creates a snapshot of current memory usage - * using the Apify platform `systemInfo` event. + * @deprecated Kept for backward compatibility. */ protected _snapshotMemory(systemInfo: SystemInfo) { - const createdAt = systemInfo.createdAt ? new Date(systemInfo.createdAt) : new Date(); - this._pruneSnapshots(this.memorySnapshots, createdAt); - const { memCurrentBytes, memTotalBytes } = systemInfo; - - let maxMemoryBytes = this.maxMemoryBytes!; - if (this.maxMemoryRatio !== undefined && this.maxMemoryRatio > 0) { - maxMemoryBytes = this.maxMemoryRatio * (memTotalBytes ?? this.maxMemoryBytes); - } - - const snapshot: MemorySnapshot = { - createdAt, - isOverloaded: memCurrentBytes! / maxMemoryBytes > this.maxUsedMemoryRatio, - usedBytes: memCurrentBytes, - }; - - this.memorySnapshots.push(snapshot); - this._memoryOverloadWarning(systemInfo, maxMemoryBytes); + this.memorySignal._onSystemInfo(systemInfo); } /** - * Checks for critical memory overload and logs it to the console. + * @deprecated Kept for backward compatibility. */ - protected _memoryOverloadWarning(systemInfo: SystemInfo, maxMemoryBytes: number) { - const { memCurrentBytes } = systemInfo; - const createdAt = systemInfo.createdAt ? new Date(systemInfo.createdAt) : new Date(); - if ( - this.lastLoggedCriticalMemoryOverloadAt && - +createdAt < +this.lastLoggedCriticalMemoryOverloadAt + CRITICAL_OVERLOAD_RATE_LIMIT_MILLIS - ) - return; - - const maxDesiredMemoryBytes = this.maxUsedMemoryRatio * maxMemoryBytes; - const reserveMemory = maxMemoryBytes * (1 - this.maxUsedMemoryRatio) * RESERVE_MEMORY_RATIO; - const criticalOverloadBytes = maxDesiredMemoryBytes + reserveMemory; - const isCriticalOverload = memCurrentBytes! > criticalOverloadBytes; - - if (isCriticalOverload) { - const usedPercentage = Math.round((memCurrentBytes! / maxMemoryBytes) * 100); - const toMb = (bytes: number) => Math.round(bytes / 1024 ** 2); - this.log.warning( - 'Memory is critically overloaded. ' + - `Using ${toMb(memCurrentBytes!)} MB of ${toMb( - maxMemoryBytes, - )} MB (${usedPercentage}%). Consider increasing available memory.`, - ); - this.lastLoggedCriticalMemoryOverloadAt = createdAt; - } + protected _memoryOverloadWarning(systemInfo: SystemInfo) { + this.memorySignal._memoryOverloadWarning(systemInfo); } /** - * Creates a snapshot of current event loop delay. + * @deprecated Kept for backward compatibility. */ protected _snapshotEventLoop(intervalCallback: () => unknown) { - const now = new Date(); - this._pruneSnapshots(this.eventLoopSnapshots, now); - - const snapshot = { - createdAt: now, - isOverloaded: false, - exceededMillis: 0, - }; - - const previousSnapshot = this.eventLoopSnapshots[this.eventLoopSnapshots.length - 1]; - if (previousSnapshot) { - const { createdAt } = previousSnapshot; - const delta = now.getTime() - +createdAt - this.eventLoopSnapshotIntervalMillis; - - if (delta > this.maxBlockedMillis) snapshot.isOverloaded = true; - snapshot.exceededMillis = Math.max(delta - this.maxBlockedMillis, 0); - } - - this.eventLoopSnapshots.push(snapshot); - intervalCallback(); + this.eventLoopSignal.handle(intervalCallback); } /** - * Creates a snapshot of current CPU usage using the Apify platform `systemInfo` event. + * @deprecated Kept for backward compatibility. */ protected _snapshotCpu(systemInfo: SystemInfo) { - const { cpuCurrentUsage, isCpuOverloaded } = systemInfo; - const createdAt = systemInfo.createdAt ? new Date(systemInfo.createdAt) : new Date(); - this._pruneSnapshots(this.cpuSnapshots, createdAt); - - this.cpuSnapshots.push({ - createdAt, - isOverloaded: isCpuOverloaded!, - usedRatio: Math.ceil(cpuCurrentUsage! / 100), - }); + this.cpuSignal.handle(systemInfo); } /** - * Creates a snapshot of current API state by checking for - * rate limit errors. Only errors produced by a 2nd retry - * of the API call are considered for snapshotting since - * earlier errors may just be caused by a random spike in - * number of requests and do not necessarily signify API - * overloading. + * @deprecated Kept for backward compatibility. */ protected _snapshotClient(intervalCallback: () => unknown) { - const now = new Date(); - this._pruneSnapshots(this.clientSnapshots, now); - - const allErrorCounts = this.client.stats?.rateLimitErrors ?? []; // storage client might not support this - const currentErrCount = allErrorCounts[CLIENT_RATE_LIMIT_ERROR_RETRY_COUNT] || 0; - - // Handle empty snapshots array - const snapshot = { - createdAt: now, - isOverloaded: false, - rateLimitErrorCount: currentErrCount, - }; - const previousSnapshot = this.clientSnapshots[this.clientSnapshots.length - 1]; - if (previousSnapshot) { - const { rateLimitErrorCount } = previousSnapshot; - const delta = currentErrCount - rateLimitErrorCount; - if (delta > this.maxClientErrors) snapshot.isOverloaded = true; - } - - this.clientSnapshots.push(snapshot); - intervalCallback(); + this.clientSignal.handle(intervalCallback); } /** - * Removes snapshots that are older than the snapshotHistorySecs option - * from the array (destructively - in place). + * @deprecated Pruning is now handled by individual signals. */ - protected _pruneSnapshots( - snapshots: MemorySnapshot[] | CpuSnapshot[] | EventLoopSnapshot[] | ClientSnapshot[], - now: Date, - ) { - let oldCount = 0; - for (let i = 0; i < snapshots.length; i++) { - const { createdAt } = snapshots[i]; - if (now.getTime() - new Date(createdAt).getTime() > this.snapshotHistoryMillis) oldCount++; - else break; - } - snapshots.splice(0, oldCount); - } - - protected async getTotalMemoryBytes() { - if (this.config.get('systemInfoV2')) { - const containerized = this.config.get('containerized', await isContainerized()); - return (await getMemoryInfoV2(containerized)).totalBytes; - } - return (await getMemoryInfo()).totalBytes; + protected _pruneSnapshots(_snapshots: any[], _now: Date) { + // no-op — signals prune themselves } } diff --git a/packages/core/src/autoscaling/system_status.ts b/packages/core/src/autoscaling/system_status.ts index 8c369a8166c0..969bd2ab3860 100644 --- a/packages/core/src/autoscaling/system_status.ts +++ b/packages/core/src/autoscaling/system_status.ts @@ -1,7 +1,8 @@ -import { weightedAvg } from '@crawlee/utils'; import ow from 'ow'; import type { Configuration } from '../configuration'; +import type { LoadSignal } from './load_signal'; +import { evaluateLoadSignalSample } from './load_signal'; import { Snapshotter } from './snapshotter'; /** @@ -31,6 +32,12 @@ export interface SystemInfo { * @internal */ createdAt?: Date; + + /** + * Status of additional load signals beyond the built-in four. + * Keys are `LoadSignal.name` values, values are overload info. + */ + loadSignalInfo?: Record; } export interface SystemStatusOptions { @@ -73,6 +80,14 @@ export interface SystemStatusOptions { */ snapshotter?: Snapshotter; + /** + * Additional load signals to include in the system status evaluation. + * These are evaluated alongside the built-in memory, CPU, event loop, + * and client signals. If any signal reports overload, the system is + * considered overloaded. + */ + loadSignals?: LoadSignal[]; + /** @internal */ config?: Configuration; } @@ -96,6 +111,9 @@ export interface FinalStatistics { crawlerRuntimeMillis: number; } +/** The four built-in signal names that map to typed `SystemInfo` fields. */ +const BUILTIN_SIGNAL_NAMES = new Set(['memInfo', 'eventLoopInfo', 'cpuInfo', 'clientInfo']); + /** * Provides a simple interface to reading system status from a {@apilink Snapshotter} instance. * It only exposes two functions {@apilink SystemStatus.getCurrentStatus} @@ -120,11 +138,15 @@ export interface FinalStatistics { */ export class SystemStatus { private readonly currentHistoryMillis: number; - private readonly maxMemoryOverloadedRatio: number; - private readonly maxEventLoopOverloadedRatio: number; - private readonly maxCpuOverloadedRatio: number; - private readonly maxClientOverloadedRatio: number; private readonly snapshotter: Snapshotter; + private readonly signals: LoadSignal[]; + + /** + * Per-signal ratio overrides. The built-in four get their overrides from + * the legacy `max*OverloadedRatio` options; custom signals use their own + * `overloadedRatio`. + */ + private ratioOverrides: Record; constructor(options: SystemStatusOptions = {}) { ow( @@ -136,6 +158,7 @@ export class SystemStatus { maxCpuOverloadedRatio: ow.optional.number, maxClientOverloadedRatio: ow.optional.number, snapshotter: ow.optional.object, + loadSignals: ow.optional.array, config: ow.optional.object, }), ); @@ -147,15 +170,23 @@ export class SystemStatus { maxCpuOverloadedRatio = 0.4, maxClientOverloadedRatio = 0.3, snapshotter, + loadSignals = [], config, } = options; this.currentHistoryMillis = currentHistorySecs * 1000; - this.maxMemoryOverloadedRatio = maxMemoryOverloadedRatio; - this.maxEventLoopOverloadedRatio = maxEventLoopOverloadedRatio; - this.maxCpuOverloadedRatio = maxCpuOverloadedRatio; - this.maxClientOverloadedRatio = maxClientOverloadedRatio; this.snapshotter = snapshotter || new Snapshotter({ config }); + + // Built-in signals from the snapshotter + any custom signals + this.signals = [...this.snapshotter.getLoadSignals(), ...loadSignals]; + + // Allow legacy options to override the built-in signal ratios + this.ratioOverrides = { + memInfo: maxMemoryOverloadedRatio, + eventLoopInfo: maxEventLoopOverloadedRatio, + cpuInfo: maxCpuOverloadedRatio, + clientInfo: maxClientOverloadedRatio, + }; } /** @@ -202,92 +233,37 @@ export class SystemStatus { * Returns a system status object. */ protected _isSystemIdle(sampleDurationMillis?: number): SystemInfo { - const memInfo = this._isMemoryOverloaded(sampleDurationMillis); - const eventLoopInfo = this._isEventLoopOverloaded(sampleDurationMillis); - const cpuInfo = this._isCpuOverloaded(sampleDurationMillis); - const clientInfo = this._isClientOverloaded(sampleDurationMillis); - return { - isSystemIdle: - !memInfo.isOverloaded && - !eventLoopInfo.isOverloaded && - !cpuInfo.isOverloaded && - !clientInfo.isOverloaded, - memInfo, - eventLoopInfo, - cpuInfo, - clientInfo, + const result: SystemInfo = { + isSystemIdle: true, + memInfo: { isOverloaded: false, limitRatio: 0, actualRatio: 0 }, + eventLoopInfo: { isOverloaded: false, limitRatio: 0, actualRatio: 0 }, + cpuInfo: { isOverloaded: false, limitRatio: 0, actualRatio: 0 }, + clientInfo: { isOverloaded: false, limitRatio: 0, actualRatio: 0 }, }; - } - - /** - * Returns an object with an isOverloaded property set to true - * if the memory has been overloaded in the last sampleDurationMillis. - */ - protected _isMemoryOverloaded(sampleDurationMillis?: number) { - const sample = this.snapshotter.getMemorySample(sampleDurationMillis); - return this._isSampleOverloaded(sample, this.maxMemoryOverloadedRatio); - } - /** - * Returns an object with an isOverloaded property set to true - * if the event loop has been overloaded in the last sampleDurationMillis. - */ - protected _isEventLoopOverloaded(sampleDurationMillis?: number) { - const sample = this.snapshotter.getEventLoopSample(sampleDurationMillis); - return this._isSampleOverloaded(sample, this.maxEventLoopOverloadedRatio); - } + let loadSignalInfo: Record | undefined; - /** - * Returns an object with an isOverloaded property set to true - * if the CPU has been overloaded in the last sampleDurationMillis. - */ - protected _isCpuOverloaded(sampleDurationMillis?: number) { - const sample = this.snapshotter.getCpuSample(sampleDurationMillis); - return this._isSampleOverloaded(sample, this.maxCpuOverloadedRatio); - } + for (const signal of this.signals) { + const ratio = this.ratioOverrides[signal.name] ?? signal.overloadedRatio; + const sample = signal.getSample(sampleDurationMillis); + const info = evaluateLoadSignalSample(sample, ratio); - /** - * Returns an object with an isOverloaded property set to true - * if the client has been overloaded in the last sampleDurationMillis. - */ - protected _isClientOverloaded(sampleDurationMillis?: number): ClientInfo { - const sample = this.snapshotter.getClientSample(sampleDurationMillis); - return this._isSampleOverloaded(sample, this.maxClientOverloadedRatio); - } + if (info.isOverloaded) { + result.isSystemIdle = false; + } - /** - * Returns an object with sample information and an isOverloaded property - * set to true if at least the ratio of snapshots in the sample are overloaded. - */ - protected _isSampleOverloaded( - sample: T[], - ratio: number, - ): ClientInfo { - if (sample.length === 0) { - return { - isOverloaded: false, - limitRatio: ratio, - actualRatio: 0, - }; + if (BUILTIN_SIGNAL_NAMES.has(signal.name)) { + (result as any)[signal.name] = info; + } else { + loadSignalInfo ??= {}; + loadSignalInfo[signal.name] = info; + } } - const weights: number[] = []; - const values: number[] = []; - - for (let i = 1; i < sample.length; i++) { - const previous = sample[i - 1]; - const current = sample[i]; - const weight = +current.createdAt - +previous.createdAt; - weights.push(weight || 1); // Prevent errors from 0ms long intervals (sync) between snapshots. - values.push(+current.isOverloaded); + if (loadSignalInfo) { + result.loadSignalInfo = loadSignalInfo; } - const wAvg = sample.length === 1 ? +sample[0].isOverloaded : weightedAvg(values, weights); - - return { - isOverloaded: wAvg > ratio, - limitRatio: ratio, - actualRatio: Math.round(wAvg * 1000) / 1000, - }; + return result; } } diff --git a/test/core/autoscaling/autoscaled_pool.test.ts b/test/core/autoscaling/autoscaled_pool.test.ts index 0118a30325f3..96e03ab9b402 100644 --- a/test/core/autoscaling/autoscaled_pool.test.ts +++ b/test/core/autoscaling/autoscaled_pool.test.ts @@ -1,3 +1,4 @@ +import type { LoadSignal, LoadSnapshot } from '@crawlee/core'; import { AutoscaledPool } from '@crawlee/core'; import { sleep } from '@crawlee/utils'; @@ -559,4 +560,71 @@ describe('AutoscaledPool', () => { await expect(pool.run()).resolves.toBeUndefined(); expect(Date.now() - now).toBeGreaterThanOrEqual(1e3); }, 10e3); + + describe('custom load signals', () => { + /** Creates a minimal fake LoadSignal with static pre-seeded snapshots. */ + function createFakeLoadSignal(name: string, { overloadedRatio = 0.3, isOverloaded = false } = {}): LoadSignal { + const now = Date.now(); + const snapshots: LoadSnapshot[] = Array.from({ length: 5 }, (_, i) => ({ + createdAt: new Date(now - (5 - i) * 100), + isOverloaded, + })); + + return { + name, + overloadedRatio, + async start() {}, + async stop() {}, + getSample(sampleDurationMillis?: number) { + if (!sampleDurationMillis) return snapshots; + const cutoff = Date.now() - sampleDurationMillis; + return snapshots.filter((s) => +s.createdAt >= cutoff); + }, + }; + } + + test('overloaded signal prevents concurrency from scaling up', async () => { + const signal = createFakeLoadSignal('proxyHealth', { isOverloaded: true }); + + let count = 0; + const pool = new AutoscaledPool({ + minConcurrency: 1, + maxConcurrency: 10, + runTaskFunction: async () => { + count++; + await sleep(10); + }, + isFinishedFunction: async () => count >= 50, + isTaskReadyFunction: async () => count < 50, + systemStatusOptions: { loadSignals: [signal] }, + }); + + await pool.run(); + + expect(pool.desiredConcurrency).toBe(1); + }); + + test('signal info appears in SystemStatus.getCurrentStatus()', async () => { + const signal = createFakeLoadSignal('navTimeout', { overloadedRatio: 0.2, isOverloaded: true }); + + let count = 0; + const pool = new AutoscaledPool({ + minConcurrency: 1, + maxConcurrency: 10, + runTaskFunction: async () => { + count++; + await sleep(10); + }, + isFinishedFunction: async () => count >= 10, + isTaskReadyFunction: async () => count < 10, + systemStatusOptions: { loadSignals: [signal] }, + }); + + // @ts-expect-error Accessing private prop + const status = pool.systemStatus.getCurrentStatus(); + expect(status.loadSignalInfo?.navTimeout?.isOverloaded).toBe(true); + + await pool.run(); + }); + }); }); diff --git a/test/core/autoscaling/system_status.test.ts b/test/core/autoscaling/system_status.test.ts index 4dddab23aab5..1cbb2965c9e0 100644 --- a/test/core/autoscaling/system_status.test.ts +++ b/test/core/autoscaling/system_status.test.ts @@ -13,6 +13,18 @@ describe('SystemStatus', () => { log.setLevel(logLevel); }); + function mockSignal(name: string, snapshots: any[]) { + return { + name, + overloadedRatio: 0, // overridden by SystemStatusOptions anyway + getSample(sampleDurationMillis?: number) { + return sampleDurationMillis ? snapshots.slice(-sampleDurationMillis) : snapshots; + }, + async start() {}, + async stop() {}, + }; + } + class MockSnapshotter { constructor( readonly memSnapshots: any[], @@ -21,6 +33,15 @@ describe('SystemStatus', () => { readonly clientSnapshots: any[], ) {} + getLoadSignals() { + return [ + mockSignal('memInfo', this.memSnapshots), + mockSignal('eventLoopInfo', this.loopSnapshots), + mockSignal('cpuInfo', this.cpuSnapshots), + mockSignal('clientInfo', this.clientSnapshots), + ]; + } + getMemorySample(offset: number) { return this.memSnapshots.slice(-offset); } @@ -119,8 +140,11 @@ describe('SystemStatus', () => { test('should overload when threshold is crossed', () => { const snaps = generateSnapsSync(50, true); - const systemStatus = new SystemStatus({ - snapshotter: new MockSnapshotter(snaps, snaps, snaps, snaps) as any, + const mock = new MockSnapshotter(snaps, snaps, snaps, snaps) as any; + + // At exactly 0.5, the 50% overloaded sample should NOT trigger (uses >) + let systemStatus = new SystemStatus({ + snapshotter: mock, maxMemoryOverloadedRatio: 0.5, maxEventLoopOverloadedRatio: 0.5, maxCpuOverloadedRatio: 0.5, @@ -129,36 +153,36 @@ describe('SystemStatus', () => { expect(systemStatus.getCurrentStatus().isSystemIdle).toBe(true); expect(systemStatus.getHistoricalStatus().isSystemIdle).toBe(true); - // @ts-expect-error Overwriting readonly private prop - systemStatus.maxMemoryOverloadedRatio = 0.49; - // @ts-expect-error Overwriting readonly private prop - systemStatus.maxEventLoopOverloadedRatio = 0.49; - // @ts-expect-error Overwriting readonly private prop - systemStatus.maxCpuOverloadedRatio = 0.49; - // @ts-expect-error Overwriting readonly private prop - systemStatus.maxClientOverloadedRatio = 0.49; + // Drop all thresholds below 0.5 → all four overloaded + systemStatus = new SystemStatus({ + snapshotter: mock, + maxMemoryOverloadedRatio: 0.49, + maxEventLoopOverloadedRatio: 0.49, + maxCpuOverloadedRatio: 0.49, + maxClientOverloadedRatio: 0.49, + }); expect(systemStatus.getCurrentStatus().isSystemIdle).toBe(false); expect(systemStatus.getHistoricalStatus().isSystemIdle).toBe(false); - // @ts-expect-error Overwriting readonly private prop - systemStatus.maxMemoryOverloadedRatio = 0.5; - // @ts-expect-error Overwriting readonly private prop - systemStatus.maxEventLoopOverloadedRatio = 0.5; - // @ts-expect-error Overwriting readonly private prop - systemStatus.maxCpuOverloadedRatio = 0.49; - // @ts-expect-error Overwriting readonly private prop - systemStatus.maxClientOverloadedRatio = 0.49; + // Memory & eventLoop at threshold, CPU & client below → still overloaded + systemStatus = new SystemStatus({ + snapshotter: mock, + maxMemoryOverloadedRatio: 0.5, + maxEventLoopOverloadedRatio: 0.5, + maxCpuOverloadedRatio: 0.49, + maxClientOverloadedRatio: 0.49, + }); expect(systemStatus.getCurrentStatus().isSystemIdle).toBe(false); expect(systemStatus.getHistoricalStatus().isSystemIdle).toBe(false); - // @ts-expect-error Overwriting readonly private prop - systemStatus.maxMemoryOverloadedRatio = 1; - // @ts-expect-error Overwriting readonly private prop - systemStatus.maxEventLoopOverloadedRatio = 1; - // @ts-expect-error Overwriting readonly private prop - systemStatus.maxCpuOverloadedRatio = 1; - // @ts-expect-error Overwriting readonly private prop - systemStatus.maxClientOverloadedRatio = 1; + // All thresholds well above → idle + systemStatus = new SystemStatus({ + snapshotter: mock, + maxMemoryOverloadedRatio: 1, + maxEventLoopOverloadedRatio: 1, + maxCpuOverloadedRatio: 1, + maxClientOverloadedRatio: 1, + }); expect(systemStatus.getCurrentStatus().isSystemIdle).toBe(true); expect(systemStatus.getHistoricalStatus().isSystemIdle).toBe(true); });