From 4200057e5ccd1fedb3b7b3ea5bd2a65f1d0592a6 Mon Sep 17 00:00:00 2001 From: Jack Zhuang <277994282+os-zhuang@users.noreply.github.com> Date: Mon, 1 Jun 2026 08:31:46 +0800 Subject: [PATCH 1/2] feat(messaging,cli): wire messaging + triggers capability tokens; resolve notify-by-email to user id MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two related platform improvements that make the `notify` flow node and auto-firing flows usable from a plain `defineStack({ requires: [...] })`, without hand-wiring plugin instances. CLI / runtime — new capability tokens • `messaging` → MessagingServicePlugin, so the `notify` node delivers to a user's inbox channel (sys_inbox_message rows) instead of degrading to a logged no-op. • `triggers` → RecordChangeTriggerPlugin (+ ScheduleTriggerPlugin extra), so autolaunched / schedule flows actually fire. The automation engine ships the FlowTrigger wiring; these plugins are the concrete triggers. Pair `triggers` with `job` for cron/interval schedules. Mirrored in both the CLI CAPABILITY_PROVIDERS table (serve.ts) and the runtime capability-loader so dev `objectstack serve` and the cloud artifact kernel resolve them identically. CLI package.json gains the workspace deps so the dynamic imports resolve. Inbox channel — notify-by-email lands in the right inbox Flows commonly address recipients by email (e.g. a `{record.assignee}` field), but sys_inbox_message is keyed by user id. The inbox channel now resolves an email-shaped recipient to its sys_user.id via findOne, with a verbatim fallback when the recipient is not email-shaped, no user matches, or the lookup fails — so a failed resolution can never drop the row. Adds InboxChannelOptions.userObject for the lookup target. Fully covered by new unit tests. Co-Authored-By: Claude Opus 4.8 (1M context) --- packages/cli/package.json | 3 + packages/cli/src/commands/serve.ts | 24 +++++++ .../runtime/src/cloud/capability-loader.ts | 13 ++++ .../src/inbox-channel.test.ts | 62 ++++++++++++++++++- .../service-messaging/src/inbox-channel.ts | 50 ++++++++++++++- pnpm-lock.yaml | 9 +++ 6 files changed, 157 insertions(+), 4 deletions(-) diff --git a/packages/cli/package.json b/packages/cli/package.json index 80edae019..687b2640a 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -61,6 +61,8 @@ "@objectstack/plugin-reports": "workspace:*", "@objectstack/plugin-security": "workspace:*", "@objectstack/plugin-sharing": "workspace:*", + "@objectstack/plugin-trigger-record-change": "workspace:*", + "@objectstack/plugin-trigger-schedule": "workspace:*", "@objectstack/plugin-webhooks": "workspace:*", "@objectstack/rest": "workspace:*", "@objectstack/runtime": "workspace:^", @@ -71,6 +73,7 @@ "@objectstack/service-external-datasource": "workspace:*", "@objectstack/service-feed": "workspace:*", "@objectstack/service-job": "workspace:*", + "@objectstack/service-messaging": "workspace:*", "@objectstack/service-package": "workspace:*", "@objectstack/service-queue": "workspace:*", "@objectstack/service-realtime": "workspace:*", diff --git a/packages/cli/src/commands/serve.ts b/packages/cli/src/commands/serve.ts index 0c3d3db31..705d2a697 100644 --- a/packages/cli/src/commands/serve.ts +++ b/packages/cli/src/commands/serve.ts @@ -1357,6 +1357,30 @@ export default class Serve extends Command { export: 'JobServicePlugin', nameMatch: ['service-job', 'JobServicePlugin'], }, + messaging: { + // Backs the `notify` flow node (ADR-0012): delivers to a user's + // channels (inbox by default → `sys_inbox_message` rows). Without + // this the notify node degrades to a logged no-op. + pkg: '@objectstack/service-messaging', + export: 'MessagingServicePlugin', + nameMatch: ['service-messaging', 'MessagingServicePlugin'], + }, + triggers: { + // Makes autolaunched flows actually fire. The automation engine ships + // the `FlowTrigger` wiring; these plugins are the concrete triggers: + // record-change (ObjectQL lifecycle hooks) + schedule (cron/interval + // via the job service — so pair `triggers` with `job`). + pkg: '@objectstack/plugin-trigger-record-change', + export: 'RecordChangeTriggerPlugin', + nameMatch: ['trigger-record-change', 'RecordChangeTriggerPlugin'], + extras: [ + { + pkg: '@objectstack/plugin-trigger-schedule', + export: 'ScheduleTriggerPlugin', + nameMatch: ['trigger-schedule', 'ScheduleTriggerPlugin'], + }, + ], + }, realtime: { pkg: '@objectstack/service-realtime', export: 'RealtimeServicePlugin', diff --git a/packages/runtime/src/cloud/capability-loader.ts b/packages/runtime/src/cloud/capability-loader.ts index c1d22dce0..4927d8b47 100644 --- a/packages/runtime/src/cloud/capability-loader.ts +++ b/packages/runtime/src/cloud/capability-loader.ts @@ -81,6 +81,19 @@ export const CAPABILITY_PROVIDERS: Record = { pkg: '@objectstack/service-job', export: 'JobServicePlugin', }, + messaging: { + // Backs the `notify` flow node (ADR-0012): delivers to a user's + // channels (inbox by default → `sys_inbox_message` rows). + pkg: '@objectstack/service-messaging', + export: 'MessagingServicePlugin', + }, + triggers: { + // Concrete flow triggers — record-change (ObjectQL hooks) + schedule + // (cron/interval via the job service; pair `triggers` with `job`). + pkg: '@objectstack/plugin-trigger-record-change', + export: 'RecordChangeTriggerPlugin', + extras: [{ pkg: '@objectstack/plugin-trigger-schedule', export: 'ScheduleTriggerPlugin' }], + }, realtime: { pkg: '@objectstack/service-realtime', export: 'RealtimeServicePlugin', diff --git a/packages/services/service-messaging/src/inbox-channel.test.ts b/packages/services/service-messaging/src/inbox-channel.test.ts index d5f3e0836..44773290d 100644 --- a/packages/services/service-messaging/src/inbox-channel.test.ts +++ b/packages/services/service-messaging/src/inbox-channel.test.ts @@ -24,18 +24,26 @@ function delivery(overrides: Partial = {}, recipient = }; } -/** A fake data engine capturing inserts. */ -function fakeData(insertImpl?: (obj: string, row: any) => any) { +/** A fake data engine capturing inserts (and optionally answering findOne). */ +function fakeData( + insertImpl?: (obj: string, row: any) => any, + findOneImpl?: (obj: string, query: any) => any, +) { const inserts: Array<{ object: string; row: any }> = []; + const findOnes: Array<{ object: string; query: any }> = []; return { inserts, + findOnes, engine: { async insert(object: string, row: any) { inserts.push({ object, row }); return insertImpl ? insertImpl(object, row) : { id: 'inbox_1', ...row }; }, async find() { return []; }, - async findOne() { return null; }, + async findOne(object: string, query: any) { + findOnes.push({ object, query }); + return findOneImpl ? findOneImpl(object, query) : null; + }, async update() { return {}; }, async delete() { return {}; }, } as any, @@ -104,4 +112,52 @@ describe('inbox channel', () => { const ch = createInboxChannel({ getData: () => undefined }); expect(ch.classifyError?.(new Error('x'))).toBe('retryable'); }); + + // ── email → user id resolution (notify-by-email lands in the right inbox) ── + + it('resolves an email-shaped recipient to its sys_user id', async () => { + const data = fakeData(undefined, (obj, _q) => + obj === 'sys_user' ? { id: 'usr_abc123' } : null, + ); + const ch = createInboxChannel({ getData: () => data.engine }); + + await ch.send(silentCtx(), delivery({}, 'ada@example.com')); + + expect(data.findOnes).toHaveLength(1); + expect(data.findOnes[0].object).toBe('sys_user'); + expect(data.findOnes[0].query).toEqual({ where: { email: 'ada@example.com' }, fields: ['id'] }); + expect(data.inserts[0].row.user_id).toBe('usr_abc123'); + }); + + it('honours a userObject override for resolution', async () => { + const data = fakeData(undefined, () => ({ id: 'usr_xyz' })); + const ch = createInboxChannel({ getData: () => data.engine, userObject: 'crm_contact' }); + await ch.send(silentCtx(), delivery({}, 'ada@example.com')); + expect(data.findOnes[0].object).toBe('crm_contact'); + expect(data.inserts[0].row.user_id).toBe('usr_xyz'); + }); + + it('keys by the recipient verbatim when it is not email-shaped (no lookup)', async () => { + const data = fakeData(); + const ch = createInboxChannel({ getData: () => data.engine }); + await ch.send(silentCtx(), delivery({}, 'usr_42')); + expect(data.findOnes).toHaveLength(0); + expect(data.inserts[0].row.user_id).toBe('usr_42'); + }); + + it('falls back to the email verbatim when no user matches', async () => { + const data = fakeData(undefined, () => null); + const ch = createInboxChannel({ getData: () => data.engine }); + await ch.send(silentCtx(), delivery({}, 'ghost@example.com')); + expect(data.findOnes).toHaveLength(1); + expect(data.inserts[0].row.user_id).toBe('ghost@example.com'); + }); + + it('falls back to the email verbatim when the lookup throws', async () => { + const data = fakeData(undefined, () => { throw new Error('user table locked'); }); + const ch = createInboxChannel({ getData: () => data.engine }); + const result = await ch.send(silentCtx(), delivery({}, 'ada@example.com')); + expect(result.ok).toBe(true); + expect(data.inserts[0].row.user_id).toBe('ada@example.com'); + }); }); diff --git a/packages/services/service-messaging/src/inbox-channel.ts b/packages/services/service-messaging/src/inbox-channel.ts index 5b4be7fb7..f62282abc 100644 --- a/packages/services/service-messaging/src/inbox-channel.ts +++ b/packages/services/service-messaging/src/inbox-channel.ts @@ -12,6 +12,12 @@ import type { /** The object the inbox channel writes rows to. */ export const INBOX_OBJECT = 'sys_inbox_message'; +/** The user identity object an email-shaped recipient is resolved against. */ +export const USER_OBJECT = 'sys_user'; + +/** Cheap RFC-ish heuristic — "looks like an email" so we attempt id resolution. */ +const EMAIL_SHAPE = /^[^\s@]+@[^\s@]+\.[^\s@]+$/; + export interface InboxChannelOptions { /** * Resolve the runtime data engine. Returns `undefined` when no data layer @@ -22,6 +28,15 @@ export interface InboxChannelOptions { getData(): IDataEngine | undefined; /** Object name override (default {@link INBOX_OBJECT}). */ objectName?: string; + /** + * User identity object used to resolve an email-shaped recipient to its + * id (default {@link USER_OBJECT}). The inbox is keyed by user id, but + * flows commonly address recipients by email (e.g. an `assignee` field), + * so a recipient matching {@link EMAIL_SHAPE} is looked up by `email` and + * replaced with the matching user's `id`. Verbatim fallback applies when + * the recipient is not email-shaped, no user matches, or the lookup fails. + */ + userObject?: string; /** Clock injection for deterministic tests. Defaults to `new Date()`. */ now?(): string; } @@ -36,8 +51,39 @@ export interface InboxChannelOptions { */ export function createInboxChannel(opts: InboxChannelOptions): MessagingChannel { const objectName = opts.objectName ?? INBOX_OBJECT; + const userObject = opts.userObject ?? USER_OBJECT; const now = opts.now ?? (() => new Date().toISOString()); + /** + * Map an email-shaped recipient to its user id; return the recipient + * unchanged for non-email recipients, on no match, or on any lookup error + * (the inbox row is best-effort keyed and must never fail on resolution). + */ + async function resolveRecipient( + ctx: MessagingChannelContext, + data: IDataEngine, + recipient: string, + ): Promise { + if (!EMAIL_SHAPE.test(recipient)) return recipient; + try { + const user = await data.findOne(userObject, { + where: { email: recipient }, + fields: ['id'], + }); + const id = user?.id; + if (id != null && String(id).length > 0) return String(id); + ctx.logger.warn( + `[inbox] no '${userObject}' matched email '${recipient}'; keying inbox row by the email verbatim`, + ); + return recipient; + } catch (err) { + ctx.logger.warn( + `[inbox] failed to resolve '${recipient}' to a user id (${(err as Error).message}); keying by the email verbatim`, + ); + return recipient; + } + } + return { id: 'inbox', @@ -52,8 +98,10 @@ export function createInboxChannel(opts: InboxChannelOptions): MessagingChannel return { ok: true }; } + const userId = await resolveRecipient(ctx, data, delivery.recipient); + const row: Record = { - user_id: delivery.recipient, + user_id: userId, topic: n.topic, title: n.title, body_md: n.body, diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 9a0784a7e..a401ff107 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -499,6 +499,12 @@ importers: '@objectstack/plugin-sharing': specifier: workspace:* version: link:../plugins/plugin-sharing + '@objectstack/plugin-trigger-record-change': + specifier: workspace:* + version: link:../plugins/plugin-trigger-record-change + '@objectstack/plugin-trigger-schedule': + specifier: workspace:* + version: link:../plugins/plugin-trigger-schedule '@objectstack/plugin-webhooks': specifier: workspace:* version: link:../plugins/plugin-webhooks @@ -529,6 +535,9 @@ importers: '@objectstack/service-job': specifier: workspace:* version: link:../services/service-job + '@objectstack/service-messaging': + specifier: workspace:* + version: link:../services/service-messaging '@objectstack/service-package': specifier: workspace:* version: link:../services/service-package From 065d7d9f1abc5a3d091d91bbaa720d7dbaf231ba Mon Sep 17 00:00:00 2001 From: Jack Zhuang <277994282+os-zhuang@users.noreply.github.com> Date: Mon, 1 Jun 2026 08:33:46 +0800 Subject: [PATCH 2/2] chore: add changeset for messaging + triggers capability tokens Co-Authored-By: Claude Opus 4.8 (1M context) --- .../messaging-triggers-capability-tokens.md | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) create mode 100644 .changeset/messaging-triggers-capability-tokens.md diff --git a/.changeset/messaging-triggers-capability-tokens.md b/.changeset/messaging-triggers-capability-tokens.md new file mode 100644 index 000000000..c9125d8a9 --- /dev/null +++ b/.changeset/messaging-triggers-capability-tokens.md @@ -0,0 +1,24 @@ +--- +"@objectstack/service-messaging": minor +"@objectstack/cli": minor +"@objectstack/runtime": minor +--- + +Messaging + triggers capability tokens, and notify-by-email recipient resolution. + +Make the `notify` flow node and auto-firing flows usable from a plain +`defineStack({ requires: [...] })` — no hand-wired plugin instances. + +- **CLI / runtime — new capability tokens.** `messaging` → + `MessagingServicePlugin` (the `notify` node delivers to the inbox channel + instead of degrading to a logged no-op); `triggers` → + `RecordChangeTriggerPlugin` + `ScheduleTriggerPlugin` (autolaunched / schedule + flows actually fire — pair `triggers` with `job` for cron/interval). Wired + identically in the CLI `CAPABILITY_PROVIDERS` table and the runtime + `capability-loader`. +- **Inbox channel — notify-by-email.** Flows commonly address recipients by + email (e.g. `{record.assignee}`), but `sys_inbox_message` is keyed by user id. + The inbox channel now resolves an email-shaped recipient to its `sys_user.id` + (configurable via `InboxChannelOptions.userObject`), with a verbatim fallback + when the recipient is not email-shaped, no user matches, or the lookup fails — + so a failed resolution can never drop the row.