Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 115 additions & 37 deletions guides/events/event-queues-new.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ status: released

# Transactional Event Queues

Persist events in the same database transaction as your business data. Process them asynchronously with retries, ordering, and a dead letter queue.
Persist events in the same database transaction as your business data. Process them asynchronously with retries and a dead letter queue.
{.subtitle}

{{ $frontmatter.synopsis }}
Expand Down Expand Up @@ -48,7 +48,7 @@ The core principle is straightforward:
1. Instead of executing side effects directly, you write an event message into a database table — **within the current transaction**.
2. Once the transaction commits, a background runner reads pending messages and dispatches them to the respective service.
3. If processing succeeds, the message is deleted. If it fails, the system retries with exponentially increasing delays.
4. After a configurable maximum number of attempts, the message is moved to the dead letter queue for manual intervention.
4. After a configurable maximum number of attempts, the message becomes a dead letter and requires manual intervention.

```mermaid
sequenceDiagram
Expand Down Expand Up @@ -82,6 +82,64 @@ Because the event message and your business data share the same database transac
- **No lost events** — if the transaction commits, the event is guaranteed to be processed eventually.


### Callback Events { #callback-events }

When a queued message finishes processing, the system emits a callback event on the same service:

- **`<event>/#succeeded`** — emitted when processing completes successfully
- **`<event>/#failed`** — emitted when the message becomes a dead letter (after all retries are exhausted)

Callback events allow you to react to outcomes — for example, to trigger compensation logic after a failure or to replicate data after a successful remote call.

- **Queued transactionally** — callback events are written to the queue in the same transaction as the main event processing, so they only exist if processing commits.
- **Processed asynchronously** — like any queued event, they run in their own transaction afterwards.

> [!note]
> Callback events are emitted by the background runner, not during the original request that queued the message.


### Single-Tenancy vs Multi-Tenancy { #tenancy }

Event queues work in both single-tenant and multi-tenant deployments.

[Learn more about multitenancy.](../multitenancy/){.learn-more}

#### Single-Tenancy

Messages are stored in queue, which reside in the application database. A background runner starts when your application starts and processes messages continuously. When a transaction commits, processing is triggered immediately.

#### Multi-Tenancy

Each tenant has its own database. To coordinate across tenants, the system writes lightweight markers to a central (provider) database whenever messages are queued. A central runner periodically checks these markers and triggers processing for each tenant that has pending work.

This ensures that:
- Pending messages are recovered after application restarts or shutdowns
- Each tenant's messages are processed independently


### The Data Model { #data-model }

Your database model is automatically extended with the following entity, which is used by the persistent queue:

```cds
namespace cds.outbox;

entity Messages {
key ID : UUID; // Unique message identifier
timestamp : Timestamp; // When the message was queued
target : String; // Target service/queue name
msg : LargeString; // Serialized event payload
attempts : Integer default 0; // Number of processing attempts
partition : Integer default 0; // Reserved, currently unused
lastError : LargeString; // Error from last failed attempt
lastAttemptTimestamp : Timestamp; // When last attempt occurred
status : String(23); // Current processing status
task : String(255); // Task name for named/singleton tasks
appid : String(255); // Application ID for shared HDI containers
}
```



## Use Cases

Expand All @@ -104,7 +162,7 @@ this.after('CREATE', 'Travels', async (travel) => {
})
```
```java [Java]
@Autowired @Qualifier("MyCustomOutbox")
@Autowired @Qualifier("FlightsOutbox")
OutboxService outbox;

