diff --git a/README.md b/README.md
index cb6b146..314e322 100644
--- a/README.md
+++ b/README.md
@@ -8,6 +8,7 @@
+ [Expected table schema](#expected-table-schema)
+ [Wiring the repository](#wiring-the-repository)
+ [Producing events from an aggregate](#producing-events-from-an-aggregate)
+ + [Declaring integration events and translators](#declaring-integration-events-and-translators)
+ [Customizing the table layout](#customizing-the-table-layout)
+ [Writing a custom payload serializer](#writing-a-custom-payload-serializer)
+ [Event schema versioning](#event-schema-versioning)
@@ -18,16 +19,21 @@
## Overview
The **Transactional Outbox** pattern solves the dual-write problem: persisting an aggregate state change and publishing
-a domain event must happen atomically. Doing both independently risks a crash leaving one side committed and the other
-lost. The outbox pattern records both in the same database transaction, delegating event delivery to a separate relay
-process.
+an integration event must happen atomically. Doing both independently risks a crash leaving one side committed and the
+other lost. The outbox pattern records both in the same database transaction, delegating event delivery to a separate
+relay process.
-This library is the write-side adapter. It persists outbox records via Doctrine DBAL and is opinionated on correctness.
-Transactions are always required and JSON validity is always checked, while leaving every schema decision to you: table
-name, column names, and identity column storage type are all configurable.
+This library is the write-side adapter. It persists integration events atomically with aggregate state via Doctrine
+DBAL. The pipeline is: `DomainEvent → IntegrationEventTranslator → IntegrationEvent → PayloadSerializer → INSERT`.
+Domain events without a registered translator are silently skipped: their absence from `IntegrationEventTranslators`
+is the canonical declaration that the event is internal to the bounded context and must not cross its boundary.
+
+The library is opinionated on correctness: transactions are always required, JSON validity is always checked, and every
+schema decision is left to you: table name, column names, and identity column storage type are all configurable.
The library composes with [`tiny-blocks/building-blocks`](https://github.com/tiny-blocks/building-blocks), which
-contributes `DomainEvent`, `DomainEventBehavior`, `EventRecord`, `EventRecords`, `EventType`, `Revision`,
+contributes `DomainEvent`, `EventRecord`, `EventRecords`, `IntegrationEvent`, `IntegrationEventBehavior`,
+`IntegrationEventRecord`, `IntegrationEventTranslator`, `IntegrationEventTranslators`, `EventType`, `Revision`,
`AggregateVersion`, and the `EventualAggregateRoot` family. This library provides the persistence step only.
## Installation
@@ -49,8 +55,8 @@ CREATE TABLE outbox_events
(
id BINARY(16) NOT NULL COMMENT 'The event identifier in Version 4 UUID format (e.g. 123e4567-e89b-12d3-a456-426614174000).',
payload JSON NOT NULL COMMENT 'The event payload serialized as a JSON object (e.g. {"transaction_id":"..."}).',
- revision INT NOT NULL COMMENT 'The positive integer indicating the payload schema revision of the event (e.g. 1).',
- event_type VARCHAR(255) NOT NULL COMMENT 'The event class name in CamelCase (e.g. TransactionConfirmed).',
+ revision INT NOT NULL COMMENT 'The positive integer indicating the payload schema revision of the integration event (e.g. 1).',
+ event_type VARCHAR(255) NOT NULL COMMENT 'The integration event class name in CamelCase (e.g. PaymentConfirmed). Reflects the public contract, not the internal domain event.',
occurred_at TIMESTAMP(6) NOT NULL COMMENT 'The UTC date and time when the event occurred in ISO 8601 format (e.g. 2026-02-13T08:49:44.931408+00:00).',
aggregate_id BINARY(16) NOT NULL COMMENT 'The aggregate root identifier in Version 4 UUID format (e.g. 123e4567-e89b-12d3-a456-426614174000).',
aggregate_type VARCHAR(255) NOT NULL COMMENT 'The aggregate root class name that produced the event in CamelCase (e.g. Transaction).',
@@ -81,41 +87,47 @@ CREATE TABLE outbox_events
### Wiring the repository
-`DoctrineOutboxRepository` requires a Doctrine DBAL `Connection` and a `PayloadSerializers` collection. The table layout
-defaults to table `outbox_events` with BINARY(16) identity columns.
+`DoctrineOutboxRepository` requires a Doctrine DBAL `Connection`, an `IntegrationEventTranslators` collection, and a
+`PayloadSerializers` collection. The table layout defaults to table `outbox_events` with BINARY(16) identity columns.
```php
event instanceof OrderPlaced;
+ }
+
+ public function translate(EventRecord $record): IntegrationEvent
+ {
+ return new OrderShipped(orderId: $record->event->orderId);
+ }
+}
+```
+
+#### Registering translators
+
+Pass an `IntegrationEventTranslators` collection to the repository constructor. Lookup follows first-match-wins
+semantics: the first translator whose `supports()` returns `true` is used.
+
+```php
+event`,
-`$record->aggregateType`, `$record->aggregateId`, `$record->aggregateVersion`, and all other fields when routing or
-shaping the payload.
+Both `supports()` and `serialize()` receive the full `IntegrationEventRecord`, giving access to `$record->event`
+(the `IntegrationEvent`), `$record->aggregateType`, `$record->aggregateId`, `$record->aggregateVersion`, and all
+other envelope fields when routing or shaping the payload.
-Use `match (true)` in `serialize()` to handle multiple event types from the same aggregate in a single serializer:
+Use `match (true)` in `serialize()` to handle multiple integration event types from the same aggregate in a single
+serializer:
```php
event instanceof OrderPlaced || $record->event instanceof OrderShipped;
+ return $record->event instanceof OrderShipped || $record->event instanceof OrderCanceled;
}
- public function serialize(EventRecord $record): SerializedPayload
+ public function serialize(IntegrationEventRecord $record): SerializedPayload
{
return match (true) {
- $record->event instanceof OrderPlaced => SerializedPayload::from(
+ $record->event instanceof OrderShipped => SerializedPayload::from(
payload: json_encode(['orderId' => $record->event->orderId], JSON_THROW_ON_ERROR)
),
- $record->event instanceof OrderShipped => SerializedPayload::from(
+ $record->event instanceof OrderCanceled => SerializedPayload::from(
payload: json_encode(
- ['orderId' => $record->event->orderId, 'shippedAt' => $record->event->shippedAt],
+ ['orderId' => $record->event->orderId, 'reason' => $record->event->reason],
JSON_THROW_ON_ERROR
)
)
@@ -268,7 +408,11 @@ final readonly class OrderEventSerializer implements PayloadSerializer
# PayloadSerializerReflection always returns true from supports(), so it must come last.
$repository = new DoctrineOutboxRepository(
connection: $connection,
- serializers: serializers::createFrom(elements: [
+ translators: IntegrationEventTranslators::createFrom(elements: [
+ new OrderPlacedTranslator(),
+ new OrderCanceledTranslator()
+ ]),
+ serializers: PayloadSerializers::createFrom(elements: [
new OrderEventSerializer(),
new PayloadSerializerReflection()
])
@@ -282,24 +426,26 @@ library handles encoding internally.
### Event schema versioning
-Each domain event declares its schema revision via `DomainEvent::revision()`. `DomainEventBehavior` provides the
-default implementation, returning revision 1. Override `revision()` when the event's payload structure changes:
+Each integration event declares its schema revision via `IntegrationEvent::revision()`. `IntegrationEventBehavior`
+provides the default implementation, returning revision 1. Override `revision()` when the integration event's payload
+structure changes in a backward-incompatible way:
```php
aggregateType === 'Order'`), or the payload shaping may vary
-based on the aggregate version (`$record->aggregateVersion`). Receiving the full `EventRecord` in both `supports()` and
-`serialize()` gives serializers access to all available context without requiring any additional indirection.
+Routing and shaping decisions often depend on context beyond the event payload itself. For example, a single serializer
+may handle integration events from a specific aggregate type (`$record->aggregateType === 'Order'`), or the payload
+shaping may vary based on the aggregate version (`$record->aggregateVersion`). Receiving the full
+`IntegrationEventRecord` in both `supports()` and `serialize()` gives serializers access to all available envelope
+context without requiring any additional indirection.
### 09. How does the library handle transient database errors?
@@ -381,6 +529,23 @@ caller.
The consumer is responsible for any retry policy. A common pattern is to wrap the unit of work (aggregate save +
outbox push) in a retry loop that catches transient exceptions and re-executes the entire transaction.
+### 10. Why are domain events without a translator silently skipped instead of failing loudly?
+
+The absence of a translator is the canonical declaration that the event is internal to the bounded context. Failing
+loudly would force every internal domain event to either register a no-op translator or be filtered before the call to
+`push()`, both of which leak the public-contract concern into code that should not know about it. Silent skip keeps the
+boundary one-sided: registering a translator is the explicit opt-in for cross-context publication. Code that does not
+register a translator is correct by design.
+
+### 11. Why doesn't the library accept a DomainEvent directly for publication?
+
+The Anti-Corruption Layer rationale established in `tiny-blocks/building-blocks` applies here (see Vaughn Vernon,
+*Implementing Domain-Driven Design*, Addison-Wesley, 2013, Chapter 3, "Context Maps"). The public contract must evolve
+independently of the internal model. Accepting a domain event directly reintroduces the coupling that the integration
+event abstraction exists to eliminate: every change to the domain event's shape would affect external consumers. The
+cost of writing a translator is small and the gain in contract stability is the entire reason for the design. See
+the `IntegrationEventTranslator` documentation in `tiny-blocks/building-blocks` for the full rationale.
+
## License
Outbox is licensed under [MIT](LICENSE).
diff --git a/composer.json b/composer.json
index 50f24b5..a06f54d 100644
--- a/composer.json
+++ b/composer.json
@@ -13,6 +13,7 @@
"event-sourcing",
"event-versioning",
"event-persistence",
+ "integration-events",
"event-serialization",
"transactional-outbox"
],
@@ -31,7 +32,7 @@
"php": "^8.5",
"doctrine/dbal": "^4.4",
"ramsey/uuid": "^4.9",
- "tiny-blocks/building-blocks": "^3.0",
+ "tiny-blocks/building-blocks": "^4.0",
"tiny-blocks/collection": "^2.3"
},
"require-dev": {
diff --git a/src/DoctrineOutboxRepository.php b/src/DoctrineOutboxRepository.php
index 7f1e7fa..ab5dac2 100644
--- a/src/DoctrineOutboxRepository.php
+++ b/src/DoctrineOutboxRepository.php
@@ -8,6 +8,8 @@
use Doctrine\DBAL\Exception\UniqueConstraintViolationException;
use TinyBlocks\BuildingBlocks\Event\EventRecord;
use TinyBlocks\BuildingBlocks\Event\EventRecords;
+use TinyBlocks\BuildingBlocks\Event\IntegrationEventRecord;
+use TinyBlocks\BuildingBlocks\Event\IntegrationEventTranslators;
use TinyBlocks\Outbox\Exceptions\DuplicateAggregateVersion;
use TinyBlocks\Outbox\Exceptions\DuplicateOutboxEvent;
use TinyBlocks\Outbox\Exceptions\OutboxRequiresActiveTransaction;
@@ -22,6 +24,7 @@
public function __construct(
private Connection $connection,
+ private IntegrationEventTranslators $translators,
private PayloadSerializers $serializers,
?TableLayout $tableLayout = null
) {
@@ -34,16 +37,29 @@ public function push(EventRecords $records): void
throw OutboxRequiresActiveTransaction::asMissing();
}
- $records->each(actions: function (EventRecord $record): void {
- $payloadSerializer = $this->serializers->findFor(record: $record);
+ $records->each(actions: function (EventRecord $eventRecord): void {
+ $translator = $this->translators->findFor(record: $eventRecord);
+
+ if (is_null($translator)) {
+ return;
+ }
+
+ $integrationEventRecord = IntegrationEventRecord::from(
+ eventRecord: $eventRecord,
+ integrationEvent: $translator->translate(record: $eventRecord)
+ );
+
+ $payloadSerializer = $this->serializers->findFor(record: $integrationEventRecord);
if (is_null($payloadSerializer)) {
- throw PayloadSerializerNotConfigured::forEventClass(eventClass: $record->event::class);
+ throw PayloadSerializerNotConfigured::forEventClass(
+ eventClass: $integrationEventRecord->event::class
+ );
}
$insert = OutboxInsert::from(
- record: $record,
- payload: $payloadSerializer->serialize(record: $record),
+ record: $integrationEventRecord,
+ payload: $payloadSerializer->serialize(record: $integrationEventRecord),
tableLayout: $this->tableLayout
);
@@ -53,13 +69,16 @@ public function push(EventRecords $records): void
if ($this->tableLayout->uniqueConstraint->isViolatedBy(exception: $exception)) {
throw DuplicateAggregateVersion::forRecord(
previous: $exception,
- aggregateId: $record->aggregateId->identityValue(),
- aggregateType: $record->aggregateType,
- aggregateVersion: $record->aggregateVersion->value
+ aggregateId: $integrationEventRecord->aggregateId->identityValue(),
+ aggregateType: $integrationEventRecord->aggregateType,
+ aggregateVersion: $integrationEventRecord->aggregateVersion->value
);
}
- throw DuplicateOutboxEvent::forRecord(eventId: $record->id, previous: $exception);
+ throw DuplicateOutboxEvent::forRecord(
+ eventId: $integrationEventRecord->id,
+ previous: $exception
+ );
}
});
}
diff --git a/src/Internal/OutboxInsert.php b/src/Internal/OutboxInsert.php
index aadc1b2..7864542 100644
--- a/src/Internal/OutboxInsert.php
+++ b/src/Internal/OutboxInsert.php
@@ -4,7 +4,7 @@
namespace TinyBlocks\Outbox\Internal;
-use TinyBlocks\BuildingBlocks\Event\EventRecord;
+use TinyBlocks\BuildingBlocks\Event\IntegrationEventRecord;
use TinyBlocks\Outbox\Schema\TableLayout;
use TinyBlocks\Outbox\Serialization\SerializedPayload;
@@ -15,7 +15,7 @@ private function __construct(public string $sql, public array $parameters)
}
public static function from(
- EventRecord $record,
+ IntegrationEventRecord $record,
SerializedPayload $payload,
TableLayout $tableLayout
): OutboxInsert {
diff --git a/src/OutboxRepository.php b/src/OutboxRepository.php
index 1bc9160..de09e2b 100644
--- a/src/OutboxRepository.php
+++ b/src/OutboxRepository.php
@@ -5,11 +5,14 @@
namespace TinyBlocks\Outbox;
use TinyBlocks\BuildingBlocks\Event\EventRecords;
+use TinyBlocks\BuildingBlocks\Event\IntegrationEvent;
+use TinyBlocks\BuildingBlocks\Event\IntegrationEventTranslator;
use TinyBlocks\Outbox\Exceptions\DuplicateAggregateVersion;
use TinyBlocks\Outbox\Exceptions\DuplicateOutboxEvent;
use TinyBlocks\Outbox\Exceptions\InvalidPayloadJson;
use TinyBlocks\Outbox\Exceptions\OutboxRequiresActiveTransaction;
use TinyBlocks\Outbox\Exceptions\PayloadSerializerNotConfigured;
+use TinyBlocks\Outbox\Serialization\PayloadSerializer;
/**
* Producer-side contract: persists outbox records as part of the caller's open transaction.
@@ -23,14 +26,28 @@ interface OutboxRepository
/**
* Persists the given records as part of the caller's open transaction.
*
- *
The implementation must not open or commit a transaction. It is the caller's responsibility
- * to ensure this call happens inside the same unit of work as the aggregate state change.
+ * The input carries domain events from the aggregate's recorded-events buffer.
+ * The implementation filters each record through the registered
+ * {@see IntegrationEventTranslator} collection: domain events without a matching
+ * translator are silently skipped, because the absence of a translator is the canonical
+ * declaration that the event is internal to the bounded context and must not cross its
+ * boundary.
+ *
+ * Matched domain events are translated into {@see IntegrationEvent} envelopes via
+ * the Anti-Corruption Layer and only then serialized and persisted. The
+ * {@see PayloadSerializer} operates on the integration event record, never on the
+ * domain event directly.
+ *
+ * The implementation must not open or commit a transaction. It is the caller's
+ * responsibility to ensure this call happens inside the same unit of work as the
+ * aggregate state change.
*
* @param EventRecords $records The records to persist.
* @throws InvalidPayloadJson When a serializer produces an invalid JSON payload.
* @throws DuplicateOutboxEvent When a record with a duplicate id already exists in the outbox.
* @throws DuplicateAggregateVersion When two records share the same aggregate type, id, and aggregate version.
- * @throws PayloadSerializerNotConfigured When no serializer supports the event class.
+ * @throws PayloadSerializerNotConfigured When a translator matched a domain event but no serializer supports the
+ * produced integration event.
* @throws OutboxRequiresActiveTransaction When called outside an active transaction.
*/
public function push(EventRecords $records): void;
diff --git a/src/Serialization/PayloadSerializer.php b/src/Serialization/PayloadSerializer.php
index f4662f1..f3348b2 100644
--- a/src/Serialization/PayloadSerializer.php
+++ b/src/Serialization/PayloadSerializer.php
@@ -4,23 +4,23 @@
namespace TinyBlocks\Outbox\Serialization;
-use TinyBlocks\BuildingBlocks\Event\EventRecord;
+use TinyBlocks\BuildingBlocks\Event\IntegrationEventRecord;
interface PayloadSerializer
{
/**
- * Tells whether this serializer handles the event in the given record.
+ * Tells whether this serializer handles the integration event in the given record.
*
- * @param EventRecord $record The record being serialized.
- * @return bool True if this serializer can produce the payload for the event.
+ * @param IntegrationEventRecord $record The record being serialized.
+ * @return bool True if this serializer can produce the payload for the integration event.
*/
- public function supports(EventRecord $record): bool;
+ public function supports(IntegrationEventRecord $record): bool;
/**
- * Produces the persistent payload for the event in the record.
+ * Produces the persistent payload for the integration event in the record.
*
- * @param EventRecord $record The record being serialized.
+ * @param IntegrationEventRecord $record The record being serialized.
* @return SerializedPayload The serialized payload.
*/
- public function serialize(EventRecord $record): SerializedPayload;
+ public function serialize(IntegrationEventRecord $record): SerializedPayload;
}
diff --git a/src/Serialization/PayloadSerializerReflection.php b/src/Serialization/PayloadSerializerReflection.php
index 3ad1a87..12fa155 100644
--- a/src/Serialization/PayloadSerializerReflection.php
+++ b/src/Serialization/PayloadSerializerReflection.php
@@ -4,16 +4,16 @@
namespace TinyBlocks\Outbox\Serialization;
-use TinyBlocks\BuildingBlocks\Event\EventRecord;
+use TinyBlocks\BuildingBlocks\Event\IntegrationEventRecord;
final readonly class PayloadSerializerReflection implements PayloadSerializer
{
- public function supports(EventRecord $record): bool
+ public function supports(IntegrationEventRecord $record): bool
{
return true;
}
- public function serialize(EventRecord $record): SerializedPayload
+ public function serialize(IntegrationEventRecord $record): SerializedPayload
{
return SerializedPayload::fromArray(payload: get_object_vars($record->event));
}
diff --git a/src/Serialization/PayloadSerializers.php b/src/Serialization/PayloadSerializers.php
index ba6e962..b93964f 100644
--- a/src/Serialization/PayloadSerializers.php
+++ b/src/Serialization/PayloadSerializers.php
@@ -4,7 +4,7 @@
namespace TinyBlocks\Outbox\Serialization;
-use TinyBlocks\BuildingBlocks\Event\EventRecord;
+use TinyBlocks\BuildingBlocks\Event\IntegrationEventRecord;
use TinyBlocks\Collection\Collection;
final class PayloadSerializers extends Collection
@@ -12,10 +12,10 @@ final class PayloadSerializers extends Collection
/**
* Returns the first payload serializer that supports the given record, or null when none matches.
*
- * @param EventRecord $record The record whose payload serializer is being resolved.
+ * @param IntegrationEventRecord $record The record whose payload serializer is being resolved.
* @return PayloadSerializer|null The matching serializer, or null when no element supports the record.
*/
- public function findFor(EventRecord $record): ?PayloadSerializer
+ public function findFor(IntegrationEventRecord $record): ?PayloadSerializer
{
$serializer = $this->findBy(
predicates: static fn(PayloadSerializer $serializer): bool => $serializer->supports(record: $record)
diff --git a/src/Serialization/SerializedPayload.php b/src/Serialization/SerializedPayload.php
index b5211f9..bfaaeff 100644
--- a/src/Serialization/SerializedPayload.php
+++ b/src/Serialization/SerializedPayload.php
@@ -12,19 +12,6 @@ private function __construct(private string $payload)
{
}
- /**
- * Creates a SerializedPayload from an associative array, encoding it as JSON.
- *
- * @param array $payload The associative array to encode as the serialized payload.
- * @return SerializedPayload The serialized payload with the JSON-encoded representation.
- */
- public static function fromArray(array $payload): SerializedPayload
- {
- $json = json_encode($payload, JSON_THROW_ON_ERROR);
-
- return new SerializedPayload(payload: $json);
- }
-
/**
* Creates a SerializedPayload from a raw JSON string.
*
@@ -41,6 +28,19 @@ public static function from(string $payload): SerializedPayload
return new SerializedPayload(payload: $payload);
}
+ /**
+ * Creates a SerializedPayload from an associative array, encoding it as JSON.
+ *
+ * @param array $payload The associative array to encode as the serialized payload.
+ * @return SerializedPayload The serialized payload with the JSON-encoded representation.
+ */
+ public static function fromArray(array $payload): SerializedPayload
+ {
+ $json = json_encode($payload, JSON_THROW_ON_ERROR);
+
+ return new SerializedPayload(payload: $json);
+ }
+
/**
* Returns the SerializedPayload as its JSON string representation.
*
diff --git a/tests/Integration/DoctrineOutboxRepositoryTest.php b/tests/Integration/DoctrineOutboxRepositoryTest.php
index 1c61b13..832d7dc 100644
--- a/tests/Integration/DoctrineOutboxRepositoryTest.php
+++ b/tests/Integration/DoctrineOutboxRepositoryTest.php
@@ -10,12 +10,15 @@
use Test\TinyBlocks\Outbox\Models\EventRecordFactory;
use Test\TinyBlocks\Outbox\Models\Order;
use Test\TinyBlocks\Outbox\Models\OrderPlaced;
+use Test\TinyBlocks\Outbox\Models\OrderPlacedTranslator;
use Test\TinyBlocks\Outbox\Models\RefundIssued;
+use Test\TinyBlocks\Outbox\Models\RefundIssuedTranslator;
use Test\TinyBlocks\Outbox\Unit\DriverExceptionStub;
use Test\TinyBlocks\Outbox\Unit\InvalidPayloadSerializer;
use Test\TinyBlocks\Outbox\Unit\OrderPlacedSerializer;
use TinyBlocks\BuildingBlocks\Aggregate\AggregateVersion;
use TinyBlocks\BuildingBlocks\Event\EventRecords;
+use TinyBlocks\BuildingBlocks\Event\IntegrationEventTranslators;
use TinyBlocks\BuildingBlocks\Event\Revision;
use TinyBlocks\Outbox\DoctrineOutboxRepository;
use TinyBlocks\Outbox\Exceptions\DuplicateAggregateVersion;
@@ -46,6 +49,7 @@ public function testPushWhenNoTransactionThenOutboxRequiresActiveTransaction():
/** @Given a repository */
$repository = new DoctrineOutboxRepository(
connection: self::$connection,
+ translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]),
serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()])
);
@@ -68,9 +72,10 @@ public function testPushWhenNoTransactionThenOutboxRequiresActiveTransaction():
public function testPushWhenMultipleSerializersAndFirstMatchesThenFirstIsUsed(): void
{
- /** @Given a repository with two serializers supporting the same event */
+ /** @Given a repository with a translator and two serializers supporting the same integration event */
$repository = new DoctrineOutboxRepository(
connection: self::$connection,
+ translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]),
serializers: PayloadSerializers::createFrom(elements: [
new OrderPlacedSerializer(),
new FallbackOrderPlacedSerializer()
@@ -98,9 +103,10 @@ public function testPushWhenMultipleSerializersAndFirstMatchesThenFirstIsUsed():
public function testPushWhenReflectionPayloadSerializerThenEventPropertiesAreEncoded(): void
{
- /** @Given a repository using ReflectionPayloadSerializer as the only serializer */
+ /** @Given a repository with a translator and a reflection payload serializer */
$repository = new DoctrineOutboxRepository(
connection: self::$connection,
+ translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]),
serializers: PayloadSerializers::createFrom(elements: [new PayloadSerializerReflection()])
);
@@ -119,7 +125,7 @@ public function testPushWhenReflectionPayloadSerializerThenEventPropertiesAreEnc
/** @And the transaction is committed */
self::$connection->commit();
- /** @Then the payload reflects the event's public properties */
+ /** @Then the payload reflects the integration event's public properties */
self::assertSame('[]', self::$connection->fetchOne('SELECT payload FROM outbox_events LIMIT 1'));
}
@@ -128,6 +134,7 @@ public function testPushWhenCallerRollsBackThenNoRecordPersisted(): void
/** @Given a repository */
$repository = new DoctrineOutboxRepository(
connection: self::$connection,
+ translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]),
serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()])
);
@@ -155,6 +162,7 @@ public function testPushWhenTwoRecordsThenBothPersisted(): void
/** @Given a repository */
$repository = new DoctrineOutboxRepository(
connection: self::$connection,
+ translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]),
serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()])
);
@@ -190,6 +198,7 @@ public function testPushWhenUuidWithNullBytesThenBytesPreservedInStorage(): void
/** @And a repository */
$repository = new DoctrineOutboxRepository(
connection: self::$connection,
+ translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]),
serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()])
);
@@ -216,9 +225,10 @@ public function testPushWhenUuidWithNullBytesThenBytesPreservedInStorage(): void
public function testPushWhenSerializerReturnsInvalidJsonThenInvalidPayloadJson(): void
{
- /** @Given a repository with a serializer that produces invalid JSON */
+ /** @Given a repository with a translator and a serializer that produces invalid JSON */
$repository = new DoctrineOutboxRepository(
connection: self::$connection,
+ translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]),
serializers: PayloadSerializers::createFrom(elements: [new InvalidPayloadSerializer()])
);
@@ -244,9 +254,13 @@ public function testPushWhenSerializerReturnsInvalidJsonThenInvalidPayloadJson()
public function testPushWhenMultipleSerializersAndSecondMatchesThenCorrectSerializerIsUsed(): void
{
- /** @Given a repository with an order serializer followed by a refund serializer */
+ /** @Given a repository with translators for both event types and matching serializers */
$repository = new DoctrineOutboxRepository(
connection: self::$connection,
+ translators: IntegrationEventTranslators::createFrom(elements: [
+ new OrderPlacedTranslator(),
+ new RefundIssuedTranslator()
+ ]),
serializers: PayloadSerializers::createFrom(elements: [
new OrderPlacedSerializer(),
new RefundIssuedSerializer()
@@ -275,41 +289,12 @@ public function testPushWhenMultipleSerializersAndSecondMatchesThenCorrectSerial
);
}
- public function testPushWhenNoSerializerSupportsThenPayloadSerializerNotConfigured(): void
+ public function testPushWhenSerializerDoesNotSupportIntegrationEventThenPayloadSerializerNotConfigured(): void
{
- /** @Given a repository with no serializers */
- $repository = new DoctrineOutboxRepository(
- connection: self::$connection,
- serializers: PayloadSerializers::createFromEmpty()
- );
-
- /** @And the connection has an active transaction */
- self::$connection->beginTransaction();
-
- /** @And a record to push */
- $records = EventRecords::createFrom(elements: [
- EventRecordFactory::create(
- event: new OrderPlaced(),
- aggregateType: 'Order',
- eventTypeName: 'OrderPlaced'
- )
- ]);
-
- /** @Then an exception indicating no serializer is configured is thrown */
- $this->expectException(PayloadSerializerNotConfigured::class);
- $this->expectExceptionMessage(
- 'No payload serializer configured for event class .'
- );
-
- /** @When pushing the record */
- $repository->push(records: $records);
- }
-
- public function testPushWhenSerializerDoesNotSupportEventThenPayloadSerializerNotConfigured(): void
- {
- /** @Given a repository with only an order serializer */
+ /** @Given a repository with a refund translator but only an order serializer */
$repository = new DoctrineOutboxRepository(
connection: self::$connection,
+ translators: IntegrationEventTranslators::createFrom(elements: [new RefundIssuedTranslator()]),
serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()])
);
@@ -328,7 +313,7 @@ public function testPushWhenSerializerDoesNotSupportEventThenPayloadSerializerNo
/** @Then an exception indicating no serializer is configured is thrown */
$this->expectException(PayloadSerializerNotConfigured::class);
$this->expectExceptionMessage(
- 'No payload serializer configured for event class .'
+ 'No payload serializer configured for event class .'
);
/** @When pushing the unsupported event */
@@ -340,6 +325,7 @@ public function testPushWhenDuplicateEventIdThenDuplicateOutboxEvent(): void
/** @Given a repository */
$repository = new DoctrineOutboxRepository(
connection: self::$connection,
+ translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]),
serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()])
);
@@ -370,6 +356,7 @@ public function testPushWhenDuplicateAggregateVersionThenDuplicateAggregateVersi
/** @Given a repository */
$repository = new DoctrineOutboxRepository(
connection: self::$connection,
+ translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]),
serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()])
);
@@ -430,6 +417,7 @@ public function testPushWhenCustomTableNameThenRecordStoredInCustomTable(): void
/** @And a repository using the custom layout */
$repository = new DoctrineOutboxRepository(
connection: self::$connection,
+ translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]),
serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]),
tableLayout: $tableLayout
);
@@ -484,6 +472,7 @@ public function testPushWhenStringIdentityTypeStoredThenIdIsUuidString(): void
/** @And a repository using the string layout */
$repository = new DoctrineOutboxRepository(
connection: self::$connection,
+ translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]),
serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]),
tableLayout: $tableLayout
);
@@ -536,6 +525,7 @@ public function testPushWhenNonUuidAggregateIdWithStringTypeThenStoredAsOriginal
/** @And a repository using the string layout */
$repository = new DoctrineOutboxRepository(
connection: self::$connection,
+ translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]),
serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]),
tableLayout: $tableLayout
);
@@ -565,9 +555,10 @@ public function testPushWhenNonUuidAggregateIdWithStringTypeThenStoredAsOriginal
public function testPushWhenSingleRecordThenAllFieldsPersistedCorrectly(): void
{
- /** @Given a repository with an order placed serializer */
+ /** @Given a repository with a translator and an order placed serializer */
$repository = new DoctrineOutboxRepository(
connection: self::$connection,
+ translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]),
serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()])
);
@@ -598,11 +589,11 @@ public function testPushWhenSingleRecordThenAllFieldsPersistedCorrectly(): void
/** @And the aggregate_id is stored as 16-byte binary */
self::assertSame(16, strlen($row['aggregate_id']));
- /** @And the event_type is correct */
- self::assertSame('OrderPlaced', $row['event_type']);
+ /** @And the event_type reflects the integration event class */
+ self::assertSame('OrderShipped', $row['event_type']);
/** @And the revision is correct */
- self::assertSame(2, (int)$row['revision']);
+ self::assertSame(1, (int)$row['revision']);
/** @And the aggregate_version is correct */
self::assertSame(3, (int)$row['aggregate_version']);
@@ -622,6 +613,7 @@ public function testPushWhenKnownIdThenPersistedIdMatchesOriginal(): void
/** @Given a repository */
$repository = new DoctrineOutboxRepository(
connection: self::$connection,
+ translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]),
serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()])
);
@@ -687,6 +679,7 @@ public function testPushWhenAllColumnNamesAreCustomThenRecordStoredInCustomColum
/** @And a repository using this layout */
$repository = new DoctrineOutboxRepository(
connection: self::$connection,
+ translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]),
serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]),
tableLayout: $tableLayout
);
@@ -719,8 +712,8 @@ public function testPushWhenAllColumnNamesAreCustomThenRecordStoredInCustomColum
/** @And event_aggregate_type is correct */
self::assertSame('Order', $row['event_aggregate_type']);
- /** @And event_event_type is correct */
- self::assertSame('OrderPlaced', $row['event_event_type']);
+ /** @And event_event_type reflects the integration event class */
+ self::assertSame('OrderShipped', $row['event_event_type']);
/** @And event_revision is correct */
self::assertSame(1, (int)$row['event_revision']);
@@ -743,6 +736,7 @@ public function testPushWhenEventRecordsIsEmptyThenNoInsertIsExecuted(): void
/** @Given a repository */
$repository = new DoctrineOutboxRepository(
connection: self::$connection,
+ translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]),
serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()])
);
@@ -761,9 +755,10 @@ public function testPushWhenEventRecordsIsEmptyThenNoInsertIsExecuted(): void
public function testPushWhenRealEventualAggregateRootThenEventRecordIsPersistedCorrectly(): void
{
- /** @Given a repository with a reflection payload serializer */
+ /** @Given a repository with a translator and a reflection payload serializer */
$repository = new DoctrineOutboxRepository(
connection: self::$connection,
+ translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]),
serializers: PayloadSerializers::createFrom(elements: [new PayloadSerializerReflection()])
);
@@ -800,6 +795,7 @@ public function testPushWhenNullTableLayoutThenSqlUsesDefaultTableName(): void
/** @When pushing a record using the default table layout */
new DoctrineOutboxRepository(
connection: $connection,
+ translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]),
serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()])
)->push(
records: EventRecords::createFrom(elements: [
@@ -832,6 +828,7 @@ public function testPushWhenCustomTableLayoutThenSqlUsesCustomTableName(): void
/** @When pushing a record using the custom table layout */
new DoctrineOutboxRepository(
connection: $connection,
+ translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]),
serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]),
tableLayout: $tableLayout
)->push(
@@ -879,6 +876,7 @@ function (string $sql, array $params) use (&$capturedParameters): int {
/** @When pushing a record with all deterministic fields */
new DoctrineOutboxRepository(
connection: $connection,
+ translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]),
serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]),
tableLayout: $tableLayout
)->push(
@@ -900,7 +898,7 @@ function (string $sql, array $params) use (&$capturedParameters): int {
'id' => '550e8400-e29b-41d4-a716-446655440000',
'aggregateId' => '6ba7b810-9dad-11d1-80b4-00c04fd430c8',
'aggregateType' => 'Order',
- 'eventType' => 'OrderPlaced',
+ 'eventType' => 'OrderShipped',
'revision' => 1,
'aggregateVersion' => 1,
'payload' => '{}',
@@ -934,6 +932,7 @@ public function testPushWhenUniqueConstraintOnAggregateVersionThenDuplicateAggre
/** @When pushing the record */
new DoctrineOutboxRepository(
connection: $connection,
+ translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]),
serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()])
)->push(
records: EventRecords::createFrom(elements: [
@@ -967,6 +966,7 @@ public function testPushWhenUniqueConstraintOnEventIdThenDuplicateOutboxEventIsT
/** @When pushing the record */
new DoctrineOutboxRepository(
connection: $connection,
+ translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]),
serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()])
)->push(
records: EventRecords::createFrom(elements: [
@@ -1003,6 +1003,7 @@ public function testPushWhenUniqueConstraintWithCustomNameThenDuplicateAggregate
/** @When pushing a record with the custom table layout */
new DoctrineOutboxRepository(
connection: $connection,
+ translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]),
serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]),
tableLayout: $tableLayout
)->push(
@@ -1039,6 +1040,7 @@ public function testPushWhenConstraintNameIsCustomThenDuplicateAggregateVersion(
/** @And a repository using the custom table layout */
$repository = new DoctrineOutboxRepository(
connection: self::$connection,
+ translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]),
serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]),
tableLayout: $tableLayout
);
@@ -1074,4 +1076,135 @@ public function testPushWhenConstraintNameIsCustomThenDuplicateAggregateVersion(
)
]));
}
+
+ public function testPushWhenNoTranslatorsRegisteredThenRecordIsSilentlySkipped(): void
+ {
+ /** @Given a repository with no translators and a reflection serializer */
+ $repository = new DoctrineOutboxRepository(
+ connection: self::$connection,
+ translators: IntegrationEventTranslators::createFromEmpty(),
+ serializers: PayloadSerializers::createFrom(elements: [new PayloadSerializerReflection()])
+ );
+
+ /** @And the connection has an active transaction */
+ self::$connection->beginTransaction();
+
+ /** @When pushing an order placed record */
+ $repository->push(records: EventRecords::createFrom(elements: [
+ EventRecordFactory::create(
+ event: new OrderPlaced(),
+ aggregateType: 'Order',
+ eventTypeName: 'OrderPlaced'
+ )
+ ]));
+
+ /** @And the transaction is committed */
+ self::$connection->commit();
+
+ /** @Then no records are persisted */
+ self::assertSame(0, (int)self::$connection->fetchOne('SELECT COUNT(*) FROM outbox_events'));
+ }
+
+ public function testPushWhenOnlyOrderTranslatorRegisteredThenRefundEventIsSkipped(): void
+ {
+ /** @Given a repository with only an order translator and both serializers */
+ $repository = new DoctrineOutboxRepository(
+ connection: self::$connection,
+ translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]),
+ serializers: PayloadSerializers::createFrom(elements: [
+ new OrderPlacedSerializer(),
+ new RefundIssuedSerializer()
+ ])
+ );
+
+ /** @And the connection has an active transaction */
+ self::$connection->beginTransaction();
+
+ /** @When pushing one order placed and one refund issued record */
+ $repository->push(records: EventRecords::createFrom(elements: [
+ EventRecordFactory::create(
+ event: new OrderPlaced(),
+ aggregateType: 'Order',
+ eventTypeName: 'OrderPlaced',
+ aggregateVersion: AggregateVersion::of(value: 1)
+ ),
+ EventRecordFactory::create(
+ event: new RefundIssued(),
+ aggregateType: 'Refund',
+ eventTypeName: 'RefundIssued',
+ aggregateVersion: AggregateVersion::of(value: 1)
+ )
+ ]));
+
+ /** @And the transaction is committed */
+ self::$connection->commit();
+
+ /** @Then exactly one row is persisted */
+ self::assertSame(1, (int)self::$connection->fetchOne('SELECT COUNT(*) FROM outbox_events'));
+
+ /** @And the persisted event_type is the order integration event */
+ self::assertSame('OrderShipped', self::$connection->fetchOne('SELECT event_type FROM outbox_events LIMIT 1'));
+ }
+
+ public function testPushWhenTwoTranslatorsSupportSameEventThenFirstTranslatorWins(): void
+ {
+ /** @Given a repository with two translators both supporting order placed, and a matching serializer */
+ $repository = new DoctrineOutboxRepository(
+ connection: self::$connection,
+ translators: IntegrationEventTranslators::createFrom(elements: [
+ new OrderPlacedTranslator(),
+ new DuplicateOrderPlacedTranslator()
+ ]),
+ serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()])
+ );
+
+ /** @And the connection has an active transaction */
+ self::$connection->beginTransaction();
+
+ /** @When pushing an order placed record */
+ $repository->push(records: EventRecords::createFrom(elements: [
+ EventRecordFactory::create(
+ event: new OrderPlaced(),
+ aggregateType: 'Order',
+ eventTypeName: 'OrderPlaced'
+ )
+ ]));
+
+ /** @And the transaction is committed */
+ self::$connection->commit();
+
+ /** @Then the persisted event_type reflects the first translator's output */
+ self::assertSame('OrderShipped', self::$connection->fetchOne('SELECT event_type FROM outbox_events LIMIT 1'));
+ }
+
+ public function testPushWhenTranslatorMatchesButNoSerializerThenPayloadSerializerNotConfigured(): void
+ {
+ /** @Given a repository with a translator but no serializer that matches the produced integration event */
+ $repository = new DoctrineOutboxRepository(
+ connection: self::$connection,
+ translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]),
+ serializers: PayloadSerializers::createFromEmpty()
+ );
+
+ /** @And the connection has an active transaction */
+ self::$connection->beginTransaction();
+
+ /** @And a record to push */
+ $records = EventRecords::createFrom(elements: [
+ EventRecordFactory::create(
+ event: new OrderPlaced(),
+ aggregateType: 'Order',
+ eventTypeName: 'OrderPlaced'
+ )
+ ]);
+
+ /** @Then an exception indicating no serializer is configured is thrown */
+ $this->expectException(PayloadSerializerNotConfigured::class);
+ $this->expectExceptionMessage(
+ 'No payload serializer configured for event class .'
+ );
+
+ /** @When pushing the record */
+ $repository->push(records: $records);
+ }
}
diff --git a/tests/Integration/DuplicateOrderPlacedTranslator.php b/tests/Integration/DuplicateOrderPlacedTranslator.php
new file mode 100644
index 0000000..2fa1b2c
--- /dev/null
+++ b/tests/Integration/DuplicateOrderPlacedTranslator.php
@@ -0,0 +1,23 @@
+event instanceof OrderPlaced;
+ }
+
+ public function translate(EventRecord $record): IntegrationEvent
+ {
+ return new DuplicateOrderShipped();
+ }
+}
diff --git a/tests/Integration/DuplicateOrderShipped.php b/tests/Integration/DuplicateOrderShipped.php
new file mode 100644
index 0000000..3889d7d
--- /dev/null
+++ b/tests/Integration/DuplicateOrderShipped.php
@@ -0,0 +1,13 @@
+event instanceof OrderPlaced;
+ return $record->event instanceof OrderShipped;
}
- public function serialize(EventRecord $record): SerializedPayload
+ public function serialize(IntegrationEventRecord $record): SerializedPayload
{
return SerializedPayload::from(payload: '{"fallback":true}');
}
diff --git a/tests/Integration/RefundIssuedSerializer.php b/tests/Integration/RefundIssuedSerializer.php
index 8796ae2..6b3f7df 100644
--- a/tests/Integration/RefundIssuedSerializer.php
+++ b/tests/Integration/RefundIssuedSerializer.php
@@ -4,19 +4,19 @@
namespace Test\TinyBlocks\Outbox\Integration;
-use Test\TinyBlocks\Outbox\Models\RefundIssued;
-use TinyBlocks\BuildingBlocks\Event\EventRecord;
+use Test\TinyBlocks\Outbox\Models\RefundCompleted;
+use TinyBlocks\BuildingBlocks\Event\IntegrationEventRecord;
use TinyBlocks\Outbox\Serialization\PayloadSerializer;
use TinyBlocks\Outbox\Serialization\SerializedPayload;
final readonly class RefundIssuedSerializer implements PayloadSerializer
{
- public function supports(EventRecord $record): bool
+ public function supports(IntegrationEventRecord $record): bool
{
- return $record->event instanceof RefundIssued;
+ return $record->event instanceof RefundCompleted;
}
- public function serialize(EventRecord $record): SerializedPayload
+ public function serialize(IntegrationEventRecord $record): SerializedPayload
{
return SerializedPayload::from(payload: '{"type":"refund"}');
}
diff --git a/tests/Models/OrderPlacedTranslator.php b/tests/Models/OrderPlacedTranslator.php
new file mode 100644
index 0000000..03014e9
--- /dev/null
+++ b/tests/Models/OrderPlacedTranslator.php
@@ -0,0 +1,22 @@
+event instanceof OrderPlaced;
+ }
+
+ public function translate(EventRecord $record): IntegrationEvent
+ {
+ return new OrderShipped();
+ }
+}
diff --git a/tests/Models/OrderShipped.php b/tests/Models/OrderShipped.php
new file mode 100644
index 0000000..9b0349f
--- /dev/null
+++ b/tests/Models/OrderShipped.php
@@ -0,0 +1,13 @@
+event instanceof RefundIssued;
+ }
+
+ public function translate(EventRecord $record): IntegrationEvent
+ {
+ return new RefundCompleted();
+ }
+}
diff --git a/tests/Unit/InMemoryOutboxRepositoryMock.php b/tests/Unit/InMemoryOutboxRepositoryMock.php
index c45bf44..f0d391c 100644
--- a/tests/Unit/InMemoryOutboxRepositoryMock.php
+++ b/tests/Unit/InMemoryOutboxRepositoryMock.php
@@ -7,6 +7,8 @@
use Doctrine\DBAL\Exception\UniqueConstraintViolationException;
use TinyBlocks\BuildingBlocks\Event\EventRecord;
use TinyBlocks\BuildingBlocks\Event\EventRecords;
+use TinyBlocks\BuildingBlocks\Event\IntegrationEventRecord;
+use TinyBlocks\BuildingBlocks\Event\IntegrationEventTranslators;
use TinyBlocks\Outbox\Exceptions\DuplicateAggregateVersion;
use TinyBlocks\Outbox\Exceptions\DuplicateOutboxEvent;
use TinyBlocks\Outbox\Exceptions\OutboxRequiresActiveTransaction;
@@ -20,8 +22,10 @@ final class InMemoryOutboxRepositoryMock implements OutboxRepository
private bool $transactionActive = false;
private array $aggregateVersions = [];
- public function __construct(private readonly PayloadSerializers $payloadSerializers)
- {
+ public function __construct(
+ private readonly IntegrationEventTranslators $translators,
+ private readonly PayloadSerializers $payloadSerializers
+ ) {
}
public function beginTransaction(): void
@@ -52,36 +56,49 @@ public function push(EventRecords $records): void
throw OutboxRequiresActiveTransaction::asMissing();
}
- $records->each(actions: function (EventRecord $record): void {
- $payloadSerializer = $this->payloadSerializers->findFor(record: $record);
+ $records->each(actions: function (EventRecord $eventRecord): void {
+ $translator = $this->translators->findFor(record: $eventRecord);
+
+ if (is_null($translator)) {
+ return;
+ }
+
+ $integrationEventRecord = IntegrationEventRecord::from(
+ eventRecord: $eventRecord,
+ integrationEvent: $translator->translate(record: $eventRecord)
+ );
+
+ $payloadSerializer = $this->payloadSerializers->findFor(record: $integrationEventRecord);
if (is_null($payloadSerializer)) {
- throw PayloadSerializerNotConfigured::forEventClass(eventClass: $record->event::class);
+ throw PayloadSerializerNotConfigured::forEventClass(
+ eventClass: $integrationEventRecord->event::class
+ );
}
- $payloadSerializer->serialize(record: $record);
+ $payloadSerializer->serialize(record: $integrationEventRecord);
$aggregateKey = sprintf(
'%s|%s|%d',
- $record->aggregateType,
- $record->aggregateId->identityValue(),
- $record->aggregateVersion->value
+ $integrationEventRecord->aggregateType,
+ $integrationEventRecord->aggregateId->identityValue(),
+ $integrationEventRecord->aggregateVersion->value
);
if (isset($this->aggregateVersions[$aggregateKey])) {
throw DuplicateAggregateVersion::forRecord(
previous: null,
- aggregateId: $record->aggregateId->identityValue(),
- aggregateType: $record->aggregateType,
- aggregateVersion: $record->aggregateVersion->value
+ aggregateId: $integrationEventRecord->aggregateId->identityValue(),
+ aggregateType: $integrationEventRecord->aggregateType,
+ aggregateVersion: $integrationEventRecord->aggregateVersion->value
);
}
- $eventId = (string)$record->id;
+ $eventId = (string)$integrationEventRecord->id;
if (isset($this->records[$eventId])) {
throw DuplicateOutboxEvent::forRecord(
- eventId: $record->id,
+ eventId: $integrationEventRecord->id,
previous: new UniqueConstraintViolationException(
new DriverExceptionStub('Duplicate entry for key PRIMARY'),
null
@@ -90,7 +107,7 @@ public function push(EventRecords $records): void
}
$this->aggregateVersions[$aggregateKey] = true;
- $this->records[$eventId] = $record;
+ $this->records[$eventId] = $integrationEventRecord;
});
}
}
diff --git a/tests/Unit/InMemoryOutboxRepositoryTest.php b/tests/Unit/InMemoryOutboxRepositoryTest.php
index 442ad23..0556eba 100644
--- a/tests/Unit/InMemoryOutboxRepositoryTest.php
+++ b/tests/Unit/InMemoryOutboxRepositoryTest.php
@@ -9,8 +9,10 @@
use Test\TinyBlocks\Outbox\Models\EventRecordFactory;
use Test\TinyBlocks\Outbox\Models\Order;
use Test\TinyBlocks\Outbox\Models\OrderPlaced;
+use Test\TinyBlocks\Outbox\Models\OrderPlacedTranslator;
use TinyBlocks\BuildingBlocks\Aggregate\AggregateVersion;
use TinyBlocks\BuildingBlocks\Event\EventRecords;
+use TinyBlocks\BuildingBlocks\Event\IntegrationEventTranslators;
use TinyBlocks\Outbox\Exceptions\DuplicateAggregateVersion;
use TinyBlocks\Outbox\Exceptions\DuplicateOutboxEvent;
use TinyBlocks\Outbox\Exceptions\InvalidPayloadJson;
@@ -23,8 +25,9 @@ final class InMemoryOutboxRepositoryTest extends TestCase
{
public function testPushWhenSingleRecordThenItIsPersisted(): void
{
- /** @Given an in-memory repository with configured serializers */
+ /** @Given an in-memory repository with a configured translator and serializer */
$outbox = new InMemoryOutboxRepositoryMock(
+ translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]),
payloadSerializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()])
);
@@ -49,8 +52,9 @@ public function testPushWhenSingleRecordThenItIsPersisted(): void
public function testPushWhenMultipleRecordsThenAllArePersistedInOrder(): void
{
- /** @Given an in-memory repository with configured serializers */
+ /** @Given an in-memory repository with a configured translator and serializer */
$outbox = new InMemoryOutboxRepositoryMock(
+ translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]),
payloadSerializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()])
);
@@ -84,6 +88,7 @@ public function testPushWhenNoTransactionIsActiveThenOutboxRequiresActiveTransac
{
/** @Given an in-memory repository without an active transaction */
$outbox = new InMemoryOutboxRepositoryMock(
+ translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]),
payloadSerializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()])
);
@@ -102,8 +107,9 @@ public function testPushWhenNoTransactionIsActiveThenOutboxRequiresActiveTransac
public function testPushWhenNoPayloadSerializerMatchesThenPayloadSerializerNotConfigured(): void
{
- /** @Given an in-memory repository with no payload serializers configured */
+ /** @Given an in-memory repository with a translator but no payload serializers */
$outbox = new InMemoryOutboxRepositoryMock(
+ translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]),
payloadSerializers: PayloadSerializers::createFromEmpty()
);
@@ -113,7 +119,7 @@ public function testPushWhenNoPayloadSerializerMatchesThenPayloadSerializerNotCo
/** @Then an exception indicating no configured payload serializer is thrown */
$this->expectException(PayloadSerializerNotConfigured::class);
- /** @When pushing a record whose event type has no matching serializer */
+ /** @When pushing a record whose integration event class has no matching serializer */
$outbox->push(records: EventRecords::createFrom(elements: [
EventRecordFactory::create(
event: new OrderPlaced(),
@@ -125,8 +131,9 @@ public function testPushWhenNoPayloadSerializerMatchesThenPayloadSerializerNotCo
public function testPushWhenPayloadSerializerReturnsInvalidJsonThenInvalidPayloadJson(): void
{
- /** @Given an in-memory repository with a serializer that produces invalid JSON */
+ /** @Given an in-memory repository with a translator and a serializer that produces invalid JSON */
$outbox = new InMemoryOutboxRepositoryMock(
+ translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]),
payloadSerializers: PayloadSerializers::createFrom(elements: [new InvalidPayloadSerializer()])
);
@@ -148,8 +155,9 @@ public function testPushWhenPayloadSerializerReturnsInvalidJsonThenInvalidPayloa
public function testPushWhenTwoRecordsShareTheSameIdThenDuplicateOutboxEvent(): void
{
- /** @Given an in-memory repository with configured serializers */
+ /** @Given an in-memory repository with a configured translator and serializer */
$outbox = new InMemoryOutboxRepositoryMock(
+ translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]),
payloadSerializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()])
);
@@ -186,8 +194,9 @@ public function testPushWhenTwoRecordsShareTheSameIdThenDuplicateOutboxEvent():
public function testPushWhenTwoRecordsShareTheSameAggregateVersionThenDuplicateAggregateVersion(): void
{
- /** @Given an in-memory repository with configured serializers */
+ /** @Given an in-memory repository with a configured translator and serializer */
$outbox = new InMemoryOutboxRepositoryMock(
+ translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]),
payloadSerializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()])
);
@@ -225,8 +234,9 @@ public function testPushWhenTwoRecordsShareTheSameAggregateVersionThenDuplicateA
public function testPushWhenEventRecordsIsEmptyThenNoRecordIsPersisted(): void
{
- /** @Given an in-memory repository with configured serializers */
+ /** @Given an in-memory repository with a configured translator and serializer */
$outbox = new InMemoryOutboxRepositoryMock(
+ translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]),
payloadSerializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()])
);
@@ -245,8 +255,9 @@ public function testPushWhenEventRecordsIsEmptyThenNoRecordIsPersisted(): void
public function testPushWhenRealEventualAggregateRootThenEventRecordIsPersisted(): void
{
- /** @Given an in-memory repository with a reflection payload serializer */
+ /** @Given an in-memory repository with a translator and a reflection payload serializer */
$outbox = new InMemoryOutboxRepositoryMock(
+ translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]),
payloadSerializers: PayloadSerializers::createFrom(elements: [new PayloadSerializerReflection()])
);
@@ -262,4 +273,31 @@ public function testPushWhenRealEventualAggregateRootThenEventRecordIsPersisted(
/** @Then one event record is persisted in the repository */
self::assertCount(1, $outbox->persistedRecords());
}
+
+ public function testPushWhenNoTranslatorMatchesThenRecordIsSilentlySkipped(): void
+ {
+ /** @Given an in-memory repository with no translators registered */
+ $outbox = new InMemoryOutboxRepositoryMock(
+ translators: IntegrationEventTranslators::createFromEmpty(),
+ payloadSerializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()])
+ );
+
+ /** @And a transaction is started */
+ $outbox->beginTransaction();
+
+ /** @When a record whose event has no matching translator is pushed */
+ $outbox->push(records: EventRecords::createFrom(elements: [
+ EventRecordFactory::create(
+ event: new OrderPlaced(),
+ aggregateType: 'Order',
+ eventTypeName: 'OrderPlaced'
+ )
+ ]));
+
+ /** @And the transaction is committed */
+ $outbox->commit();
+
+ /** @Then no records are persisted and no exception is raised */
+ self::assertCount(0, $outbox->persistedRecords());
+ }
}
diff --git a/tests/Unit/InvalidPayloadSerializer.php b/tests/Unit/InvalidPayloadSerializer.php
index ac6ac21..d1a3e49 100644
--- a/tests/Unit/InvalidPayloadSerializer.php
+++ b/tests/Unit/InvalidPayloadSerializer.php
@@ -4,19 +4,19 @@
namespace Test\TinyBlocks\Outbox\Unit;
-use Test\TinyBlocks\Outbox\Models\OrderPlaced;
-use TinyBlocks\BuildingBlocks\Event\EventRecord;
+use Test\TinyBlocks\Outbox\Models\OrderShipped;
+use TinyBlocks\BuildingBlocks\Event\IntegrationEventRecord;
use TinyBlocks\Outbox\Serialization\PayloadSerializer;
use TinyBlocks\Outbox\Serialization\SerializedPayload;
final readonly class InvalidPayloadSerializer implements PayloadSerializer
{
- public function supports(EventRecord $record): bool
+ public function supports(IntegrationEventRecord $record): bool
{
- return $record->event instanceof OrderPlaced;
+ return $record->event instanceof OrderShipped;
}
- public function serialize(EventRecord $record): SerializedPayload
+ public function serialize(IntegrationEventRecord $record): SerializedPayload
{
return SerializedPayload::from(payload: 'not json');
}
diff --git a/tests/Unit/OrderPlacedSerializer.php b/tests/Unit/OrderPlacedSerializer.php
index 9cc81a3..6427199 100644
--- a/tests/Unit/OrderPlacedSerializer.php
+++ b/tests/Unit/OrderPlacedSerializer.php
@@ -4,19 +4,19 @@
namespace Test\TinyBlocks\Outbox\Unit;
-use Test\TinyBlocks\Outbox\Models\OrderPlaced;
-use TinyBlocks\BuildingBlocks\Event\EventRecord;
+use Test\TinyBlocks\Outbox\Models\OrderShipped;
+use TinyBlocks\BuildingBlocks\Event\IntegrationEventRecord;
use TinyBlocks\Outbox\Serialization\PayloadSerializer;
use TinyBlocks\Outbox\Serialization\SerializedPayload;
final readonly class OrderPlacedSerializer implements PayloadSerializer
{
- public function supports(EventRecord $record): bool
+ public function supports(IntegrationEventRecord $record): bool
{
- return $record->event instanceof OrderPlaced;
+ return $record->event instanceof OrderShipped;
}
- public function serialize(EventRecord $record): SerializedPayload
+ public function serialize(IntegrationEventRecord $record): SerializedPayload
{
return SerializedPayload::from(payload: '{}');
}