diff --git a/README.md b/README.md index cb6b146..314e322 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,7 @@ + [Expected table schema](#expected-table-schema) + [Wiring the repository](#wiring-the-repository) + [Producing events from an aggregate](#producing-events-from-an-aggregate) + + [Declaring integration events and translators](#declaring-integration-events-and-translators) + [Customizing the table layout](#customizing-the-table-layout) + [Writing a custom payload serializer](#writing-a-custom-payload-serializer) + [Event schema versioning](#event-schema-versioning) @@ -18,16 +19,21 @@ ## Overview The **Transactional Outbox** pattern solves the dual-write problem: persisting an aggregate state change and publishing -a domain event must happen atomically. Doing both independently risks a crash leaving one side committed and the other -lost. The outbox pattern records both in the same database transaction, delegating event delivery to a separate relay -process. +an integration event must happen atomically. Doing both independently risks a crash leaving one side committed and the +other lost. The outbox pattern records both in the same database transaction, delegating event delivery to a separate +relay process. -This library is the write-side adapter. It persists outbox records via Doctrine DBAL and is opinionated on correctness. -Transactions are always required and JSON validity is always checked, while leaving every schema decision to you: table -name, column names, and identity column storage type are all configurable. +This library is the write-side adapter. It persists integration events atomically with aggregate state via Doctrine +DBAL. The pipeline is: `DomainEvent → IntegrationEventTranslator → IntegrationEvent → PayloadSerializer → INSERT`. +Domain events without a registered translator are silently skipped: their absence from `IntegrationEventTranslators` +is the canonical declaration that the event is internal to the bounded context and must not cross its boundary. + +The library is opinionated on correctness: transactions are always required, JSON validity is always checked, and every +schema decision is left to you: table name, column names, and identity column storage type are all configurable. The library composes with [`tiny-blocks/building-blocks`](https://github.com/tiny-blocks/building-blocks), which -contributes `DomainEvent`, `DomainEventBehavior`, `EventRecord`, `EventRecords`, `EventType`, `Revision`, +contributes `DomainEvent`, `EventRecord`, `EventRecords`, `IntegrationEvent`, `IntegrationEventBehavior`, +`IntegrationEventRecord`, `IntegrationEventTranslator`, `IntegrationEventTranslators`, `EventType`, `Revision`, `AggregateVersion`, and the `EventualAggregateRoot` family. This library provides the persistence step only. ## Installation @@ -49,8 +55,8 @@ CREATE TABLE outbox_events ( id BINARY(16) NOT NULL COMMENT 'The event identifier in Version 4 UUID format (e.g. 123e4567-e89b-12d3-a456-426614174000).', payload JSON NOT NULL COMMENT 'The event payload serialized as a JSON object (e.g. {"transaction_id":"..."}).', - revision INT NOT NULL COMMENT 'The positive integer indicating the payload schema revision of the event (e.g. 1).', - event_type VARCHAR(255) NOT NULL COMMENT 'The event class name in CamelCase (e.g. TransactionConfirmed).', + revision INT NOT NULL COMMENT 'The positive integer indicating the payload schema revision of the integration event (e.g. 1).', + event_type VARCHAR(255) NOT NULL COMMENT 'The integration event class name in CamelCase (e.g. PaymentConfirmed). Reflects the public contract, not the internal domain event.', occurred_at TIMESTAMP(6) NOT NULL COMMENT 'The UTC date and time when the event occurred in ISO 8601 format (e.g. 2026-02-13T08:49:44.931408+00:00).', aggregate_id BINARY(16) NOT NULL COMMENT 'The aggregate root identifier in Version 4 UUID format (e.g. 123e4567-e89b-12d3-a456-426614174000).', aggregate_type VARCHAR(255) NOT NULL COMMENT 'The aggregate root class name that produced the event in CamelCase (e.g. Transaction).', @@ -81,41 +87,47 @@ CREATE TABLE outbox_events ### Wiring the repository -`DoctrineOutboxRepository` requires a Doctrine DBAL `Connection` and a `PayloadSerializers` collection. The table layout -defaults to table `outbox_events` with BINARY(16) identity columns. +`DoctrineOutboxRepository` requires a Doctrine DBAL `Connection`, an `IntegrationEventTranslators` collection, and a +`PayloadSerializers` collection. The table layout defaults to table `outbox_events` with BINARY(16) identity columns. ```php event instanceof OrderPlaced; + } + + public function translate(EventRecord $record): IntegrationEvent + { + return new OrderShipped(orderId: $record->event->orderId); + } +} +``` + +#### Registering translators + +Pass an `IntegrationEventTranslators` collection to the repository constructor. Lookup follows first-match-wins +semantics: the first translator whose `supports()` returns `true` is used. + +```php +event`, -`$record->aggregateType`, `$record->aggregateId`, `$record->aggregateVersion`, and all other fields when routing or -shaping the payload. +Both `supports()` and `serialize()` receive the full `IntegrationEventRecord`, giving access to `$record->event` +(the `IntegrationEvent`), `$record->aggregateType`, `$record->aggregateId`, `$record->aggregateVersion`, and all +other envelope fields when routing or shaping the payload. -Use `match (true)` in `serialize()` to handle multiple event types from the same aggregate in a single serializer: +Use `match (true)` in `serialize()` to handle multiple integration event types from the same aggregate in a single +serializer: ```php event instanceof OrderPlaced || $record->event instanceof OrderShipped; + return $record->event instanceof OrderShipped || $record->event instanceof OrderCanceled; } - public function serialize(EventRecord $record): SerializedPayload + public function serialize(IntegrationEventRecord $record): SerializedPayload { return match (true) { - $record->event instanceof OrderPlaced => SerializedPayload::from( + $record->event instanceof OrderShipped => SerializedPayload::from( payload: json_encode(['orderId' => $record->event->orderId], JSON_THROW_ON_ERROR) ), - $record->event instanceof OrderShipped => SerializedPayload::from( + $record->event instanceof OrderCanceled => SerializedPayload::from( payload: json_encode( - ['orderId' => $record->event->orderId, 'shippedAt' => $record->event->shippedAt], + ['orderId' => $record->event->orderId, 'reason' => $record->event->reason], JSON_THROW_ON_ERROR ) ) @@ -268,7 +408,11 @@ final readonly class OrderEventSerializer implements PayloadSerializer # PayloadSerializerReflection always returns true from supports(), so it must come last. $repository = new DoctrineOutboxRepository( connection: $connection, - serializers: serializers::createFrom(elements: [ + translators: IntegrationEventTranslators::createFrom(elements: [ + new OrderPlacedTranslator(), + new OrderCanceledTranslator() + ]), + serializers: PayloadSerializers::createFrom(elements: [ new OrderEventSerializer(), new PayloadSerializerReflection() ]) @@ -282,24 +426,26 @@ library handles encoding internally. ### Event schema versioning -Each domain event declares its schema revision via `DomainEvent::revision()`. `DomainEventBehavior` provides the -default implementation, returning revision 1. Override `revision()` when the event's payload structure changes: +Each integration event declares its schema revision via `IntegrationEvent::revision()`. `IntegrationEventBehavior` +provides the default implementation, returning revision 1. Override `revision()` when the integration event's payload +structure changes in a backward-incompatible way: ```php aggregateType === 'Order'`), or the payload shaping may vary -based on the aggregate version (`$record->aggregateVersion`). Receiving the full `EventRecord` in both `supports()` and -`serialize()` gives serializers access to all available context without requiring any additional indirection. +Routing and shaping decisions often depend on context beyond the event payload itself. For example, a single serializer +may handle integration events from a specific aggregate type (`$record->aggregateType === 'Order'`), or the payload +shaping may vary based on the aggregate version (`$record->aggregateVersion`). Receiving the full +`IntegrationEventRecord` in both `supports()` and `serialize()` gives serializers access to all available envelope +context without requiring any additional indirection. ### 09. How does the library handle transient database errors? @@ -381,6 +529,23 @@ caller. The consumer is responsible for any retry policy. A common pattern is to wrap the unit of work (aggregate save + outbox push) in a retry loop that catches transient exceptions and re-executes the entire transaction. +### 10. Why are domain events without a translator silently skipped instead of failing loudly? + +The absence of a translator is the canonical declaration that the event is internal to the bounded context. Failing +loudly would force every internal domain event to either register a no-op translator or be filtered before the call to +`push()`, both of which leak the public-contract concern into code that should not know about it. Silent skip keeps the +boundary one-sided: registering a translator is the explicit opt-in for cross-context publication. Code that does not +register a translator is correct by design. + +### 11. Why doesn't the library accept a DomainEvent directly for publication? + +The Anti-Corruption Layer rationale established in `tiny-blocks/building-blocks` applies here (see Vaughn Vernon, +*Implementing Domain-Driven Design*, Addison-Wesley, 2013, Chapter 3, "Context Maps"). The public contract must evolve +independently of the internal model. Accepting a domain event directly reintroduces the coupling that the integration +event abstraction exists to eliminate: every change to the domain event's shape would affect external consumers. The +cost of writing a translator is small and the gain in contract stability is the entire reason for the design. See +the `IntegrationEventTranslator` documentation in `tiny-blocks/building-blocks` for the full rationale. + ## License Outbox is licensed under [MIT](LICENSE). diff --git a/composer.json b/composer.json index 50f24b5..a06f54d 100644 --- a/composer.json +++ b/composer.json @@ -13,6 +13,7 @@ "event-sourcing", "event-versioning", "event-persistence", + "integration-events", "event-serialization", "transactional-outbox" ], @@ -31,7 +32,7 @@ "php": "^8.5", "doctrine/dbal": "^4.4", "ramsey/uuid": "^4.9", - "tiny-blocks/building-blocks": "^3.0", + "tiny-blocks/building-blocks": "^4.0", "tiny-blocks/collection": "^2.3" }, "require-dev": { diff --git a/src/DoctrineOutboxRepository.php b/src/DoctrineOutboxRepository.php index 7f1e7fa..ab5dac2 100644 --- a/src/DoctrineOutboxRepository.php +++ b/src/DoctrineOutboxRepository.php @@ -8,6 +8,8 @@ use Doctrine\DBAL\Exception\UniqueConstraintViolationException; use TinyBlocks\BuildingBlocks\Event\EventRecord; use TinyBlocks\BuildingBlocks\Event\EventRecords; +use TinyBlocks\BuildingBlocks\Event\IntegrationEventRecord; +use TinyBlocks\BuildingBlocks\Event\IntegrationEventTranslators; use TinyBlocks\Outbox\Exceptions\DuplicateAggregateVersion; use TinyBlocks\Outbox\Exceptions\DuplicateOutboxEvent; use TinyBlocks\Outbox\Exceptions\OutboxRequiresActiveTransaction; @@ -22,6 +24,7 @@ public function __construct( private Connection $connection, + private IntegrationEventTranslators $translators, private PayloadSerializers $serializers, ?TableLayout $tableLayout = null ) { @@ -34,16 +37,29 @@ public function push(EventRecords $records): void throw OutboxRequiresActiveTransaction::asMissing(); } - $records->each(actions: function (EventRecord $record): void { - $payloadSerializer = $this->serializers->findFor(record: $record); + $records->each(actions: function (EventRecord $eventRecord): void { + $translator = $this->translators->findFor(record: $eventRecord); + + if (is_null($translator)) { + return; + } + + $integrationEventRecord = IntegrationEventRecord::from( + eventRecord: $eventRecord, + integrationEvent: $translator->translate(record: $eventRecord) + ); + + $payloadSerializer = $this->serializers->findFor(record: $integrationEventRecord); if (is_null($payloadSerializer)) { - throw PayloadSerializerNotConfigured::forEventClass(eventClass: $record->event::class); + throw PayloadSerializerNotConfigured::forEventClass( + eventClass: $integrationEventRecord->event::class + ); } $insert = OutboxInsert::from( - record: $record, - payload: $payloadSerializer->serialize(record: $record), + record: $integrationEventRecord, + payload: $payloadSerializer->serialize(record: $integrationEventRecord), tableLayout: $this->tableLayout ); @@ -53,13 +69,16 @@ public function push(EventRecords $records): void if ($this->tableLayout->uniqueConstraint->isViolatedBy(exception: $exception)) { throw DuplicateAggregateVersion::forRecord( previous: $exception, - aggregateId: $record->aggregateId->identityValue(), - aggregateType: $record->aggregateType, - aggregateVersion: $record->aggregateVersion->value + aggregateId: $integrationEventRecord->aggregateId->identityValue(), + aggregateType: $integrationEventRecord->aggregateType, + aggregateVersion: $integrationEventRecord->aggregateVersion->value ); } - throw DuplicateOutboxEvent::forRecord(eventId: $record->id, previous: $exception); + throw DuplicateOutboxEvent::forRecord( + eventId: $integrationEventRecord->id, + previous: $exception + ); } }); } diff --git a/src/Internal/OutboxInsert.php b/src/Internal/OutboxInsert.php index aadc1b2..7864542 100644 --- a/src/Internal/OutboxInsert.php +++ b/src/Internal/OutboxInsert.php @@ -4,7 +4,7 @@ namespace TinyBlocks\Outbox\Internal; -use TinyBlocks\BuildingBlocks\Event\EventRecord; +use TinyBlocks\BuildingBlocks\Event\IntegrationEventRecord; use TinyBlocks\Outbox\Schema\TableLayout; use TinyBlocks\Outbox\Serialization\SerializedPayload; @@ -15,7 +15,7 @@ private function __construct(public string $sql, public array $parameters) } public static function from( - EventRecord $record, + IntegrationEventRecord $record, SerializedPayload $payload, TableLayout $tableLayout ): OutboxInsert { diff --git a/src/OutboxRepository.php b/src/OutboxRepository.php index 1bc9160..de09e2b 100644 --- a/src/OutboxRepository.php +++ b/src/OutboxRepository.php @@ -5,11 +5,14 @@ namespace TinyBlocks\Outbox; use TinyBlocks\BuildingBlocks\Event\EventRecords; +use TinyBlocks\BuildingBlocks\Event\IntegrationEvent; +use TinyBlocks\BuildingBlocks\Event\IntegrationEventTranslator; use TinyBlocks\Outbox\Exceptions\DuplicateAggregateVersion; use TinyBlocks\Outbox\Exceptions\DuplicateOutboxEvent; use TinyBlocks\Outbox\Exceptions\InvalidPayloadJson; use TinyBlocks\Outbox\Exceptions\OutboxRequiresActiveTransaction; use TinyBlocks\Outbox\Exceptions\PayloadSerializerNotConfigured; +use TinyBlocks\Outbox\Serialization\PayloadSerializer; /** * Producer-side contract: persists outbox records as part of the caller's open transaction. @@ -23,14 +26,28 @@ interface OutboxRepository /** * Persists the given records as part of the caller's open transaction. * - *

The implementation must not open or commit a transaction. It is the caller's responsibility - * to ensure this call happens inside the same unit of work as the aggregate state change.

+ *

The input carries domain events from the aggregate's recorded-events buffer. + * The implementation filters each record through the registered + * {@see IntegrationEventTranslator} collection: domain events without a matching + * translator are silently skipped, because the absence of a translator is the canonical + * declaration that the event is internal to the bounded context and must not cross its + * boundary.

+ * + *

Matched domain events are translated into {@see IntegrationEvent} envelopes via + * the Anti-Corruption Layer and only then serialized and persisted. The + * {@see PayloadSerializer} operates on the integration event record, never on the + * domain event directly.

+ * + *

The implementation must not open or commit a transaction. It is the caller's + * responsibility to ensure this call happens inside the same unit of work as the + * aggregate state change.

* * @param EventRecords $records The records to persist. * @throws InvalidPayloadJson When a serializer produces an invalid JSON payload. * @throws DuplicateOutboxEvent When a record with a duplicate id already exists in the outbox. * @throws DuplicateAggregateVersion When two records share the same aggregate type, id, and aggregate version. - * @throws PayloadSerializerNotConfigured When no serializer supports the event class. + * @throws PayloadSerializerNotConfigured When a translator matched a domain event but no serializer supports the + * produced integration event. * @throws OutboxRequiresActiveTransaction When called outside an active transaction. */ public function push(EventRecords $records): void; diff --git a/src/Serialization/PayloadSerializer.php b/src/Serialization/PayloadSerializer.php index f4662f1..f3348b2 100644 --- a/src/Serialization/PayloadSerializer.php +++ b/src/Serialization/PayloadSerializer.php @@ -4,23 +4,23 @@ namespace TinyBlocks\Outbox\Serialization; -use TinyBlocks\BuildingBlocks\Event\EventRecord; +use TinyBlocks\BuildingBlocks\Event\IntegrationEventRecord; interface PayloadSerializer { /** - * Tells whether this serializer handles the event in the given record. + * Tells whether this serializer handles the integration event in the given record. * - * @param EventRecord $record The record being serialized. - * @return bool True if this serializer can produce the payload for the event. + * @param IntegrationEventRecord $record The record being serialized. + * @return bool True if this serializer can produce the payload for the integration event. */ - public function supports(EventRecord $record): bool; + public function supports(IntegrationEventRecord $record): bool; /** - * Produces the persistent payload for the event in the record. + * Produces the persistent payload for the integration event in the record. * - * @param EventRecord $record The record being serialized. + * @param IntegrationEventRecord $record The record being serialized. * @return SerializedPayload The serialized payload. */ - public function serialize(EventRecord $record): SerializedPayload; + public function serialize(IntegrationEventRecord $record): SerializedPayload; } diff --git a/src/Serialization/PayloadSerializerReflection.php b/src/Serialization/PayloadSerializerReflection.php index 3ad1a87..12fa155 100644 --- a/src/Serialization/PayloadSerializerReflection.php +++ b/src/Serialization/PayloadSerializerReflection.php @@ -4,16 +4,16 @@ namespace TinyBlocks\Outbox\Serialization; -use TinyBlocks\BuildingBlocks\Event\EventRecord; +use TinyBlocks\BuildingBlocks\Event\IntegrationEventRecord; final readonly class PayloadSerializerReflection implements PayloadSerializer { - public function supports(EventRecord $record): bool + public function supports(IntegrationEventRecord $record): bool { return true; } - public function serialize(EventRecord $record): SerializedPayload + public function serialize(IntegrationEventRecord $record): SerializedPayload { return SerializedPayload::fromArray(payload: get_object_vars($record->event)); } diff --git a/src/Serialization/PayloadSerializers.php b/src/Serialization/PayloadSerializers.php index ba6e962..b93964f 100644 --- a/src/Serialization/PayloadSerializers.php +++ b/src/Serialization/PayloadSerializers.php @@ -4,7 +4,7 @@ namespace TinyBlocks\Outbox\Serialization; -use TinyBlocks\BuildingBlocks\Event\EventRecord; +use TinyBlocks\BuildingBlocks\Event\IntegrationEventRecord; use TinyBlocks\Collection\Collection; final class PayloadSerializers extends Collection @@ -12,10 +12,10 @@ final class PayloadSerializers extends Collection /** * Returns the first payload serializer that supports the given record, or null when none matches. * - * @param EventRecord $record The record whose payload serializer is being resolved. + * @param IntegrationEventRecord $record The record whose payload serializer is being resolved. * @return PayloadSerializer|null The matching serializer, or null when no element supports the record. */ - public function findFor(EventRecord $record): ?PayloadSerializer + public function findFor(IntegrationEventRecord $record): ?PayloadSerializer { $serializer = $this->findBy( predicates: static fn(PayloadSerializer $serializer): bool => $serializer->supports(record: $record) diff --git a/src/Serialization/SerializedPayload.php b/src/Serialization/SerializedPayload.php index b5211f9..bfaaeff 100644 --- a/src/Serialization/SerializedPayload.php +++ b/src/Serialization/SerializedPayload.php @@ -12,19 +12,6 @@ private function __construct(private string $payload) { } - /** - * Creates a SerializedPayload from an associative array, encoding it as JSON. - * - * @param array $payload The associative array to encode as the serialized payload. - * @return SerializedPayload The serialized payload with the JSON-encoded representation. - */ - public static function fromArray(array $payload): SerializedPayload - { - $json = json_encode($payload, JSON_THROW_ON_ERROR); - - return new SerializedPayload(payload: $json); - } - /** * Creates a SerializedPayload from a raw JSON string. * @@ -41,6 +28,19 @@ public static function from(string $payload): SerializedPayload return new SerializedPayload(payload: $payload); } + /** + * Creates a SerializedPayload from an associative array, encoding it as JSON. + * + * @param array $payload The associative array to encode as the serialized payload. + * @return SerializedPayload The serialized payload with the JSON-encoded representation. + */ + public static function fromArray(array $payload): SerializedPayload + { + $json = json_encode($payload, JSON_THROW_ON_ERROR); + + return new SerializedPayload(payload: $json); + } + /** * Returns the SerializedPayload as its JSON string representation. * diff --git a/tests/Integration/DoctrineOutboxRepositoryTest.php b/tests/Integration/DoctrineOutboxRepositoryTest.php index 1c61b13..832d7dc 100644 --- a/tests/Integration/DoctrineOutboxRepositoryTest.php +++ b/tests/Integration/DoctrineOutboxRepositoryTest.php @@ -10,12 +10,15 @@ use Test\TinyBlocks\Outbox\Models\EventRecordFactory; use Test\TinyBlocks\Outbox\Models\Order; use Test\TinyBlocks\Outbox\Models\OrderPlaced; +use Test\TinyBlocks\Outbox\Models\OrderPlacedTranslator; use Test\TinyBlocks\Outbox\Models\RefundIssued; +use Test\TinyBlocks\Outbox\Models\RefundIssuedTranslator; use Test\TinyBlocks\Outbox\Unit\DriverExceptionStub; use Test\TinyBlocks\Outbox\Unit\InvalidPayloadSerializer; use Test\TinyBlocks\Outbox\Unit\OrderPlacedSerializer; use TinyBlocks\BuildingBlocks\Aggregate\AggregateVersion; use TinyBlocks\BuildingBlocks\Event\EventRecords; +use TinyBlocks\BuildingBlocks\Event\IntegrationEventTranslators; use TinyBlocks\BuildingBlocks\Event\Revision; use TinyBlocks\Outbox\DoctrineOutboxRepository; use TinyBlocks\Outbox\Exceptions\DuplicateAggregateVersion; @@ -46,6 +49,7 @@ public function testPushWhenNoTransactionThenOutboxRequiresActiveTransaction(): /** @Given a repository */ $repository = new DoctrineOutboxRepository( connection: self::$connection, + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]) ); @@ -68,9 +72,10 @@ public function testPushWhenNoTransactionThenOutboxRequiresActiveTransaction(): public function testPushWhenMultipleSerializersAndFirstMatchesThenFirstIsUsed(): void { - /** @Given a repository with two serializers supporting the same event */ + /** @Given a repository with a translator and two serializers supporting the same integration event */ $repository = new DoctrineOutboxRepository( connection: self::$connection, + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), serializers: PayloadSerializers::createFrom(elements: [ new OrderPlacedSerializer(), new FallbackOrderPlacedSerializer() @@ -98,9 +103,10 @@ public function testPushWhenMultipleSerializersAndFirstMatchesThenFirstIsUsed(): public function testPushWhenReflectionPayloadSerializerThenEventPropertiesAreEncoded(): void { - /** @Given a repository using ReflectionPayloadSerializer as the only serializer */ + /** @Given a repository with a translator and a reflection payload serializer */ $repository = new DoctrineOutboxRepository( connection: self::$connection, + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), serializers: PayloadSerializers::createFrom(elements: [new PayloadSerializerReflection()]) ); @@ -119,7 +125,7 @@ public function testPushWhenReflectionPayloadSerializerThenEventPropertiesAreEnc /** @And the transaction is committed */ self::$connection->commit(); - /** @Then the payload reflects the event's public properties */ + /** @Then the payload reflects the integration event's public properties */ self::assertSame('[]', self::$connection->fetchOne('SELECT payload FROM outbox_events LIMIT 1')); } @@ -128,6 +134,7 @@ public function testPushWhenCallerRollsBackThenNoRecordPersisted(): void /** @Given a repository */ $repository = new DoctrineOutboxRepository( connection: self::$connection, + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]) ); @@ -155,6 +162,7 @@ public function testPushWhenTwoRecordsThenBothPersisted(): void /** @Given a repository */ $repository = new DoctrineOutboxRepository( connection: self::$connection, + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]) ); @@ -190,6 +198,7 @@ public function testPushWhenUuidWithNullBytesThenBytesPreservedInStorage(): void /** @And a repository */ $repository = new DoctrineOutboxRepository( connection: self::$connection, + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]) ); @@ -216,9 +225,10 @@ public function testPushWhenUuidWithNullBytesThenBytesPreservedInStorage(): void public function testPushWhenSerializerReturnsInvalidJsonThenInvalidPayloadJson(): void { - /** @Given a repository with a serializer that produces invalid JSON */ + /** @Given a repository with a translator and a serializer that produces invalid JSON */ $repository = new DoctrineOutboxRepository( connection: self::$connection, + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), serializers: PayloadSerializers::createFrom(elements: [new InvalidPayloadSerializer()]) ); @@ -244,9 +254,13 @@ public function testPushWhenSerializerReturnsInvalidJsonThenInvalidPayloadJson() public function testPushWhenMultipleSerializersAndSecondMatchesThenCorrectSerializerIsUsed(): void { - /** @Given a repository with an order serializer followed by a refund serializer */ + /** @Given a repository with translators for both event types and matching serializers */ $repository = new DoctrineOutboxRepository( connection: self::$connection, + translators: IntegrationEventTranslators::createFrom(elements: [ + new OrderPlacedTranslator(), + new RefundIssuedTranslator() + ]), serializers: PayloadSerializers::createFrom(elements: [ new OrderPlacedSerializer(), new RefundIssuedSerializer() @@ -275,41 +289,12 @@ public function testPushWhenMultipleSerializersAndSecondMatchesThenCorrectSerial ); } - public function testPushWhenNoSerializerSupportsThenPayloadSerializerNotConfigured(): void + public function testPushWhenSerializerDoesNotSupportIntegrationEventThenPayloadSerializerNotConfigured(): void { - /** @Given a repository with no serializers */ - $repository = new DoctrineOutboxRepository( - connection: self::$connection, - serializers: PayloadSerializers::createFromEmpty() - ); - - /** @And the connection has an active transaction */ - self::$connection->beginTransaction(); - - /** @And a record to push */ - $records = EventRecords::createFrom(elements: [ - EventRecordFactory::create( - event: new OrderPlaced(), - aggregateType: 'Order', - eventTypeName: 'OrderPlaced' - ) - ]); - - /** @Then an exception indicating no serializer is configured is thrown */ - $this->expectException(PayloadSerializerNotConfigured::class); - $this->expectExceptionMessage( - 'No payload serializer configured for event class .' - ); - - /** @When pushing the record */ - $repository->push(records: $records); - } - - public function testPushWhenSerializerDoesNotSupportEventThenPayloadSerializerNotConfigured(): void - { - /** @Given a repository with only an order serializer */ + /** @Given a repository with a refund translator but only an order serializer */ $repository = new DoctrineOutboxRepository( connection: self::$connection, + translators: IntegrationEventTranslators::createFrom(elements: [new RefundIssuedTranslator()]), serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]) ); @@ -328,7 +313,7 @@ public function testPushWhenSerializerDoesNotSupportEventThenPayloadSerializerNo /** @Then an exception indicating no serializer is configured is thrown */ $this->expectException(PayloadSerializerNotConfigured::class); $this->expectExceptionMessage( - 'No payload serializer configured for event class .' + 'No payload serializer configured for event class .' ); /** @When pushing the unsupported event */ @@ -340,6 +325,7 @@ public function testPushWhenDuplicateEventIdThenDuplicateOutboxEvent(): void /** @Given a repository */ $repository = new DoctrineOutboxRepository( connection: self::$connection, + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]) ); @@ -370,6 +356,7 @@ public function testPushWhenDuplicateAggregateVersionThenDuplicateAggregateVersi /** @Given a repository */ $repository = new DoctrineOutboxRepository( connection: self::$connection, + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]) ); @@ -430,6 +417,7 @@ public function testPushWhenCustomTableNameThenRecordStoredInCustomTable(): void /** @And a repository using the custom layout */ $repository = new DoctrineOutboxRepository( connection: self::$connection, + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]), tableLayout: $tableLayout ); @@ -484,6 +472,7 @@ public function testPushWhenStringIdentityTypeStoredThenIdIsUuidString(): void /** @And a repository using the string layout */ $repository = new DoctrineOutboxRepository( connection: self::$connection, + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]), tableLayout: $tableLayout ); @@ -536,6 +525,7 @@ public function testPushWhenNonUuidAggregateIdWithStringTypeThenStoredAsOriginal /** @And a repository using the string layout */ $repository = new DoctrineOutboxRepository( connection: self::$connection, + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]), tableLayout: $tableLayout ); @@ -565,9 +555,10 @@ public function testPushWhenNonUuidAggregateIdWithStringTypeThenStoredAsOriginal public function testPushWhenSingleRecordThenAllFieldsPersistedCorrectly(): void { - /** @Given a repository with an order placed serializer */ + /** @Given a repository with a translator and an order placed serializer */ $repository = new DoctrineOutboxRepository( connection: self::$connection, + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]) ); @@ -598,11 +589,11 @@ public function testPushWhenSingleRecordThenAllFieldsPersistedCorrectly(): void /** @And the aggregate_id is stored as 16-byte binary */ self::assertSame(16, strlen($row['aggregate_id'])); - /** @And the event_type is correct */ - self::assertSame('OrderPlaced', $row['event_type']); + /** @And the event_type reflects the integration event class */ + self::assertSame('OrderShipped', $row['event_type']); /** @And the revision is correct */ - self::assertSame(2, (int)$row['revision']); + self::assertSame(1, (int)$row['revision']); /** @And the aggregate_version is correct */ self::assertSame(3, (int)$row['aggregate_version']); @@ -622,6 +613,7 @@ public function testPushWhenKnownIdThenPersistedIdMatchesOriginal(): void /** @Given a repository */ $repository = new DoctrineOutboxRepository( connection: self::$connection, + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]) ); @@ -687,6 +679,7 @@ public function testPushWhenAllColumnNamesAreCustomThenRecordStoredInCustomColum /** @And a repository using this layout */ $repository = new DoctrineOutboxRepository( connection: self::$connection, + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]), tableLayout: $tableLayout ); @@ -719,8 +712,8 @@ public function testPushWhenAllColumnNamesAreCustomThenRecordStoredInCustomColum /** @And event_aggregate_type is correct */ self::assertSame('Order', $row['event_aggregate_type']); - /** @And event_event_type is correct */ - self::assertSame('OrderPlaced', $row['event_event_type']); + /** @And event_event_type reflects the integration event class */ + self::assertSame('OrderShipped', $row['event_event_type']); /** @And event_revision is correct */ self::assertSame(1, (int)$row['event_revision']); @@ -743,6 +736,7 @@ public function testPushWhenEventRecordsIsEmptyThenNoInsertIsExecuted(): void /** @Given a repository */ $repository = new DoctrineOutboxRepository( connection: self::$connection, + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]) ); @@ -761,9 +755,10 @@ public function testPushWhenEventRecordsIsEmptyThenNoInsertIsExecuted(): void public function testPushWhenRealEventualAggregateRootThenEventRecordIsPersistedCorrectly(): void { - /** @Given a repository with a reflection payload serializer */ + /** @Given a repository with a translator and a reflection payload serializer */ $repository = new DoctrineOutboxRepository( connection: self::$connection, + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), serializers: PayloadSerializers::createFrom(elements: [new PayloadSerializerReflection()]) ); @@ -800,6 +795,7 @@ public function testPushWhenNullTableLayoutThenSqlUsesDefaultTableName(): void /** @When pushing a record using the default table layout */ new DoctrineOutboxRepository( connection: $connection, + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]) )->push( records: EventRecords::createFrom(elements: [ @@ -832,6 +828,7 @@ public function testPushWhenCustomTableLayoutThenSqlUsesCustomTableName(): void /** @When pushing a record using the custom table layout */ new DoctrineOutboxRepository( connection: $connection, + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]), tableLayout: $tableLayout )->push( @@ -879,6 +876,7 @@ function (string $sql, array $params) use (&$capturedParameters): int { /** @When pushing a record with all deterministic fields */ new DoctrineOutboxRepository( connection: $connection, + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]), tableLayout: $tableLayout )->push( @@ -900,7 +898,7 @@ function (string $sql, array $params) use (&$capturedParameters): int { 'id' => '550e8400-e29b-41d4-a716-446655440000', 'aggregateId' => '6ba7b810-9dad-11d1-80b4-00c04fd430c8', 'aggregateType' => 'Order', - 'eventType' => 'OrderPlaced', + 'eventType' => 'OrderShipped', 'revision' => 1, 'aggregateVersion' => 1, 'payload' => '{}', @@ -934,6 +932,7 @@ public function testPushWhenUniqueConstraintOnAggregateVersionThenDuplicateAggre /** @When pushing the record */ new DoctrineOutboxRepository( connection: $connection, + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]) )->push( records: EventRecords::createFrom(elements: [ @@ -967,6 +966,7 @@ public function testPushWhenUniqueConstraintOnEventIdThenDuplicateOutboxEventIsT /** @When pushing the record */ new DoctrineOutboxRepository( connection: $connection, + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]) )->push( records: EventRecords::createFrom(elements: [ @@ -1003,6 +1003,7 @@ public function testPushWhenUniqueConstraintWithCustomNameThenDuplicateAggregate /** @When pushing a record with the custom table layout */ new DoctrineOutboxRepository( connection: $connection, + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]), tableLayout: $tableLayout )->push( @@ -1039,6 +1040,7 @@ public function testPushWhenConstraintNameIsCustomThenDuplicateAggregateVersion( /** @And a repository using the custom table layout */ $repository = new DoctrineOutboxRepository( connection: self::$connection, + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]), tableLayout: $tableLayout ); @@ -1074,4 +1076,135 @@ public function testPushWhenConstraintNameIsCustomThenDuplicateAggregateVersion( ) ])); } + + public function testPushWhenNoTranslatorsRegisteredThenRecordIsSilentlySkipped(): void + { + /** @Given a repository with no translators and a reflection serializer */ + $repository = new DoctrineOutboxRepository( + connection: self::$connection, + translators: IntegrationEventTranslators::createFromEmpty(), + serializers: PayloadSerializers::createFrom(elements: [new PayloadSerializerReflection()]) + ); + + /** @And the connection has an active transaction */ + self::$connection->beginTransaction(); + + /** @When pushing an order placed record */ + $repository->push(records: EventRecords::createFrom(elements: [ + EventRecordFactory::create( + event: new OrderPlaced(), + aggregateType: 'Order', + eventTypeName: 'OrderPlaced' + ) + ])); + + /** @And the transaction is committed */ + self::$connection->commit(); + + /** @Then no records are persisted */ + self::assertSame(0, (int)self::$connection->fetchOne('SELECT COUNT(*) FROM outbox_events')); + } + + public function testPushWhenOnlyOrderTranslatorRegisteredThenRefundEventIsSkipped(): void + { + /** @Given a repository with only an order translator and both serializers */ + $repository = new DoctrineOutboxRepository( + connection: self::$connection, + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), + serializers: PayloadSerializers::createFrom(elements: [ + new OrderPlacedSerializer(), + new RefundIssuedSerializer() + ]) + ); + + /** @And the connection has an active transaction */ + self::$connection->beginTransaction(); + + /** @When pushing one order placed and one refund issued record */ + $repository->push(records: EventRecords::createFrom(elements: [ + EventRecordFactory::create( + event: new OrderPlaced(), + aggregateType: 'Order', + eventTypeName: 'OrderPlaced', + aggregateVersion: AggregateVersion::of(value: 1) + ), + EventRecordFactory::create( + event: new RefundIssued(), + aggregateType: 'Refund', + eventTypeName: 'RefundIssued', + aggregateVersion: AggregateVersion::of(value: 1) + ) + ])); + + /** @And the transaction is committed */ + self::$connection->commit(); + + /** @Then exactly one row is persisted */ + self::assertSame(1, (int)self::$connection->fetchOne('SELECT COUNT(*) FROM outbox_events')); + + /** @And the persisted event_type is the order integration event */ + self::assertSame('OrderShipped', self::$connection->fetchOne('SELECT event_type FROM outbox_events LIMIT 1')); + } + + public function testPushWhenTwoTranslatorsSupportSameEventThenFirstTranslatorWins(): void + { + /** @Given a repository with two translators both supporting order placed, and a matching serializer */ + $repository = new DoctrineOutboxRepository( + connection: self::$connection, + translators: IntegrationEventTranslators::createFrom(elements: [ + new OrderPlacedTranslator(), + new DuplicateOrderPlacedTranslator() + ]), + serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]) + ); + + /** @And the connection has an active transaction */ + self::$connection->beginTransaction(); + + /** @When pushing an order placed record */ + $repository->push(records: EventRecords::createFrom(elements: [ + EventRecordFactory::create( + event: new OrderPlaced(), + aggregateType: 'Order', + eventTypeName: 'OrderPlaced' + ) + ])); + + /** @And the transaction is committed */ + self::$connection->commit(); + + /** @Then the persisted event_type reflects the first translator's output */ + self::assertSame('OrderShipped', self::$connection->fetchOne('SELECT event_type FROM outbox_events LIMIT 1')); + } + + public function testPushWhenTranslatorMatchesButNoSerializerThenPayloadSerializerNotConfigured(): void + { + /** @Given a repository with a translator but no serializer that matches the produced integration event */ + $repository = new DoctrineOutboxRepository( + connection: self::$connection, + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), + serializers: PayloadSerializers::createFromEmpty() + ); + + /** @And the connection has an active transaction */ + self::$connection->beginTransaction(); + + /** @And a record to push */ + $records = EventRecords::createFrom(elements: [ + EventRecordFactory::create( + event: new OrderPlaced(), + aggregateType: 'Order', + eventTypeName: 'OrderPlaced' + ) + ]); + + /** @Then an exception indicating no serializer is configured is thrown */ + $this->expectException(PayloadSerializerNotConfigured::class); + $this->expectExceptionMessage( + 'No payload serializer configured for event class .' + ); + + /** @When pushing the record */ + $repository->push(records: $records); + } } diff --git a/tests/Integration/DuplicateOrderPlacedTranslator.php b/tests/Integration/DuplicateOrderPlacedTranslator.php new file mode 100644 index 0000000..2fa1b2c --- /dev/null +++ b/tests/Integration/DuplicateOrderPlacedTranslator.php @@ -0,0 +1,23 @@ +event instanceof OrderPlaced; + } + + public function translate(EventRecord $record): IntegrationEvent + { + return new DuplicateOrderShipped(); + } +} diff --git a/tests/Integration/DuplicateOrderShipped.php b/tests/Integration/DuplicateOrderShipped.php new file mode 100644 index 0000000..3889d7d --- /dev/null +++ b/tests/Integration/DuplicateOrderShipped.php @@ -0,0 +1,13 @@ +event instanceof OrderPlaced; + return $record->event instanceof OrderShipped; } - public function serialize(EventRecord $record): SerializedPayload + public function serialize(IntegrationEventRecord $record): SerializedPayload { return SerializedPayload::from(payload: '{"fallback":true}'); } diff --git a/tests/Integration/RefundIssuedSerializer.php b/tests/Integration/RefundIssuedSerializer.php index 8796ae2..6b3f7df 100644 --- a/tests/Integration/RefundIssuedSerializer.php +++ b/tests/Integration/RefundIssuedSerializer.php @@ -4,19 +4,19 @@ namespace Test\TinyBlocks\Outbox\Integration; -use Test\TinyBlocks\Outbox\Models\RefundIssued; -use TinyBlocks\BuildingBlocks\Event\EventRecord; +use Test\TinyBlocks\Outbox\Models\RefundCompleted; +use TinyBlocks\BuildingBlocks\Event\IntegrationEventRecord; use TinyBlocks\Outbox\Serialization\PayloadSerializer; use TinyBlocks\Outbox\Serialization\SerializedPayload; final readonly class RefundIssuedSerializer implements PayloadSerializer { - public function supports(EventRecord $record): bool + public function supports(IntegrationEventRecord $record): bool { - return $record->event instanceof RefundIssued; + return $record->event instanceof RefundCompleted; } - public function serialize(EventRecord $record): SerializedPayload + public function serialize(IntegrationEventRecord $record): SerializedPayload { return SerializedPayload::from(payload: '{"type":"refund"}'); } diff --git a/tests/Models/OrderPlacedTranslator.php b/tests/Models/OrderPlacedTranslator.php new file mode 100644 index 0000000..03014e9 --- /dev/null +++ b/tests/Models/OrderPlacedTranslator.php @@ -0,0 +1,22 @@ +event instanceof OrderPlaced; + } + + public function translate(EventRecord $record): IntegrationEvent + { + return new OrderShipped(); + } +} diff --git a/tests/Models/OrderShipped.php b/tests/Models/OrderShipped.php new file mode 100644 index 0000000..9b0349f --- /dev/null +++ b/tests/Models/OrderShipped.php @@ -0,0 +1,13 @@ +event instanceof RefundIssued; + } + + public function translate(EventRecord $record): IntegrationEvent + { + return new RefundCompleted(); + } +} diff --git a/tests/Unit/InMemoryOutboxRepositoryMock.php b/tests/Unit/InMemoryOutboxRepositoryMock.php index c45bf44..f0d391c 100644 --- a/tests/Unit/InMemoryOutboxRepositoryMock.php +++ b/tests/Unit/InMemoryOutboxRepositoryMock.php @@ -7,6 +7,8 @@ use Doctrine\DBAL\Exception\UniqueConstraintViolationException; use TinyBlocks\BuildingBlocks\Event\EventRecord; use TinyBlocks\BuildingBlocks\Event\EventRecords; +use TinyBlocks\BuildingBlocks\Event\IntegrationEventRecord; +use TinyBlocks\BuildingBlocks\Event\IntegrationEventTranslators; use TinyBlocks\Outbox\Exceptions\DuplicateAggregateVersion; use TinyBlocks\Outbox\Exceptions\DuplicateOutboxEvent; use TinyBlocks\Outbox\Exceptions\OutboxRequiresActiveTransaction; @@ -20,8 +22,10 @@ final class InMemoryOutboxRepositoryMock implements OutboxRepository private bool $transactionActive = false; private array $aggregateVersions = []; - public function __construct(private readonly PayloadSerializers $payloadSerializers) - { + public function __construct( + private readonly IntegrationEventTranslators $translators, + private readonly PayloadSerializers $payloadSerializers + ) { } public function beginTransaction(): void @@ -52,36 +56,49 @@ public function push(EventRecords $records): void throw OutboxRequiresActiveTransaction::asMissing(); } - $records->each(actions: function (EventRecord $record): void { - $payloadSerializer = $this->payloadSerializers->findFor(record: $record); + $records->each(actions: function (EventRecord $eventRecord): void { + $translator = $this->translators->findFor(record: $eventRecord); + + if (is_null($translator)) { + return; + } + + $integrationEventRecord = IntegrationEventRecord::from( + eventRecord: $eventRecord, + integrationEvent: $translator->translate(record: $eventRecord) + ); + + $payloadSerializer = $this->payloadSerializers->findFor(record: $integrationEventRecord); if (is_null($payloadSerializer)) { - throw PayloadSerializerNotConfigured::forEventClass(eventClass: $record->event::class); + throw PayloadSerializerNotConfigured::forEventClass( + eventClass: $integrationEventRecord->event::class + ); } - $payloadSerializer->serialize(record: $record); + $payloadSerializer->serialize(record: $integrationEventRecord); $aggregateKey = sprintf( '%s|%s|%d', - $record->aggregateType, - $record->aggregateId->identityValue(), - $record->aggregateVersion->value + $integrationEventRecord->aggregateType, + $integrationEventRecord->aggregateId->identityValue(), + $integrationEventRecord->aggregateVersion->value ); if (isset($this->aggregateVersions[$aggregateKey])) { throw DuplicateAggregateVersion::forRecord( previous: null, - aggregateId: $record->aggregateId->identityValue(), - aggregateType: $record->aggregateType, - aggregateVersion: $record->aggregateVersion->value + aggregateId: $integrationEventRecord->aggregateId->identityValue(), + aggregateType: $integrationEventRecord->aggregateType, + aggregateVersion: $integrationEventRecord->aggregateVersion->value ); } - $eventId = (string)$record->id; + $eventId = (string)$integrationEventRecord->id; if (isset($this->records[$eventId])) { throw DuplicateOutboxEvent::forRecord( - eventId: $record->id, + eventId: $integrationEventRecord->id, previous: new UniqueConstraintViolationException( new DriverExceptionStub('Duplicate entry for key PRIMARY'), null @@ -90,7 +107,7 @@ public function push(EventRecords $records): void } $this->aggregateVersions[$aggregateKey] = true; - $this->records[$eventId] = $record; + $this->records[$eventId] = $integrationEventRecord; }); } } diff --git a/tests/Unit/InMemoryOutboxRepositoryTest.php b/tests/Unit/InMemoryOutboxRepositoryTest.php index 442ad23..0556eba 100644 --- a/tests/Unit/InMemoryOutboxRepositoryTest.php +++ b/tests/Unit/InMemoryOutboxRepositoryTest.php @@ -9,8 +9,10 @@ use Test\TinyBlocks\Outbox\Models\EventRecordFactory; use Test\TinyBlocks\Outbox\Models\Order; use Test\TinyBlocks\Outbox\Models\OrderPlaced; +use Test\TinyBlocks\Outbox\Models\OrderPlacedTranslator; use TinyBlocks\BuildingBlocks\Aggregate\AggregateVersion; use TinyBlocks\BuildingBlocks\Event\EventRecords; +use TinyBlocks\BuildingBlocks\Event\IntegrationEventTranslators; use TinyBlocks\Outbox\Exceptions\DuplicateAggregateVersion; use TinyBlocks\Outbox\Exceptions\DuplicateOutboxEvent; use TinyBlocks\Outbox\Exceptions\InvalidPayloadJson; @@ -23,8 +25,9 @@ final class InMemoryOutboxRepositoryTest extends TestCase { public function testPushWhenSingleRecordThenItIsPersisted(): void { - /** @Given an in-memory repository with configured serializers */ + /** @Given an in-memory repository with a configured translator and serializer */ $outbox = new InMemoryOutboxRepositoryMock( + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), payloadSerializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]) ); @@ -49,8 +52,9 @@ public function testPushWhenSingleRecordThenItIsPersisted(): void public function testPushWhenMultipleRecordsThenAllArePersistedInOrder(): void { - /** @Given an in-memory repository with configured serializers */ + /** @Given an in-memory repository with a configured translator and serializer */ $outbox = new InMemoryOutboxRepositoryMock( + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), payloadSerializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]) ); @@ -84,6 +88,7 @@ public function testPushWhenNoTransactionIsActiveThenOutboxRequiresActiveTransac { /** @Given an in-memory repository without an active transaction */ $outbox = new InMemoryOutboxRepositoryMock( + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), payloadSerializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]) ); @@ -102,8 +107,9 @@ public function testPushWhenNoTransactionIsActiveThenOutboxRequiresActiveTransac public function testPushWhenNoPayloadSerializerMatchesThenPayloadSerializerNotConfigured(): void { - /** @Given an in-memory repository with no payload serializers configured */ + /** @Given an in-memory repository with a translator but no payload serializers */ $outbox = new InMemoryOutboxRepositoryMock( + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), payloadSerializers: PayloadSerializers::createFromEmpty() ); @@ -113,7 +119,7 @@ public function testPushWhenNoPayloadSerializerMatchesThenPayloadSerializerNotCo /** @Then an exception indicating no configured payload serializer is thrown */ $this->expectException(PayloadSerializerNotConfigured::class); - /** @When pushing a record whose event type has no matching serializer */ + /** @When pushing a record whose integration event class has no matching serializer */ $outbox->push(records: EventRecords::createFrom(elements: [ EventRecordFactory::create( event: new OrderPlaced(), @@ -125,8 +131,9 @@ public function testPushWhenNoPayloadSerializerMatchesThenPayloadSerializerNotCo public function testPushWhenPayloadSerializerReturnsInvalidJsonThenInvalidPayloadJson(): void { - /** @Given an in-memory repository with a serializer that produces invalid JSON */ + /** @Given an in-memory repository with a translator and a serializer that produces invalid JSON */ $outbox = new InMemoryOutboxRepositoryMock( + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), payloadSerializers: PayloadSerializers::createFrom(elements: [new InvalidPayloadSerializer()]) ); @@ -148,8 +155,9 @@ public function testPushWhenPayloadSerializerReturnsInvalidJsonThenInvalidPayloa public function testPushWhenTwoRecordsShareTheSameIdThenDuplicateOutboxEvent(): void { - /** @Given an in-memory repository with configured serializers */ + /** @Given an in-memory repository with a configured translator and serializer */ $outbox = new InMemoryOutboxRepositoryMock( + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), payloadSerializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]) ); @@ -186,8 +194,9 @@ public function testPushWhenTwoRecordsShareTheSameIdThenDuplicateOutboxEvent(): public function testPushWhenTwoRecordsShareTheSameAggregateVersionThenDuplicateAggregateVersion(): void { - /** @Given an in-memory repository with configured serializers */ + /** @Given an in-memory repository with a configured translator and serializer */ $outbox = new InMemoryOutboxRepositoryMock( + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), payloadSerializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]) ); @@ -225,8 +234,9 @@ public function testPushWhenTwoRecordsShareTheSameAggregateVersionThenDuplicateA public function testPushWhenEventRecordsIsEmptyThenNoRecordIsPersisted(): void { - /** @Given an in-memory repository with configured serializers */ + /** @Given an in-memory repository with a configured translator and serializer */ $outbox = new InMemoryOutboxRepositoryMock( + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), payloadSerializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]) ); @@ -245,8 +255,9 @@ public function testPushWhenEventRecordsIsEmptyThenNoRecordIsPersisted(): void public function testPushWhenRealEventualAggregateRootThenEventRecordIsPersisted(): void { - /** @Given an in-memory repository with a reflection payload serializer */ + /** @Given an in-memory repository with a translator and a reflection payload serializer */ $outbox = new InMemoryOutboxRepositoryMock( + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), payloadSerializers: PayloadSerializers::createFrom(elements: [new PayloadSerializerReflection()]) ); @@ -262,4 +273,31 @@ public function testPushWhenRealEventualAggregateRootThenEventRecordIsPersisted( /** @Then one event record is persisted in the repository */ self::assertCount(1, $outbox->persistedRecords()); } + + public function testPushWhenNoTranslatorMatchesThenRecordIsSilentlySkipped(): void + { + /** @Given an in-memory repository with no translators registered */ + $outbox = new InMemoryOutboxRepositoryMock( + translators: IntegrationEventTranslators::createFromEmpty(), + payloadSerializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]) + ); + + /** @And a transaction is started */ + $outbox->beginTransaction(); + + /** @When a record whose event has no matching translator is pushed */ + $outbox->push(records: EventRecords::createFrom(elements: [ + EventRecordFactory::create( + event: new OrderPlaced(), + aggregateType: 'Order', + eventTypeName: 'OrderPlaced' + ) + ])); + + /** @And the transaction is committed */ + $outbox->commit(); + + /** @Then no records are persisted and no exception is raised */ + self::assertCount(0, $outbox->persistedRecords()); + } } diff --git a/tests/Unit/InvalidPayloadSerializer.php b/tests/Unit/InvalidPayloadSerializer.php index ac6ac21..d1a3e49 100644 --- a/tests/Unit/InvalidPayloadSerializer.php +++ b/tests/Unit/InvalidPayloadSerializer.php @@ -4,19 +4,19 @@ namespace Test\TinyBlocks\Outbox\Unit; -use Test\TinyBlocks\Outbox\Models\OrderPlaced; -use TinyBlocks\BuildingBlocks\Event\EventRecord; +use Test\TinyBlocks\Outbox\Models\OrderShipped; +use TinyBlocks\BuildingBlocks\Event\IntegrationEventRecord; use TinyBlocks\Outbox\Serialization\PayloadSerializer; use TinyBlocks\Outbox\Serialization\SerializedPayload; final readonly class InvalidPayloadSerializer implements PayloadSerializer { - public function supports(EventRecord $record): bool + public function supports(IntegrationEventRecord $record): bool { - return $record->event instanceof OrderPlaced; + return $record->event instanceof OrderShipped; } - public function serialize(EventRecord $record): SerializedPayload + public function serialize(IntegrationEventRecord $record): SerializedPayload { return SerializedPayload::from(payload: 'not json'); } diff --git a/tests/Unit/OrderPlacedSerializer.php b/tests/Unit/OrderPlacedSerializer.php index 9cc81a3..6427199 100644 --- a/tests/Unit/OrderPlacedSerializer.php +++ b/tests/Unit/OrderPlacedSerializer.php @@ -4,19 +4,19 @@ namespace Test\TinyBlocks\Outbox\Unit; -use Test\TinyBlocks\Outbox\Models\OrderPlaced; -use TinyBlocks\BuildingBlocks\Event\EventRecord; +use Test\TinyBlocks\Outbox\Models\OrderShipped; +use TinyBlocks\BuildingBlocks\Event\IntegrationEventRecord; use TinyBlocks\Outbox\Serialization\PayloadSerializer; use TinyBlocks\Outbox\Serialization\SerializedPayload; final readonly class OrderPlacedSerializer implements PayloadSerializer { - public function supports(EventRecord $record): bool + public function supports(IntegrationEventRecord $record): bool { - return $record->event instanceof OrderPlaced; + return $record->event instanceof OrderShipped; } - public function serialize(EventRecord $record): SerializedPayload + public function serialize(IntegrationEventRecord $record): SerializedPayload { return SerializedPayload::from(payload: '{}'); }