Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
281 changes: 223 additions & 58 deletions README.md

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"event-sourcing",
"event-versioning",
"event-persistence",
"integration-events",
"event-serialization",
"transactional-outbox"
],
Expand All @@ -31,7 +32,7 @@
"php": "^8.5",
"doctrine/dbal": "^4.4",
"ramsey/uuid": "^4.9",
"tiny-blocks/building-blocks": "^3.0",
"tiny-blocks/building-blocks": "^4.0",
"tiny-blocks/collection": "^2.3"
},
"require-dev": {
Expand Down
37 changes: 28 additions & 9 deletions src/DoctrineOutboxRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
use Doctrine\DBAL\Exception\UniqueConstraintViolationException;
use TinyBlocks\BuildingBlocks\Event\EventRecord;
use TinyBlocks\BuildingBlocks\Event\EventRecords;
use TinyBlocks\BuildingBlocks\Event\IntegrationEventRecord;
use TinyBlocks\BuildingBlocks\Event\IntegrationEventTranslators;
use TinyBlocks\Outbox\Exceptions\DuplicateAggregateVersion;
use TinyBlocks\Outbox\Exceptions\DuplicateOutboxEvent;
use TinyBlocks\Outbox\Exceptions\OutboxRequiresActiveTransaction;
Expand All @@ -22,6 +24,7 @@

public function __construct(
private Connection $connection,
private IntegrationEventTranslators $translators,
private PayloadSerializers $serializers,
?TableLayout $tableLayout = null
) {
Expand All @@ -34,16 +37,29 @@ public function push(EventRecords $records): void
throw OutboxRequiresActiveTransaction::asMissing();
}

$records->each(actions: function (EventRecord $record): void {
$payloadSerializer = $this->serializers->findFor(record: $record);
$records->each(actions: function (EventRecord $eventRecord): void {
$translator = $this->translators->findFor(record: $eventRecord);

if (is_null($translator)) {
return;
}

$integrationEventRecord = IntegrationEventRecord::from(
eventRecord: $eventRecord,
integrationEvent: $translator->translate(record: $eventRecord)
);

$payloadSerializer = $this->serializers->findFor(record: $integrationEventRecord);

if (is_null($payloadSerializer)) {
throw PayloadSerializerNotConfigured::forEventClass(eventClass: $record->event::class);
throw PayloadSerializerNotConfigured::forEventClass(
eventClass: $integrationEventRecord->event::class
);
}

$insert = OutboxInsert::from(
record: $record,
payload: $payloadSerializer->serialize(record: $record),
record: $integrationEventRecord,
payload: $payloadSerializer->serialize(record: $integrationEventRecord),
tableLayout: $this->tableLayout
);

Expand All @@ -53,13 +69,16 @@ public function push(EventRecords $records): void
if ($this->tableLayout->uniqueConstraint->isViolatedBy(exception: $exception)) {
throw DuplicateAggregateVersion::forRecord(
previous: $exception,
aggregateId: $record->aggregateId->identityValue(),
aggregateType: $record->aggregateType,
aggregateVersion: $record->aggregateVersion->value
aggregateId: $integrationEventRecord->aggregateId->identityValue(),
aggregateType: $integrationEventRecord->aggregateType,
aggregateVersion: $integrationEventRecord->aggregateVersion->value
);
}

throw DuplicateOutboxEvent::forRecord(eventId: $record->id, previous: $exception);
throw DuplicateOutboxEvent::forRecord(
eventId: $integrationEventRecord->id,
previous: $exception
);
}
});
}
Expand Down
4 changes: 2 additions & 2 deletions src/Internal/OutboxInsert.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace TinyBlocks\Outbox\Internal;

use TinyBlocks\BuildingBlocks\Event\EventRecord;
use TinyBlocks\BuildingBlocks\Event\IntegrationEventRecord;
use TinyBlocks\Outbox\Schema\TableLayout;
use TinyBlocks\Outbox\Serialization\SerializedPayload;

Expand All @@ -15,7 +15,7 @@ private function __construct(public string $sql, public array $parameters)
}

