-
Notifications
You must be signed in to change notification settings - Fork 17
Optional cross-worker lock for shared adaptor repo #1416
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
2518c23
0ccdc90
7fe2707
e52257e
66c1ea7
09503ae
5509997
81eefab
f5c4abf
0d0fd3b
dc8f04f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| --- | ||
| '@openfn/engine-multi': minor | ||
| '@openfn/ws-worker': minor | ||
| --- | ||
|
|
||
| Add a filesystem-based lock so multiple workers can safely share a single adaptor repo directory (e.g. an NFS mount or k8s PVC). The lock lives inside engine-multi's autoinstall and serialises installs across processes via a per-adaptor lockfile. It is enabled by default and can be disabled with `WORKER_NO_REPO_LOCK=true` (or `--no-repo-lock`) on ws-worker. The engine's `isInstalled` check has also been tightened to verify the adaptor's `node_modules` entry exists on disk, not just the repo `package.json` dependency entry — this catches half-installed adaptors left behind by a crashed worker. | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,5 @@ | ||
| import path from 'node:path'; | ||
| import { stat } from 'node:fs/promises'; | ||
| import { | ||
| ensureRepo, | ||
| getAliasedName, | ||
|
|
@@ -13,98 +15,102 @@ import type { Logger } from '@openfn/logger'; | |
| import { AUTOINSTALL_COMPLETE, AUTOINSTALL_ERROR } from '../events'; | ||
| import { AutoinstallError } from '../errors'; | ||
| import ExecutionContext from '../classes/ExecutionContext'; | ||
| import { withInstallLock } from '../util/repo-lock'; | ||
|
|
||
| // none of these options should be on the plan actually | ||
| export type AutoinstallOptions = { | ||
| skipRepoValidation?: boolean; | ||
| handleInstall?(fn: string, repoDir: string, logger: Logger): Promise<void>; | ||
| handleIsInstalled?( | ||
| fn: string, | ||
| repoDir: string, | ||
| logger: Logger | ||
| ): Promise<boolean>; | ||
| lockRepo?: boolean; | ||
| versionLookup?: (specifier: string) => Promise<string>; | ||
| }; | ||
|
|
||
| const pending: Record<string, Promise<void>> = {}; | ||
| // Per-entry options are pinned at enqueue time so the worker that drains the | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this comment valid? Looks random? |
||
| // queue doesn't accidentally use the *first* caller's closure. See PR #1416. | ||
| type QueueEntry = { | ||
| adaptors: string[]; | ||
| callback: (err?: any) => void; | ||
| context: ExecutionContext; | ||
| repoDir: string; | ||
| logger: Logger; | ||
| useLock: boolean; | ||
| }; | ||
|
|
||
| let busy = false; | ||
| const queue: QueueEntry[] = []; | ||
|
|
||
| const processQueue = async () => { | ||
| const next = queue.shift(); | ||
| if (next) { | ||
| busy = true; | ||
| await doAutoinstall(next); | ||
| processQueue(); | ||
| } else { | ||
| busy = false; | ||
| } | ||
| }; | ||
|
|
||
| const queue: Array<{ adaptors: string[]; callback: (err?: any) => void }> = []; | ||
|
|
||
| const enqueue = (adaptors: string[]) => | ||
| new Promise((resolve) => { | ||
| queue.push({ adaptors, callback: resolve }); | ||
| }); | ||
| const doAutoinstall = async (entry: QueueEntry) => { | ||
| const { adaptors, callback, context, repoDir, logger, useLock } = entry; | ||
|
|
||
| // Install any modules for an Execution Plan that are not already installed | ||
| // This will enforce a queue ensuring only one module is installed at a time | ||
| // This fixes https://github.com/OpenFn/kit/issues/503 | ||
| const autoinstall = async (context: ExecutionContext): Promise<ModulePaths> => { | ||
| // TODO not a huge fan of these functions in the closure, but it's ok for now | ||
| const processQueue = async () => { | ||
| const next = queue.shift(); | ||
| if (next) { | ||
| busy = true; | ||
| const { adaptors, callback } = next; | ||
| await doAutoinstall(adaptors, callback); | ||
| processQueue(); | ||
| } else { | ||
| // do nothing | ||
| busy = false; | ||
| for (const a of adaptors) { | ||
| const { name, version } = getNameAndVersion(a); | ||
| if (await isInstalled(a, repoDir, logger)) { | ||
| continue; | ||
| } | ||
| }; | ||
|
|
||
| // This will actually do the autoinstall for an run (all adaptors) | ||
| const doAutoinstall = async ( | ||
| adaptors: string[], | ||
| onComplete: (err?: any) => void | ||
| ) => { | ||
| // Check whether we still need to do any work | ||
| for (const a of adaptors) { | ||
| const { name, version } = getNameAndVersion(a); | ||
| if (await isInstalledFn(a, repoDir, logger)) { | ||
| continue; | ||
| } | ||
|
|
||
| const startTime = Date.now(); | ||
| try { | ||
| await installFn(a, repoDir, logger); | ||
|
|
||
| const duration = Date.now() - startTime; | ||
| logger.success(`autoinstalled ${a} in ${duration / 1000}s`); | ||
| context.emit(AUTOINSTALL_COMPLETE, { | ||
| module: name, | ||
| version: version!, | ||
| duration, | ||
| }); | ||
| } catch (e: any) { | ||
| delete pending[a]; | ||
|
|
||
| logger.error(`ERROR autoinstalling ${a}: ${e.message}`); | ||
| logger.error(e); | ||
| const duration = Date.now() - startTime; | ||
| context.emit(AUTOINSTALL_ERROR, { | ||
| module: name, | ||
| version: version!, | ||
| duration, | ||
| message: e.message || e.toString(), | ||
| const startTime = Date.now(); | ||
| try { | ||
| if (useLock) { | ||
| // Re-check inside the lock so we skip work a peer worker | ||
| // completed while we were waiting. | ||
| await withInstallLock(repoDir, getAliasedName(a), logger, async () => { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmm. Why is the last argument a callback? We can't just let the promise return? |
||
| if (await isInstalled(a, repoDir, logger)) { | ||
| logger.debug( | ||
| `another worker installed ${a} while waiting for lock; skipping` | ||
| ); | ||
| return; | ||
| } | ||
| await install(a, repoDir, logger); | ||
| }); | ||
|
|
||
| // Abort on the first error | ||
| return onComplete(new AutoinstallError(a, e)); | ||
| } else { | ||
| await install(a, repoDir, logger); | ||
| } | ||
|
|
||
| const duration = Date.now() - startTime; | ||
| logger.success(`autoinstalled ${a} in ${duration / 1000}s`); | ||
| context.emit(AUTOINSTALL_COMPLETE, { | ||
| module: name, | ||
| version: version!, | ||
| duration, | ||
| }); | ||
| } catch (e: any) { | ||
| logger.error(`ERROR autoinstalling ${a}: ${e.message}`); | ||
| logger.error(e); | ||
| const duration = Date.now() - startTime; | ||
| context.emit(AUTOINSTALL_ERROR, { | ||
| module: name, | ||
| version: version!, | ||
| duration, | ||
| message: e.message || e.toString(), | ||
| }); | ||
|
|
||
| // Abort on the first error | ||
| return callback(new AutoinstallError(a, e)); | ||
| } | ||
| onComplete(); | ||
| }; | ||
| } | ||
| callback(); | ||
| }; | ||
|
|
||
| // Install any modules for an Execution Plan that are not already installed | ||
| // This will enforce a queue ensuring only one module is installed at a time | ||
| // This fixes https://github.com/OpenFn/kit/issues/503 | ||
| const autoinstall = async (context: ExecutionContext): Promise<ModulePaths> => { | ||
| const { logger, state, options } = context; | ||
| const { plan } = state; | ||
| const { repoDir, whitelist } = options; | ||
| const autoinstallOptions = options.autoinstall || {}; | ||
| const useLock = autoinstallOptions.lockRepo !== false; | ||
|
|
||
| const installFn = autoinstallOptions?.handleInstall || install; | ||
| const isInstalledFn = autoinstallOptions?.handleIsInstalled || isInstalled; | ||
| const versionlookup = autoinstallOptions?.versionLookup || getLatestVersion; | ||
|
|
||
| let didValidateRepo = false; | ||
|
|
@@ -124,7 +130,7 @@ const autoinstall = async (context: ExecutionContext): Promise<ModulePaths> => { | |
| const adaptors = Array.from(identifyAdaptors(plan)); | ||
| const paths: ModulePaths = {}; | ||
|
|
||
| const adaptorsToLoad = []; | ||
| const adaptorsToLoad: string[] = []; | ||
| for (const a of adaptors) { | ||
| // Ensure that this is not blacklisted | ||
| if (whitelist && !whitelist.find((r) => r.exec(a))) { | ||
|
|
@@ -166,7 +172,7 @@ const autoinstall = async (context: ExecutionContext): Promise<ModulePaths> => { | |
| version: v, | ||
| }; | ||
|
|
||
| if (!(await isInstalledFn(resolvedAdaptorName, repoDir, logger))) { | ||
| if (!(await isInstalled(resolvedAdaptorName, repoDir, logger))) { | ||
| adaptorsToLoad.push(resolvedAdaptorName); | ||
| } | ||
| } | ||
|
|
@@ -185,8 +191,16 @@ const autoinstall = async (context: ExecutionContext): Promise<ModulePaths> => { | |
| } | ||
|
|
||
| if (adaptorsToLoad.length) { | ||
| // Add this to the queue | ||
| const p = enqueue(adaptorsToLoad); | ||
| const p = new Promise((resolve) => { | ||
| queue.push({ | ||
| adaptors: adaptorsToLoad, | ||
| callback: resolve, | ||
| context, | ||
| repoDir, | ||
| logger, | ||
| useLock, | ||
| }); | ||
| }); | ||
|
|
||
| if (!busy) { | ||
| processQueue(); | ||
|
|
@@ -210,9 +224,17 @@ export default autoinstall; | |
| const install = (specifier: string, repoDir: string, logger: Logger) => | ||
| runtimeInstall([specifier], repoDir, logger); | ||
|
|
||
| // The actual isInstalled function is not unit tested | ||
| // TODO this should probably all be handled (and tested) in @openfn/runtime | ||
| const isInstalled = async ( | ||
| const fileExists = async (p: string) => { | ||
| try { | ||
| await stat(p); | ||
| return true; | ||
| } catch { | ||
| return false; | ||
| } | ||
| }; | ||
|
|
||
| // Exported for unit testing | ||
| export const isInstalled = async ( | ||
| specifier: string, | ||
| repoDir: string, | ||
| logger: Logger | ||
|
|
@@ -232,7 +254,12 @@ const isInstalled = async ( | |
| const pkg = await loadRepoPkg(repoDir); | ||
| if (pkg) { | ||
| const { dependencies } = pkg; | ||
| return dependencies.hasOwnProperty(alias); | ||
| if (!dependencies.hasOwnProperty(alias)) { | ||
| return false; | ||
| } | ||
| return fileExists( | ||
| path.join(repoDir, 'node_modules', alias, 'package.json') | ||
| ); | ||
| } | ||
| }; | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,88 @@ | ||
| import path from 'node:path'; | ||
| import { mkdir, writeFile } from 'node:fs/promises'; | ||
| import lockfile from 'proper-lockfile'; | ||
| import type { Logger } from '@openfn/logger'; | ||
|
|
||
| // k8s CPU throttling can delay the heartbeat setTimeout; 5 min avoids false-stale on tight clusters | ||
| const STALE_MS = 5 * 60_000; | ||
| // Retry ceiling exceeds STALE_MS so a dead lock-holder's stale window expires before we give up | ||
| const MAX_LOCK_WAIT_MS = STALE_MS + 60_000; | ||
| const LOCK_INTERVAL_MS = 2_000; | ||
| const UPDATE_MS = 5_000; | ||
|
|
||
| const LOCK_RETRY_OPTIONS = { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Surprises me to see this declared as a constant. I get the convention but an object isn't really a constant. Just feels wierd. Also none of this is configurable? The lock uses hard-coded values and that's it? Not a blocker for me but we might want to open an issue to make this configurable later. Not because I want to configure it today - but because it's weird that I am unable to. |
||
| retries: MAX_LOCK_WAIT_MS / LOCK_INTERVAL_MS, | ||
| factor: 1, | ||
| minTimeout: LOCK_INTERVAL_MS, | ||
| maxTimeout: LOCK_INTERVAL_MS, | ||
| }; | ||
|
|
||
| const ensureLockTarget = async (target: string) => { | ||
| await mkdir(path.dirname(target), { recursive: true }); | ||
| try { | ||
| await writeFile(target, '', { flag: 'wx' }); | ||
| } catch (e: any) { | ||
| if (e.code !== 'EEXIST') throw e; | ||
| } | ||
| }; | ||
|
|
||
| export const withInstallLock = async ( | ||
| repoDir: string, | ||
| alias: string, | ||
| logger: Logger, | ||
| fn: () => Promise<void> | ||
| ): Promise<void> => { | ||
| // Defence-in-depth: refuse aliases that could escape the .locks directory. | ||
| // Upstream whitelist filtering should make this unreachable. | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should it? Where? If this is true I expect to see some shared logic which sanitises aliases to be safe |
||
| if (alias.split(/[/\\]/).includes('..') || path.isAbsolute(alias)) { | ||
| throw new Error(`Invalid alias for install lock: ${alias}`); | ||
| } | ||
|
|
||
| const locksDir = path.join(repoDir, '.locks'); | ||
| const target = path.join(locksDir, `${alias}.lock`); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. since locksDir isn't used anywhere else you can simplify to |
||
|
|
||
| await ensureLockTarget(target); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hold up, this function creates a lock file at the target path. But that's exactly what So how does that work? Doesn't |
||
|
|
||
| logger.debug(`acquiring install lock for ${alias}`); | ||
| let release: () => Promise<void>; | ||
|
|
||
| const lockOpts = { stale: STALE_MS, update: UPDATE_MS, realpath: false }; | ||
|
|
||
| try { | ||
| release = await lockfile.lock(target, { ...lockOpts, retries: 0 }); | ||
| } catch (e: any) { | ||
| if (e.code !== 'ELOCKED') throw e; | ||
|
|
||
| logger.info( | ||
| `waiting for install lock on \`${alias}\` (another worker is installing)` | ||
| ); | ||
|
|
||
| try { | ||
| release = await lockfile.lock(target, { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So we call lockfile.lock twice here. Once I guess to see if we can aquire the lock - and if we can't, we print this log line and then try again with a number of retries? First observation is that we're then in effect retrying the lock LOCK_RETRY_OPTIONS + 1 times Second observation is - is this worth it? Are we better calling |
||
| ...lockOpts, | ||
| retries: LOCK_RETRY_OPTIONS, | ||
| }); | ||
| } catch (e2: any) { | ||
| if (e2.code === 'ELOCKED') { | ||
| throw new Error( | ||
| `Lock acquisition timed out after ${ | ||
| MAX_LOCK_WAIT_MS / 1000 | ||
| }s waiting for ${alias}; another worker likely still installing (lock: ${target})` | ||
| ); | ||
| } | ||
| throw e2; | ||
| } | ||
| } | ||
|
|
||
| logger.debug(`acquired install lock for ${alias}`); | ||
|
|
||
| try { | ||
| await fn(); | ||
| } finally { | ||
| try { | ||
| await release(); | ||
| } catch (e) { | ||
| logger.warn(`failed to release install lock for ${alias}:`, e); | ||
| } | ||
| } | ||
| }; | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer the release notes to be way more terse. No explanation, no justification - just the headline. Maybe a migration guide if breaking but obviously that doesn't apply here