Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions packages/core/src/autoscaling/autoscaled_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { betterClearInterval, betterSetInterval } from '@apify/utilities';
import { Configuration } from '../configuration';
import { CriticalError } from '../errors';
import { log as defaultLog } from '../log';
import type { LoadSignal } from './load_signal';
import type { SnapshotterOptions } from './snapshotter';
import { Snapshotter } from './snapshotter';
import type { SystemInfo, SystemStatusOptions } from './system_status';
Expand Down Expand Up @@ -203,6 +204,10 @@ export class AutoscaledPool {
private resolve: ((val?: unknown) => void) | null = null;
private reject: ((reason?: unknown) => void) | null = null;
private snapshotter: Snapshotter;

/** Additional SystemStatus loadSignals - tracked here for initialization and cleanup */
private loadSignals: LoadSignal[];

private systemStatus: SystemStatus;
private autoscaleInterval!: BetterIntervalID;
private maybeRunInterval!: BetterIntervalID;
Expand Down Expand Up @@ -295,6 +300,7 @@ export class AutoscaledPool {
});
ssoCopy.config ??= this.config;
this.snapshotter = ssoCopy.snapshotter;
this.loadSignals = ssoCopy.loadSignals ?? [];
this.systemStatus = new SystemStatus(ssoCopy);
}

Expand Down Expand Up @@ -366,6 +372,7 @@ export class AutoscaledPool {
});

await this.snapshotter.start();
await Promise.all(this.loadSignals.map((s) => s.start()));

// This interval checks the system status and updates the desired concurrency accordingly.
this.autoscaleInterval = betterSetInterval(this._autoscale, this.autoscaleIntervalMillis);
Expand Down Expand Up @@ -699,6 +706,7 @@ export class AutoscaledPool {
betterClearInterval(this.maybeRunInterval);
if (this.tasksDonePerSecondInterval) betterClearInterval(this.tasksDonePerSecondInterval);
if (this.snapshotter) await this.snapshotter.stop();
await Promise.all(this.loadSignals.map((s) => s.stop()));
}