public static function from(
EventRecord $record,
IntegrationEventRecord $record,
SerializedPayload $payload,
TableLayout $tableLayout
): OutboxInsert {
Expand Down
23 changes: 20 additions & 3 deletions src/OutboxRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@
namespace TinyBlocks\Outbox;

use TinyBlocks\BuildingBlocks\Event\EventRecords;
use TinyBlocks\BuildingBlocks\Event\IntegrationEvent;
use TinyBlocks\BuildingBlocks\Event\IntegrationEventTranslator;
use TinyBlocks\Outbox\Exceptions\DuplicateAggregateVersion;
use TinyBlocks\Outbox\Exceptions\DuplicateOutboxEvent;
use TinyBlocks\Outbox\Exceptions\InvalidPayloadJson;
use TinyBlocks\Outbox\Exceptions\OutboxRequiresActiveTransaction;
use TinyBlocks\Outbox\Exceptions\PayloadSerializerNotConfigured;
use TinyBlocks\Outbox\Serialization\PayloadSerializer;

/**
* Producer-side contract: persists outbox records as part of the caller's open transaction.
Expand All @@ -23,14 +26,28 @@ interface OutboxRepository
/**
* Persists the given records as part of the caller's open transaction.
*
* <p>The implementation must not open or commit a transaction. It is the caller's responsibility
* to ensure this call happens inside the same unit of work as the aggregate state change.</p>
* <p>The input carries domain events from the aggregate's recorded-events buffer.
* The implementation filters each record through the registered
* {@see IntegrationEventTranslator} collection: domain events without a matching
* translator are silently skipped, because the absence of a translator is the canonical
* declaration that the event is internal to the bounded context and must not cross its
* boundary.</p>
*
* <p>Matched domain events are translated into {@see IntegrationEvent} envelopes via
* the Anti-Corruption Layer and only then serialized and persisted. The
* {@see PayloadSerializer} operates on the integration event record, never on the
* domain event directly.</p>
*
* <p>The implementation must not open or commit a transaction. It is the caller's
* responsibility to ensure this call happens inside the same unit of work as the
* aggregate state change.</p>
*
* @param EventRecords $records The records to persist.
* @throws InvalidPayloadJson When a serializer produces an invalid JSON payload.
* @throws DuplicateOutboxEvent When a record with a duplicate id already exists in the outbox.
* @throws DuplicateAggregateVersion When two records share the same aggregate type, id, and aggregate version.
* @throws PayloadSerializerNotConfigured When no serializer supports the event class.
* @throws PayloadSerializerNotConfigured When a translator matched a domain event but no serializer supports the
* produced integration event.
* @throws OutboxRequiresActiveTransaction When called outside an active transaction.
*/
public function push(EventRecords $records): void;
Expand Down
16 changes: 8 additions & 8 deletions src/Serialization/PayloadSerializer.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,23 @@

namespace TinyBlocks\Outbox\Serialization;

use TinyBlocks\BuildingBlocks\Event\EventRecord;
use TinyBlocks\BuildingBlocks\Event\IntegrationEventRecord;

interface PayloadSerializer
{
/**
* Tells whether this serializer handles the event in the given record.
* Tells whether this serializer handles the integration event in the given record.
*
* @param EventRecord $record The record being serialized.
* @return bool True if this serializer can produce the payload for the event.
* @param IntegrationEventRecord $record The record being serialized.
* @return bool True if this serializer can produce the payload for the integration event.
*/
public function supports(EventRecord $record): bool;
public function supports(IntegrationEventRecord $record): bool;

/**
* Produces the persistent payload for the event in the record.
* Produces the persistent payload for the integration event in the record.
*
* @param EventRecord $record The record being serialized.
* @param IntegrationEventRecord $record The record being serialized.
* @return SerializedPayload The serialized payload.
*/
public function serialize(EventRecord $record): SerializedPayload;
public function serialize(IntegrationEventRecord $record): SerializedPayload;
}
6 changes: 3 additions & 3 deletions src/Serialization/PayloadSerializerReflection.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@

namespace TinyBlocks\Outbox\Serialization;

use TinyBlocks\BuildingBlocks\Event\EventRecord;
use TinyBlocks\BuildingBlocks\Event\IntegrationEventRecord;

final readonly class PayloadSerializerReflection implements PayloadSerializer
{
public function supports(EventRecord $record): bool
public function supports(IntegrationEventRecord $record): bool
{
return true;
}

public function serialize(EventRecord $record): SerializedPayload
public function serialize(IntegrationEventRecord $record): SerializedPayload
{
return SerializedPayload::fromArray(payload: get_object_vars($record->event));
}
Expand Down
6 changes: 3 additions & 3 deletions src/Serialization/PayloadSerializers.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,18 @@

namespace TinyBlocks\Outbox\Serialization;

use TinyBlocks\BuildingBlocks\Event\EventRecord;
use TinyBlocks\BuildingBlocks\Event\IntegrationEventRecord;
use TinyBlocks\Collection\Collection;

final class PayloadSerializers extends Collection
{
/**
* Returns the first payload serializer that supports the given record, or null when none matches.
*
* @param EventRecord $record The record whose payload serializer is being resolved.
* @param IntegrationEventRecord $record The record whose payload serializer is being resolved.
* @return PayloadSerializer|null The matching serializer, or null when no element supports the record.
*/
public function findFor(EventRecord $record): ?PayloadSerializer
public function findFor(IntegrationEventRecord $record): ?PayloadSerializer
{
$serializer = $this->findBy(
predicates: static fn(PayloadSerializer $serializer): bool => $serializer->supports(record: $record)
Expand Down
26 changes: 13 additions & 13 deletions src/Serialization/SerializedPayload.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,6 @@ private function __construct(private string $payload)
{
}

/**
* Creates a SerializedPayload from an associative array, encoding it as JSON.
*
* @param array<int|string, mixed> $payload The associative array to encode as the serialized payload.
* @return SerializedPayload The serialized payload with the JSON-encoded representation.
*/
public static function fromArray(array $payload): SerializedPayload
{
$json = json_encode($payload, JSON_THROW_ON_ERROR);

return new SerializedPayload(payload: $json);
}

/**
* Creates a SerializedPayload from a raw JSON string.
*
Expand All @@ -41,6 +28,19 @@ public static function from(string $payload): SerializedPayload
return new SerializedPayload(payload: $payload);
}

/**
* Creates a SerializedPayload from an associative array, encoding it as JSON.
*
* @param array<int|string, mixed> $payload The associative array to encode as the serialized payload.
* @return SerializedPayload The serialized payload with the JSON-encoded representation.
*/
public static function fromArray(array $payload): SerializedPayload
{
$json = json_encode($payload, JSON_THROW_ON_ERROR);

return new SerializedPayload(payload: $json);
}

/**
* Returns the SerializedPayload as its JSON string representation.
*
Expand Down
Loading