diff --git a/CHANGELOG.md b/CHANGELOG.md index 4bc8f7c..7ce7ca1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +# Version 5.5.0 +* Added cleanup commands +* Updated for PHP 8.2 + # Version 5.4.1 * Fix File Exceptions integration diff --git a/README.md b/README.md index 84638b5..5c251a7 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ providing an easy way to create import / export dataflow. | Dataflow | Symfony | Support | |----------|--------------------------|---------| -| 5.x | 7.x | yes | +| 5.x | ^7.3 | yes | | 4.x | 3.4 \| 4.x \| 5.x \| 6.x | yes | | 3.x | 3.4 \| 4.x \| 5.x | no | | 2.x | 3.4 \| 4.x | no | @@ -243,7 +243,6 @@ implementing `DataflowTypeInterface`. Otherwise, manually add the tag `coderhapsodie.dataflow.type` in your dataflow type service configuration: -```yaml ```yaml CodeRhapsodie\DataflowExemple\DataflowType\MyFirstDataflowType: tags: @@ -598,6 +597,10 @@ the messenger component instead. `code-rhapsodie:dataflow:dump-schema` Generates schema create / update SQL queries +`code-rhapsodie:dataflow:job:set-crashed` Jobs that have been in the "running" status for too long will be set in the "crashed" status. + +`code-rhapsodie:dataflow:job:cleanup` Remove old completed or crashed jobs + ### Work with many databases All commands have a `--connection` option to define what Doctrine DBAL connection to use during execution. diff --git a/Tests/DataflowType/AbstractDataflowTypeTest.php b/Tests/DataflowType/AbstractDataflowTypeTest.php index ee843ae..ca4a121 100644 --- a/Tests/DataflowType/AbstractDataflowTypeTest.php +++ b/Tests/DataflowType/AbstractDataflowTypeTest.php @@ -18,7 +18,7 @@ public function testProcess() $dataflowType = new class($label, $options, $values, $testCase) extends AbstractDataflowType { - public function __construct(private string $label, private array $options, private array $values, private TestCase $testCase) + public function __construct(private readonly string $label, private readonly array $options, private readonly array $values, private readonly TestCase $testCase) { } diff --git a/composer.json b/composer.json index 393f4b0..e60f0e0 100644 --- a/composer.json +++ b/composer.json @@ -41,33 +41,35 @@ } }, "require": { - "php": "^8.0", + "php": "^8.2", "ext-json": "*", "doctrine/dbal": "^3.0||^4.0", "doctrine/doctrine-bundle": "^2.0", "monolog/monolog": "^2.0||^3.0", "psr/log": "^1.1||^2.0||^3.0", - "symfony/config": "^7.0", - "symfony/console": "^7.0", - "symfony/dependency-injection": "^7.0", - "symfony/event-dispatcher": "^7.0", - "symfony/http-kernel": "^7.0", - "symfony/lock": "^7.0", - "symfony/monolog-bridge": "^7.0", - "symfony/options-resolver": "^7.0", - "symfony/validator": "^7.0", - "symfony/yaml": "^7.0" + "symfony/config": "^7.3", + "symfony/console": "^7.3", + "symfony/dependency-injection": "^7.3", + "symfony/event-dispatcher": "^7.3", + "symfony/http-kernel": "^7.3", + "symfony/lock": "^7.3", + "symfony/monolog-bridge": "^7.3", + "symfony/options-resolver": "^7.3", + "symfony/validator": "^7.3", + "symfony/yaml": "^7.3" }, "require-dev": { "amphp/amp": "^2.5", + "ekino/phpstan-banned-code": "^3.2", "friendsofphp/php-cs-fixer": "^3.75", "phpunit/phpunit": "^11", "portphp/portphp": "^1.9", "rector/rector": "^2.0", - "symfony/messenger": "^7.0" + "symfony/messenger": "^7.3" }, "suggest": { "amphp/amp": "Provide asynchronous steps for your dataflows", + "league/flysystem": "Allows Dataflow file exception mode, i.e. saving job exceptions outside the database", "portphp/portphp": "Provides generic readers, steps and writers for your dataflows.", "symfony/messenger": "Allows messenger mode, i.e. letting workers run jobs" }, diff --git a/phpstan.neon.dist b/phpstan.neon.dist new file mode 100644 index 0000000..abcd60b --- /dev/null +++ b/phpstan.neon.dist @@ -0,0 +1,6 @@ +includes: + - vendor/ekino/phpstan-banned-code/extension.neon +parameters: + level: 5 + paths: + - src/ diff --git a/rector.php b/rector.php index 92cd240..5beb0f7 100644 --- a/rector.php +++ b/rector.php @@ -2,24 +2,20 @@ declare(strict_types=1); -use Rector\CodeQuality\Rector\Class_\InlineConstructorDefaultToPropertyRector; use Rector\Config\RectorConfig; use Rector\Set\ValueObject\LevelSetList; use Rector\Symfony\Set\SymfonySetList; -return static function (RectorConfig $rectorConfig): void { - $rectorConfig->paths([ - __DIR__ . '/src', - __DIR__ . '/Tests', - ]); - - // register a single rule - $rectorConfig->rule(InlineConstructorDefaultToPropertyRector::class); - - $rectorConfig->sets([ - SymfonySetList::SYMFONY_70, +return RectorConfig::configure() + ->withPaths([ + __DIR__.'/src', + __DIR__.'/Tests', + ]) + ->withComposerBased(doctrine: true, symfony: true) + ->withSets([ SymfonySetList::SYMFONY_CODE_QUALITY, SymfonySetList::SYMFONY_CONSTRUCTOR_INJECTION, - LevelSetList::UP_TO_PHP_80, - ]); -}; + SymfonySetList::SYMFONY_73, + LevelSetList::UP_TO_PHP_82, + ]) +; diff --git a/src/Command/AddScheduledDataflowCommand.php b/src/Command/AddScheduledDataflowCommand.php index 7a895ca..fe20282 100644 --- a/src/Command/AddScheduledDataflowCommand.php +++ b/src/Command/AddScheduledDataflowCommand.php @@ -9,46 +9,34 @@ use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface; use CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository; use Symfony\Component\Console\Attribute\AsCommand; -use Symfony\Component\Console\Command\Command; -use Symfony\Component\Console\Input\InputInterface; -use Symfony\Component\Console\Input\InputOption; -use Symfony\Component\Console\Output\OutputInterface; +use Symfony\Component\Console\Attribute\Option; use Symfony\Component\Console\Style\SymfonyStyle; use Symfony\Component\Validator\Validator\ValidatorInterface; /** * @codeCoverageIgnore */ -#[AsCommand('code-rhapsodie:dataflow:schedule:add', 'Create a scheduled dataflow')] -class AddScheduledDataflowCommand extends Command +#[AsCommand('code-rhapsodie:dataflow:schedule:add', 'Create a scheduled dataflow', help: <<<'TXT' +The %command.name% allows you to create a new scheduled dataflow. +TXT)] +final readonly class AddScheduledDataflowCommand { public function __construct(private DataflowTypeRegistryInterface $registry, private ScheduledDataflowRepository $scheduledDataflowRepository, private ValidatorInterface $validator, private ConnectionFactory $connectionFactory) { - parent::__construct(); } - protected function configure(): void - { - $this - ->setHelp('The %command.name% allows you to create a new scheduled dataflow.') - ->addOption('label', null, InputOption::VALUE_REQUIRED, 'Label of the scheduled dataflow') - ->addOption('type', null, InputOption::VALUE_REQUIRED, 'Type of the scheduled dataflow (FQCN)') - ->addOption( - 'options', - null, - InputOption::VALUE_OPTIONAL, - 'Options of the scheduled dataflow (ex: {"option1": "value1", "option2": "value2"})' - ) - ->addOption('frequency', null, InputOption::VALUE_REQUIRED, 'Frequency of the scheduled dataflow') - ->addOption('first_run', null, InputOption::VALUE_REQUIRED, 'Date for the first run of the scheduled dataflow (Y-m-d H:i:s)') - ->addOption('enabled', null, InputOption::VALUE_REQUIRED, 'State of the scheduled dataflow') - ->addOption('connection', null, InputOption::VALUE_REQUIRED, 'Define the DBAL connection to use'); - } - - protected function execute(InputInterface $input, OutputInterface $output): int - { - if ($input->getOption('connection') !== null) { - $this->connectionFactory->setConnectionName($input->getOption('connection')); + public function __invoke( + SymfonyStyle $io, + #[Option('Label of the scheduled dataflow')] ?string $label = null, + #[Option('Type of the scheduled dataflow (FQCN)')] ?string $type = null, + #[Option('Options of the scheduled dataflow (ex: {"option1": "value1", "option2": "value2"})')] ?string $options = null, + #[Option('Frequency of the scheduled dataflow')] ?string $frequency = null, + #[Option('Date for the first run of the scheduled dataflow (Y-m-d H:i:s)')] ?string $firstRun = null, + #[Option('State of the scheduled dataflow')] ?bool $enabled = null, + #[Option('Define the DBAL connection to use')] ?string $connection = null, + ): int { + if ($connection !== null) { + $this->connectionFactory->setConnectionName($connection); } $choices = []; $typeMapping = []; @@ -57,36 +45,29 @@ protected function execute(InputInterface $input, OutputInterface $output): int $typeMapping[$dataflowType->getLabel()] = $fqcn; } - $io = new SymfonyStyle($input, $output); - $label = $input->getOption('label'); if (!$label) { $label = $io->ask('What is the scheduled dataflow label?'); } - $type = $input->getOption('type'); if (!$type) { $selectedType = $io->choice('What is the scheduled dataflow type?', $choices); $type = $typeMapping[$selectedType]; } - $options = $input->getOption('options'); if (!$options) { $options = $io->ask( 'What are the launch options for the scheduled dataflow? (ex: {"option1": "value1", "option2": "value2"})', json_encode([]) ); } - $frequency = $input->getOption('frequency'); if (!$frequency) { $frequency = $io->choice( 'What is the frequency for the scheduled dataflow?', ScheduledDataflow::AVAILABLE_FREQUENCIES ); } - $firstRun = $input->getOption('first_run'); if (!$firstRun) { $firstRun = $io->ask('When is the first execution of the scheduled dataflow (format: Y-m-d H:i:s)?'); } - $enabled = $input->getOption('enabled'); - if (!$enabled) { + if ($enabled === null) { $enabled = $io->confirm('Enable the scheduled dataflow?'); } diff --git a/src/Command/ChangeScheduleStatusCommand.php b/src/Command/ChangeScheduleStatusCommand.php index 35735fc..2e665b4 100644 --- a/src/Command/ChangeScheduleStatusCommand.php +++ b/src/Command/ChangeScheduleStatusCommand.php @@ -7,63 +7,58 @@ use CodeRhapsodie\DataflowBundle\Entity\ScheduledDataflow; use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory; use CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository; +use Symfony\Component\Console\Attribute\Argument; use Symfony\Component\Console\Attribute\AsCommand; -use Symfony\Component\Console\Command\Command; -use Symfony\Component\Console\Input\InputArgument; -use Symfony\Component\Console\Input\InputInterface; -use Symfony\Component\Console\Input\InputOption; -use Symfony\Component\Console\Output\OutputInterface; +use Symfony\Component\Console\Attribute\Option; use Symfony\Component\Console\Style\SymfonyStyle; /** * @codeCoverageIgnore */ -#[AsCommand('code-rhapsodie:dataflow:schedule:change-status', 'Change schedule status')] -class ChangeScheduleStatusCommand extends Command +#[AsCommand('code-rhapsodie:dataflow:schedule:change-status', 'Change schedule status', help: <<<'TXT' +The %command.name% command able you to change schedule status. +TXT)] +final readonly class ChangeScheduleStatusCommand { public function __construct(private ScheduledDataflowRepository $scheduledDataflowRepository, private ConnectionFactory $connectionFactory) { - parent::__construct(); } - protected function configure(): void - { - $this - ->setHelp('The %command.name% command able you to change schedule status.') - ->addArgument('schedule-id', InputArgument::REQUIRED, 'Id of the schedule') - ->addOption('enable', null, InputOption::VALUE_NONE, 'Enable the schedule') - ->addOption('disable', null, InputOption::VALUE_NONE, 'Disable the schedule') - ->addOption('connection', null, InputOption::VALUE_REQUIRED, 'Define the DBAL connection to use'); - } - - protected function execute(InputInterface $input, OutputInterface $output): int - { - if ($input->getOption('connection') !== null) { - $this->connectionFactory->setConnectionName($input->getOption('connection')); + public function __invoke( + SymfonyStyle $io, + #[Argument('Id of the schedule')] int $scheduleId, + #[Option('Enable the schedule')] ?bool $enable = null, + #[Option('Disable the schedule')] ?bool $disable = null, + #[Option('Define the DBAL connection to use')] ?string $connection = null, + ): int { + if ($connection !== null) { + $this->connectionFactory->setConnectionName($connection); } - $io = new SymfonyStyle($input, $output); + /** @var ScheduledDataflow|null $schedule */ - $schedule = $this->scheduledDataflowRepository->find((int) $input->getArgument('schedule-id')); + $schedule = $this->scheduledDataflowRepository->find($scheduleId); if (!$schedule) { - $io->error(\sprintf('Cannot find scheduled dataflow with id "%d".', $input->getArgument('schedule-id'))); + $io->error(\sprintf('Cannot find scheduled dataflow with id "%d".', $scheduleId)); return 1; } - if ($input->getOption('enable') && $input->getOption('disable')) { + if ($enable !== null && $disable !== null) { $io->error('You cannot pass enable and disable options in the same time.'); return 2; } - if (!$input->getOption('enable') && !$input->getOption('disable')) { + if ($enable === null && $disable === null) { $io->error('You must pass enable or disable option.'); return 3; } + $enable = $enable ?? !$disable; + try { - $schedule->setEnabled($input->getOption('enable')); + $schedule->setEnabled($enable); $this->scheduledDataflowRepository->save($schedule); $io->success(\sprintf('Schedule with id "%s" has been successfully updated.', $schedule->getId())); } catch (\Exception $e) { diff --git a/src/Command/DatabaseSchemaCommand.php b/src/Command/DatabaseSchemaCommand.php index 464df39..1721596 100644 --- a/src/Command/DatabaseSchemaCommand.php +++ b/src/Command/DatabaseSchemaCommand.php @@ -12,36 +12,28 @@ use Doctrine\DBAL\Schema\Schema; use Doctrine\DBAL\Schema\Table; use Symfony\Component\Console\Attribute\AsCommand; +use Symfony\Component\Console\Attribute\Option; use Symfony\Component\Console\Command\Command; -use Symfony\Component\Console\Input\InputInterface; -use Symfony\Component\Console\Input\InputOption; -use Symfony\Component\Console\Output\OutputInterface; use Symfony\Component\Console\Question\ConfirmationQuestion; use Symfony\Component\Console\Style\SymfonyStyle; -#[AsCommand(name: 'code-rhapsodie:dataflow:database-schema', description: 'Generates schema create / update SQL queries')] -class DatabaseSchemaCommand extends Command +#[AsCommand(name: 'code-rhapsodie:dataflow:database-schema', description: 'Generates schema create / update SQL queries', help: <<<'TXT' +The %command.name% help you to generate SQL Query to create or update your database schema for this bundle +TXT)] +final readonly class DatabaseSchemaCommand { public function __construct(private ConnectionFactory $connectionFactory) { - parent::__construct(); } - protected function configure(): void - { - $this - ->setHelp('The %command.name% help you to generate SQL Query to create or update your database schema for this bundle') - ->addOption('dump-sql', null, InputOption::VALUE_NONE, 'Dump only the update SQL queries.') - ->addOption('update', null, InputOption::VALUE_NONE, 'Dump/execute only the update SQL queries.') - ->addOption('connection', null, InputOption::VALUE_REQUIRED, 'Define the DBAL connection to use'); - } - - protected function execute(InputInterface $input, OutputInterface $output): int - { - $io = new SymfonyStyle($input, $output); - - if ($input->getOption('connection') !== null) { - $this->connectionFactory->setConnectionName($input->getOption('connection')); + public function __invoke( + SymfonyStyle $io, + #[Option('Dump only the update SQL queries.')] bool $dump = false, + #[Option('Dump/execute only the update SQL queries.')] bool $update = false, + #[Option('Define the DBAL connection to use')] ?string $connection = null, + ): int { + if ($connection !== null) { + $this->connectionFactory->setConnectionName($connection); } $connection = $this->connectionFactory->getConnection(); @@ -51,7 +43,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int $sqls = $schema->toSql($connection->getDatabasePlatform()); - if ($input->getOption('update')) { + if ($update) { $sm = $connection->createSchemaManager(); $tableArray = [JobRepository::TABLE_NAME, ScheduledDataflowRepository::TABLE_NAME]; @@ -84,7 +76,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int } } - if ($input->getOption('dump-sql')) { + if ($dump) { $io->text('Execute these SQL Queries on your database:'); foreach ($sqls as $sql) { $io->text($sql.';'); @@ -105,6 +97,6 @@ protected function execute(InputInterface $input, OutputInterface $output): int $io->success(\sprintf('%d queries executed.', \count($sqls))); - return parent::SUCCESS; + return Command::SUCCESS; } } diff --git a/src/Command/ExecuteDataflowCommand.php b/src/Command/ExecuteDataflowCommand.php index fa99cd5..3a9c38c 100644 --- a/src/Command/ExecuteDataflowCommand.php +++ b/src/Command/ExecuteDataflowCommand.php @@ -10,12 +10,9 @@ use CodeRhapsodie\DataflowBundle\Repository\JobRepository; use Psr\Log\LoggerAwareInterface; use Psr\Log\LoggerAwareTrait; +use Symfony\Component\Console\Attribute\Argument; use Symfony\Component\Console\Attribute\AsCommand; -use Symfony\Component\Console\Command\Command; -use Symfony\Component\Console\Input\InputArgument; -use Symfony\Component\Console\Input\InputInterface; -use Symfony\Component\Console\Input\InputOption; -use Symfony\Component\Console\Output\OutputInterface; +use Symfony\Component\Console\Attribute\Option; use Symfony\Component\Console\Style\SymfonyStyle; /** @@ -23,41 +20,32 @@ * * @codeCoverageIgnore */ -#[AsCommand('code-rhapsodie:dataflow:execute', 'Runs one dataflow type with provided options')] -class ExecuteDataflowCommand extends Command implements LoggerAwareInterface +#[AsCommand('code-rhapsodie:dataflow:execute', 'Runs one dataflow type with provided options', help: <<<'TXT' +The %command.name% command runs one dataflow with the provided options. + + php %command.full_name% App\Dataflow\MyDataflow '{"option1": "value1", "option2": "value2"}' +TXT)] +final class ExecuteDataflowCommand implements LoggerAwareInterface { use LoggerAwareTrait; - public function __construct(private DataflowTypeRegistryInterface $registry, private ConnectionFactory $connectionFactory, private JobRepository $jobRepository) + public function __construct(private readonly DataflowTypeRegistryInterface $registry, private readonly ConnectionFactory $connectionFactory, private readonly JobRepository $jobRepository) { - parent::__construct(); } - protected function configure(): void - { - $this - ->setHelp( - <<<'EOF' -The %command.name% command runs one dataflow with the provided options. - - php %command.full_name% App\Dataflow\MyDataflow '{"option1": "value1", "option2": "value2"}' -EOF - ) - ->addArgument('fqcn', InputArgument::REQUIRED, 'FQCN or alias of the dataflow type') - ->addArgument('options', InputArgument::OPTIONAL, 'Options for the dataflow type as a json string', '[]') - ->addOption('connection', null, InputOption::VALUE_REQUIRED, 'Define the DBAL connection to use'); - } - - protected function execute(InputInterface $input, OutputInterface $output): int - { - if ($input->getOption('connection') !== null) { - $this->connectionFactory->setConnectionName($input->getOption('connection')); + public function __invoke( + SymfonyStyle $io, + #[Argument('FQCN or alias of the dataflow type')] string $fqcn, + #[Argument('Options for the dataflow type as a json string')] string $options = '[]', + #[Option('Define the DBAL connection to use')] ?string $connection = null, + ): int { + if ($connection !== null) { + $this->connectionFactory->setConnectionName($connection); } - $fqcnOrAlias = $input->getArgument('fqcn'); - $options = json_decode($input->getArgument('options'), true, 512, \JSON_THROW_ON_ERROR); - $io = new SymfonyStyle($input, $output); - $dataflowType = $this->registry->getDataflowType($fqcnOrAlias); + $options = json_decode($options, true, 512, \JSON_THROW_ON_ERROR); + + $dataflowType = $this->registry->getDataflowType($fqcn); if ($dataflowType instanceof AutoUpdateCountInterface) { $dataflowType->setRepository($this->jobRepository); } diff --git a/src/Command/JobCleanupCommand.php b/src/Command/JobCleanupCommand.php new file mode 100644 index 0000000..c90187f --- /dev/null +++ b/src/Command/JobCleanupCommand.php @@ -0,0 +1,33 @@ +jobRepository->deleteOld($this->retention); + foreach ($removedIds as $jobId) { + $this->exceptionHandler->delete($jobId); + } + + return Command::SUCCESS; + } +} diff --git a/src/Command/JobShowCommand.php b/src/Command/JobShowCommand.php index 2863b94..6d83201 100644 --- a/src/Command/JobShowCommand.php +++ b/src/Command/JobShowCommand.php @@ -8,49 +8,40 @@ use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory; use CodeRhapsodie\DataflowBundle\Gateway\JobGateway; use Symfony\Component\Console\Attribute\AsCommand; -use Symfony\Component\Console\Command\Command; -use Symfony\Component\Console\Input\InputInterface; -use Symfony\Component\Console\Input\InputOption; -use Symfony\Component\Console\Output\OutputInterface; +use Symfony\Component\Console\Attribute\Option; use Symfony\Component\Console\Style\SymfonyStyle; /** * @codeCoverageIgnore */ -#[AsCommand('code-rhapsodie:dataflow:job:show', 'Display job details for schedule or specific job')] -class JobShowCommand extends Command +#[AsCommand('code-rhapsodie:dataflow:job:show', 'Display job details for schedule or specific job', help: <<<'TXT' +The %command.name% display job details for schedule or specific job. +TXT)] +final readonly class JobShowCommand { private const STATUS_MAPPING = [ Job::STATUS_PENDING => 'Pending', Job::STATUS_RUNNING => 'Running', Job::STATUS_COMPLETED => 'Completed', + Job::STATUS_QUEUED => 'Queued', + Job::STATUS_CRASHED => 'Crashed', ]; public function __construct(private JobGateway $jobGateway, private ConnectionFactory $connectionFactory) { - parent::__construct(); } - protected function configure(): void - { - $this - ->setHelp('The %command.name% display job details for schedule or specific job.') - ->addOption('job-id', null, InputOption::VALUE_REQUIRED, 'Id of the job to get details') - ->addOption('schedule-id', null, InputOption::VALUE_REQUIRED, 'Id of schedule for last execution details') - ->addOption('details', null, InputOption::VALUE_NONE, 'Display full details') - ->addOption('connection', null, InputOption::VALUE_REQUIRED, 'Define the DBAL connection to use'); - } - - protected function execute(InputInterface $input, OutputInterface $output): int - { - if ($input->getOption('connection') !== null) { - $this->connectionFactory->setConnectionName($input->getOption('connection')); + public function __invoke( + SymfonyStyle $io, + #[Option('Id of the job to get details')] ?int $jobId = null, + #[Option('Id of schedule for last execution details')] ?int $scheduleId = null, + #[Option('Display full details')] bool $details = false, + #[Option('Define the DBAL connection to use')] ?string $connection = null, + ): int { + if ($connection !== null) { + $this->connectionFactory->setConnectionName($connection); } - $io = new SymfonyStyle($input, $output); - - $jobId = (int) $input->getOption('job-id'); - $scheduleId = (int) $input->getOption('schedule-id'); if ($jobId && $scheduleId) { $io->error('You must use `job-id` OR `schedule-id` option, not the 2 in the same time.'); @@ -84,16 +75,16 @@ protected function execute(InputInterface $input, OutputInterface $output): int ['Errors', \count((array) $job->getExceptions())], ['Status', $this->translateStatus($job->getStatus())], ]; - if ($input->getOption('details')) { + if ($details) { $display[] = ['Type', $job->getDataflowType()]; $display[] = ['Options', json_encode($job->getOptions(), \JSON_THROW_ON_ERROR)]; $io->section('Summary'); } $io->table(['Field', 'Value'], $display); - if ($input->getOption('details')) { + if ($details) { $io->section('Exceptions'); - $exceptions = array_map(fn (string $exception) => substr($exception, 0, 900).'…', $job->getExceptions()); + $exceptions = array_map(static fn (string $exception) => substr($exception, 0, 900).'…', $job->getExceptions()); $io->write($exceptions); } diff --git a/src/Command/RunPendingDataflowsCommand.php b/src/Command/RunPendingDataflowsCommand.php index a08cae5..0d35ced 100644 --- a/src/Command/RunPendingDataflowsCommand.php +++ b/src/Command/RunPendingDataflowsCommand.php @@ -8,48 +8,38 @@ use CodeRhapsodie\DataflowBundle\Manager\ScheduledDataflowManagerInterface; use CodeRhapsodie\DataflowBundle\Runner\PendingDataflowRunnerInterface; use Symfony\Component\Console\Attribute\AsCommand; -use Symfony\Component\Console\Command\Command; +use Symfony\Component\Console\Attribute\Option; use Symfony\Component\Console\Command\LockableTrait; -use Symfony\Component\Console\Input\InputInterface; -use Symfony\Component\Console\Input\InputOption; -use Symfony\Component\Console\Output\OutputInterface; +use Symfony\Component\Console\Style\SymfonyStyle; /** * Runs dataflows according to user-defined schedule. * * @codeCoverageIgnore */ -#[AsCommand('code-rhapsodie:dataflow:run-pending', 'Runs dataflows based on the scheduled defined in the UI.')] -class RunPendingDataflowsCommand extends Command +#[AsCommand('code-rhapsodie:dataflow:run-pending', 'Runs dataflows based on the scheduled defined in the UI.', help: <<<'TXT' +The %command.name% command runs dataflows according to the schedule defined in the UI by the user. +TXT)] +final class RunPendingDataflowsCommand { use LockableTrait; - public function __construct(private ScheduledDataflowManagerInterface $manager, private PendingDataflowRunnerInterface $runner, private ConnectionFactory $connectionFactory) - { - parent::__construct(); - } - - protected function configure(): void + public function __construct(private readonly ScheduledDataflowManagerInterface $manager, private readonly PendingDataflowRunnerInterface $runner, private readonly ConnectionFactory $connectionFactory) { - $this - ->setHelp( - <<<'EOF' -The %command.name% command runs dataflows according to the schedule defined in the UI by the user. -EOF - ) - ->addOption('connection', null, InputOption::VALUE_REQUIRED, 'Define the DBAL connection to use'); } - protected function execute(InputInterface $input, OutputInterface $output): int - { + public function __invoke( + SymfonyStyle $io, + #[Option('Define the DBAL connection to use')] ?string $connection = null, + ): int { if (!$this->lock()) { - $output->writeln('The command is already running in another process.'); + $io->writeln('The command is already running in another process.'); return 0; } - if ($input->getOption('connection') !== null) { - $this->connectionFactory->setConnectionName($input->getOption('connection')); + if ($connection !== null) { + $this->connectionFactory->setConnectionName($connection); } $this->manager->createJobsFromScheduledDataflows(); diff --git a/src/Command/ScheduleListCommand.php b/src/Command/ScheduleListCommand.php index d4a187c..6f7b2b0 100644 --- a/src/Command/ScheduleListCommand.php +++ b/src/Command/ScheduleListCommand.php @@ -7,36 +7,29 @@ use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory; use CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository; use Symfony\Component\Console\Attribute\AsCommand; -use Symfony\Component\Console\Command\Command; -use Symfony\Component\Console\Input\InputInterface; -use Symfony\Component\Console\Input\InputOption; -use Symfony\Component\Console\Output\OutputInterface; +use Symfony\Component\Console\Attribute\Option; use Symfony\Component\Console\Style\SymfonyStyle; /** * @codeCoverageIgnore */ -#[AsCommand('code-rhapsodie:dataflow:schedule:list', 'List scheduled dataflows')] -class ScheduleListCommand extends Command +#[AsCommand('code-rhapsodie:dataflow:schedule:list', 'List scheduled dataflows', help: <<<'TXT' +The %command.name% lists all scheduled dataflows. +TXT)] +final readonly class ScheduleListCommand { public function __construct(private ScheduledDataflowRepository $scheduledDataflowRepository, private ConnectionFactory $connectionFactory) { - parent::__construct(); } - protected function configure(): void - { - $this - ->setHelp('The %command.name% lists all scheduled dataflows.') - ->addOption('connection', null, InputOption::VALUE_REQUIRED, 'Define the DBAL connection to use'); - } - - protected function execute(InputInterface $input, OutputInterface $output): int - { - if ($input->getOption('connection') !== null) { - $this->connectionFactory->setConnectionName($input->getOption('connection')); + public function __invoke( + SymfonyStyle $io, + #[Option('Define the DBAL connection to use')] ?string $connection = null, + ): int { + if ($connection !== null) { + $this->connectionFactory->setConnectionName($connection); } - $io = new SymfonyStyle($input, $output); + $display = []; $schedules = $this->scheduledDataflowRepository->listAllOrderedByLabel(); foreach ($schedules as $schedule) { diff --git a/src/Command/SchemaCommand.php b/src/Command/SchemaCommand.php index 237724c..cbe643d 100644 --- a/src/Command/SchemaCommand.php +++ b/src/Command/SchemaCommand.php @@ -17,13 +17,14 @@ * * @deprecated This command is deprecated and will be removed in 6.0, use this command "code-rhapsodie:dataflow:database-schema" instead. */ -#[AsCommand('code-rhapsodie:dataflow:dump-schema', 'Generates schema create / update SQL queries')] +#[AsCommand('code-rhapsodie:dataflow:dump-schema', 'Generates schema create / update SQL queries', help: <<<'TXT' +The %command.name% help you to generate SQL Query to create or update your database schema for this bundle +TXT)] class SchemaCommand extends Command { protected function configure(): void { $this - ->setHelp('The %command.name% help you to generate SQL Query to create or update your database schema for this bundle') ->addOption('update', null, InputOption::VALUE_NONE, 'Dump only the update SQL queries.') ->addOption('connection', null, InputOption::VALUE_REQUIRED, 'Define the DBAL connection to use') ; @@ -38,7 +39,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int // add -- before each keys $options = array_combine( - array_map(fn ($key) => '--'.$key, array_keys($options)), + array_map(static fn ($key) => '--'.$key, array_keys($options)), array_values($options) ); diff --git a/src/Command/SetCrashedCommand.php b/src/Command/SetCrashedCommand.php new file mode 100644 index 0000000..6493292 --- /dev/null +++ b/src/Command/SetCrashedCommand.php @@ -0,0 +1,26 @@ +jobRepository->crashLongRunning($this->crashedDelay); + + return Command::SUCCESS; + } +} diff --git a/src/DataflowType/Dataflow/AMPAsyncDataflow.php b/src/DataflowType/Dataflow/AMPAsyncDataflow.php index 1ee099b..8d64929 100644 --- a/src/DataflowType/Dataflow/AMPAsyncDataflow.php +++ b/src/DataflowType/Dataflow/AMPAsyncDataflow.php @@ -23,7 +23,7 @@ class AMPAsyncDataflow implements DataflowInterface, LoggerAwareInterface { use LoggerAwareTrait; - /** @var callable[] */ + /** @var array> */ private array $steps = []; /** @var WriterInterface[] */ diff --git a/src/DataflowType/Dataflow/Dataflow.php b/src/DataflowType/Dataflow/Dataflow.php index dbbb761..81b2d99 100644 --- a/src/DataflowType/Dataflow/Dataflow.php +++ b/src/DataflowType/Dataflow/Dataflow.php @@ -21,8 +21,6 @@ class Dataflow implements DataflowInterface, LoggerAwareInterface private ?\Closure $customExceptionIndex = null; - private ?\DateTimeInterface $dateTime = null; - /** * @var \Closure[] */ @@ -67,7 +65,7 @@ public function setCustomExceptionIndex(callable $callable): self */ public function setAfterItemProcessors(array $processors): self { - $this->afterItemProcessors = array_map(fn (callable $callable) => \Closure::fromCallable($callable), $processors); + $this->afterItemProcessors = array_map(\Closure::fromCallable(...), $processors); return $this; } diff --git a/src/DataflowType/Result.php b/src/DataflowType/Result.php index 5c8716b..f7cf5fe 100644 --- a/src/DataflowType/Result.php +++ b/src/DataflowType/Result.php @@ -9,15 +9,15 @@ */ class Result { - private \DateInterval $elapsed; + private readonly \DateInterval $elapsed; private int $errorCount = 0; private int $successCount = 0; - private array $exceptions; + private readonly array $exceptions; - public function __construct(private string $name, private \DateTimeInterface $startTime, private \DateTimeInterface $endTime, private int $totalProcessedCount, array $exceptions) + public function __construct(private readonly string $name, private readonly \DateTimeInterface $startTime, private readonly \DateTimeInterface $endTime, private readonly int $totalProcessedCount, array $exceptions) { $this->elapsed = $startTime->diff($endTime); $this->errorCount = \count($exceptions); diff --git a/src/DataflowType/Writer/CollectionWriter.php b/src/DataflowType/Writer/CollectionWriter.php index 704e675..7dee448 100644 --- a/src/DataflowType/Writer/CollectionWriter.php +++ b/src/DataflowType/Writer/CollectionWriter.php @@ -14,7 +14,7 @@ class CollectionWriter implements DelegateWriterInterface /** * CollectionWriter constructor. */ - public function __construct(private WriterInterface $writer) + public function __construct(private readonly WriterInterface $writer) { } diff --git a/src/DataflowType/Writer/PortWriterAdapter.php b/src/DataflowType/Writer/PortWriterAdapter.php index 7d624ed..32bfb58 100644 --- a/src/DataflowType/Writer/PortWriterAdapter.php +++ b/src/DataflowType/Writer/PortWriterAdapter.php @@ -6,7 +6,7 @@ class PortWriterAdapter implements WriterInterface { - public function __construct(private \Port\Writer $writer) + public function __construct(private readonly \Port\Writer $writer) { } diff --git a/src/DependencyInjection/CodeRhapsodieDataflowExtension.php b/src/DependencyInjection/CodeRhapsodieDataflowExtension.php index ca4bb9c..b09e69c 100644 --- a/src/DependencyInjection/CodeRhapsodieDataflowExtension.php +++ b/src/DependencyInjection/CodeRhapsodieDataflowExtension.php @@ -30,6 +30,8 @@ public function load(array $configs, ContainerBuilder $container): void $container->setParameter('coderhapsodie.dataflow.dbal_default_connection', $config['dbal_default_connection']); $container->setParameter('coderhapsodie.dataflow.default_logger', $config['default_logger']); $container->setParameter('coderhapsodie.dataflow.exceptions_mode.type', $config['exceptions_mode']['type']); + $container->setParameter('coderhapsodie.dataflow.job_history.retention', $config['job_history']['retention']); + $container->setParameter('coderhapsodie.dataflow.job_history.crashed_delay', $config['job_history']['crashed_delay']); if ($config['exceptions_mode']['type'] === 'file') { $container->setParameter('coderhapsodie.dataflow.flysystem_service', $config['exceptions_mode']['flysystem_service']); diff --git a/src/DependencyInjection/Configuration.php b/src/DependencyInjection/Configuration.php index 12a6df0..a4be510 100644 --- a/src/DependencyInjection/Configuration.php +++ b/src/DependencyInjection/Configuration.php @@ -52,6 +52,20 @@ public function getConfigTreeBuilder(): TreeBuilder ->thenInvalid('You need "league/flysystem" to use Dataflow file exception mode.') ->end() ->end() + ->arrayNode('job_history') + ->addDefaultsIfNotSet() + ->children() + ->integerNode('retention') + ->defaultValue(30) + ->min(0) + ->info('How many days completed and crashed jobs are kept when running the cleanup command.') + ->end() + ->integerNode('crashed_delay') + ->defaultValue(24) + ->min(24) + ->info('Jobs running for more than this many hours will be set as crashed when running the cleanup command.') + ->end() + ->end() ->end() ; diff --git a/src/Entity/Job.php b/src/Entity/Job.php index cd931a6..020f839 100644 --- a/src/Entity/Job.php +++ b/src/Entity/Job.php @@ -17,6 +17,7 @@ class Job public const STATUS_RUNNING = 1; public const STATUS_COMPLETED = 2; public const STATUS_QUEUED = 3; + public const STATUS_CRASHED = 4; private const KEYS = [ 'id', @@ -63,7 +64,7 @@ class Job public static function createFromScheduledDataflow(ScheduledDataflow $scheduled): self { - return (new static()) + return (new self()) ->setStatus(static::STATUS_PENDING) ->setDataflowType($scheduled->getDataflowType()) ->setOptions($scheduled->getOptions()) @@ -74,7 +75,7 @@ public static function createFromScheduledDataflow(ScheduledDataflow $scheduled) public static function createFromArray(array $datas) { - $lost = array_diff(static::KEYS, array_keys($datas)); + $lost = array_diff(self::KEYS, array_keys($datas)); if (\count($lost) > 0) { throw new \LogicException('The first argument of '.__METHOD__.' must be contains: "'.implode(', ', $lost).'"'); } diff --git a/src/Entity/ScheduledDataflow.php b/src/Entity/ScheduledDataflow.php index 3f138cd..161c806 100644 --- a/src/Entity/ScheduledDataflow.php +++ b/src/Entity/ScheduledDataflow.php @@ -49,7 +49,7 @@ class ScheduledDataflow public static function createFromArray(array $datas) { - $lost = array_diff(static::KEYS, array_keys($datas)); + $lost = array_diff(self::KEYS, array_keys($datas)); if (\count($lost) > 0) { throw new \LogicException('The first argument of '.__METHOD__.' must be contains: "'.implode(', ', $lost).'"'); } diff --git a/src/Event/ProcessingEvent.php b/src/Event/ProcessingEvent.php index 4c0e4e7..fa56cdb 100644 --- a/src/Event/ProcessingEvent.php +++ b/src/Event/ProcessingEvent.php @@ -16,7 +16,7 @@ class ProcessingEvent extends CrEvent /** * ProcessingEvent constructor. */ - public function __construct(private Job $job) + public function __construct(private readonly Job $job) { } diff --git a/src/ExceptionsHandler/ExceptionHandlerInterface.php b/src/ExceptionsHandler/ExceptionHandlerInterface.php index a950c85..bb1bfcc 100644 --- a/src/ExceptionsHandler/ExceptionHandlerInterface.php +++ b/src/ExceptionsHandler/ExceptionHandlerInterface.php @@ -9,4 +9,6 @@ interface ExceptionHandlerInterface public function save(?int $jobId, ?array $exceptions): void; public function find(int $jobId): ?array; + + public function delete(int $jobId): void; } diff --git a/src/ExceptionsHandler/FilesystemExceptionHandler.php b/src/ExceptionsHandler/FilesystemExceptionHandler.php index 9176cf7..20ec200 100644 --- a/src/ExceptionsHandler/FilesystemExceptionHandler.php +++ b/src/ExceptionsHandler/FilesystemExceptionHandler.php @@ -9,7 +9,7 @@ class FilesystemExceptionHandler implements ExceptionHandlerInterface { - public function __construct(private Filesystem $filesystem) + public function __construct(private readonly Filesystem $filesystem) { } @@ -34,4 +34,9 @@ public function find(int $jobId): ?array return []; } } + + public function delete(int $jobId): void + { + $this->filesystem->delete(\sprintf('dataflow-job-%s.log', $jobId)); + } } diff --git a/src/ExceptionsHandler/NullExceptionHandler.php b/src/ExceptionsHandler/NullExceptionHandler.php index c3a0d2d..bc398e5 100644 --- a/src/ExceptionsHandler/NullExceptionHandler.php +++ b/src/ExceptionsHandler/NullExceptionHandler.php @@ -8,10 +8,16 @@ class NullExceptionHandler implements ExceptionHandlerInterface { public function save(?int $jobId, ?array $exceptions): void { + // Nothing to do } public function find(int $jobId): ?array { return null; } + + public function delete(int $jobId): void + { + // Nothing to do + } } diff --git a/src/Factory/ConnectionFactory.php b/src/Factory/ConnectionFactory.php index 04da339..6daae2b 100644 --- a/src/Factory/ConnectionFactory.php +++ b/src/Factory/ConnectionFactory.php @@ -13,7 +13,7 @@ */ class ConnectionFactory { - public function __construct(private Container $container, private string $connectionName) + public function __construct(private readonly Container $container, private string $connectionName) { } diff --git a/src/Gateway/JobGateway.php b/src/Gateway/JobGateway.php index fa24e41..55ba631 100644 --- a/src/Gateway/JobGateway.php +++ b/src/Gateway/JobGateway.php @@ -11,7 +11,7 @@ class JobGateway { - public function __construct(private JobRepository $repository, private ExceptionHandlerInterface $exceptionHandler) + public function __construct(private readonly JobRepository $repository, private readonly ExceptionHandlerInterface $exceptionHandler) { } diff --git a/src/Manager/ScheduledDataflowManager.php b/src/Manager/ScheduledDataflowManager.php index cbb0f23..3b5baa1 100644 --- a/src/Manager/ScheduledDataflowManager.php +++ b/src/Manager/ScheduledDataflowManager.php @@ -15,7 +15,7 @@ */ class ScheduledDataflowManager implements ScheduledDataflowManagerInterface { - public function __construct(private Connection $connection, private ScheduledDataflowRepository $scheduledDataflowRepository, private JobRepository $jobRepository) + public function __construct(private readonly Connection $connection, private readonly ScheduledDataflowRepository $scheduledDataflowRepository, private readonly JobRepository $jobRepository) { } diff --git a/src/MessengerMode/JobMessage.php b/src/MessengerMode/JobMessage.php index f4361f7..5bb51a3 100644 --- a/src/MessengerMode/JobMessage.php +++ b/src/MessengerMode/JobMessage.php @@ -6,7 +6,7 @@ class JobMessage { - public function __construct(private int $jobId) + public function __construct(private readonly int $jobId) { } diff --git a/src/MessengerMode/JobMessageHandler.php b/src/MessengerMode/JobMessageHandler.php index 401dbd7..44d2a2a 100644 --- a/src/MessengerMode/JobMessageHandler.php +++ b/src/MessengerMode/JobMessageHandler.php @@ -11,7 +11,7 @@ #[AsMessageHandler] class JobMessageHandler { - public function __construct(private JobRepository $repository, private JobProcessorInterface $processor) + public function __construct(private readonly JobRepository $repository, private readonly JobProcessorInterface $processor) { } diff --git a/src/Repository/JobRepository.php b/src/Repository/JobRepository.php index 92d36a6..1683243 100644 --- a/src/Repository/JobRepository.php +++ b/src/Repository/JobRepository.php @@ -151,6 +151,48 @@ public function createQueryBuilder($alias = null): QueryBuilder return $qb; } + public function crashLongRunning(int $hours): void + { + $qb = $this->connection->createQueryBuilder(); + $qb->update(static::TABLE_NAME.' j') + ->set('j.status', ':new_status') + ->set('j.end_time', ':now') + ->andWhere('j.status = :status') + ->andWhere('j.start_time < :date') + ->setParameter('status', Job::STATUS_RUNNING) + ->setParameter('date', new \DateTime("- {$hours} hours"), 'datetime') + ->setParameter('new_status', Job::STATUS_CRASHED) + ->setParameter('now', new \DateTime(), 'datetime') + ->executeStatement() + ; + } + + /** + * @return int[] Removed job ids + */ + public function deleteOld(int $days): array + { + $qb = $this->connection->createQueryBuilder(); + $ids = $qb->select('j.id') + ->from(static::TABLE_NAME, 'j') + ->andWhere($qb->expr()->in('j.status', [Job::STATUS_COMPLETED, Job::STATUS_CRASHED])) + ->andWhere('j.end_time < :date') + ->setParameter('date', new \DateTime("- {$days} days"), 'datetime') + ->executeQuery() + ->fetchFirstColumn() + ; + + $qb = $this->connection->createQueryBuilder(); + $qb->delete(static::TABLE_NAME.' j') + ->andWhere($qb->expr()->in('j.status', [Job::STATUS_COMPLETED, Job::STATUS_CRASHED])) + ->andWhere('j.end_time < :date') + ->setParameter('date', new \DateTime("- {$days} days"), 'datetime') + ->executeStatement() + ; + + return $ids; + } + private function returnFirstOrNull(QueryBuilder $qb): ?Job { $stmt = $qb->executeQuery(); diff --git a/src/Resources/config/services.yaml b/src/Resources/config/services.yaml index dce5b62..83af479 100644 --- a/src/Resources/config/services.yaml +++ b/src/Resources/config/services.yaml @@ -3,7 +3,7 @@ services: public: false CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface: '@CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistry' - CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistry: + CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistry: ~ CodeRhapsodie\DataflowBundle\Command\AddScheduledDataflowCommand: arguments: @@ -95,8 +95,21 @@ services: $dispatcher: '@event_dispatcher' $jobGateway: '@CodeRhapsodie\DataflowBundle\Gateway\JobGateway' CodeRhapsodie\DataflowBundle\ExceptionsHandler\ExceptionHandlerInterface: '@CodeRhapsodie\DataflowBundle\ExceptionsHandler\NullExceptionHandler' - CodeRhapsodie\DataflowBundle\ExceptionsHandler\NullExceptionHandler: + CodeRhapsodie\DataflowBundle\ExceptionsHandler\NullExceptionHandler: ~ CodeRhapsodie\DataflowBundle\Gateway\JobGateway: arguments: $repository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository' $exceptionHandler: '@CodeRhapsodie\DataflowBundle\ExceptionsHandler\ExceptionHandlerInterface' + + CodeRhapsodie\DataflowBundle\Command\JobCleanupCommand: + arguments: + $jobRepository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository' + $exceptionHandler: '@CodeRhapsodie\DataflowBundle\ExceptionsHandler\ExceptionHandlerInterface' + $retention: '%coderhapsodie.dataflow.job_history.retention%' + tags: ['console.command'] + + CodeRhapsodie\DataflowBundle\Command\SetCrashedCommand: + arguments: + $jobRepository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository' + $crashedDelay: '%coderhapsodie.dataflow.job_history.crashed_delay%' + tags: ['console.command'] diff --git a/src/Runner/MessengerDataflowRunner.php b/src/Runner/MessengerDataflowRunner.php index 8f01f66..86c859b 100644 --- a/src/Runner/MessengerDataflowRunner.php +++ b/src/Runner/MessengerDataflowRunner.php @@ -11,7 +11,7 @@ class MessengerDataflowRunner implements PendingDataflowRunnerInterface { - public function __construct(private JobRepository $repository, private MessageBusInterface $bus) + public function __construct(private readonly JobRepository $repository, private readonly MessageBusInterface $bus) { } diff --git a/src/Runner/PendingDataflowRunner.php b/src/Runner/PendingDataflowRunner.php index 36ff37e..0c4a8e0 100644 --- a/src/Runner/PendingDataflowRunner.php +++ b/src/Runner/PendingDataflowRunner.php @@ -9,7 +9,7 @@ class PendingDataflowRunner implements PendingDataflowRunnerInterface { - public function __construct(private JobRepository $repository, private JobProcessorInterface $processor) + public function __construct(private readonly JobRepository $repository, private readonly JobProcessorInterface $processor) { }