protected _incrementTasksDonePerSecond(intervalCallback: () => void) {
Expand Down
60 changes: 60 additions & 0 deletions packages/core/src/autoscaling/client_load_signal.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import type { StorageClient } from '@crawlee/types';

import type { LoadSnapshot } from './load_signal';
import { SnapshotStore } from './load_signal';

const CLIENT_RATE_LIMIT_ERROR_RETRY_COUNT = 2;

export interface ClientSnapshot extends LoadSnapshot {
rateLimitErrorCount: number;
}

export interface ClientLoadSignalOptions {
client: StorageClient;
clientSnapshotIntervalSecs?: number;
maxClientErrors?: number;
overloadedRatio?: number;
snapshotHistoryMillis?: number;
}

/**
* Periodically checks the storage client for rate-limit errors (HTTP 429)
* and reports overload when the error delta exceeds a threshold.
*/
export function createClientLoadSignal(options: ClientLoadSignalOptions) {
const maxClientErrors = options.maxClientErrors ?? 3;

const signal = SnapshotStore.fromInterval<ClientSnapshot>({
name: 'clientInfo',
overloadedRatio: options.overloadedRatio ?? 0.3,
intervalMillis: (options.clientSnapshotIntervalSecs ?? 1) * 1000,
snapshotHistoryMillis: options.snapshotHistoryMillis,
handler(store, intervalCallback) {
const now = new Date();

const allErrorCounts = options.client.stats?.rateLimitErrors ?? [];
const currentErrCount = allErrorCounts[CLIENT_RATE_LIMIT_ERROR_RETRY_COUNT] || 0;

const snapshot: ClientSnapshot = {
createdAt: now,
isOverloaded: false,
rateLimitErrorCount: currentErrCount,
};
const all = store.getAll();
const previousSnapshot = all[all.length - 1];
if (previousSnapshot) {
const { rateLimitErrorCount } = previousSnapshot;
const delta = currentErrCount - rateLimitErrorCount;
if (delta > maxClientErrors) snapshot.isOverloaded = true;
}

store.push(snapshot, now);
intervalCallback();
},
});

return signal;
}

/** @internal Return type for backward compat in Snapshotter facade */
export type ClientLoadSignal = ReturnType<typeof createClientLoadSignal>;
45 changes: 45 additions & 0 deletions packages/core/src/autoscaling/cpu_load_signal.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import type { Configuration } from '../configuration';
import { EventType } from '../events/event_manager';
import type { LoadSnapshot } from './load_signal';
import { SnapshotStore } from './load_signal';
import type { SystemInfo } from './system_status';

export interface CpuSnapshot extends LoadSnapshot {
usedRatio: number;
ticks?: { idle: number; total: number };
}

export interface CpuLoadSignalOptions {
overloadedRatio?: number;
snapshotHistoryMillis?: number;
config: Configuration;
}

/**
* Tracks CPU usage via `SYSTEM_INFO` events and reports overload when
* the platform or local OS metrics indicate the CPU is overloaded.
*/
export function createCpuLoadSignal(options: CpuLoadSignalOptions) {
return SnapshotStore.fromEvent<CpuSnapshot, SystemInfo>({
name: 'cpuInfo',
overloadedRatio: options.overloadedRatio ?? 0.4,
events: options.config.getEventManager(),
event: EventType.SYSTEM_INFO,
snapshotHistoryMillis: options.snapshotHistoryMillis,
handler(store, systemInfo) {
const { cpuCurrentUsage, isCpuOverloaded } = systemInfo;
const createdAt = systemInfo.createdAt ? new Date(systemInfo.createdAt) : new Date();
store.push(
{
createdAt,
isOverloaded: isCpuOverloaded!,
usedRatio: Math.ceil(cpuCurrentUsage! / 100),
},
createdAt,
);
},
});
}

/** @internal Return type for backward compat in Snapshotter facade */
export type CpuLoadSignal = ReturnType<typeof createCpuLoadSignal>;
56 changes: 56 additions & 0 deletions packages/core/src/autoscaling/event_loop_load_signal.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import type { LoadSnapshot } from './load_signal';
import { SnapshotStore } from './load_signal';

export interface EventLoopSnapshot extends LoadSnapshot {
exceededMillis: number;
}

export interface EventLoopLoadSignalOptions {
eventLoopSnapshotIntervalSecs?: number;
maxBlockedMillis?: number;
overloadedRatio?: number;
snapshotHistoryMillis?: number;
}

/**
* Periodically measures event loop delay and reports overload when the
* delay exceeds a configured threshold.
*/
export function createEventLoopLoadSignal(options: EventLoopLoadSignalOptions = {}) {
const intervalMillis = (options.eventLoopSnapshotIntervalSecs ?? 0.5) * 1000;
const maxBlockedMillis = options.maxBlockedMillis ?? 50;

const signal = SnapshotStore.fromInterval<EventLoopSnapshot>({
name: 'eventLoopInfo',
overloadedRatio: options.overloadedRatio ?? 0.6,
intervalMillis,
snapshotHistoryMillis: options.snapshotHistoryMillis,
handler(store, intervalCallback) {
const now = new Date();

const snapshot: EventLoopSnapshot = {
createdAt: now,
isOverloaded: false,
exceededMillis: 0,
};

const all = store.getAll();
const previousSnapshot = all[all.length - 1];
if (previousSnapshot) {
const { createdAt } = previousSnapshot;
const delta = now.getTime() - +createdAt - intervalMillis;

if (delta > maxBlockedMillis) snapshot.isOverloaded = true;
snapshot.exceededMillis = Math.max(delta - maxBlockedMillis, 0);
}

store.push(snapshot, now);
intervalCallback();
},
});

return signal;
}

/** @internal Return type for backward compat in Snapshotter facade */
export type EventLoopLoadSignal = ReturnType<typeof createEventLoopLoadSignal>;
5 changes: 5 additions & 0 deletions packages/core/src/autoscaling/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
export * from './autoscaled_pool';
export * from './client_load_signal';
export * from './cpu_load_signal';
export * from './event_loop_load_signal';
export * from './load_signal';
export * from './memory_load_signal';
export * from './snapshotter';
export * from './system_status';
Loading
Loading