@Autowired @Qualifier(CqnService.DEFAULT_NAME)
Expand All @@ -118,6 +176,8 @@ void notifyFlights(List<Travels> travels) {
```
:::

[See the xtravels sample for a comparable scenario.](https://github.com/capire/xtravels){.learn-more}

Some services are outboxed automatically, including `cds.MessagingService` and `cds.AuditLogService`.
You don't need to call `cds.queued()` or configure anything extra for these — they use the persistent queue by default.

Expand Down Expand Up @@ -264,7 +324,7 @@ In Java, use `AsyncCqnService.of(srv, outbox)` to wrap any CAP service with an o
::: code-group
```java [Java]
OutboxService outbox = runtime.getServiceCatalog()
.getService(OutboxService.class, "MyCustomOutbox");
.getService(OutboxService.class, "FlightsOutbox");
CqnService remote = runtime.getServiceCatalog()
.getService(CqnService.class, "RemoteService");

Expand Down Expand Up @@ -331,7 +391,7 @@ You can queue any service through configuration without changing code:
cds:
outbox:
services:
MyCustomOutbox:
FlightsOutbox:
maxAttempts: 10
```
:::
Expand Down Expand Up @@ -368,9 +428,11 @@ In the unlikely event of a process crash immediately after successful processing
Because the event message is written within the same database transaction as your business data, a rollback of the transaction also removes the event message.
No event is ever dispatched for a transaction that didn't commit.

### Guaranteed Order { #guaranteed-order }
### Processing Order { #processing-order }

In Node.js, messages are processed in the order they were submitted, per service and tenant.
By default, messages are processed in parallel with a chunk size of 10 for better performance. This means processing order is not guaranteed.

To enforce strict ordering, set `parallel: false` in your configuration. However, this processes messages one at a time and is not recommended for production scenarios.

In Java, the `DefaultOutboxOrdered` outbox processes entries in submission order.
The `DefaultOutboxUnordered` outbox may process entries in parallel across application instances.
Expand Down Expand Up @@ -428,26 +490,6 @@ void process(OutboxMessageEventContext context) {
Messages that exceed the maximum retry count remain in the `cds.outbox.Messages` database table with their error information intact.
These entries form the _dead letter queue_ and require manual intervention — either to fix the underlying issue and retry, or to discard the message.

### The Data Model

Your database model is automatically extended with the following entity:

```cds
namespace cds.outbox;

entity Messages {
key ID : UUID;
timestamp : Timestamp;
target : String;
msg : LargeString;
attempts : Integer default 0;
partition : Integer default 0;
lastError : LargeString;
lastAttemptTimestamp: Timestamp @cds.on.update: $now;
status : String(23);
}
```


### Managing Dead Letters

Expand Down Expand Up @@ -580,7 +622,22 @@ If you need to enforce authorization in queued processing, encode the necessary

## Configuration

### Persistent Queue (Default) { #persistent-queue }
### Scheduling vs Legacy Implementation { #scheduling }

Node.js has two queue implementations:

- **Scheduling-based** (recommended) — enabled by configuring `cds.requires.scheduling`. Uses markers for coordination, especially important for multi-tenancy.
- **Legacy** — runs when `scheduling` is not configured. Deprecated and will be removed in a future release.

::: warning Use the scheduling-based implementation
The legacy implementation is deprecated. Enable scheduling in your configuration:
```json
{ "cds": { "requires": { "scheduling": {} } } }
```
:::


### Queue Options { #queue-options }

The persistent queue is enabled by default. Messages are stored in a database table within the current transaction.

Expand All @@ -589,11 +646,14 @@ The persistent queue is enabled by default. Messages are stored in a database ta
{
"cds": {
"requires": {
"scheduling": {
"markerInterval": "1h",
"flushInterval": "1h"
},
"queue": {
"kind": "persistent-queue",
"maxAttempts": 20,
"storeLastError": true,
"timeout": "1h"
"chunkSize": 10,
"parallel": true
}
}
}
Expand All @@ -612,13 +672,26 @@ cds:
```
:::

Configuration options for Node.js:
Queue options for Node.js (`cds.requires.queue`):

| Option | Default | Description |
|--------|---------|-------------|
| `maxAttempts` | `20` | Maximum retries before moving to dead letter queue |
| `maxAttempts` | `20` | Maximum retries before message becomes a dead letter |
| `chunkSize` | `10` | Number of messages to process per batch |
| `parallel` | `true` | Process messages in parallel; set to `false` for strict ordering |
| `storeLastError` | `true` | Store error information of the last failed attempt |
| `timeout` | `"1h"` | Time after which a `processing` message is considered abandoned and eligible for reprocessing |
| `timeout` | `"1h"` | Time after which a `processing` message is considered abandoned |
| `legacyLocking` | `false` | Backward compatibility with @sap/cds v9; will be removed in a future release |


### Scheduling Options { #scheduling-options }

Scheduling options control the background runner for multi-tenancy (`cds.requires.scheduling`):

| Option | Default | Description |
|--------|---------|-------------|
| `markerInterval` | `"1h"` | Offset added to marker timestamp before processing |
| `flushInterval` | `"1h"` | How often the central runner checks for tenants with pending work |

Configuration options for Java:

Expand Down Expand Up @@ -683,12 +756,17 @@ Or disable queueing for a specific service:

## Manual Processing { #flush }

After an application restart or crash, pending events in the database are not automatically picked up until a new outbox write occurs for the same service and tenant.
You can trigger reprocessing manually using the `flush` method, for example from a startup hook or admin endpoint:
In single-tenancy, the background runner starts on application startup and processes pending messages automatically. In multi-tenancy, the central runner periodically checks markers and triggers processing.

You can also trigger processing manually using `cds.flush()`:

::: code-group
```js [Node.js]
// Flush a specific queue
const srv = await cds.connect.to('RemoteService')
await cds.queued(srv).flush()
await cds.flush(srv.name)

// Flush all queues
await cds.flush()
```
:::
Loading