Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/distributed-adaptor-install-lock.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@openfn/engine-multi': minor
'@openfn/ws-worker': minor
---

Add a filesystem-based lock so multiple workers can safely share a single adaptor repo directory (e.g. an NFS mount or k8s PVC). The lock lives inside engine-multi's autoinstall and serialises installs across processes via a per-adaptor lockfile. It is enabled by default and can be disabled with `WORKER_NO_REPO_LOCK=true` (or `--no-repo-lock`) on ws-worker. The engine's `isInstalled` check has also been tightened to verify the adaptor's `node_modules` entry exists on disk, not just the repo `package.json` dependency entry — this catches half-installed adaptors left behind by a crashed worker.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer the release notes to be way more terse. No explanation, no justification - just the headline. Maybe a migration guide if breaking but obviously that doesn't apply here

5 changes: 4 additions & 1 deletion packages/engine-multi/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@
"@openfn/logger": "workspace:*",
"@openfn/runtime": "workspace:*",
"fast-safe-stringify": "^2.1.1",
"json-stream-stringify": "^3.1.6"
"json-stream-stringify": "^3.1.6",
"proper-lockfile": "^4.1.2"
},
"devDependencies": {
"@types/node": "^18.19.130",
"@types/proper-lockfile": "^4.1.4",
"ava": "5.3.1",
"esmock": "^2.7.5",
"tslib": "^2.8.1",
"tsm": "^2.3.0",
"tsup": "^8.5.1",
Expand Down
183 changes: 105 additions & 78 deletions packages/engine-multi/src/api/autoinstall.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import path from 'node:path';
import { stat } from 'node:fs/promises';
import {
ensureRepo,
getAliasedName,
Expand All @@ -13,98 +15,102 @@ import type { Logger } from '@openfn/logger';
import { AUTOINSTALL_COMPLETE, AUTOINSTALL_ERROR } from '../events';
import { AutoinstallError } from '../errors';
import ExecutionContext from '../classes/ExecutionContext';
import { withInstallLock } from '../util/repo-lock';

// none of these options should be on the plan actually
export type AutoinstallOptions = {
skipRepoValidation?: boolean;
handleInstall?(fn: string, repoDir: string, logger: Logger): Promise<void>;
handleIsInstalled?(
fn: string,
repoDir: string,
logger: Logger
): Promise<boolean>;
lockRepo?: boolean;
versionLookup?: (specifier: string) => Promise<string>;
};

const pending: Record<string, Promise<void>> = {};
// Per-entry options are pinned at enqueue time so the worker that drains the
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this comment valid? Looks random?

// queue doesn't accidentally use the *first* caller's closure. See PR #1416.
type QueueEntry = {
adaptors: string[];
callback: (err?: any) => void;
context: ExecutionContext;
repoDir: string;
logger: Logger;
useLock: boolean;
};

let busy = false;
const queue: QueueEntry[] = [];

const processQueue = async () => {
const next = queue.shift();
if (next) {
busy = true;
await doAutoinstall(next);
processQueue();
} else {
busy = false;
}
};

const queue: Array<{ adaptors: string[]; callback: (err?: any) => void }> = [];

const enqueue = (adaptors: string[]) =>
new Promise((resolve) => {
queue.push({ adaptors, callback: resolve });
});
const doAutoinstall = async (entry: QueueEntry) => {
const { adaptors, callback, context, repoDir, logger, useLock } = entry;

// Install any modules for an Execution Plan that are not already installed
// This will enforce a queue ensuring only one module is installed at a time
// This fixes https://github.com/OpenFn/kit/issues/503
const autoinstall = async (context: ExecutionContext): Promise<ModulePaths> => {
// TODO not a huge fan of these functions in the closure, but it's ok for now
const processQueue = async () => {
const next = queue.shift();
if (next) {
busy = true;
const { adaptors, callback } = next;
await doAutoinstall(adaptors, callback);
processQueue();
} else {
// do nothing
busy = false;
for (const a of adaptors) {
const { name, version } = getNameAndVersion(a);
if (await isInstalled(a, repoDir, logger)) {
continue;
}
};

// This will actually do the autoinstall for an run (all adaptors)
const doAutoinstall = async (
adaptors: string[],
onComplete: (err?: any) => void
) => {
// Check whether we still need to do any work
for (const a of adaptors) {
const { name, version } = getNameAndVersion(a);
if (await isInstalledFn(a, repoDir, logger)) {
continue;
}

const startTime = Date.now();
try {
await installFn(a, repoDir, logger);

const duration = Date.now() - startTime;
logger.success(`autoinstalled ${a} in ${duration / 1000}s`);
context.emit(AUTOINSTALL_COMPLETE, {
module: name,
version: version!,
duration,
});
} catch (e: any) {
delete pending[a];

logger.error(`ERROR autoinstalling ${a}: ${e.message}`);
logger.error(e);
const duration = Date.now() - startTime;
context.emit(AUTOINSTALL_ERROR, {
module: name,
version: version!,
duration,
message: e.message || e.toString(),
const startTime = Date.now();
try {
if (useLock) {
// Re-check inside the lock so we skip work a peer worker
// completed while we were waiting.
await withInstallLock(repoDir, getAliasedName(a), logger, async () => {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm. Why is the last argument a callback? We can't just let the promise return?

if (await isInstalled(a, repoDir, logger)) {
logger.debug(
`another worker installed ${a} while waiting for lock; skipping`
);
return;
}
await install(a, repoDir, logger);
});

// Abort on the first error
return onComplete(new AutoinstallError(a, e));
} else {
await install(a, repoDir, logger);
}

const duration = Date.now() - startTime;
logger.success(`autoinstalled ${a} in ${duration / 1000}s`);
context.emit(AUTOINSTALL_COMPLETE, {
module: name,
version: version!,
duration,
});
} catch (e: any) {
logger.error(`ERROR autoinstalling ${a}: ${e.message}`);
logger.error(e);
const duration = Date.now() - startTime;
context.emit(AUTOINSTALL_ERROR, {
module: name,
version: version!,
duration,
message: e.message || e.toString(),
});

// Abort on the first error
return callback(new AutoinstallError(a, e));
}
onComplete();
};
}
callback();
};

// Install any modules for an Execution Plan that are not already installed
// This will enforce a queue ensuring only one module is installed at a time
// This fixes https://github.com/OpenFn/kit/issues/503
const autoinstall = async (context: ExecutionContext): Promise<ModulePaths> => {
const { logger, state, options } = context;
const { plan } = state;
const { repoDir, whitelist } = options;
const autoinstallOptions = options.autoinstall || {};
const useLock = autoinstallOptions.lockRepo !== false;

const installFn = autoinstallOptions?.handleInstall || install;
const isInstalledFn = autoinstallOptions?.handleIsInstalled || isInstalled;
const versionlookup = autoinstallOptions?.versionLookup || getLatestVersion;

let didValidateRepo = false;
Expand All @@ -124,7 +130,7 @@ const autoinstall = async (context: ExecutionContext): Promise<ModulePaths> => {
const adaptors = Array.from(identifyAdaptors(plan));
const paths: ModulePaths = {};

const adaptorsToLoad = [];
const adaptorsToLoad: string[] = [];
for (const a of adaptors) {
// Ensure that this is not blacklisted
if (whitelist && !whitelist.find((r) => r.exec(a))) {
Expand Down Expand Up @@ -166,7 +172,7 @@ const autoinstall = async (context: ExecutionContext): Promise<ModulePaths> => {
version: v,
};

if (!(await isInstalledFn(resolvedAdaptorName, repoDir, logger))) {
if (!(await isInstalled(resolvedAdaptorName, repoDir, logger))) {
adaptorsToLoad.push(resolvedAdaptorName);
}
}
Expand All @@ -185,8 +191,16 @@ const autoinstall = async (context: ExecutionContext): Promise<ModulePaths> => {
}

if (adaptorsToLoad.length) {
// Add this to the queue
const p = enqueue(adaptorsToLoad);
const p = new Promise((resolve) => {
queue.push({
adaptors: adaptorsToLoad,
callback: resolve,
context,
repoDir,
logger,
useLock,
});
});

if (!busy) {
processQueue();
Expand All @@ -210,9 +224,17 @@ export default autoinstall;
const install = (specifier: string, repoDir: string, logger: Logger) =>
runtimeInstall([specifier], repoDir, logger);

// The actual isInstalled function is not unit tested
// TODO this should probably all be handled (and tested) in @openfn/runtime
const isInstalled = async (
const fileExists = async (p: string) => {
try {
await stat(p);
return true;
} catch {
return false;
}
};

// Exported for unit testing
export const isInstalled = async (
specifier: string,
repoDir: string,
logger: Logger
Expand All @@ -232,7 +254,12 @@ const isInstalled = async (
const pkg = await loadRepoPkg(repoDir);
if (pkg) {
const { dependencies } = pkg;
return dependencies.hasOwnProperty(alias);
if (!dependencies.hasOwnProperty(alias)) {
return false;
}
return fileExists(
path.join(repoDir, 'node_modules', alias, 'package.json')
);
}
};

Expand Down
88 changes: 88 additions & 0 deletions packages/engine-multi/src/util/repo-lock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import path from 'node:path';
import { mkdir, writeFile } from 'node:fs/promises';
import lockfile from 'proper-lockfile';
import type { Logger } from '@openfn/logger';

// k8s CPU throttling can delay the heartbeat setTimeout; 5 min avoids false-stale on tight clusters
const STALE_MS = 5 * 60_000;
// Retry ceiling exceeds STALE_MS so a dead lock-holder's stale window expires before we give up
const MAX_LOCK_WAIT_MS = STALE_MS + 60_000;
const LOCK_INTERVAL_MS = 2_000;
const UPDATE_MS = 5_000;

const LOCK_RETRY_OPTIONS = {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Surprises me to see this declared as a constant. I get the convention but an object isn't really a constant. Just feels wierd.

Also none of this is configurable? The lock uses hard-coded values and that's it? Not a blocker for me but we might want to open an issue to make this configurable later.

Not because I want to configure it today - but because it's weird that I am unable to.

retries: MAX_LOCK_WAIT_MS / LOCK_INTERVAL_MS,
factor: 1,
minTimeout: LOCK_INTERVAL_MS,
maxTimeout: LOCK_INTERVAL_MS,
};

const ensureLockTarget = async (target: string) => {
await mkdir(path.dirname(target), { recursive: true });
try {
await writeFile(target, '', { flag: 'wx' });
} catch (e: any) {
if (e.code !== 'EEXIST') throw e;
}
};

export const withInstallLock = async (
repoDir: string,
alias: string,
logger: Logger,
fn: () => Promise<void>
): Promise<void> => {
// Defence-in-depth: refuse aliases that could escape the .locks directory.
// Upstream whitelist filtering should make this unreachable.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it? Where?

If this is true I expect to see some shared logic which sanitises aliases to be safe

if (alias.split(/[/\\]/).includes('..') || path.isAbsolute(alias)) {
throw new Error(`Invalid alias for install lock: ${alias}`);
}

const locksDir = path.join(repoDir, '.locks');
const target = path.join(locksDir, `${alias}.lock`);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since locksDir isn't used anywhere else you can simplify to

const target = path.join(repoDir, '.locks', `${alias}.lock`);


await ensureLockTarget(target);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hold up, this function creates a lock file at the target path. But that's exactly what lockfile.lock does for us right?

So how does that work? Doesn't lockfile just assume the file is always locked because we've just created the lock?


logger.debug(`acquiring install lock for ${alias}`);
let release: () => Promise<void>;

const lockOpts = { stale: STALE_MS, update: UPDATE_MS, realpath: false };

try {
release = await lockfile.lock(target, { ...lockOpts, retries: 0 });
} catch (e: any) {
if (e.code !== 'ELOCKED') throw e;

logger.info(
`waiting for install lock on \`${alias}\` (another worker is installing)`
);

try {
release = await lockfile.lock(target, {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we call lockfile.lock twice here. Once I guess to see if we can aquire the lock - and if we can't, we print this log line and then try again with a number of retries?

First observation is that we're then in effect retrying the lock LOCK_RETRY_OPTIONS + 1 times

Second observation is - is this worth it? Are we better calling lockfile.check() first, and then doing the log-and-retry call if a lock already exists?

...lockOpts,
retries: LOCK_RETRY_OPTIONS,
});
} catch (e2: any) {
if (e2.code === 'ELOCKED') {
throw new Error(
`Lock acquisition timed out after ${
MAX_LOCK_WAIT_MS / 1000
}s waiting for ${alias}; another worker likely still installing (lock: ${target})`
);
}
throw e2;
}
}

logger.debug(`acquired install lock for ${alias}`);

try {
await fn();
} finally {
try {
await release();
} catch (e) {
logger.warn(`failed to release install lock for ${alias}:`, e);
}
}
};
Loading