fix(governance): supersede stale approval tasks at task-creation, not trigger time#28762
fix(governance): supersede stale approval tasks at task-creation, not trigger time#28762yan-3005 wants to merge 1 commit into
Conversation
… trigger time Event-based approval workflows mishandled sequential change events for the same entity, both rooted in WorkflowHandler.terminateDuplicateInstances. - Bug 1 (stale OM task on supersede): terminating a superseded run cleaned the Flowable side but never cancelled the backing task_entity, leaving an orphaned approval task. - Bug 2 (no-op event kills a valid approval): terminateDuplicateInstances ran in FilterEntityImpl at trigger time, before the run evaluated checkChangeDescription, so a no-op event that passed the entity filter destroyed a legitimate pending approval. Move the supersede from trigger time to task-creation time: - FilterEntityImpl: drop the eager terminateDuplicateInstances call and its async FK-race workaround. - CreateTask: when a new approval run materializes a task for an entity that already has an open Approval-category task from an earlier run of the same workflow, cancel the stale task synchronously and async-terminate the superseded Flowable process + mark its WorkflowInstance FAILED. Best-effort: cleanup failures never abort creation of the new task. - WorkflowHandler: replace the 7-day-scan terminateDuplicateInstances (and its FK-race / PG-deadlock workarounds) with a targeted terminateWorkflowInstance. Tests: unit coverage for the supersede predicate; integration tests for the no-op-survives and new-run-supersedes scenarios.
❌ PR checklist incompleteThis PR cannot be merged until the following are addressed on its linked issue:
The fields live on the linked issue in the Shipping project (open the issue → right sidebar → Projects). After you set them, re-run this check (or push a commit) — issue/project changes do not re-trigger it automatically. Maintainers can bypass this check by adding the |
| supersedePriorApprovalTask( | ||
| delegateTask, | ||
| taskRepository, | ||
| entity, | ||
| taskCategory, | ||
| resolvedWorkflowDefinitionId, | ||
| workflowInstanceId, | ||
| updatedBy); | ||
|
|
||
| // Use the repository to create (handles taskId generation, FQN, relationships) | ||
| task = taskRepository.create(null, task); |
There was a problem hiding this comment.
⚠️ Bug: Async Flowable termination dispatched before task transaction commits
In CreateTask.supersedePriorApprovalTask (called from createOrUpdateTask), the prior approval task is closed via taskRepository.closeTask(prior, ...) and then dispatchPriorInstanceTermination fires a CompletableFuture.runAsync that deletes the prior run's Flowable process and marks its WorkflowInstance FAILED. This dispatch happens before the new task is created (taskRepository.create) and before CreateTask.notify finishes — and closeTask/create all run inside the same Flowable command/JDBI transaction.
If any step after closeTask throws (e.g. taskRepository.create, setCustomTaskId, or changeEventDAO.insert), notify rethrows as a BpmnError and the enclosing transaction rolls back, restoring the prior OM task to Open. However, the async termination runs in its own committed transaction and may have already deleted the prior run's Flowable process. The net result is exactly the Bug-1 condition this PR set out to fix: an Open OM approval task whose backing Flowable process no longer exists — an orphaned, unresolvable approval.
Consider performing the supersede (close + dispatch) only after the new task has been successfully created/committed, or capturing the prior task and deferring the async termination until the new task creation succeeds, so that a rollback of the new-task transaction cannot leave a terminated Flowable process behind a still-Open OM task.
Was this helpful? React with 👍 / 👎
| Task prior = | ||
| taskCategory == TaskCategory.Approval | ||
| ? taskRepository.findOpenTaskByEntityAndCategory( | ||
| entity.getFullyQualifiedName(), taskCategory) | ||
| : null; | ||
| if (isSupersedablePriorApprovalTask( | ||
| prior, currentWorkflowDefinitionId, currentWorkflowInstanceId)) { | ||
| cancelAndTerminatePriorApproval(delegateTask, taskRepository, prior, updatedBy); | ||
| } | ||
| } catch (Exception e) { | ||
| LOG.warn( | ||
| "[CreateTask] Failed to supersede prior approval task for entity '{}': {}", | ||
| entity.getFullyQualifiedName(), | ||
| e.getMessage()); | ||
| } |
There was a problem hiding this comment.
💡 Edge Case: Only one open approval task superseded per entity
supersedePriorApprovalTask resolves the prior task via taskRepository.findOpenTaskByEntityAndCategory(fqn, Approval), which returns a single Open approval task (the first the DAO finds). If an entity ever has more than one open Approval-category task — e.g. two distinct approval workflows targeting the same entity, or a leftover from a prior incomplete supersede — only the single returned task is evaluated. If that one happens to belong to a different workflow definition, isSupersedablePriorApprovalTask returns false and a genuinely stale task from the same workflow is silently left open, breaking the intended "exactly one live approval" invariant. Consider querying all open approval tasks for the entity and superseding every one that matches the current workflowDefinitionId (and differs in workflowInstanceId).
Was this helpful? React with 👍 / 👎
| public void terminateWorkflowInstance( | ||
| UUID workflowInstanceId, String mainWorkflowName, String reason) { | ||
| try { | ||
| WorkflowInstanceRepository workflowInstanceRepository = | ||
| (WorkflowInstanceRepository) | ||
| Entity.getEntityTimeSeriesRepository(Entity.WORKFLOW_INSTANCE); | ||
| WorkflowInstanceStateRepository workflowInstanceStateRepository = | ||
| (WorkflowInstanceStateRepository) | ||
| Entity.getEntityTimeSeriesRepository(Entity.WORKFLOW_INSTANCE_STATE); | ||
|
|
||
| ListFilter filter = new ListFilter(null); | ||
| filter.addQueryParam("entityLink", entityLink); | ||
|
|
||
| long endTs = System.currentTimeMillis(); | ||
| long startTs = endTs - (7L * 24 * 60 * 60 * 1000); | ||
|
|
||
| ResultList<WorkflowInstance> allInstances = | ||
| workflowInstanceRepository.list(null, startTs, endTs, 100, filter, false); | ||
|
|
||
| List<WorkflowInstance> candidateInstances = | ||
| allInstances.getData().stream() | ||
| .filter( | ||
| instance -> WorkflowInstance.WorkflowStatus.RUNNING.equals(instance.getStatus())) | ||
| .filter( | ||
| instance -> { | ||
| try { | ||
| WorkflowDefinitionRepository repo = | ||
| (WorkflowDefinitionRepository) | ||
| Entity.getEntityRepository(Entity.WORKFLOW_DEFINITION); | ||
| var def = | ||
| repo.get( | ||
| null, | ||
| instance.getWorkflowDefinitionId(), | ||
| EntityUtil.Fields.EMPTY_FIELDS); | ||
| return mainWorkflowDefinitionName.equals(def.getName()); | ||
| } catch (Exception e) { | ||
| return false; | ||
| } | ||
| }) | ||
| .toList(); | ||
|
|
||
| RuntimeService runtimeService = getInstance().getRuntimeService(); | ||
| List<ProcessInstance> runningProcessInstances = | ||
| runtimeService | ||
| ProcessInstance mainInstance = | ||
| getRuntimeService() | ||
| .createProcessInstanceQuery() | ||
| .processDefinitionKey(mainWorkflowDefinitionName) | ||
| .list(); | ||
|
|
||
| List<WorkflowInstance> conflictingInstances = | ||
| candidateInstances.stream() | ||
| .filter( | ||
| instance -> { | ||
| return runningProcessInstances.stream() | ||
| .filter(pi -> !pi.getId().equals(currentProcessInstanceId)) | ||
| .anyMatch(pi -> pi.getBusinessKey().equals(instance.getId().toString())); | ||
| }) | ||
| .toList(); | ||
|
|
||
| if (conflictingInstances.isEmpty()) { | ||
| LOG.debug("No conflicting instances found to terminate for {}", mainWorkflowDefinitionName); | ||
| return; | ||
| } | ||
|
|
||
| // Terminate Flowable process instances OUTSIDE any JDBI transaction. | ||
| // Calling runtimeService.deleteProcessInstance() inside a JDBI transaction causes a race | ||
| // condition: the uncommitted DELETE on ACT_RU_EXECUTION holds an X-lock, Flowable's async | ||
| // job executor concurrently tries to INSERT a timer job referencing that execution (FK | ||
| // S-lock wait), and when the JDBI tx commits the execution is gone, so the timer INSERT | ||
| // fails with SQLIntegrityConstraintViolationException. | ||
| for (WorkflowInstance instance : conflictingInstances) { | ||
| ProcessInstance mainInstance = | ||
| runningProcessInstances.stream() | ||
| .filter( | ||
| pi -> | ||
| pi.getBusinessKey() != null | ||
| && pi.getBusinessKey().equals(instance.getId().toString())) | ||
| .findFirst() | ||
| .orElse(null); | ||
|
|
||
| if (mainInstance != null) { | ||
| String processId = mainInstance.getId(); | ||
| long activeUserTasks = | ||
| processEngine | ||
| .getTaskService() | ||
| .createTaskQuery() | ||
| .processInstanceId(processId) | ||
| .active() | ||
| .count(); | ||
| if (activeUserTasks == 0) { | ||
| LOG.debug( | ||
| "Process instance {} has no active user tasks — it is auto-completing; skipping external deletion", | ||
| processId); | ||
| continue; | ||
| } | ||
| LOG.info( | ||
| "Terminating main workflow instance {} for conflicting instance {}", | ||
| mainInstance.getId(), | ||
| instance.getId()); | ||
| try { | ||
| runtimeService.deleteProcessInstance( | ||
| processId, "Terminated due to conflicting workflow instance"); | ||
| } catch (FlowableObjectNotFoundException e) { | ||
| LOG.debug( | ||
| "Process instance {} already completed before termination, skipping", processId); | ||
| } | ||
| } | ||
| .processInstanceBusinessKey(workflowInstanceId.toString()) | ||
| .processDefinitionKey(mainWorkflowName) | ||
| .singleResult(); | ||
| if (mainInstance != null) { | ||
| deleteProcessInstanceQuietly(mainInstance.getId(), reason); | ||
| } | ||
|
|
||
| Entity.getJdbi() | ||
| .inTransaction( | ||
| TransactionIsolationLevel.READ_COMMITTED, | ||
| handle -> { | ||
| try { | ||
| for (WorkflowInstance instance : conflictingInstances) { | ||
| workflowInstanceStateRepository.markInstanceStatesAsFailed( | ||
| instance.getId(), "Terminated due to conflicting workflow instance"); | ||
| workflowInstanceRepository.markInstanceAsFailed( | ||
| instance.getId(), "Terminated due to conflicting workflow instance"); | ||
| } | ||
| return null; | ||
| } catch (Exception e) { | ||
| LOG.error("Failed to update instance states in transaction: {}", e.getMessage()); | ||
| throw e; | ||
| } | ||
| }); | ||
|
|
||
| LOG.info( | ||
| "Terminated {} conflicting instances of {} for entity {}", | ||
| conflictingInstances.size(), | ||
| mainWorkflowDefinitionName, | ||
| entityLink); | ||
|
|
||
| markWorkflowInstanceFailed(workflowInstanceId, reason); | ||
| } catch (Exception e) { | ||
| LOG.warn("Failed to terminate conflicting instances: {}", e.getMessage()); | ||
| LOG.warn( |
There was a problem hiding this comment.
💡 Quality: terminateWorkflowInstance can leave WorkflowInstance audit inconsistent
In WorkflowHandler.terminateWorkflowInstance, the Flowable process is deleted first via deleteProcessInstanceQuietly, then markWorkflowInstanceFailed updates the WorkflowInstance/state time-series. These two operations are not atomic and the surrounding try/catch swallows all exceptions. If markWorkflowInstanceFailed throws after the process was already deleted, the Flowable process is gone but the WorkflowInstance audit record stays in its prior (e.g. RUNNING) status, with only a LOG.warn recording the discrepancy. Given this is intentionally best-effort cleanup the impact is limited to audit accuracy, but consider marking the instance FAILED before/independently of the Flowable deletion, or logging which step failed so the inconsistency is diagnosable.
Was this helpful? React with 👍 / 👎
Code Review
|
| Compact |
|
Was this helpful? React with 👍 / 👎 | Gitar
There was a problem hiding this comment.
Pull request overview
This PR fixes superseding behavior for event-based approval workflows so sequential change events on the same entity no longer (a) orphan OpenMetadata approval tasks when a prior Flowable process is deleted, and (b) invalidate a legitimate pending approval due to a later “no-op” event that creates no task.
Changes:
- Move duplicate-instance supersede behavior from trigger-time (
FilterEntityImpl) to task-creation time (CreateTask) for Approval-category tasks. - Replace broad, time-window duplicate termination logic in
WorkflowHandlerwith a targetedterminateWorkflowInstance(...)best-effort cleanup method. - Add unit tests for the supersede predicate and add integration tests covering “no-op event does not invalidate pending approval” and “new approval run supersedes stale pending task”.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/userTask/CreateTask.java | Supersedes prior Approval tasks on task materialization; dispatches async termination of the prior Flowable instance. |
| openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java | Removes bulk duplicate termination and adds targeted termination + audit-fail marking for a specific workflow instance. |
| openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/FilterEntityImpl.java | Stops terminating duplicate instances at trigger-time to avoid killing valid pending approvals on no-op events. |
| openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/nodes/userTask/CreateTaskTest.java | Adds unit tests for isSupersedablePriorApprovalTask predicate. |
| openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java | Adds integration tests validating “no-op event doesn’t invalidate approval” and “new run supersedes stale approval”. |
| CompletableFuture.runAsync( | ||
| () -> | ||
| WorkflowHandler.getInstance() | ||
| .terminateWorkflowInstance( | ||
| priorInstanceId, mainWorkflowName, SUPERSEDED_BY_NEWER_RUN)) |
| Task prior = | ||
| taskCategory == TaskCategory.Approval | ||
| ? taskRepository.findOpenTaskByEntityAndCategory( | ||
| entity.getFullyQualifiedName(), taskCategory) | ||
| : null; |
| await("surviving task must resolve and drive the workflow to Approved") | ||
| .atMost(Duration.ofSeconds(45)) | ||
| .pollInterval(Duration.ofSeconds(2)) | ||
| .until(() -> hasApprovalTaskWithStatus(client, schemaFqn, TaskEntityStatus.Approved)); | ||
|
|
||
| LOG.info("test_NoOpEventDoesNotInvalidatePendingApproval completed successfully"); | ||
| cleanupSupersedeFixtures(client, fx); | ||
| } |
| await("superseding task must resolve and drive the workflow to Approved") | ||
| .atMost(Duration.ofSeconds(45)) | ||
| .pollInterval(Duration.ofSeconds(2)) | ||
| .until(() -> hasApprovalTaskWithStatus(client, schemaFqn, TaskEntityStatus.Approved)); | ||
|
|
||
| LOG.info("test_NewApprovalRunSupersedesStalePendingTask completed successfully"); | ||
| cleanupSupersedeFixtures(client, fx); | ||
| } |
|
🔴 Playwright Results — 1 failure(s), 12 flaky✅ 4269 passed · ❌ 1 failed · 🟡 12 flaky · ⏭️ 88 skipped
Genuine Failures (failed on all attempts)❌
|



Problem
Event-based approval workflows (e.g. glossary/databaseSchema approval) mishandled sequential change events for the same entity. Two bugs, both rooted in
WorkflowHandler.terminateDuplicateInstances:terminateDuplicateInstancesdeleted the Flowable process but never cancelled the OpenMetadatatask_entitybacking the approval. Result: an orphaned, unresolvable approval task pointing at a workflow that no longer exists in Flowable.terminateDuplicateInstancesran insideFilterEntityImplat trigger time, before the run evaluatedcheckChangeDescription. A no-op event that passed the entity-level filter but created no task would still async-kill the prior run's parked process, destroying a legitimate pending approval.Root cause: the "last event wins" supersede decision was made too early (trigger time) and at the wrong layer, before the run could know whether it would even create a task.
Fix
Move the supersede from trigger time to task-creation time, where a run has genuinely produced a new approval task.
FilterEntityImpl— remove the eagerterminateDuplicateInstancescall and itsCompletableFuture.runAsyncFK-race workaround. The filter now only decidespassesFilter. (fixes Bug 2)CreateTask— when a new approval run materializes a task for an entity that already has an openApproval-category task from an earlier run of the same workflow (matched byworkflowDefinitionId, differentworkflowInstanceId): cancel the stale OM task synchronously, then async-dispatch termination of the superseded Flowable process and mark itsWorkflowInstanceFAILED. The async dispatch keeps a Flowable-side failure from rolling back the new run's transaction; the whole supersede is best-effort so cleanup failures never abort creation of the new task. Gated oncategory == Approval, so DataAccess/Incident/Review/Custom tasks are untouched. (fixes Bug 1)WorkflowHandler— delete the 136-lineterminateDuplicateInstances(7-day time-series scan + FK-race async workaround + PG-deadlock skip-guard, all now unnecessary) and replace with a targetedterminateWorkflowInstance.Tests
CreateTaskTest): 6 cases on theisSupersedablePriorApprovalTaskpredicate (different/same instance, same/different workflow definition, missing instance/definition).WorkflowDefinitionResourceIT):test_NoOpEventDoesNotInvalidatePendingApproval— CE1 creates an approval task; CE2 (a different trigger field that passes the entity filter but failscheckChangeDescription) leaves it untouched; the surviving task is then resolved to prove its Flowable process is still alive.test_NewApprovalRunSupersedesStalePendingTask— CE1 parks a task; a second gated change supersedes it (prior task Cancelled, exactly one new open task), and the new task resolves to Approved.Verification status
Compiles;
spotless:checkclean; 50/50 unit tests pass (6 new). The two new ITs compile but were not executed against a server carrying these changes — they require a redeploy and a live stack.Notes