diff --git a/src/Activity.php b/src/Activity.php index 55908e27..a9cf9c55 100644 --- a/src/Activity.php +++ b/src/Activity.php @@ -54,11 +54,20 @@ public function __construct( ) { $this->arguments = $arguments; - if (property_exists($this, 'connection')) { + $options = $this->storedWorkflow->workflowOptions(); + $connection = $options->connection; + + if ($connection !== null) { + $this->onConnection($connection); + } elseif (property_exists($this, 'connection')) { $this->onConnection($this->connection); } - if (property_exists($this, 'queue')) { + $queue = $options->queue; + + if ($queue !== null) { + $this->onQueue($queue); + } elseif (property_exists($this, 'queue')) { $this->onQueue($this->queue); } @@ -102,7 +111,7 @@ public function handle() $this->container = App::make(Container::class); - if ($this->storedWorkflow->logs()->whereIndex($this->index)->exists()) { + if ($this->storedWorkflow->hasLogByIndex($this->index)) { return; } diff --git a/src/ActivityStub.php b/src/ActivityStub.php index 3eb8cbdf..2ebf41e6 100644 --- a/src/ActivityStub.php +++ b/src/ActivityStub.php @@ -28,9 +28,7 @@ public static function make($activity, ...$arguments): PromiseInterface { $context = WorkflowStub::getContext(); - $log = $context->storedWorkflow->logs() - ->whereIndex($context->index) - ->first(); + $log = $context->storedWorkflow->findLogByIndex($context->index); if (WorkflowStub::faked()) { $mocks = WorkflowStub::mocks(); @@ -38,15 +36,14 @@ public static function make($activity, ...$arguments): PromiseInterface if (! $log && array_key_exists($activity, $mocks)) { $result = $mocks[$activity]; - $log = $context->storedWorkflow->logs() - ->create([ - 'index' => $context->index, - 'now' => $context->now, - 'class' => $activity, - 'result' => Serializer::serialize( - is_callable($result) ? $result($context, ...$arguments) : $result - ), - ]); + $log = $context->storedWorkflow->createLog([ + 'index' => $context->index, + 'now' => $context->now, + 'class' => $activity, + 'result' => Serializer::serialize( + is_callable($result) ? $result($context, ...$arguments) : $result + ), + ]); WorkflowStub::recordDispatched($activity, $arguments); } diff --git a/src/ChildWorkflow.php b/src/ChildWorkflow.php index 23e8336a..04f15b4d 100644 --- a/src/ChildWorkflow.php +++ b/src/ChildWorkflow.php @@ -38,8 +38,11 @@ public function __construct( $connection = null, $queue = null ) { - $connection = $connection ?? config('queue.default'); - $queue = $queue ?? config('queue.connections.' . $connection . '.queue', 'default'); + $connection = $connection ?? $this->storedWorkflow->effectiveConnection() ?? config('queue.default'); + $queue = $queue ?? $this->storedWorkflow->effectiveQueue() ?? config( + 'queue.connections.' . $connection . '.queue', + 'default' + ); $this->onConnection($connection); $this->onQueue($queue); } @@ -54,7 +57,7 @@ public function handle() $workflow = $this->parentWorkflow->toWorkflow(); try { - if ($this->parentWorkflow->logs()->whereIndex($this->index)->exists()) { + if ($this->parentWorkflow->hasLogByIndex($this->index)) { $workflow->resume(); } else { $workflow->next($this->index, $this->now, $this->storedWorkflow->class, $this->return); diff --git a/src/ChildWorkflowStub.php b/src/ChildWorkflowStub.php index 36e70bb9..f9c1e6e4 100644 --- a/src/ChildWorkflowStub.php +++ b/src/ChildWorkflowStub.php @@ -21,9 +21,7 @@ public static function make($workflow, ...$arguments): PromiseInterface { $context = WorkflowStub::getContext(); - $log = $context->storedWorkflow->logs() - ->whereIndex($context->index) - ->first(); + $log = $context->storedWorkflow->findLogByIndex($context->index); if (WorkflowStub::faked()) { $mocks = WorkflowStub::mocks(); @@ -31,15 +29,14 @@ public static function make($workflow, ...$arguments): PromiseInterface if (! $log && array_key_exists($workflow, $mocks)) { $result = $mocks[$workflow]; - $log = $context->storedWorkflow->logs() - ->create([ - 'index' => $context->index, - 'now' => $context->now, - 'class' => $workflow, - 'result' => Serializer::serialize( - is_callable($result) ? $result($context, ...$arguments) : $result - ), - ]); + $log = $context->storedWorkflow->createLog([ + 'index' => $context->index, + 'now' => $context->now, + 'class' => $workflow, + 'result' => Serializer::serialize( + is_callable($result) ? $result($context, ...$arguments) : $result + ), + ]); WorkflowStub::recordDispatched($workflow, $arguments); } @@ -58,6 +55,17 @@ public static function make($workflow, ...$arguments): PromiseInterface $childWorkflow = $storedChildWorkflow ? $storedChildWorkflow->toWorkflow() : WorkflowStub::make($workflow); + $hasOptions = collect($arguments) + ->contains(static fn ($argument): bool => $argument instanceof WorkflowOptions); + + if (! $hasOptions) { + $options = new WorkflowOptions(WorkflowStub::connection(), WorkflowStub::queue()); + + if ($options->connection !== null || $options->queue !== null) { + $arguments[] = $options; + } + } + if ($childWorkflow->running() && ! $childWorkflow->created()) { try { $childWorkflow->resume(); diff --git a/src/Exception.php b/src/Exception.php index 3cfda11b..d5613a21 100644 --- a/src/Exception.php +++ b/src/Exception.php @@ -36,8 +36,11 @@ public function __construct( $connection = null, $queue = null ) { - $connection = $connection ?? config('queue.default'); - $queue = $queue ?? config('queue.connections.' . $connection . '.queue', 'default'); + $connection = $connection ?? $this->storedWorkflow->effectiveConnection() ?? config('queue.default'); + $queue = $queue ?? $this->storedWorkflow->effectiveQueue() ?? config( + 'queue.connections.' . $connection . '.queue', + 'default' + ); $this->onConnection($connection); $this->onQueue($queue); } @@ -47,7 +50,7 @@ public function handle() $workflow = $this->storedWorkflow->toWorkflow(); try { - if ($this->storedWorkflow->logs()->whereIndex($this->index)->exists()) { + if ($this->storedWorkflow->hasLogByIndex($this->index)) { $workflow->resume(); } else { $workflow->next($this->index, $this->now, self::class, $this->exception); diff --git a/src/Models/StoredWorkflow.php b/src/Models/StoredWorkflow.php index cb91200d..aba0b86b 100644 --- a/src/Models/StoredWorkflow.php +++ b/src/Models/StoredWorkflow.php @@ -5,12 +5,16 @@ namespace Workflow\Models; use Illuminate\Database\Eloquent\Builder; +use Illuminate\Database\Eloquent\Collection; use Illuminate\Database\Eloquent\Model; use Illuminate\Database\Eloquent\Prunable; use Illuminate\Database\Eloquent\Relations\BelongsToMany; +use Illuminate\Support\Arr; use Spatie\ModelStates\HasStates; use Workflow\States\WorkflowContinuedStatus; use Workflow\States\WorkflowStatus; +use Workflow\WorkflowMetadata; +use Workflow\WorkflowOptions; use Workflow\WorkflowStub; class StoredWorkflow extends Model @@ -52,12 +56,130 @@ public function toWorkflow() return WorkflowStub::fromStoredWorkflow($this); } + public function workflowMetadata(): WorkflowMetadata + { + $arguments = $this->arguments; + + if ($arguments === null) { + return new WorkflowMetadata([]); + } + + return WorkflowMetadata::fromSerializedArguments( + \Workflow\Serializers\Serializer::unserialize($arguments) + ); + } + + /** + * @return array + */ + public function workflowArguments(): array + { + return $this->workflowMetadata() +->arguments; + } + + public function workflowOptions(): WorkflowOptions + { + return $this->workflowMetadata() +->options; + } + + public function effectiveConnection(): ?string + { + $connection = $this->workflowOptions() +->connection; + + if ($connection !== null) { + return $connection; + } + + if (! is_string($this->class) || $this->class === '') { + return null; + } + + return Arr::get(WorkflowStub::getDefaultProperties($this->class), 'connection'); + } + + public function effectiveQueue(): ?string + { + $queue = $this->workflowOptions() +->queue; + + if ($queue !== null) { + return $queue; + } + + if (! is_string($this->class) || $this->class === '') { + return null; + } + + $connection = $this->effectiveConnection() ?? config('queue.default'); + + return Arr::get(WorkflowStub::getDefaultProperties($this->class), 'queue') + ?? config('queue.connections.' . $connection . '.queue', 'default'); + } + public function logs(): \Illuminate\Database\Eloquent\Relations\HasMany { return $this->hasMany(config('workflows.stored_workflow_log_model', StoredWorkflowLog::class)) ->orderBy('id'); } + public function findLogByIndex(int $index, bool $fresh = false): ?StoredWorkflowLog + { + if ($fresh) { + $log = $this->logs() + ->whereIndex($index) + ->first(); + + if ($this->relationLoaded('logs') && $log !== null) { + /** @var Collection $logs */ + $logs = $this->getRelation('logs'); + if (! $logs->contains('id', $log->id)) { + $this->setRelation('logs', $logs->push($log)->sortBy('id')->values()); + } + } + + return $log; + } + + if ($this->relationLoaded('logs')) { + /** @var Collection $logs */ + $logs = $this->getRelation('logs'); + return $logs->firstWhere('index', $index); + } + + return $this->logs() + ->whereIndex($index) + ->first(); + } + + public function hasLogByIndex(int $index): bool + { + if ($this->relationLoaded('logs')) { + return $this->findLogByIndex($index) !== null; + } + + return $this->logs() + ->whereIndex($index) + ->exists(); + } + + public function createLog(array $attributes): StoredWorkflowLog + { + /** @var StoredWorkflowLog $log */ + $log = $this->logs() + ->create($attributes); + + if ($this->relationLoaded('logs')) { + /** @var Collection $logs */ + $logs = $this->getRelation('logs'); + $this->setRelation('logs', $logs->push($log)->sortBy('id')->values()); + } + + return $log; + } + public function signals(): \Illuminate\Database\Eloquent\Relations\HasMany { return $this->hasMany(config('workflows.stored_workflow_signal_model', StoredWorkflowSignal::class)) @@ -70,6 +192,48 @@ public function timers(): \Illuminate\Database\Eloquent\Relations\HasMany ->orderBy('id'); } + public function findTimerByIndex(int $index): ?StoredWorkflowTimer + { + if ($this->relationLoaded('timers')) { + /** @var Collection $timers */ + $timers = $this->getRelation('timers'); + return $timers->firstWhere('index', $index); + } + + return $this->timers() + ->whereIndex($index) + ->first(); + } + + public function createTimer(array $attributes): StoredWorkflowTimer + { + /** @var StoredWorkflowTimer $timer */ + $timer = $this->timers() + ->create($attributes); + + if ($this->relationLoaded('timers')) { + /** @var Collection $timers */ + $timers = $this->getRelation('timers'); + $this->setRelation('timers', $timers->push($timer)->sortBy('id')->values()); + } + + return $timer; + } + + public function orderedSignals(): Collection + { + if ($this->relationLoaded('signals')) { + /** @var Collection $signals */ + $signals = $this->getRelation('signals'); + return $signals->sortBy('created_at') + ->values(); + } + + return $this->signals() + ->orderBy('created_at') + ->get(); + } + public function exceptions(): \Illuminate\Database\Eloquent\Relations\HasMany { return $this->hasMany(config('workflows.stored_workflow_exception_model', StoredWorkflowException::class)) diff --git a/src/Serializers/AbstractSerializer.php b/src/Serializers/AbstractSerializer.php index fd633106..05353dfe 100644 --- a/src/Serializers/AbstractSerializer.php +++ b/src/Serializers/AbstractSerializer.php @@ -32,9 +32,7 @@ public static function serializeModels($data) { if (is_array($data)) { $self = static::getInstance(); - foreach ($data as $key => $value) { - $data[$key] = $self->getSerializedPropertyValue($value); - } + $data = $self->serializeValue($data); } elseif ($data instanceof Throwable) { $data = [ 'class' => get_class($data), @@ -54,9 +52,7 @@ public static function unserializeModels($data) { if (is_array($data)) { $self = static::getInstance(); - foreach ($data as $key => $value) { - $data[$key] = $self->getRestoredPropertyValue($value); - } + $data = $self->unserializeValue($data); } return $data; } @@ -77,4 +73,30 @@ public static function unserialize(string $data) } return static::unserializeModels($unserialized); } + + private function serializeValue(mixed $value): mixed + { + if (is_array($value)) { + foreach ($value as $key => $nested) { + $value[$key] = $this->serializeValue($nested); + } + + return $value; + } + + return $this->getSerializedPropertyValue($value); + } + + private function unserializeValue(mixed $value): mixed + { + if (is_array($value)) { + foreach ($value as $key => $nested) { + $value[$key] = $this->unserializeValue($nested); + } + + return $value; + } + + return $this->getRestoredPropertyValue($value); + } } diff --git a/src/Signal.php b/src/Signal.php index 531719b2..df0b05ea 100644 --- a/src/Signal.php +++ b/src/Signal.php @@ -29,8 +29,11 @@ public function __construct( $connection = null, $queue = null ) { - $connection = $connection ?? config('queue.default'); - $queue = $queue ?? config('queue.connections.' . $connection . '.queue', 'default'); + $connection = $connection ?? $this->storedWorkflow->effectiveConnection() ?? config('queue.default'); + $queue = $queue ?? $this->storedWorkflow->effectiveQueue() ?? config( + 'queue.connections.' . $connection . '.queue', + 'default' + ); $this->onConnection($connection); $this->onQueue($queue); } diff --git a/src/Traits/AwaitWithTimeouts.php b/src/Traits/AwaitWithTimeouts.php index f68f5810..bf9aaa33 100644 --- a/src/Traits/AwaitWithTimeouts.php +++ b/src/Traits/AwaitWithTimeouts.php @@ -15,9 +15,7 @@ trait AwaitWithTimeouts { public static function awaitWithTimeout(int|string|CarbonInterval $seconds, $condition): PromiseInterface { - $log = self::$context->storedWorkflow->logs() - ->whereIndex(self::$context->index) - ->first(); + $log = self::$context->storedWorkflow->findLogByIndex(self::$context->index); if ($log) { ++self::$context->index; @@ -29,17 +27,14 @@ public static function awaitWithTimeout(int|string|CarbonInterval $seconds, $con if ($result === true) { if (! self::$context->replaying) { try { - self::$context->storedWorkflow->logs() - ->create([ - 'index' => self::$context->index, - 'now' => self::$context->now, - 'class' => Signal::class, - 'result' => Serializer::serialize($result), - ]); + self::$context->storedWorkflow->createLog([ + 'index' => self::$context->index, + 'now' => self::$context->now, + 'class' => Signal::class, + 'result' => Serializer::serialize($result), + ]); } catch (QueryException $exception) { - $log = self::$context->storedWorkflow->logs() - ->whereIndex(self::$context->index) - ->first(); + $log = self::$context->storedWorkflow->findLogByIndex(self::$context->index, true); if ($log) { ++self::$context->index; diff --git a/src/Traits/Awaits.php b/src/Traits/Awaits.php index 45a870d4..3bd5ae17 100644 --- a/src/Traits/Awaits.php +++ b/src/Traits/Awaits.php @@ -15,9 +15,7 @@ trait Awaits { public static function await($condition): PromiseInterface { - $log = self::$context->storedWorkflow->logs() - ->whereIndex(self::$context->index) - ->first(); + $log = self::$context->storedWorkflow->findLogByIndex(self::$context->index); if ($log) { ++self::$context->index; @@ -29,17 +27,14 @@ public static function await($condition): PromiseInterface if ($result === true) { if (! self::$context->replaying) { try { - self::$context->storedWorkflow->logs() - ->create([ - 'index' => self::$context->index, - 'now' => self::$context->now, - 'class' => Signal::class, - 'result' => Serializer::serialize($result), - ]); + self::$context->storedWorkflow->createLog([ + 'index' => self::$context->index, + 'now' => self::$context->now, + 'class' => Signal::class, + 'result' => Serializer::serialize($result), + ]); } catch (QueryException $exception) { - $log = self::$context->storedWorkflow->logs() - ->whereIndex(self::$context->index) - ->first(); + $log = self::$context->storedWorkflow->findLogByIndex(self::$context->index, true); if ($log) { ++self::$context->index; diff --git a/src/Traits/Continues.php b/src/Traits/Continues.php index 2294b63a..c2684733 100644 --- a/src/Traits/Continues.php +++ b/src/Traits/Continues.php @@ -8,6 +8,7 @@ use function React\Promise\resolve; use Workflow\ContinuedWorkflow; use Workflow\Models\StoredWorkflow; +use Workflow\WorkflowOptions; trait Continues { @@ -63,6 +64,14 @@ public static function continueAsNew(...$arguments): PromiseInterface ]); } + if (! collect($arguments)->contains(static fn ($argument): bool => $argument instanceof WorkflowOptions)) { + $options = $context->storedWorkflow->workflowOptions(); + + if ($options->connection !== null || $options->queue !== null) { + $arguments[] = $options; + } + } + $newWorkflow->start(...$arguments); } diff --git a/src/Traits/SideEffects.php b/src/Traits/SideEffects.php index c052daca..e4a4db0c 100644 --- a/src/Traits/SideEffects.php +++ b/src/Traits/SideEffects.php @@ -13,9 +13,7 @@ trait SideEffects { public static function sideEffect($callable): PromiseInterface { - $log = self::$context->storedWorkflow->logs() - ->whereIndex(self::$context->index) - ->first(); + $log = self::$context->storedWorkflow->findLogByIndex(self::$context->index); if ($log) { ++self::$context->index; @@ -26,17 +24,14 @@ public static function sideEffect($callable): PromiseInterface if (! self::$context->replaying) { try { - self::$context->storedWorkflow->logs() - ->create([ - 'index' => self::$context->index, - 'now' => self::$context->now, - 'class' => self::$context->storedWorkflow->class, - 'result' => Serializer::serialize($result), - ]); + self::$context->storedWorkflow->createLog([ + 'index' => self::$context->index, + 'now' => self::$context->now, + 'class' => self::$context->storedWorkflow->class, + 'result' => Serializer::serialize($result), + ]); } catch (QueryException $exception) { - $log = self::$context->storedWorkflow->logs() - ->whereIndex(self::$context->index) - ->first(); + $log = self::$context->storedWorkflow->findLogByIndex(self::$context->index, true); if ($log) { ++self::$context->index; diff --git a/src/Traits/Timers.php b/src/Traits/Timers.php index b807ee61..538f9502 100644 --- a/src/Traits/Timers.php +++ b/src/Traits/Timers.php @@ -26,29 +26,26 @@ public static function timer(int|string|CarbonInterval $seconds): PromiseInterfa return resolve(true); } - $log = self::$context->storedWorkflow->logs() - ->whereIndex(self::$context->index) - ->first(); + $log = self::$context->storedWorkflow->findLogByIndex(self::$context->index); if ($log) { ++self::$context->index; return resolve(Serializer::unserialize($log->result)); } - $timer = self::$context->storedWorkflow->timers() - ->whereIndex(self::$context->index) - ->first(); + self::$context->storedWorkflow->loadMissing('timers'); + + $timer = self::$context->storedWorkflow->findTimerByIndex(self::$context->index); if ($timer === null) { $when = self::$context->now->copy() ->addSeconds($seconds); if (! self::$context->replaying) { - $timer = self::$context->storedWorkflow->timers() - ->create([ - 'index' => self::$context->index, - 'stop_at' => $when, - ]); + $timer = self::$context->storedWorkflow->createTimer([ + 'index' => self::$context->index, + 'stop_at' => $when, + ]); } else { ++self::$context->index; $deferred = new Deferred(); @@ -62,13 +59,12 @@ public static function timer(int|string|CarbonInterval $seconds): PromiseInterfa if ($result === true) { if (! self::$context->replaying) { try { - self::$context->storedWorkflow->logs() - ->create([ - 'index' => self::$context->index, - 'now' => self::$context->now, - 'class' => Timer::class, - 'result' => Serializer::serialize(true), - ]); + self::$context->storedWorkflow->createLog([ + 'index' => self::$context->index, + 'now' => self::$context->now, + 'class' => Timer::class, + 'result' => Serializer::serialize(true), + ]); } catch (\Illuminate\Database\UniqueConstraintViolationException $exception) { // already logged } diff --git a/src/Traits/Versions.php b/src/Traits/Versions.php index baf9c724..d8fd011f 100644 --- a/src/Traits/Versions.php +++ b/src/Traits/Versions.php @@ -17,9 +17,7 @@ public static function getVersion( int $minSupported = self::DEFAULT_VERSION, int $maxSupported = 1 ): PromiseInterface { - $log = self::$context->storedWorkflow->logs() - ->whereIndex(self::$context->index) - ->first(); + $log = self::$context->storedWorkflow->findLogByIndex(self::$context->index); if ($log) { $version = Serializer::unserialize($log->result); @@ -39,17 +37,14 @@ public static function getVersion( if (! self::$context->replaying) { try { - self::$context->storedWorkflow->logs() - ->create([ - 'index' => self::$context->index, - 'now' => self::$context->now, - 'class' => 'version:' . $changeId, - 'result' => Serializer::serialize($version), - ]); + self::$context->storedWorkflow->createLog([ + 'index' => self::$context->index, + 'now' => self::$context->now, + 'class' => 'version:' . $changeId, + 'result' => Serializer::serialize($version), + ]); } catch (QueryException $exception) { - $log = self::$context->storedWorkflow->logs() - ->whereIndex(self::$context->index) - ->first(); + $log = self::$context->storedWorkflow->findLogByIndex(self::$context->index, true); if ($log) { $version = Serializer::unserialize($log->result); diff --git a/src/Workflow.php b/src/Workflow.php index 0942dd99..5ebecc0d 100644 --- a/src/Workflow.php +++ b/src/Workflow.php @@ -76,11 +76,19 @@ public function __construct( $this->arguments = $arguments; - if (property_exists($this, 'connection')) { + $connection = $this->storedWorkflow->effectiveConnection(); + + if ($connection !== null) { + $this->onConnection($connection); + } elseif (property_exists($this, 'connection')) { $this->onConnection($this->connection); } - if (property_exists($this, 'queue')) { + $queue = $this->storedWorkflow->effectiveQueue(); + + if ($queue !== null) { + $this->onQueue($queue); + } elseif (property_exists($this, 'queue')) { $this->onQueue($this->queue); } @@ -179,9 +187,9 @@ public function handle(): void ->wherePivot('parent_index', '!=', StoredWorkflow::ACTIVE_WORKFLOW_INDEX) ->first(); - $log = $this->storedWorkflow->logs() - ->where('index', $this->index) - ->first(); + $this->storedWorkflow->loadMissing(['logs', 'signals']); + + $log = $this->storedWorkflow->findLogByIndex($this->index); $this->storedWorkflow ->signals() @@ -216,11 +224,7 @@ public function handle(): void while ($this->coroutine->valid()) { $this->index = WorkflowStub::getContext()->index; - $previousLog = $log; - - $log = $this->storedWorkflow->logs() - ->where('index', $this->index) - ->first(); + $log = $this->storedWorkflow->findLogByIndex($this->index); $this->now = $log ? $log->now : Carbon::now(); @@ -286,9 +290,11 @@ public function handle(): void ); if ($parentWorkflow) { - $properties = WorkflowStub::getDefaultProperties($parentWorkflow->class); - $connection = $properties['connection'] ?? config('queue.default'); - $queue = $properties['queue'] ?? config('queue.connections.' . $connection . '.queue', 'default'); + $connection = $parentWorkflow->effectiveConnection() ?? config('queue.default'); + $queue = $parentWorkflow->effectiveQueue() ?? config( + 'queue.connections.' . $connection . '.queue', + 'default' + ); ChildWorkflow::dispatch( $parentWorkflow->pivot->parent_index, diff --git a/src/WorkflowMetadata.php b/src/WorkflowMetadata.php new file mode 100644 index 00000000..96818fc8 --- /dev/null +++ b/src/WorkflowMetadata.php @@ -0,0 +1,74 @@ + $arguments + */ + public function __construct( + public array $arguments, + public WorkflowOptions $options = new WorkflowOptions(), + ) { + } + + /** + * @return array{ + * arguments: array, + * options: array{connection: ?string, queue: ?string} + * } + */ + public function toArray(): array + { + return [ + 'arguments' => $this->arguments, + 'options' => [ + 'connection' => $this->options->connection, + 'queue' => $this->options->queue, + ], + ]; + } + + public static function fromSerializedArguments(mixed $serialized): self + { + if ($serialized instanceof self) { + return $serialized; + } + + if (is_array($serialized)) { + if (array_key_exists('arguments', $serialized) || array_key_exists('options', $serialized)) { + $arguments = $serialized['arguments'] ?? []; + $options = $serialized['options'] ?? []; + + return new self( + is_array($arguments) ? array_values($arguments) : [], + is_array($options) ? WorkflowOptions::set($options) : new WorkflowOptions(), + ); + } + + return new self(array_values($serialized)); + } + + return new self([]); + } + + /** + * @param array $arguments + */ + public static function fromStartArguments(array $arguments, ?WorkflowOptions $fallback = null): self + { + $options = $fallback; + + foreach ($arguments as $index => $argument) { + if ($argument instanceof WorkflowOptions) { + $options = $argument; + unset($arguments[$index]); + } + } + + return new self(array_values($arguments), $options ?? new WorkflowOptions()); + } +} diff --git a/src/WorkflowOptions.php b/src/WorkflowOptions.php new file mode 100644 index 00000000..cccc96ed --- /dev/null +++ b/src/WorkflowOptions.php @@ -0,0 +1,22 @@ +storedWorkflow->class, $method)) { $activeWorkflow = $this->storedWorkflow->active(); - return (new $activeWorkflow->class( - $activeWorkflow, - ...Serializer::unserialize($activeWorkflow->arguments), - )) + return (new $activeWorkflow->class($activeWorkflow, ...$activeWorkflow->workflowArguments())) ->query($method); } if (self::isUpdateMethod($this->storedWorkflow->class, $method)) { $activeWorkflow = $this->storedWorkflow->active(); - $workflow = new $activeWorkflow->class( - $activeWorkflow, - ...Serializer::unserialize($activeWorkflow->arguments), - ); + $workflow = new $activeWorkflow->class($activeWorkflow, ...$activeWorkflow->workflowArguments()); $result = $workflow->query($method); if ($workflow->outboxWasConsumed) { @@ -123,12 +116,12 @@ public function __call($method, $arguments) public static function connection() { - return Arr::get(self::getDefaultProperties(self::$context->storedWorkflow->class), 'connection'); + return self::$context->storedWorkflow->effectiveConnection(); } public static function queue() { - return Arr::get(self::getDefaultProperties(self::$context->storedWorkflow->class), 'queue'); + return self::$context->storedWorkflow->effectiveQueue(); } public static function getDefaultProperties(string $class): array @@ -245,7 +238,11 @@ public function resume(): void public function start(...$arguments): void { - $this->storedWorkflow->arguments = Serializer::serialize($arguments); + $fallbackOptions = $this->storedWorkflow->workflowOptions(); + + $metadata = WorkflowMetadata::fromStartArguments($arguments, $fallbackOptions); + + $this->storedWorkflow->arguments = Serializer::serialize($metadata->toArray()); $this->dispatch(); } @@ -302,13 +299,12 @@ public function fail($exception): void public function next($index, $now, $class, $result, bool $shouldSignal = true): void { try { - $this->storedWorkflow->logs() - ->create([ - 'index' => $index, - 'now' => $now, - 'class' => $class, - 'result' => Serializer::serialize($result), - ]); + $this->storedWorkflow->createLog([ + 'index' => $index, + 'now' => $now, + 'class' => $class, + 'result' => Serializer::serialize($result), + ]); } catch (\Illuminate\Database\UniqueConstraintViolationException $exception) { // already logged } @@ -375,7 +371,7 @@ private function dispatch(): void WorkflowStarted::dispatch( $this->storedWorkflow->id, $this->storedWorkflow->class, - json_encode(Serializer::unserialize($this->storedWorkflow->arguments)), + json_encode($this->storedWorkflow->workflowArguments()), now() ->format('Y-m-d\TH:i:s.u\Z') ); @@ -395,7 +391,7 @@ private function dispatch(): void $this->storedWorkflow->class::$dispatch( $this->storedWorkflow, - ...Serializer::unserialize($this->storedWorkflow->arguments) + ...$this->storedWorkflow->workflowArguments() ); } } diff --git a/tests/Fixtures/TestBadConnectionWorkflow.php b/tests/Fixtures/TestBadConnectionWorkflow.php index a3e626b5..b9b92146 100644 --- a/tests/Fixtures/TestBadConnectionWorkflow.php +++ b/tests/Fixtures/TestBadConnectionWorkflow.php @@ -8,7 +8,7 @@ use Workflow\QueryMethod; use Workflow\SignalMethod; use Workflow\Workflow; -use function Workflow\{activity, await}; +use function Workflow\{activity, await, sideEffect}; class TestBadConnectionWorkflow extends Workflow { @@ -35,13 +35,13 @@ public function execute(Application $app, $shouldAssert = false) assert($app->runningInConsole()); if ($shouldAssert) { - assert(! $this->canceled); + assert(yield sideEffect(fn (): bool => ! $this->canceled)); } $otherResult = yield activity(TestOtherActivity::class, 'other'); if ($shouldAssert) { - assert(! $this->canceled); + assert(yield sideEffect(fn (): bool => ! $this->canceled)); } yield await(fn (): bool => $this->canceled); diff --git a/tests/Fixtures/TestWebhookWorkflow.php b/tests/Fixtures/TestWebhookWorkflow.php index 1a893a84..06e691d7 100644 --- a/tests/Fixtures/TestWebhookWorkflow.php +++ b/tests/Fixtures/TestWebhookWorkflow.php @@ -9,7 +9,7 @@ use Workflow\SignalMethod; use Workflow\Webhook; use Workflow\Workflow; -use function Workflow\{activity, await}; +use function Workflow\{activity, await, sideEffect}; #[Webhook] class TestWebhookWorkflow extends Workflow @@ -38,13 +38,13 @@ public function execute(Application $app, $shouldAssert = false) assert($app->runningInConsole()); if ($shouldAssert) { - assert(! $this->canceled); + assert(yield sideEffect(fn (): bool => ! $this->canceled)); } $otherResult = yield activity(TestOtherActivity::class, 'other'); if ($shouldAssert) { - assert(! $this->canceled); + assert(yield sideEffect(fn (): bool => ! $this->canceled)); } yield await(fn (): bool => $this->canceled); diff --git a/tests/Fixtures/TestWorkflow.php b/tests/Fixtures/TestWorkflow.php index 49c2e22f..be5b031d 100644 --- a/tests/Fixtures/TestWorkflow.php +++ b/tests/Fixtures/TestWorkflow.php @@ -9,7 +9,7 @@ use Workflow\SignalMethod; use Workflow\Webhook; use Workflow\Workflow; -use function Workflow\{activity, await}; +use function Workflow\{activity, await, sideEffect}; #[Webhook] class TestWorkflow extends Workflow @@ -38,7 +38,7 @@ public function execute(Application $app, $shouldAssert = false) assert($app->runningInConsole()); if ($shouldAssert) { - assert(! $this->canceled); + assert(yield sideEffect(fn (): bool => ! $this->canceled)); } $otherResult = yield activity(TestOtherActivity::class, 'other'); diff --git a/tests/Unit/ActivityTest.php b/tests/Unit/ActivityTest.php index 383522ec..08784962 100644 --- a/tests/Unit/ActivityTest.php +++ b/tests/Unit/ActivityTest.php @@ -39,6 +39,24 @@ public function testActivity(): void $this->assertSame($activity->timeout, pcntl_alarm(0)); } + public function testActivityUsesWorkflowOptionConnection(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $storedWorkflow->arguments = Serializer::serialize([ + 'arguments' => [], + 'options' => [ + 'connection' => 'sync', + 'queue' => null, + ], + ]); + $storedWorkflow->save(); + + $activity = new TestOtherActivity(0, now()->toDateTimeString(), $storedWorkflow, ['other']); + + $this->assertSame('sync', $activity->connection); + } + public function testInvalidActivity(): void { $this->expectException(BadMethodCallException::class); diff --git a/tests/Unit/ChildWorkflowStubTest.php b/tests/Unit/ChildWorkflowStubTest.php index f3d90dc3..6f5a3f9c 100644 --- a/tests/Unit/ChildWorkflowStubTest.php +++ b/tests/Unit/ChildWorkflowStubTest.php @@ -93,15 +93,6 @@ public function testLoadsChildWorkflow(): void public function testIgnoresTransitionNotFoundWhenChildResumeThrows(): void { - $logs = Mockery::mock(); - $logs->shouldReceive('whereIndex') - ->once() - ->with(0) - ->andReturnSelf(); - $logs->shouldReceive('first') - ->once() - ->andReturn(null); - $childWorkflow = new class() { public function running(): bool { @@ -143,12 +134,19 @@ public function startAsChild(...$arguments): void ->andReturn($storedChildWorkflow); $storedWorkflow = Mockery::mock(); - $storedWorkflow->shouldReceive('logs') + $storedWorkflow->shouldReceive('findLogByIndex') ->once() - ->andReturn($logs); + ->with(0) + ->andReturn(null); $storedWorkflow->shouldReceive('children') ->once() ->andReturn($children); + $storedWorkflow->shouldReceive('effectiveConnection') + ->once() + ->andReturn(null); + $storedWorkflow->shouldReceive('effectiveQueue') + ->once() + ->andReturn(null); WorkflowStub::setContext([ 'storedWorkflow' => $storedWorkflow, diff --git a/tests/Unit/Models/StoredWorkflowTest.php b/tests/Unit/Models/StoredWorkflowTest.php index 95a463f3..ec6e533e 100644 --- a/tests/Unit/Models/StoredWorkflowTest.php +++ b/tests/Unit/Models/StoredWorkflowTest.php @@ -5,6 +5,7 @@ namespace Tests\Unit\Models; use Illuminate\Support\Carbon; +use Illuminate\Support\Facades\DB; use Tests\Fixtures\TestContinueAsNewWorkflow; use Tests\Fixtures\TestWorkflow; use Tests\TestCase; @@ -249,4 +250,212 @@ public function testActiveWithContinuedStatusButNoActiveChild(): void $this->assertNotNull($active); $this->assertSame($workflow->id, $active->id); } + + public function testEffectiveConnectionUsesWorkflowOptionConnection(): void + { + $workflow = StoredWorkflow::create([ + 'class' => TestWorkflow::class, + 'status' => 'running', + 'arguments' => Serializer::serialize([ + 'arguments' => [], + 'options' => [ + 'connection' => 'sync', + 'queue' => null, + ], + ]), + ]); + + $this->assertSame('sync', $workflow->effectiveConnection()); + } + + public function testFindLogByIndexFreshSyncsLoadedLogsRelation(): void + { + $workflow = StoredWorkflow::create([ + 'class' => 'TestWorkflow', + 'status' => 'running', + ]); + + $existingLog = $workflow->logs() + ->create([ + 'index' => 0, + 'now' => now(), + 'class' => 'test', + ]); + + $freshLog = $workflow->logs() + ->create([ + 'index' => 1, + 'now' => now(), + 'class' => 'test', + ]); + + $workflow->setRelation('logs', $workflow->logs()->whereKey($existingLog->id)->get()); + + $log = $workflow->findLogByIndex(1, true); + + $this->assertNotNull($log); + $this->assertSame($freshLog->id, $log->id); + $this->assertSame([$existingLog->id, $freshLog->id], $workflow->getRelation('logs')->pluck('id')->toArray()); + } + + public function testFindLogByIndexUsesLoadedLogsRelation(): void + { + $workflow = StoredWorkflow::create([ + 'class' => 'TestWorkflow', + 'status' => 'running', + ]); + + $workflow->logs() + ->create([ + 'index' => 0, + 'now' => now(), + 'class' => 'test', + ]); + + $workflow->load('logs'); + + DB::flushQueryLog(); + DB::enableQueryLog(); + + $log = $workflow->findLogByIndex(0); + + $this->assertNotNull($log); + $this->assertCount(0, DB::getQueryLog()); + } + + public function testCreateLogSyncsLoadedLogsRelation(): void + { + $workflow = StoredWorkflow::create([ + 'class' => 'TestWorkflow', + 'status' => 'running', + ]); + + $workflow->load('logs'); + + $workflow->createLog([ + 'index' => 2, + 'now' => now(), + 'class' => 'test', + ]); + + DB::flushQueryLog(); + DB::enableQueryLog(); + + $log = $workflow->findLogByIndex(2); + + $this->assertNotNull($log); + $this->assertCount(0, DB::getQueryLog()); + } + + public function testFindTimerByIndexUsesLoadedTimersRelation(): void + { + $workflow = StoredWorkflow::create([ + 'class' => 'TestWorkflow', + 'status' => 'running', + ]); + + $workflow->timers() + ->create([ + 'index' => 3, + 'stop_at' => now() + ->addSecond(), + ]); + + $workflow->load('timers'); + + DB::flushQueryLog(); + DB::enableQueryLog(); + + $timer = $workflow->findTimerByIndex(3); + + $this->assertNotNull($timer); + $this->assertCount(0, DB::getQueryLog()); + } + + public function testFindTimerByIndexQueriesWhenTimersRelationIsNotLoaded(): void + { + $workflow = StoredWorkflow::create([ + 'class' => 'TestWorkflow', + 'status' => 'running', + ]); + + $workflow->timers() + ->create([ + 'index' => 3, + 'stop_at' => now() + ->addSecond(), + ]); + + DB::flushQueryLog(); + DB::enableQueryLog(); + + $timer = $workflow->findTimerByIndex(3); + + $this->assertNotNull($timer); + $this->assertGreaterThan(0, count(DB::getQueryLog())); + } + + public function testOrderedSignalsUsesLoadedSignalsRelation(): void + { + $workflow = StoredWorkflow::create([ + 'class' => 'TestWorkflow', + 'status' => 'running', + ]); + + $workflow->signals() + ->create([ + 'method' => 'first', + 'arguments' => serialize([]), + 'created_at' => now() + ->subSecond(), + ]); + + $workflow->signals() + ->create([ + 'method' => 'second', + 'arguments' => serialize([]), + 'created_at' => now(), + ]); + + $workflow->load('signals'); + + DB::flushQueryLog(); + DB::enableQueryLog(); + + $signals = $workflow->orderedSignals(); + + $this->assertSame(['first', 'second'], $signals->pluck('method')->toArray()); + $this->assertCount(0, DB::getQueryLog()); + } + + public function testOrderedSignalsQueriesWhenSignalsRelationIsNotLoaded(): void + { + $workflow = StoredWorkflow::create([ + 'class' => 'TestWorkflow', + 'status' => 'running', + ]); + + $workflow->signals() + ->create([ + 'method' => 'first', + 'arguments' => serialize([]), + 'created_at' => now() + ->subSecond(), + ]); + + $workflow->signals() + ->create([ + 'method' => 'second', + 'arguments' => serialize([]), + 'created_at' => now(), + ]); + + DB::flushQueryLog(); + DB::enableQueryLog(); + + $signals = $workflow->orderedSignals(); + + $this->assertSame(['first', 'second'], $signals->pluck('method')->toArray()); + $this->assertGreaterThan(0, count(DB::getQueryLog())); + } } diff --git a/tests/Unit/Traits/AwaitWithTimeoutsTest.php b/tests/Unit/Traits/AwaitWithTimeoutsTest.php index bb8589bf..cd584e0a 100644 --- a/tests/Unit/Traits/AwaitWithTimeoutsTest.php +++ b/tests/Unit/Traits/AwaitWithTimeoutsTest.php @@ -121,23 +121,18 @@ public function testThrowsQueryExceptionWhenNotDuplicateKey(): void $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); - $mockLogs = Mockery::mock(\Illuminate\Database\Eloquent\Relations\HasMany::class) - ->shouldReceive('whereIndex') - ->twice() - ->andReturnSelf() - ->shouldReceive('first') - ->twice() - ->andReturn(null) - ->shouldReceive('create') - ->andThrow(new \Illuminate\Database\QueryException('', '', [], new \Exception('Some other error'))) - ->getMock(); - $mockStoredWorkflow = Mockery::spy($storedWorkflow); - - $mockStoredWorkflow->shouldReceive('logs') - ->andReturnUsing(static function () use ($mockLogs) { - return $mockLogs; - }); + $mockStoredWorkflow->shouldReceive('findLogByIndex') + ->once() + ->with(0) + ->andReturn(null); + $mockStoredWorkflow->shouldReceive('createLog') + ->once() + ->andThrow(new \Illuminate\Database\QueryException('', '', [], new \Exception('Some other error'))); + $mockStoredWorkflow->shouldReceive('findLogByIndex') + ->once() + ->with(0, true) + ->andReturn(null); WorkflowStub::setContext([ 'storedWorkflow' => $mockStoredWorkflow, diff --git a/tests/Unit/Traits/AwaitsTest.php b/tests/Unit/Traits/AwaitsTest.php index cf5677da..12b2a8bd 100644 --- a/tests/Unit/Traits/AwaitsTest.php +++ b/tests/Unit/Traits/AwaitsTest.php @@ -107,23 +107,18 @@ public function testThrowsQueryExceptionWhenNotDuplicateKey(): void $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); - $mockLogs = Mockery::mock(\Illuminate\Database\Eloquent\Relations\HasMany::class) - ->shouldReceive('whereIndex') - ->twice() - ->andReturnSelf() - ->shouldReceive('first') - ->twice() - ->andReturn(null) - ->shouldReceive('create') - ->andThrow(new \Illuminate\Database\QueryException('', '', [], new \Exception('Some other error'))) - ->getMock(); - $mockStoredWorkflow = Mockery::spy($storedWorkflow); - - $mockStoredWorkflow->shouldReceive('logs') - ->andReturnUsing(static function () use ($mockLogs) { - return $mockLogs; - }); + $mockStoredWorkflow->shouldReceive('findLogByIndex') + ->once() + ->with(0) + ->andReturn(null); + $mockStoredWorkflow->shouldReceive('createLog') + ->once() + ->andThrow(new \Illuminate\Database\QueryException('', '', [], new \Exception('Some other error'))); + $mockStoredWorkflow->shouldReceive('findLogByIndex') + ->once() + ->with(0, true) + ->andReturn(null); WorkflowStub::setContext([ 'storedWorkflow' => $mockStoredWorkflow, diff --git a/tests/Unit/Traits/SideEffectsTest.php b/tests/Unit/Traits/SideEffectsTest.php index 1ff8c369..9e121000 100644 --- a/tests/Unit/Traits/SideEffectsTest.php +++ b/tests/Unit/Traits/SideEffectsTest.php @@ -93,25 +93,18 @@ public function testThrowsQueryExceptionWhenNotDuplicateKey(): void $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); - $mockLogs = Mockery::mock(\Illuminate\Database\Eloquent\Relations\HasMany::class) - ->shouldReceive('whereIndex') - ->twice() - ->andReturnSelf() - ->shouldReceive('first') - ->twice() - ->andReturn(null) - ->shouldReceive('create') - ->andThrow(new \Illuminate\Database\QueryException('', '', [], new \Exception('Some other error'))) - ->getMock(); - $mockStoredWorkflow = Mockery::spy($storedWorkflow); - - $mockStoredWorkflow->shouldReceive('logs') - ->andReturnUsing(static function () use ($mockLogs) { - return $mockLogs; - }); - - $mockStoredWorkflow->class = TestWorkflow::class; + $mockStoredWorkflow->shouldReceive('findLogByIndex') + ->once() + ->with(0) + ->andReturn(null); + $mockStoredWorkflow->shouldReceive('createLog') + ->once() + ->andThrow(new \Illuminate\Database\QueryException('', '', [], new \Exception('Some other error'))); + $mockStoredWorkflow->shouldReceive('findLogByIndex') + ->once() + ->with(0, true) + ->andReturn(null); WorkflowStub::setContext([ 'storedWorkflow' => $mockStoredWorkflow, diff --git a/tests/Unit/Traits/TimersTest.php b/tests/Unit/Traits/TimersTest.php index 36a701d1..1e0dfad7 100644 --- a/tests/Unit/Traits/TimersTest.php +++ b/tests/Unit/Traits/TimersTest.php @@ -6,7 +6,6 @@ use Carbon\CarbonInterval; use Exception; -use Illuminate\Database\Eloquent\Relations\HasMany; use Illuminate\Database\UniqueConstraintViolationException; use Illuminate\Support\Facades\Bus; use Mockery; @@ -163,23 +162,14 @@ public function testHandlesDuplicateLogInsertionProperly(): void 'result' => Serializer::serialize(true), ]); - $mockLogs = Mockery::mock(HasMany::class) - ->shouldReceive('whereIndex') + $mockStoredWorkflow = Mockery::spy($storedWorkflow); + $mockStoredWorkflow->shouldReceive('findLogByIndex') ->once() - ->andReturnSelf() - ->shouldReceive('first') + ->with(0) + ->andReturn(null); + $mockStoredWorkflow->shouldReceive('createLog') ->once() - ->andReturn(null) - ->shouldReceive('create') - ->andThrow(new UniqueConstraintViolationException('', '', [], new Exception())) - ->getMock(); - - $mockStoredWorkflow = Mockery::spy($storedWorkflow); - - $mockStoredWorkflow->shouldReceive('logs') - ->andReturnUsing(static function () use ($mockLogs) { - return $mockLogs; - }); + ->andThrow(new UniqueConstraintViolationException('', '', [], new Exception())); WorkflowStub::setContext([ 'storedWorkflow' => $mockStoredWorkflow, diff --git a/tests/Unit/Traits/VersionsTest.php b/tests/Unit/Traits/VersionsTest.php index 8198ae19..1fe909f9 100644 --- a/tests/Unit/Traits/VersionsTest.php +++ b/tests/Unit/Traits/VersionsTest.php @@ -9,6 +9,7 @@ use Tests\TestCase; use Workflow\Exceptions\VersionNotSupportedException; use Workflow\Models\StoredWorkflow; +use Workflow\Models\StoredWorkflowLog; use Workflow\Serializers\Serializer; use Workflow\WorkflowStub; @@ -121,29 +122,20 @@ public function testResolvesConflictingResultWithValidVersion(): void $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); - $mockLogs = Mockery::mock(\Illuminate\Database\Eloquent\Relations\HasMany::class) - ->shouldReceive('whereIndex') - ->andReturnSelf() - ->shouldReceive('first') + $mockStoredWorkflow = Mockery::spy($storedWorkflow); + $mockStoredWorkflow->shouldReceive('findLogByIndex') + ->once() + ->with(0) + ->andReturn(null); + $mockStoredWorkflow->shouldReceive('createLog') ->once() - ->andReturn(null) - ->shouldReceive('create') - ->andThrow(new \Illuminate\Database\QueryException('', '', [], new \Exception('Duplicate entry'))) - ->shouldReceive('first') + ->andThrow(new \Illuminate\Database\QueryException('', '', [], new \Exception('Duplicate entry'))); + $mockStoredWorkflow->shouldReceive('findLogByIndex') ->once() - ->andReturn((object) [ + ->with(0, true) + ->andReturn(new StoredWorkflowLog([ 'result' => Serializer::serialize(1), - ]) - ->getMock(); - - $mockStoredWorkflow = Mockery::spy($storedWorkflow); - - $mockStoredWorkflow->shouldReceive('logs') - ->andReturnUsing(static function () use ($mockLogs) { - return $mockLogs; - }); - - $mockStoredWorkflow->class = TestWorkflow::class; + ])); WorkflowStub::setContext([ 'storedWorkflow' => $mockStoredWorkflow, @@ -167,29 +159,20 @@ public function testResolvesConflictingResultThrowsWhenVersionNotSupported(): vo $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); - $mockLogs = Mockery::mock(\Illuminate\Database\Eloquent\Relations\HasMany::class) - ->shouldReceive('whereIndex') - ->andReturnSelf() - ->shouldReceive('first') + $mockStoredWorkflow = Mockery::spy($storedWorkflow); + $mockStoredWorkflow->shouldReceive('findLogByIndex') + ->once() + ->with(0) + ->andReturn(null); + $mockStoredWorkflow->shouldReceive('createLog') ->once() - ->andReturn(null) - ->shouldReceive('create') - ->andThrow(new \Illuminate\Database\QueryException('', '', [], new \Exception('Duplicate entry'))) - ->shouldReceive('first') + ->andThrow(new \Illuminate\Database\QueryException('', '', [], new \Exception('Duplicate entry'))); + $mockStoredWorkflow->shouldReceive('findLogByIndex') ->once() - ->andReturn((object) [ + ->with(0, true) + ->andReturn(new StoredWorkflowLog([ 'result' => Serializer::serialize(99), - ]) - ->getMock(); - - $mockStoredWorkflow = Mockery::spy($storedWorkflow); - - $mockStoredWorkflow->shouldReceive('logs') - ->andReturnUsing(static function () use ($mockLogs) { - return $mockLogs; - }); - - $mockStoredWorkflow->class = TestWorkflow::class; + ])); WorkflowStub::setContext([ 'storedWorkflow' => $mockStoredWorkflow, @@ -213,25 +196,18 @@ public function testThrowsQueryExceptionWhenNotDuplicateKey(): void $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); - $mockLogs = Mockery::mock(\Illuminate\Database\Eloquent\Relations\HasMany::class) - ->shouldReceive('whereIndex') - ->twice() - ->andReturnSelf() - ->shouldReceive('first') - ->twice() - ->andReturn(null) - ->shouldReceive('create') - ->andThrow(new \Illuminate\Database\QueryException('', '', [], new \Exception('Some other error'))) - ->getMock(); - $mockStoredWorkflow = Mockery::spy($storedWorkflow); - - $mockStoredWorkflow->shouldReceive('logs') - ->andReturnUsing(static function () use ($mockLogs) { - return $mockLogs; - }); - - $mockStoredWorkflow->class = TestWorkflow::class; + $mockStoredWorkflow->shouldReceive('findLogByIndex') + ->once() + ->with(0) + ->andReturn(null); + $mockStoredWorkflow->shouldReceive('createLog') + ->once() + ->andThrow(new \Illuminate\Database\QueryException('', '', [], new \Exception('Some other error'))); + $mockStoredWorkflow->shouldReceive('findLogByIndex') + ->once() + ->with(0, true) + ->andReturn(null); WorkflowStub::setContext([ 'storedWorkflow' => $mockStoredWorkflow, diff --git a/tests/Unit/WorkflowMetadataTest.php b/tests/Unit/WorkflowMetadataTest.php new file mode 100644 index 00000000..ac13cb2d --- /dev/null +++ b/tests/Unit/WorkflowMetadataTest.php @@ -0,0 +1,30 @@ +assertSame($metadata, $result); + } + + public function testFromSerializedArgumentsReturnsEmptyMetadataForInvalidPayload(): void + { + $result = WorkflowMetadata::fromSerializedArguments('not-an-array'); + + $this->assertSame([], $result->arguments); + $this->assertNull($result->options->connection); + $this->assertNull($result->options->queue); + } +} diff --git a/tests/Unit/WorkflowTest.php b/tests/Unit/WorkflowTest.php index a1bf5e96..6314ddcc 100644 --- a/tests/Unit/WorkflowTest.php +++ b/tests/Unit/WorkflowTest.php @@ -7,6 +7,7 @@ use BadMethodCallException; use Illuminate\Support\Carbon; use Illuminate\Support\Facades\Event; +use Mockery; use Tests\Fixtures\TestActivity; use Tests\Fixtures\TestChildWorkflow; use Tests\Fixtures\TestContinueAsNewWorkflow; @@ -292,6 +293,22 @@ public function testParentPending(): void $this->assertSame('other', $childWorkflow->output()); } + public function testConstructorUsesQueuePropertyWhenEffectiveQueueIsNull(): void + { + $storedWorkflow = Mockery::mock(StoredWorkflow::class); + $storedWorkflow->shouldReceive('effectiveConnection') + ->once() + ->andReturn('sync'); + $storedWorkflow->shouldReceive('effectiveQueue') + ->once() + ->andReturn(null); + + $workflow = new Workflow($storedWorkflow); + + $this->assertSame('sync', $workflow->connection); + $this->assertNull($workflow->queue); + } + public function testThrowsWhenExecuteMethodIsMissing(): void { $stub = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); @@ -420,4 +437,37 @@ public function testContinueAsNewWithParentWorkflow(): void $this->assertSame(1, $storedWorkflow->continuedWorkflows()->count()); } + + public function testContinueAsNewCarriesWorkflowOptions(): void + { + $storedWorkflow = StoredWorkflow::create([ + 'class' => TestContinueAsNewWorkflow::class, + 'arguments' => Serializer::serialize([ + 'arguments' => [0, 3], + 'options' => [ + 'connection' => 'sync', + 'queue' => 'default', + ], + ]), + 'status' => WorkflowPendingStatus::class, + ]); + + $storedWorkflow->logs() + ->create([ + 'index' => 0, + 'now' => now(), + 'class' => TestCountActivity::class, + 'result' => Serializer::serialize(0), + ]); + + $workflow = new TestContinueAsNewWorkflow($storedWorkflow); + $workflow->handle(); + + $continuedWorkflow = $storedWorkflow->continuedWorkflows() + ->first(); + + $this->assertNotNull($continuedWorkflow); + $this->assertSame('sync', $continuedWorkflow->workflowOptions()->connection); + $this->assertSame('default', $continuedWorkflow->workflowOptions()->queue); + } }