Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .changeset/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
"@objectstack/plugin-sharing",
"@objectstack/plugin-webhooks",
"@objectstack/plugin-trigger-record-change",
"@objectstack/plugin-trigger-schedule",
"@objectstack/connector-mcp",
"@objectstack/connector-rest",
"@objectstack/connector-slack",
Expand Down
28 changes: 28 additions & 0 deletions .changeset/schedule-flow-trigger.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
---
"@objectstack/plugin-trigger-schedule": minor
---

Schedule flow trigger — auto-launch flows on a cron/interval/once schedule.

The sibling of `@objectstack/plugin-trigger-record-change`: it completes the
*time-based* arm of the automation engine's `FlowTrigger` extension point. The
engine already parses a flow's start node into a `schedule` binding
(`flow.type === 'schedule'` or a start-node `config.schedule` descriptor); this
plugin registers the concrete `schedule` trigger and delegates timing to the
platform `IJobService` (the `'job'` service), so it stays adapter-agnostic — the
job service selects a cron-capable adapter (durable `DbJobAdapter` /
`CronJobAdapter`) for cron schedules and the interval adapter otherwise.

- `normalizeSchedule` accepts the canonical `JobSchedule` plus shorthands (a
bare cron string, `{ cron }` / `{ expression }`, `{ every }` / `{ intervalMs }`,
`{ at }`).
- When a job fires, the flow runs with `event: 'schedule'` and
`params: { jobId, flowName, schedule }`; the engine's start-condition gate
still applies.
- Error-isolated (a flow failure never crashes the job runner); per-flow job
name so `stop()` cancels exactly one flow; the job service is resolved lazily
per bind so adapter upgrades are picked up; graceful degrade when the
automation or job service is absent.

No engine change required — the `schedule` binding shipped with the
record-change trigger PR.
68 changes: 68 additions & 0 deletions packages/plugins/plugin-trigger-schedule/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# @objectstack/plugin-trigger-schedule

Auto-launch ObjectStack flows on a schedule (cron / interval / once).

The automation engine ships the `FlowTrigger` extension point and the wiring
that turns a flow's `start` node into a normalized trigger binding — but the
*concrete* schedule trigger lives here, as a plugin. It delegates timing to the
platform `IJobService` (the `'job'` service), so it stays adapter-agnostic: the
job service selects a cron-capable adapter (e.g. the durable `DbJobAdapter` or
`CronJobAdapter`) for cron schedules and the interval adapter for the rest.

This is the sibling of `@objectstack/plugin-trigger-record-change` — same
engine baseline, a different event source.

## What it does

A flow whose `start` node declares a schedule:

```ts
{
type: 'start',
config: {
schedule: { type: 'cron', expression: '0 1 * * *', timezone: 'UTC' },
condition: "...", // optional start-condition gate
},
}
// or simply: a flow with `type: 'schedule'` and a start-node schedule descriptor
```

auto-launches on that schedule — no manual `engine.execute()`. When it fires,
the flow runs with `event: 'schedule'` and `params: { jobId, flowName, schedule }`
in its context.

### Schedule shapes

`normalizeSchedule` accepts the canonical `JobSchedule` plus shorthands:

| Input | Normalized |
| ---------------------------------------------- | ---------------------------------------- |
| `{ type: 'cron', expression, timezone? }` | cron |
| `'0 1 * * *'` (bare string) | `{ type: 'cron', expression: '0 1 * * *' }` |
| `{ cron }` / `{ expression }` | cron |
| `{ type: 'interval', intervalMs }` / `{ every }` | interval |
| `{ type: 'once', at }` / `{ at }` | once |

## Usage

```ts
import { AutomationServicePlugin } from '@objectstack/service-automation';
import { JobServicePlugin } from '@objectstack/service-job';
import { ScheduleTriggerPlugin } from '@objectstack/plugin-trigger-schedule';

kernel
.use(new AutomationServicePlugin()) // engine + flows
.use(new JobServicePlugin()) // the 'job' service (cron/interval/db)
.use(new ScheduleTriggerPlugin()); // ← makes schedule flows live
```

Depends on the job service plugin (`com.objectstack.service.job`) so its
`kernel:ready` adapter upgrade runs first; the job service is nonetheless
resolved lazily per bind, so adapter upgrades are always picked up. If the
automation or job service is unavailable, the plugin logs a warning and no-ops
rather than failing startup.

## Error isolation

