From e5b9511b75b3eda7fe14a02e9ef257c69fe2c1ae Mon Sep 17 00:00:00 2001 From: Jack Zhuang <277994282+os-zhuang@users.noreply.github.com> Date: Mon, 1 Jun 2026 06:52:45 +0800 Subject: [PATCH] =?UTF-8?q?feat(automation):=20schedule=20flow=20trigger?= =?UTF-8?q?=20=E2=80=94=20auto-launch=20flows=20on=20cron/interval/once?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The time-based sibling of plugin-trigger-record-change. The automation engine already parses a flow's start node into a `schedule` binding (flow.type === 'schedule' or a start-node `config.schedule` descriptor) — shipped with the record-change trigger PR — so this is purely the concrete trigger, with no engine change. New plugin @objectstack/plugin-trigger-schedule: - ScheduleTrigger implements the engine's FlowTrigger extension point and delegates timing to the platform IJobService (the 'job' service), staying adapter-agnostic: the job service selects a cron-capable adapter (DbJobAdapter / CronJobAdapter) for cron and the interval adapter otherwise. - normalizeSchedule accepts the canonical JobSchedule plus shorthands (bare cron string, { cron }/{ expression }, { every }/{ intervalMs }, { at }). - On fire the flow runs with event:'schedule' + params:{ jobId, flowName, schedule }; the engine's start-condition gate still applies. - Error-isolated (a flow failure never crashes the job runner); per-flow job name so stop() cancels exactly one flow; job service resolved lazily per bind so adapter upgrades (interval → durable Db) are picked up; graceful degrade when automation/job service absent (kernel:ready + getService). Depends on com.objectstack.service.job so its adapter upgrade runs first. Tests: 17 (normalizeSchedule shapes/shorthands/rejects, schedule/cancel, config.schedule fallback, fire→context, error isolation, idempotency, plugin wiring + graceful degrade + lazy resolution). Builds clean (full DTS). Adds the package to the changeset fixed group. --- .changeset/config.json | 1 + .changeset/schedule-flow-trigger.md | 28 ++ .../plugins/plugin-trigger-schedule/README.md | 68 ++++ .../plugin-trigger-schedule/package.json | 55 ++++ .../plugin-trigger-schedule/src/index.ts | 10 + .../plugin-trigger-schedule/src/plugin.ts | 83 +++++ .../src/schedule-trigger.test.ts | 295 ++++++++++++++++++ .../src/schedule-trigger.ts | 208 ++++++++++++ .../plugin-trigger-schedule/tsconfig.json | 18 ++ pnpm-lock.yaml | 19 ++ 10 files changed, 785 insertions(+) create mode 100644 .changeset/schedule-flow-trigger.md create mode 100644 packages/plugins/plugin-trigger-schedule/README.md create mode 100644 packages/plugins/plugin-trigger-schedule/package.json create mode 100644 packages/plugins/plugin-trigger-schedule/src/index.ts create mode 100644 packages/plugins/plugin-trigger-schedule/src/plugin.ts create mode 100644 packages/plugins/plugin-trigger-schedule/src/schedule-trigger.test.ts create mode 100644 packages/plugins/plugin-trigger-schedule/src/schedule-trigger.ts create mode 100644 packages/plugins/plugin-trigger-schedule/tsconfig.json diff --git a/.changeset/config.json b/.changeset/config.json index c1a4cf6be..87fa3f1c8 100644 --- a/.changeset/config.json +++ b/.changeset/config.json @@ -38,6 +38,7 @@ "@objectstack/plugin-sharing", "@objectstack/plugin-webhooks", "@objectstack/plugin-trigger-record-change", + "@objectstack/plugin-trigger-schedule", "@objectstack/connector-mcp", "@objectstack/connector-rest", "@objectstack/connector-slack", diff --git a/.changeset/schedule-flow-trigger.md b/.changeset/schedule-flow-trigger.md new file mode 100644 index 000000000..6dd29c0a0 --- /dev/null +++ b/.changeset/schedule-flow-trigger.md @@ -0,0 +1,28 @@ +--- +"@objectstack/plugin-trigger-schedule": minor +--- + +Schedule flow trigger — auto-launch flows on a cron/interval/once schedule. + +The sibling of `@objectstack/plugin-trigger-record-change`: it completes the +*time-based* arm of the automation engine's `FlowTrigger` extension point. The +engine already parses a flow's start node into a `schedule` binding +(`flow.type === 'schedule'` or a start-node `config.schedule` descriptor); this +plugin registers the concrete `schedule` trigger and delegates timing to the +platform `IJobService` (the `'job'` service), so it stays adapter-agnostic — the +job service selects a cron-capable adapter (durable `DbJobAdapter` / +`CronJobAdapter`) for cron schedules and the interval adapter otherwise. + +- `normalizeSchedule` accepts the canonical `JobSchedule` plus shorthands (a + bare cron string, `{ cron }` / `{ expression }`, `{ every }` / `{ intervalMs }`, + `{ at }`). +- When a job fires, the flow runs with `event: 'schedule'` and + `params: { jobId, flowName, schedule }`; the engine's start-condition gate + still applies. +- Error-isolated (a flow failure never crashes the job runner); per-flow job + name so `stop()` cancels exactly one flow; the job service is resolved lazily + per bind so adapter upgrades are picked up; graceful degrade when the + automation or job service is absent. + +No engine change required — the `schedule` binding shipped with the +record-change trigger PR. diff --git a/packages/plugins/plugin-trigger-schedule/README.md b/packages/plugins/plugin-trigger-schedule/README.md new file mode 100644 index 000000000..d426d3035 --- /dev/null +++ b/packages/plugins/plugin-trigger-schedule/README.md @@ -0,0 +1,68 @@ +# @objectstack/plugin-trigger-schedule + +Auto-launch ObjectStack flows on a schedule (cron / interval / once). + +The automation engine ships the `FlowTrigger` extension point and the wiring +that turns a flow's `start` node into a normalized trigger binding — but the +*concrete* schedule trigger lives here, as a plugin. It delegates timing to the +platform `IJobService` (the `'job'` service), so it stays adapter-agnostic: the +job service selects a cron-capable adapter (e.g. the durable `DbJobAdapter` or +`CronJobAdapter`) for cron schedules and the interval adapter for the rest. + +This is the sibling of `@objectstack/plugin-trigger-record-change` — same +engine baseline, a different event source. + +## What it does + +A flow whose `start` node declares a schedule: + +```ts +{ + type: 'start', + config: { + schedule: { type: 'cron', expression: '0 1 * * *', timezone: 'UTC' }, + condition: "...", // optional start-condition gate + }, +} +// or simply: a flow with `type: 'schedule'` and a start-node schedule descriptor +``` + +auto-launches on that schedule — no manual `engine.execute()`. When it fires, +the flow runs with `event: 'schedule'` and `params: { jobId, flowName, schedule }` +in its context. + +### Schedule shapes + +`normalizeSchedule` accepts the canonical `JobSchedule` plus shorthands: + +| Input | Normalized | +| ---------------------------------------------- | ---------------------------------------- | +| `{ type: 'cron', expression, timezone? }` | cron | +| `'0 1 * * *'` (bare string) | `{ type: 'cron', expression: '0 1 * * *' }` | +| `{ cron }` / `{ expression }` | cron | +| `{ type: 'interval', intervalMs }` / `{ every }` | interval | +| `{ type: 'once', at }` / `{ at }` | once | + +## Usage + +```ts +import { AutomationServicePlugin } from '@objectstack/service-automation'; +import { JobServicePlugin } from '@objectstack/service-job'; +import { ScheduleTriggerPlugin } from '@objectstack/plugin-trigger-schedule'; + +kernel + .use(new AutomationServicePlugin()) // engine + flows + .use(new JobServicePlugin()) // the 'job' service (cron/interval/db) + .use(new ScheduleTriggerPlugin()); // ← makes schedule flows live +``` + +Depends on the job service plugin (`com.objectstack.service.job`) so its +`kernel:ready` adapter upgrade runs first; the job service is nonetheless +resolved lazily per bind, so adapter upgrades are always picked up. If the +automation or job service is unavailable, the plugin logs a warning and no-ops +rather than failing startup. + +## Error isolation + +A flow that throws during a scheduled run is logged and swallowed — it never +crashes the job runner. diff --git a/packages/plugins/plugin-trigger-schedule/package.json b/packages/plugins/plugin-trigger-schedule/package.json new file mode 100644 index 000000000..73893ccee --- /dev/null +++ b/packages/plugins/plugin-trigger-schedule/package.json @@ -0,0 +1,55 @@ +{ + "name": "@objectstack/plugin-trigger-schedule", + "version": "7.3.0", + "license": "Apache-2.0", + "description": "Schedule flow trigger for ObjectStack — auto-launches flows on a cron/interval/once schedule via the IJobService (ADR-0018)", + "main": "dist/index.js", + "types": "dist/index.d.ts", + "exports": { + ".": { + "types": "./dist/index.d.ts", + "import": "./dist/index.mjs", + "require": "./dist/index.js" + } + }, + "scripts": { + "build": "tsup --config ../../../tsup.config.ts", + "test": "vitest run --passWithNoTests" + }, + "dependencies": { + "@objectstack/core": "workspace:*", + "@objectstack/spec": "workspace:*" + }, + "devDependencies": { + "@types/node": "^25.9.1", + "typescript": "^6.0.3", + "vitest": "^4.1.7" + }, + "keywords": [ + "objectstack", + "plugin", + "automation", + "flow", + "trigger", + "schedule", + "cron" + ], + "author": "ObjectStack", + "repository": { + "type": "git", + "url": "https://github.com/objectstack-ai/framework.git", + "directory": "packages/plugins/plugin-trigger-schedule" + }, + "homepage": "https://objectstack.ai/docs", + "bugs": "https://github.com/objectstack-ai/framework/issues", + "publishConfig": { + "access": "public" + }, + "files": [ + "dist", + "README.md" + ], + "engines": { + "node": ">=18.0.0" + } +} diff --git a/packages/plugins/plugin-trigger-schedule/src/index.ts b/packages/plugins/plugin-trigger-schedule/src/index.ts new file mode 100644 index 000000000..a639fe7ca --- /dev/null +++ b/packages/plugins/plugin-trigger-schedule/src/index.ts @@ -0,0 +1,10 @@ +// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. + +export { ScheduleTriggerPlugin } from './plugin.js'; +export { ScheduleTrigger, normalizeSchedule } from './schedule-trigger.js'; +export type { + FlowTrigger, + FlowTriggerBinding, + JobServiceSurface, + TriggerLogger, +} from './schedule-trigger.js'; diff --git a/packages/plugins/plugin-trigger-schedule/src/plugin.ts b/packages/plugins/plugin-trigger-schedule/src/plugin.ts new file mode 100644 index 000000000..89cf794c3 --- /dev/null +++ b/packages/plugins/plugin-trigger-schedule/src/plugin.ts @@ -0,0 +1,83 @@ +// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. + +import type { Plugin, PluginContext } from '@objectstack/core'; +import { ScheduleTrigger } from './schedule-trigger.js'; +import type { FlowTrigger, JobServiceSurface } from './schedule-trigger.js'; + +/** + * The slice of the automation engine this plugin needs: register a trigger on + * its `FlowTrigger` extension point. Declared structurally so the plugin does + * not take a build dependency on `@objectstack/service-automation`. + */ +interface AutomationTriggerRegistry { + registerTrigger(trigger: FlowTrigger): void; + unregisterTrigger?(type: string): void; +} + +/** + * ScheduleTriggerPlugin + * + * Makes schedule-triggered flows actually fire. The automation engine ships the + * `FlowTrigger` wiring (it parses each flow's start node — `flow.type === + * 'schedule'` or a start-node `config.schedule` descriptor — into a binding and + * calls `trigger.start(...)`), but the *concrete* schedule trigger lives here as + * a plugin and delegates timing to the platform `IJobService` (the `'job'` + * service). This mirrors the connector / record-change split (engine baseline + + * trigger plugin). + * + * With this plugin (and a job service) installed, a flow whose start node + * declares `config: { schedule: { type: 'cron', expression: '0 1 * * *' } }` + * auto-launches on that schedule — no manual `engine.execute()`. + * + * Depends on the job service plugin so its `kernel:ready` upgrade (to the + * durable DbJobAdapter) runs before ours; the job service is nonetheless + * resolved lazily per `start()` so we always use its current adapter. + */ +export class ScheduleTriggerPlugin implements Plugin { + name = 'com.objectstack.trigger.schedule'; + type = 'standard'; + version = '7.3.0'; + dependencies = ['com.objectstack.service.job']; + + async init(ctx: PluginContext): Promise { + ctx.logger.info('Schedule trigger plugin initialized'); + } + + async start(ctx: PluginContext): Promise { + // The automation service + job service are resolvable once the kernel is + // ready (kernel:ready fires after AutomationServicePlugin.start() has + // pulled flows in and after the job service upgrades its adapter). + ctx.hook('kernel:ready', async () => { + const automation = this.resolveService(ctx, 'automation'); + if (!automation || typeof automation.registerTrigger !== 'function') { + ctx.logger.warn( + 'ScheduleTriggerPlugin: automation service not available — schedule trigger NOT installed', + ); + return; + } + + // Probe once for a clear startup warning; the trigger re-resolves + // lazily on each start() so adapter upgrades are always picked up. + if (!this.resolveService(ctx, 'job')) { + ctx.logger.warn( + 'ScheduleTriggerPlugin: job service not available — scheduled flows will not run until one is registered', + ); + } + + const trigger = new ScheduleTrigger( + () => this.resolveService(ctx, 'job'), + ctx.logger, + ); + automation.registerTrigger(trigger); + ctx.logger.info('ScheduleTriggerPlugin: schedule trigger registered'); + }); + } + + private resolveService(ctx: PluginContext, name: string): T | null { + try { + return ctx.getService(name) ?? null; + } catch { + return null; + } + } +} diff --git a/packages/plugins/plugin-trigger-schedule/src/schedule-trigger.test.ts b/packages/plugins/plugin-trigger-schedule/src/schedule-trigger.test.ts new file mode 100644 index 000000000..502245cd7 --- /dev/null +++ b/packages/plugins/plugin-trigger-schedule/src/schedule-trigger.test.ts @@ -0,0 +1,295 @@ +// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. + +import { describe, it, expect, vi } from 'vitest'; +import type { AutomationContext, JobSchedule, JobHandler } from '@objectstack/spec/contracts'; +import { + ScheduleTrigger, + normalizeSchedule, + type FlowTriggerBinding, + type JobServiceSurface, + type TriggerLogger, +} from './schedule-trigger.js'; +import { ScheduleTriggerPlugin } from './plugin.js'; + +// ─── Test doubles ─────────────────────────────────────────────────── + +interface ScheduledJob { + name: string; + schedule: JobSchedule; + handler: JobHandler; +} + +/** Fake IJobService slice: records schedule()/cancel() and can fire a job. */ +function fakeJobService() { + const jobs = new Map(); + const service: JobServiceSurface = { + async schedule(name, schedule, handler) { + jobs.set(name, { name, schedule, handler }); + }, + async cancel(name) { + jobs.delete(name); + }, + }; + return { + service, + jobs, + async fire(name: string, jobId = 'run1') { + await jobs.get(name)?.handler({ jobId }); + }, + }; +} + +function silentLogger(): TriggerLogger { + return { info: () => {}, warn: () => {}, debug: () => {} }; +} + +function binding(overrides: Partial = {}): FlowTriggerBinding { + return { + flowName: 'nightly_health_sweep', + schedule: { type: 'cron', expression: '0 1 * * *', timezone: 'UTC' }, + ...overrides, + }; +} + +const flush = () => new Promise((r) => setTimeout(r, 0)); + +// ─── normalizeSchedule ────────────────────────────────────────────── + +describe('normalizeSchedule', () => { + it('passes through canonical cron/interval/once shapes', () => { + expect(normalizeSchedule({ type: 'cron', expression: '* * * * *', timezone: 'UTC' })).toEqual({ + type: 'cron', + expression: '* * * * *', + timezone: 'UTC', + }); + expect(normalizeSchedule({ type: 'interval', intervalMs: 5000 })).toEqual({ + type: 'interval', + intervalMs: 5000, + }); + expect(normalizeSchedule({ type: 'once', at: '2026-01-01T00:00:00Z' })).toEqual({ + type: 'once', + at: '2026-01-01T00:00:00Z', + }); + }); + + it('treats a bare string as a cron expression', () => { + expect(normalizeSchedule('0 1 * * *')).toEqual({ type: 'cron', expression: '0 1 * * *' }); + }); + + it('accepts shorthands { cron } / { expression } / { every } / { at }', () => { + expect(normalizeSchedule({ cron: '*/5 * * * *' })).toEqual({ type: 'cron', expression: '*/5 * * * *' }); + expect(normalizeSchedule({ expression: '0 0 * * *' })).toEqual({ type: 'cron', expression: '0 0 * * *' }); + expect(normalizeSchedule({ every: 1000 })).toEqual({ type: 'interval', intervalMs: 1000 }); + expect(normalizeSchedule({ at: '2026-06-01T00:00:00Z' })).toEqual({ + type: 'once', + at: '2026-06-01T00:00:00Z', + }); + }); + + it('returns null for missing / unusable descriptors', () => { + expect(normalizeSchedule(undefined)).toBeNull(); + expect(normalizeSchedule(null)).toBeNull(); + expect(normalizeSchedule('')).toBeNull(); + expect(normalizeSchedule({ type: 'cron' })).toBeNull(); // no expression + expect(normalizeSchedule({ type: 'interval', intervalMs: 0 })).toBeNull(); + expect(normalizeSchedule({ type: 'once' })).toBeNull(); // no at + expect(normalizeSchedule(42)).toBeNull(); + }); +}); + +// ─── ScheduleTrigger ──────────────────────────────────────────────── + +describe('ScheduleTrigger', () => { + it('schedules a job for the flow with the normalized schedule', async () => { + const job = fakeJobService(); + const trigger = new ScheduleTrigger(() => job.service, silentLogger()); + + trigger.start(binding(), async () => {}); + await flush(); + + expect(job.jobs.size).toBe(1); + const scheduled = job.jobs.get('flow-schedule:nightly_health_sweep'); + expect(scheduled?.schedule).toEqual({ type: 'cron', expression: '0 1 * * *', timezone: 'UTC' }); + }); + + it('reads schedule from binding.config.schedule as a fallback', async () => { + const job = fakeJobService(); + const trigger = new ScheduleTrigger(() => job.service, silentLogger()); + + trigger.start( + binding({ schedule: undefined, config: { schedule: { type: 'interval', intervalMs: 2000 } } }), + async () => {}, + ); + await flush(); + + expect(job.jobs.get('flow-schedule:nightly_health_sweep')?.schedule).toEqual({ + type: 'interval', + intervalMs: 2000, + }); + }); + + it('does not schedule when no schedule descriptor is present', async () => { + const job = fakeJobService(); + const trigger = new ScheduleTrigger(() => job.service, silentLogger()); + + trigger.start(binding({ schedule: undefined }), async () => {}); + await flush(); + + expect(job.jobs.size).toBe(0); + }); + + it('does not schedule when the job service is unavailable', async () => { + const trigger = new ScheduleTrigger(() => null, silentLogger()); + expect(() => trigger.start(binding(), async () => {})).not.toThrow(); + }); + + it('fires the callback with a schedule context when the job runs', async () => { + const job = fakeJobService(); + const trigger = new ScheduleTrigger(() => job.service, silentLogger()); + const seen: AutomationContext[] = []; + + trigger.start(binding(), async (ctx) => { + seen.push(ctx); + }); + await flush(); + await job.fire('flow-schedule:nightly_health_sweep', 'run42'); + + expect(seen).toHaveLength(1); + expect(seen[0].event).toBe('schedule'); + expect(seen[0].params).toMatchObject({ jobId: 'run42', flowName: 'nightly_health_sweep' }); + }); + + it('isolates flow errors so the job runner is never broken', async () => { + const job = fakeJobService(); + const warn = vi.fn(); + const trigger = new ScheduleTrigger(() => job.service, { info: () => {}, warn, debug: () => {} }); + + trigger.start(binding(), async () => { + throw new Error('flow blew up'); + }); + await flush(); + + await expect(job.fire('flow-schedule:nightly_health_sweep')).resolves.toBeUndefined(); + expect(warn).toHaveBeenCalled(); + }); + + it('stop() cancels the flow\'s job', async () => { + const job = fakeJobService(); + const trigger = new ScheduleTrigger(() => job.service, silentLogger()); + + trigger.start(binding(), async () => {}); + await flush(); + expect(job.jobs.size).toBe(1); + + trigger.stop('nightly_health_sweep'); + await flush(); + expect(job.jobs.size).toBe(0); + }); + + it('re-binding the same flow is idempotent (one job)', async () => { + const job = fakeJobService(); + const trigger = new ScheduleTrigger(() => job.service, silentLogger()); + + trigger.start(binding(), async () => {}); + await flush(); + trigger.start(binding({ schedule: { type: 'interval', intervalMs: 9000 } }), async () => {}); + await flush(); + + expect(job.jobs.size).toBe(1); + expect(job.jobs.get('flow-schedule:nightly_health_sweep')?.schedule).toEqual({ + type: 'interval', + intervalMs: 9000, + }); + }); + + it('stop() on an unknown flow is a no-op', () => { + const job = fakeJobService(); + const trigger = new ScheduleTrigger(() => job.service, silentLogger()); + expect(() => trigger.stop('never_bound')).not.toThrow(); + }); +}); + +// ─── ScheduleTriggerPlugin ────────────────────────────────────────── + +describe('ScheduleTriggerPlugin', () => { + interface FakeCtx { + readyHandlers: Array<() => Promise | void>; + ctx: { + logger: TriggerLogger; + getService: (name: string) => T; + hook: (event: string, handler: () => Promise | void) => void; + }; + } + + function fakePluginCtx(services: Record): FakeCtx { + const readyHandlers: Array<() => Promise | void> = []; + return { + readyHandlers, + ctx: { + logger: silentLogger() as TriggerLogger, + getService(name: string): T { + if (!(name in services)) throw new Error(`no service '${name}'`); + return services[name] as T; + }, + hook(event: string, handler: () => Promise | void) { + if (event === 'kernel:ready') readyHandlers.push(handler); + }, + }, + }; + } + + it('registers the trigger when automation + job services exist', async () => { + const registerTrigger = vi.fn(); + const job = fakeJobService(); + const fake = fakePluginCtx({ automation: { registerTrigger }, job: job.service }); + + const plugin = new ScheduleTriggerPlugin(); + await plugin.start(fake.ctx as never); + await fake.readyHandlers[0](); + + expect(registerTrigger).toHaveBeenCalledTimes(1); + expect((registerTrigger.mock.calls[0][0] as ScheduleTrigger).type).toBe('schedule'); + }); + + it('still registers the trigger when the job service is missing (warns)', async () => { + const registerTrigger = vi.fn(); + const fake = fakePluginCtx({ automation: { registerTrigger } }); + + const plugin = new ScheduleTriggerPlugin(); + await plugin.start(fake.ctx as never); + await fake.readyHandlers[0](); + + // Registered so it can lazily pick up a job service later. + expect(registerTrigger).toHaveBeenCalledTimes(1); + }); + + it('skips gracefully when the automation service is absent', async () => { + const job = fakeJobService(); + const fake = fakePluginCtx({ job: job.service }); + + const plugin = new ScheduleTriggerPlugin(); + await plugin.start(fake.ctx as never); + await expect(fake.readyHandlers[0]()).resolves.toBeUndefined(); + }); + + it('lazily resolves the job service at fire time (adapter upgrade)', async () => { + const registerTrigger = vi.fn(); + const job = fakeJobService(); + // Job service appears AFTER the trigger is registered. + const services: Record = { automation: { registerTrigger } }; + const fake = fakePluginCtx(services); + + const plugin = new ScheduleTriggerPlugin(); + await plugin.start(fake.ctx as never); + await fake.readyHandlers[0](); + + // Now the job service becomes available. + services.job = job.service; + + const trigger = registerTrigger.mock.calls[0][0] as ScheduleTrigger; + trigger.start(binding(), async () => {}); + await flush(); + + expect(job.jobs.size).toBe(1); + }); +}); diff --git a/packages/plugins/plugin-trigger-schedule/src/schedule-trigger.ts b/packages/plugins/plugin-trigger-schedule/src/schedule-trigger.ts new file mode 100644 index 000000000..e95355291 --- /dev/null +++ b/packages/plugins/plugin-trigger-schedule/src/schedule-trigger.ts @@ -0,0 +1,208 @@ +// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. + +import type { AutomationContext } from '@objectstack/spec/contracts'; +import type { JobSchedule, JobHandler } from '@objectstack/spec/contracts'; + +/** + * Structural mirror of the automation engine's `FlowTriggerBinding` + * (service-automation/src/engine.ts). Declared locally so this trigger plugin + * stays decoupled from the automation package — same pattern the record-change + * trigger and the connector / messaging integrations use. The engine parses the + * flow's start node and hands us a binding whose `schedule` carries the + * cron/interval/once descriptor. + */ +export interface FlowTriggerBinding { + readonly flowName: string; + readonly object?: string; + readonly event?: string; + readonly condition?: string | { dialect?: string; source?: string; ast?: unknown }; + readonly schedule?: unknown; + readonly config?: Record; +} + +/** + * Structural mirror of the engine's `FlowTrigger` extension point. The engine + * calls {@link start} with a parsed binding + a callback that runs the flow, + * and {@link stop} when the flow is unregistered/disabled. + */ +export interface FlowTrigger { + readonly type: string; + start(binding: FlowTriggerBinding, callback: (ctx: AutomationContext) => Promise): void; + stop(flowName: string): void; +} + +/** + * The slice of `IJobService` this trigger needs: schedule a named job and + * cancel it. Typed structurally so the plugin depends on the spec contract + * shape, not a concrete adapter. + */ +export interface JobServiceSurface { + schedule(name: string, schedule: JobSchedule, handler: JobHandler): Promise; + cancel(name: string): Promise; +} + +/** Minimal logger surface (matches core's `ctx.logger`). */ +export interface TriggerLogger { + info(msg: string, ...args: unknown[]): void; + warn(msg: string, ...args: unknown[]): void; + debug?(msg: string, ...args: unknown[]): void; +} + +const JOB_PREFIX = 'flow-schedule'; + +/** + * Normalize a flow's raw `schedule` descriptor into a {@link JobSchedule}, or + * `null` if it can't be understood. Accepts the canonical + * `{ type: 'cron'|'interval'|'once', ... }` shape plus a few ergonomic + * shorthands (a bare cron string, `{ cron }`, `{ expression }`, `{ every }` / + * `{ intervalMs }`, `{ at }`). + */ +export function normalizeSchedule(raw: unknown): JobSchedule | null { + if (raw == null) return null; + + // Bare cron string, e.g. '0 1 * * *'. + if (typeof raw === 'string') { + const expr = raw.trim(); + return expr ? { type: 'cron', expression: expr } : null; + } + + if (typeof raw !== 'object') return null; + const s = raw as Record; + + const type = typeof s.type === 'string' ? s.type : undefined; + + if (type === 'cron' || (!type && (typeof s.cron === 'string' || typeof s.expression === 'string'))) { + const expression = + (typeof s.expression === 'string' && s.expression) || + (typeof s.cron === 'string' && s.cron) || + undefined; + if (!expression) return null; + const out: JobSchedule = { type: 'cron', expression }; + if (typeof s.timezone === 'string') out.timezone = s.timezone; + return out; + } + + if (type === 'interval' || (!type && (typeof s.intervalMs === 'number' || typeof s.every === 'number'))) { + const intervalMs = + (typeof s.intervalMs === 'number' && s.intervalMs) || + (typeof s.every === 'number' && s.every) || + undefined; + if (!intervalMs || intervalMs <= 0) return null; + return { type: 'interval', intervalMs }; + } + + if (type === 'once' || (!type && typeof s.at === 'string')) { + const at = typeof s.at === 'string' ? s.at : undefined; + if (!at) return null; + return { type: 'once', at }; + } + + return null; +} + +/** + * ScheduleTrigger + * + * Bridges the automation engine's {@link FlowTrigger} extension point to the + * platform {@link JobServiceSurface}. For each schedule-triggered flow the + * engine activates, it registers a job whose handler runs the flow; the job + * service owns the actual cron/interval/once timing (so this trigger stays + * adapter-agnostic — cron schedules need a cron-capable adapter, which the + * job service selects). + * + * The job service is resolved lazily (per `start()`) via the supplied accessor, + * so we always pick up the job service's *upgraded* adapter (e.g. the durable + * DbJobAdapter that replaces the bootstrap interval adapter on `kernel:ready`). + */ +export class ScheduleTrigger implements FlowTrigger { + readonly type = 'schedule'; + + private readonly getJobService: () => JobServiceSurface | null; + private readonly logger: TriggerLogger; + /** flowName → job name registered for it, so stop() can cancel it. */ + private readonly bound = new Map(); + + constructor(getJobService: () => JobServiceSurface | null, logger: TriggerLogger) { + this.getJobService = getJobService; + this.logger = logger; + } + + start(binding: FlowTriggerBinding, callback: (ctx: AutomationContext) => Promise): void { + const raw = binding.schedule ?? (binding.config as Record | undefined)?.schedule; + const schedule = normalizeSchedule(raw); + if (!schedule) { + this.logger.warn( + `[schedule] flow '${binding.flowName}' has no recognizable schedule descriptor — not bound`, + ); + return; + } + + const jobService = this.getJobService(); + if (!jobService || typeof jobService.schedule !== 'function') { + this.logger.warn( + `[schedule] job service unavailable — flow '${binding.flowName}' not scheduled`, + ); + return; + } + + // Idempotent: drop any prior schedule for this flow before re-binding + // (covers disable→enable cycles and hot reload). + this.stop(binding.flowName); + + const jobName = `${JOB_PREFIX}:${binding.flowName}`; + + const handler: JobHandler = async ({ jobId }) => { + try { + const ctx: AutomationContext = { + event: 'schedule', + params: { + jobId, + flowName: binding.flowName, + schedule, + }, + }; + await callback(ctx); + } catch (err) { + // Error isolation: a scheduled flow failure must not crash the + // job runner / ticker. Log and swallow. + this.logger.warn( + `[schedule] flow '${binding.flowName}' execution failed: ${(err as Error)?.message ?? String(err)}`, + ); + } + }; + + this.bound.set(binding.flowName, jobName); + // FlowTrigger.start is sync; the job service's schedule() is async. + // Fire-and-forget with error logging. + void Promise.resolve(jobService.schedule(jobName, schedule, handler)) + .then(() => { + this.logger.info( + `[schedule] bound flow '${binding.flowName}' → ${schedule.type}` + + (schedule.expression ? ` '${schedule.expression}'` : '') + + (schedule.intervalMs ? ` every ${schedule.intervalMs}ms` : '') + + (schedule.at ? ` at ${schedule.at}` : ''), + ); + }) + .catch((err) => { + this.bound.delete(binding.flowName); + this.logger.warn( + `[schedule] failed to schedule flow '${binding.flowName}': ${(err as Error)?.message ?? String(err)}`, + ); + }); + } + + stop(flowName: string): void { + const jobName = this.bound.get(flowName); + if (!jobName) return; + this.bound.delete(flowName); + const jobService = this.getJobService(); + if (!jobService || typeof jobService.cancel !== 'function') return; + void Promise.resolve(jobService.cancel(jobName)) + .then(() => this.logger.debug?.(`[schedule] unbound flow '${flowName}'`)) + .catch((err) => { + this.logger.warn( + `[schedule] failed to unbind flow '${flowName}': ${(err as Error)?.message ?? String(err)}`, + ); + }); + } +} diff --git a/packages/plugins/plugin-trigger-schedule/tsconfig.json b/packages/plugins/plugin-trigger-schedule/tsconfig.json new file mode 100644 index 000000000..f6a1e8bad --- /dev/null +++ b/packages/plugins/plugin-trigger-schedule/tsconfig.json @@ -0,0 +1,18 @@ +{ + "extends": "../../../tsconfig.json", + "compilerOptions": { + "outDir": "./dist", + "rootDir": "./src", + "types": [ + "node" + ] + }, + "include": [ + "src/**/*" + ], + "exclude": [ + "dist", + "node_modules", + "**/*.test.ts" + ] +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 050cde93c..9a0784a7e 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1489,6 +1489,25 @@ importers: specifier: ^4.1.7 version: 4.1.7(@opentelemetry/api@1.9.1)(@types/node@25.9.1)(@vitest/coverage-v8@4.1.7)(happy-dom@20.9.0)(msw@2.14.6(@types/node@25.9.1)(typescript@6.0.3))(vite@8.0.14(@types/node@25.9.1)(esbuild@0.28.0)(jiti@2.7.0)(tsx@4.22.3)(yaml@2.9.0)) + packages/plugins/plugin-trigger-schedule: + dependencies: + '@objectstack/core': + specifier: workspace:* + version: link:../../core + '@objectstack/spec': + specifier: workspace:* + version: link:../../spec + devDependencies: + '@types/node': + specifier: ^25.9.1 + version: 25.9.1 + typescript: + specifier: ^6.0.3 + version: 6.0.3 + vitest: + specifier: ^4.1.7 + version: 4.1.7(@opentelemetry/api@1.9.1)(@types/node@25.9.1)(@vitest/coverage-v8@4.1.7)(happy-dom@20.9.0)(msw@2.14.6(@types/node@25.9.1)(typescript@6.0.3))(vite@8.0.14(@types/node@25.9.1)(esbuild@0.28.0)(jiti@2.7.0)(tsx@4.22.3)(yaml@2.9.0)) + packages/plugins/plugin-webhooks: dependencies: '@objectstack/core':