diff --git a/core/Controller/TaskProcessingApiController.php b/core/Controller/TaskProcessingApiController.php index fd22d485af380..7b52c543d91a0 100644 --- a/core/Controller/TaskProcessingApiController.php +++ b/core/Controller/TaskProcessingApiController.php @@ -645,6 +645,37 @@ public function setResult(int $taskId, ?array $output = null, ?string $errorMess } } + /** + * Sets the task intermediate result while it is running + * + * @param int $taskId The id of the task + * @param array|null $output The intermediate task output, files are represented by their IDs + * @return DataResponse|DataResponse + * + * 200: Result updated successfully + * 404: Task not found + */ + #[ExAppRequired] + #[ApiRoute(verb: 'POST', url: '/tasks_provider/{taskId}/stream-result', root: '/taskprocessing')] + public function setIntermediateResult(int $taskId, array $output): DataResponse { + try { + // set result + $this->taskProcessingManager->setTaskIntermediateOutput($taskId, $output); + $task = $this->taskProcessingManager->getTask($taskId); + + /** @var CoreTaskProcessingTask $json */ + $json = $task->jsonSerialize(); + + return new DataResponse([ + 'task' => $json, + ]); + } catch (NotFoundException) { + return new DataResponse(['message' => $this->l->t('Not found')], Http::STATUS_NOT_FOUND); + } catch (Exception) { + return new DataResponse(['message' => $this->l->t('Internal error')], Http::STATUS_INTERNAL_SERVER_ERROR); + } + } + /** * @return DataResponse|DataResponse */ diff --git a/lib/private/TaskProcessing/Manager.php b/lib/private/TaskProcessing/Manager.php index 6821aee65a002..59d0c5dfca880 100644 --- a/lib/private/TaskProcessing/Manager.php +++ b/lib/private/TaskProcessing/Manager.php @@ -59,6 +59,7 @@ use OCP\TaskProcessing\IInternalTaskType; use OCP\TaskProcessing\IManager; use OCP\TaskProcessing\IProvider; +use OCP\TaskProcessing\ISynchronousProgressiveProvider; use OCP\TaskProcessing\ISynchronousProvider; use OCP\TaskProcessing\ISynchronousWatermarkingProvider; use OCP\TaskProcessing\ITaskType; @@ -1135,6 +1136,13 @@ public function processTask(Task $task, ISynchronousProvider $provider): bool { $this->setTaskStatus($task, Task::STATUS_RUNNING); if ($provider instanceof ISynchronousWatermarkingProvider) { $output = $provider->process($task->getUserId(), $input, fn (float $progress) => $this->setTaskProgress($task->getId(), $progress), $task->getIncludeWatermark()); + } elseif ($provider instanceof ISynchronousProgressiveProvider) { + $output = $provider->process( + $task->getUserId(), + $input, + fn (float $progress) => $this->setTaskProgress($task->getId(), $progress), + fn (array $output) => $this->setTaskIntermediateOutput($task->getId(), $output) + ); } else { $output = $provider->process($task->getUserId(), $input, fn (float $progress) => $this->setTaskProgress($task->getId(), $progress)); } @@ -1216,6 +1224,40 @@ public function setTaskProgress(int $id, float $progress): bool { return true; } + public function setTaskIntermediateOutput(int $id, array $output): bool { + // TODO: Not sure if we should rather catch the exceptions of getTask here and fail silently + $task = $this->getTask($id); + if ($task->getStatus() !== Task::STATUS_RUNNING) { + return false; + } + $userId = $task->getUserId(); + if ($userId !== null && $userId !== '' && $this->appManager->isEnabledForAnyone('notify_push')) { + try { + // $this->appManager->loadApp('notify_push'); + $queue = Server::get(\OCA\NotifyPush\Queue\IQueue::class); + // $queue = $this->serverContainer->get(\OCA\NotifyPush\Queue\IQueue::class); + $queue->push('notify_custom', [ + 'user' => $userId, + 'message' => 'task_' . $task->getId(), + 'body' => $output, + ]); + error_log('sending to queue!!!!!!'); + } catch (ContainerExceptionInterface|NotFoundExceptionInterface $e) { + $this->logger->debug('OCA\NotifyPush\IQueue not found, not sending to queue'); + error_log('NOT sending to queue!!!!!! ' . $e->getMessage()); + } + } + // no output shape validation for now + $task->setOutput($output); + $taskEntity = \OC\TaskProcessing\Db\Task::fromPublicTask($task); + try { + $this->taskMapper->update($taskEntity); + } catch (\OCP\DB\Exception $e) { + throw new \OCP\TaskProcessing\Exception\Exception('There was a problem finding the task', 0, $e); + } + return true; + } + #[\Override] public function setTaskResult(int $id, ?string $error, ?array $result, bool $isUsingFileIds = false, ?string $userFacingError = null): void { // TODO: Not sure if we should rather catch the exceptions of getTask here and fail silently diff --git a/lib/public/TaskProcessing/IManager.php b/lib/public/TaskProcessing/IManager.php index 2cd0244b52e8d..f919410d7c4fe 100644 --- a/lib/public/TaskProcessing/IManager.php +++ b/lib/public/TaskProcessing/IManager.php @@ -143,6 +143,16 @@ public function cancelTask(int $id): void; */ public function setTaskResult(int $id, ?string $error, ?array $result, bool $isUsingFileIds = false, ?string $userFacingError = null): void; + /** + * @param int $id The id of the task + * @param array $output + * @return bool `true` if the task should still be running; `false` if the task has been cancelled in the meantime + * @throws Exception If the query failed + * @throws NotFoundException If the task could not be found + * @since 34.0.0 + */ + public function setTaskIntermediateOutput(int $id, array $output): bool; + /** * @param int $id * @param float $progress diff --git a/lib/public/TaskProcessing/ISynchronousProgressiveProvider.php b/lib/public/TaskProcessing/ISynchronousProgressiveProvider.php new file mode 100644 index 0000000000000..102062d8d1ab3 --- /dev/null +++ b/lib/public/TaskProcessing/ISynchronousProgressiveProvider.php @@ -0,0 +1,36 @@ +|numeric|string|File> $input The task input + * @param callable(float):bool $reportProgress Report the task progress. If this returns false, that means the task was cancelled and processing should be stopped. + * @param null|callable(array):bool $reportOutput Set the task intermediate output + * @psalm-return array|numeric|string> + * @throws ProcessingException + * @since 33.0.0 + */ + #[\Override] + public function process(?string $userId, array $input, callable $reportProgress, ?callable $reportOutput = null): array; +}