A flow that throws during a scheduled run is logged and swallowed — it never
crashes the job runner.
55 changes: 55 additions & 0 deletions packages/plugins/plugin-trigger-schedule/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
{
"name": "@objectstack/plugin-trigger-schedule",
"version": "7.3.0",
"license": "Apache-2.0",
"description": "Schedule flow trigger for ObjectStack — auto-launches flows on a cron/interval/once schedule via the IJobService (ADR-0018)",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"exports": {
".": {
"types": "./dist/index.d.ts",
"import": "./dist/index.mjs",
"require": "./dist/index.js"
}
},
"scripts": {
"build": "tsup --config ../../../tsup.config.ts",
"test": "vitest run --passWithNoTests"
},
"dependencies": {
"@objectstack/core": "workspace:*",
"@objectstack/spec": "workspace:*"
},
"devDependencies": {
"@types/node": "^25.9.1",
"typescript": "^6.0.3",
"vitest": "^4.1.7"
},
"keywords": [
"objectstack",
"plugin",
"automation",
"flow",
"trigger",
"schedule",
"cron"
],
"author": "ObjectStack",
"repository": {
"type": "git",
"url": "https://github.com/objectstack-ai/framework.git",
"directory": "packages/plugins/plugin-trigger-schedule"
},
"homepage": "https://objectstack.ai/docs",
"bugs": "https://github.com/objectstack-ai/framework/issues",
"publishConfig": {
"access": "public"
},
"files": [
"dist",
"README.md"
],
"engines": {
"node": ">=18.0.0"
}
}
10 changes: 10 additions & 0 deletions packages/plugins/plugin-trigger-schedule/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license.

export { ScheduleTriggerPlugin } from './plugin.js';
export { ScheduleTrigger, normalizeSchedule } from './schedule-trigger.js';
export type {
FlowTrigger,
FlowTriggerBinding,
JobServiceSurface,
TriggerLogger,
} from './schedule-trigger.js';
83 changes: 83 additions & 0 deletions packages/plugins/plugin-trigger-schedule/src/plugin.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license.

import type { Plugin, PluginContext } from '@objectstack/core';
import { ScheduleTrigger } from './schedule-trigger.js';
import type { FlowTrigger, JobServiceSurface } from './schedule-trigger.js';

/**
* The slice of the automation engine this plugin needs: register a trigger on
* its `FlowTrigger` extension point. Declared structurally so the plugin does
* not take a build dependency on `@objectstack/service-automation`.
*/
interface AutomationTriggerRegistry {
registerTrigger(trigger: FlowTrigger): void;
unregisterTrigger?(type: string): void;
}

/**
* ScheduleTriggerPlugin
*
* Makes schedule-triggered flows actually fire. The automation engine ships the
* `FlowTrigger` wiring (it parses each flow's start node — `flow.type ===
* 'schedule'` or a start-node `config.schedule` descriptor — into a binding and
* calls `trigger.start(...)`), but the *concrete* schedule trigger lives here as
* a plugin and delegates timing to the platform `IJobService` (the `'job'`
* service). This mirrors the connector / record-change split (engine baseline +
* trigger plugin).
*
* With this plugin (and a job service) installed, a flow whose start node
* declares `config: { schedule: { type: 'cron', expression: '0 1 * * *' } }`
* auto-launches on that schedule — no manual `engine.execute()`.
*
* Depends on the job service plugin so its `kernel:ready` upgrade (to the
* durable DbJobAdapter) runs before ours; the job service is nonetheless
* resolved lazily per `start()` so we always use its current adapter.
*/
export class ScheduleTriggerPlugin implements Plugin {
name = 'com.objectstack.trigger.schedule';
type = 'standard';
version = '7.3.0';
dependencies = ['com.objectstack.service.job'];

async init(ctx: PluginContext): Promise<void> {
ctx.logger.info('Schedule trigger plugin initialized');
}

async start(ctx: PluginContext): Promise<void> {
// The automation service + job service are resolvable once the kernel is
// ready (kernel:ready fires after AutomationServicePlugin.start() has
// pulled flows in and after the job service upgrades its adapter).
ctx.hook('kernel:ready', async () => {
const automation = this.resolveService<AutomationTriggerRegistry>(ctx, 'automation');
if (!automation || typeof automation.registerTrigger !== 'function') {
ctx.logger.warn(
'ScheduleTriggerPlugin: automation service not available — schedule trigger NOT installed',
);
return;
}

// Probe once for a clear startup warning; the trigger re-resolves
// lazily on each start() so adapter upgrades are always picked up.
if (!this.resolveService<JobServiceSurface>(ctx, 'job')) {
ctx.logger.warn(
'ScheduleTriggerPlugin: job service not available — scheduled flows will not run until one is registered',
);
}

const trigger = new ScheduleTrigger(
() => this.resolveService<JobServiceSurface>(ctx, 'job'),
ctx.logger,
);
automation.registerTrigger(trigger);
ctx.logger.info('ScheduleTriggerPlugin: schedule trigger registered');
});
}

private resolveService<T>(ctx: PluginContext, name: string): T | null {
try {
return ctx.getService<T>(name) ?? null;
} catch {
return null;
}
}
}
Loading
Loading