Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions .changeset/messaging-triggers-capability-tokens.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
---
"@objectstack/service-messaging": minor
"@objectstack/cli": minor
"@objectstack/runtime": minor
---

Messaging + triggers capability tokens, and notify-by-email recipient resolution.

Make the `notify` flow node and auto-firing flows usable from a plain
`defineStack({ requires: [...] })` — no hand-wired plugin instances.

- **CLI / runtime — new capability tokens.** `messaging` →
`MessagingServicePlugin` (the `notify` node delivers to the inbox channel
instead of degrading to a logged no-op); `triggers` →
`RecordChangeTriggerPlugin` + `ScheduleTriggerPlugin` (autolaunched / schedule
flows actually fire — pair `triggers` with `job` for cron/interval). Wired
identically in the CLI `CAPABILITY_PROVIDERS` table and the runtime
`capability-loader`.
- **Inbox channel — notify-by-email.** Flows commonly address recipients by
email (e.g. `{record.assignee}`), but `sys_inbox_message` is keyed by user id.
The inbox channel now resolves an email-shaped recipient to its `sys_user.id`
(configurable via `InboxChannelOptions.userObject`), with a verbatim fallback
when the recipient is not email-shaped, no user matches, or the lookup fails —
so a failed resolution can never drop the row.
3 changes: 3 additions & 0 deletions packages/cli/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
"@objectstack/plugin-reports": "workspace:*",
"@objectstack/plugin-security": "workspace:*",
"@objectstack/plugin-sharing": "workspace:*",
"@objectstack/plugin-trigger-record-change": "workspace:*",
"@objectstack/plugin-trigger-schedule": "workspace:*",
"@objectstack/plugin-webhooks": "workspace:*",
"@objectstack/rest": "workspace:*",
"@objectstack/runtime": "workspace:^",
Expand All @@ -71,6 +73,7 @@
"@objectstack/service-external-datasource": "workspace:*",
"@objectstack/service-feed": "workspace:*",
"@objectstack/service-job": "workspace:*",
"@objectstack/service-messaging": "workspace:*",
"@objectstack/service-package": "workspace:*",
"@objectstack/service-queue": "workspace:*",
"@objectstack/service-realtime": "workspace:*",
Expand Down
24 changes: 24 additions & 0 deletions packages/cli/src/commands/serve.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1357,6 +1357,30 @@ export default class Serve extends Command {
export: 'JobServicePlugin',
nameMatch: ['service-job', 'JobServicePlugin'],
},
messaging: {
// Backs the `notify` flow node (ADR-0012): delivers to a user's
// channels (inbox by default → `sys_inbox_message` rows). Without
// this the notify node degrades to a logged no-op.
pkg: '@objectstack/service-messaging',
export: 'MessagingServicePlugin',
nameMatch: ['service-messaging', 'MessagingServicePlugin'],
},
triggers: {
// Makes autolaunched flows actually fire. The automation engine ships
// the `FlowTrigger` wiring; these plugins are the concrete triggers:
// record-change (ObjectQL lifecycle hooks) + schedule (cron/interval
// via the job service — so pair `triggers` with `job`).
pkg: '@objectstack/plugin-trigger-record-change',
export: 'RecordChangeTriggerPlugin',
nameMatch: ['trigger-record-change', 'RecordChangeTriggerPlugin'],
extras: [
{
pkg: '@objectstack/plugin-trigger-schedule',
export: 'ScheduleTriggerPlugin',
nameMatch: ['trigger-schedule', 'ScheduleTriggerPlugin'],
},
],
},
realtime: {
pkg: '@objectstack/service-realtime',
export: 'RealtimeServicePlugin',
Expand Down
13 changes: 13 additions & 0 deletions packages/runtime/src/cloud/capability-loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,19 @@ export const CAPABILITY_PROVIDERS: Record<string, CapabilitySpec> = {
pkg: '@objectstack/service-job',
export: 'JobServicePlugin',
},
messaging: {
// Backs the `notify` flow node (ADR-0012): delivers to a user's
// channels (inbox by default → `sys_inbox_message` rows).
pkg: '@objectstack/service-messaging',
export: 'MessagingServicePlugin',
},
triggers: {
// Concrete flow triggers — record-change (ObjectQL hooks) + schedule
// (cron/interval via the job service; pair `triggers` with `job`).
pkg: '@objectstack/plugin-trigger-record-change',
export: 'RecordChangeTriggerPlugin',
extras: [{ pkg: '@objectstack/plugin-trigger-schedule', export: 'ScheduleTriggerPlugin' }],
},
realtime: {
pkg: '@objectstack/service-realtime',
export: 'RealtimeServicePlugin',
Expand Down
62 changes: 59 additions & 3 deletions packages/services/service-messaging/src/inbox-channel.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,26 @@ function delivery(overrides: Partial<Delivery['notification']> = {}, recipient =
};
}

/** A fake data engine capturing inserts. */
function fakeData(insertImpl?: (obj: string, row: any) => any) {
/** A fake data engine capturing inserts (and optionally answering findOne). */
function fakeData(
insertImpl?: (obj: string, row: any) => any,
findOneImpl?: (obj: string, query: any) => any,
) {
const inserts: Array<{ object: string; row: any }> = [];
const findOnes: Array<{ object: string; query: any }> = [];
return {
inserts,
findOnes,
engine: {
async insert(object: string, row: any) {
inserts.push({ object, row });
return insertImpl ? insertImpl(object, row) : { id: 'inbox_1', ...row };
},
async find() { return []; },
async findOne() { return null; },
async findOne(object: string, query: any) {
findOnes.push({ object, query });
return findOneImpl ? findOneImpl(object, query) : null;
},
async update() { return {}; },
async delete() { return {}; },
} as any,
Expand Down Expand Up @@ -104,4 +112,52 @@ describe('inbox channel', () => {
const ch = createInboxChannel({ getData: () => undefined });
expect(ch.classifyError?.(new Error('x'))).toBe('retryable');
});

// ── email → user id resolution (notify-by-email lands in the right inbox) ──

it('resolves an email-shaped recipient to its sys_user id', async () => {
const data = fakeData(undefined, (obj, _q) =>
obj === 'sys_user' ? { id: 'usr_abc123' } : null,
);
const ch = createInboxChannel({ getData: () => data.engine });

await ch.send(silentCtx(), delivery({}, 'ada@example.com'));

expect(data.findOnes).toHaveLength(1);
expect(data.findOnes[0].object).toBe('sys_user');
expect(data.findOnes[0].query).toEqual({ where: { email: 'ada@example.com' }, fields: ['id'] });
expect(data.inserts[0].row.user_id).toBe('usr_abc123');
});

it('honours a userObject override for resolution', async () => {
const data = fakeData(undefined, () => ({ id: 'usr_xyz' }));
const ch = createInboxChannel({ getData: () => data.engine, userObject: 'crm_contact' });
await ch.send(silentCtx(), delivery({}, 'ada@example.com'));
expect(data.findOnes[0].object).toBe('crm_contact');
expect(data.inserts[0].row.user_id).toBe('usr_xyz');
});

it('keys by the recipient verbatim when it is not email-shaped (no lookup)', async () => {
const data = fakeData();
const ch = createInboxChannel({ getData: () => data.engine });
await ch.send(silentCtx(), delivery({}, 'usr_42'));
expect(data.findOnes).toHaveLength(0);
expect(data.inserts[0].row.user_id).toBe('usr_42');
});

it('falls back to the email verbatim when no user matches', async () => {
const data = fakeData(undefined, () => null);
const ch = createInboxChannel({ getData: () => data.engine });
await ch.send(silentCtx(), delivery({}, 'ghost@example.com'));
expect(data.findOnes).toHaveLength(1);
expect(data.inserts[0].row.user_id).toBe('ghost@example.com');
});

it('falls back to the email verbatim when the lookup throws', async () => {
const data = fakeData(undefined, () => { throw new Error('user table locked'); });
const ch = createInboxChannel({ getData: () => data.engine });
const result = await ch.send(silentCtx(), delivery({}, 'ada@example.com'));
expect(result.ok).toBe(true);
expect(data.inserts[0].row.user_id).toBe('ada@example.com');
});
});
50 changes: 49 additions & 1 deletion packages/services/service-messaging/src/inbox-channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
/** The object the inbox channel writes rows to. */
export const INBOX_OBJECT = 'sys_inbox_message';

/** The user identity object an email-shaped recipient is resolved against. */
export const USER_OBJECT = 'sys_user';

/** Cheap RFC-ish heuristic — "looks like an email" so we attempt id resolution. */
const EMAIL_SHAPE = /^[^\s@]+@[^\s@]+\.[^\s@]+$/;

export interface InboxChannelOptions {
/**
* Resolve the runtime data engine. Returns `undefined` when no data layer
Expand All @@ -22,6 +28,15 @@
getData(): IDataEngine | undefined;
/** Object name override (default {@link INBOX_OBJECT}). */
objectName?: string;
/**
* User identity object used to resolve an email-shaped recipient to its
* id (default {@link USER_OBJECT}). The inbox is keyed by user id, but
* flows commonly address recipients by email (e.g. an `assignee` field),
* so a recipient matching {@link EMAIL_SHAPE} is looked up by `email` and
* replaced with the matching user's `id`. Verbatim fallback applies when
* the recipient is not email-shaped, no user matches, or the lookup fails.
*/
userObject?: string;
/** Clock injection for deterministic tests. Defaults to `new Date()`. */
now?(): string;
}
Expand All @@ -36,8 +51,39 @@
*/
export function createInboxChannel(opts: InboxChannelOptions): MessagingChannel {
const objectName = opts.objectName ?? INBOX_OBJECT;
const userObject = opts.userObject ?? USER_OBJECT;
const now = opts.now ?? (() => new Date().toISOString());

/**
* Map an email-shaped recipient to its user id; return the recipient
* unchanged for non-email recipients, on no match, or on any lookup error
* (the inbox row is best-effort keyed and must never fail on resolution).
*/
async function resolveRecipient(
ctx: MessagingChannelContext,
data: IDataEngine,
recipient: string,
): Promise<string> {
if (!EMAIL_SHAPE.test(recipient)) return recipient;

Check failure

Code scanning / CodeQL

Polynomial regular expression used on uncontrolled data High

This
regular expression
that depends on
library input
may run slow on strings starting with '!@!.' and with many repetitions of '!.'.
try {
const user = await data.findOne(userObject, {
where: { email: recipient },
fields: ['id'],
});
const id = user?.id;
if (id != null && String(id).length > 0) return String(id);
ctx.logger.warn(
`[inbox] no '${userObject}' matched email '${recipient}'; keying inbox row by the email verbatim`,
);
return recipient;
} catch (err) {
ctx.logger.warn(
`[inbox] failed to resolve '${recipient}' to a user id (${(err as Error).message}); keying by the email verbatim`,
);
return recipient;
}
}

return {
id: 'inbox',

Expand All @@ -52,8 +98,10 @@
return { ok: true };
}

const userId = await resolveRecipient(ctx, data, delivery.recipient);

const row: Record<string, unknown> = {
user_id: delivery.recipient,
user_id: userId,
topic: n.topic,
title: n.title,
body_md: n.body,
Expand Down
9 changes: 9 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading