Skip to content

feat(orchestrator): triggering kafka workflows with cloud events. backend only code#2512

Open
lholmquist wants to merge 8 commits intoredhat-developer:mainfrom
lholmquist:RHIDP-9143-trigger-workflows-ce-backend
Open

feat(orchestrator): triggering kafka workflows with cloud events. backend only code#2512
lholmquist wants to merge 8 commits intoredhat-developer:mainfrom
lholmquist:RHIDP-9143-trigger-workflows-ce-backend

Conversation

@lholmquist
Copy link
Member

@lholmquist lholmquist commented Mar 11, 2026

Hey, I just made a Pull Request!

This PR is for feature https://issues.redhat.com/browse/RHDHPLAN-291

This PR is specifically for the backend code: https://issues.redhat.com/browse/RHIDP-9143

This enables users with workflows that are triggered by kafka events(by sending a Cloud Events), to be triggered from the RHDH UI. Front end changes will come in a separate PR.

To test this i used the demo workflow from here: https://github.com/lholmquist/orchestrator-demo/tree/callback-flow-updates.

I start the kafka server using the instructions in the readme here: https://github.com/lholmquist/orchestrator-demo/tree/callback-flow-updates

Then i start the workflow, which is a quarkus application by running mvn clean quarkus:dev from the 08_kafka_events/callback-flow directory. Since this is pulling dependecies from a Red Hat maven registry, I had to make sure i added those registries to my .m2/settings.xml. This is mine: https://gist.github.com/lholmquist/281b139db727504e833c76e255e975cb

The quarkus app(the workflow) should be running on port 8080 once it is started.

The app-config of RHDH needs some additions/changes to connect to the workflow and to specify the kafka parameters. This is the app-config i use: https://gist.github.com/lholmquist/727190f1f8e91d217b4676310686c282

Biggest changes to the app-config are the addition of the orchestrator.kafka config and I also changed the orchestrator.sonataFlowService.port to 8080 and the orchestrator.dataIndexService.url to http://localhost:8080 which gives the RHDH UI access to the workflow.

Run the local RHDH by running yarn dev from the workspaces/orchestrator directory

If everything is setup correctly, you should see 1 workflow in the list under the orchestrator tab:

Screenshot 2026-03-11 at 1 46 05 PM

Since this is only the backend code, you can trigger the workflow directly by using this curl command:

curl --location 'http://localhost:7007/api/orchestrator/v2/workflows/lock-flow/execute' \
--header 'Content-Type: application/json' \
--header 'Authorization: Bearer <AUTH_TOKEN_FROM_RHDH>' \
--data '{"inputData":{"isEvent":true},"authTokens":[]}'

The auth token can be found in the chrome dev tools network tab by looking at one of the requests

The plan for the front-end is to have a button that says something like "Run as event", which is where that inputData: {isEvent: true} from the data in the curl will come from.

There are two things that can happen during a successful run.

  1. Because of timing issues of sending an event to the kafka broker and it triggering the workflow, the workflow instance id might not be available yet. The code tries a few times to query the instances but if it can't find it, it is not necesarilly an error and the return value will be {"id": "kafkaEvent"} which will tell the UI to just return to the "All Runs" tab for the orchestrator workflows. Since this tab refreshes on a timer, the workflow will eventually show up.

  2. If the timing works out and the workflow shows up, then we will return the instanceId of that workflow run and the UI should just show the instance run detail page(which is what happens for a "normal" workflow)

✔️ Checklist

  • A changeset describing the change and affected packages. (more info)
  • Added or Updated documentation
  • Tests for new functionality and regression tests for bug fixes
  • Screenshots attached (for UI changes)

@rhdh-gh-app
Copy link

rhdh-gh-app bot commented Mar 11, 2026

Important

This PR includes changes that affect public-facing API. Please ensure you are adding/updating documentation for new features or behavior.

Changed Packages

Package Name Package Path Changeset Bump Current Version
@red-hat-developer-hub/backstage-plugin-orchestrator-backend workspaces/orchestrator/plugins/orchestrator-backend minor v8.7.2

@lholmquist lholmquist force-pushed the RHIDP-9143-trigger-workflows-ce-backend branch from 84668ec to 7561c8e Compare March 11, 2026 14:37
@lholmquist lholmquist marked this pull request as ready for review March 11, 2026 14:45
@lholmquist lholmquist requested review from a team and lokanandaprabhu as code owners March 11, 2026 14:45
@rhdh-qodo-merge
Copy link

Review Summary by Qodo

Add Kafka-based workflow execution with CloudEvents support

✨ Enhancement

Grey Divider

Walkthroughs

Description
• Add Kafka-based workflow execution via CloudEvents
• Implement event-driven workflow triggering with correlation attributes
• Add Kafka configuration support with SSL and SASL authentication
• Extend workflow execution API to handle event-type workflows
Diagram
flowchart LR
  A["Workflow Execution Request"] -->|isEvent=true| B["Parse Workflow Definition"]
  B --> C["Extract Event Configuration"]
  C --> D["Create CloudEvent"]
  D --> E["Send via Kafka Producer"]
  E --> F["Return Correlation ID"]
  F --> G["Poll for Workflow Instance"]
  G --> H["Return Instance ID"]
  
  I["Kafka Config"] -->|brokers, SSL, SASL| E
Loading

Grey Divider

File Changes

1. workspaces/orchestrator/plugins/orchestrator-backend/config.d.ts ⚙️ Configuration changes +62/-0

Add Kafka configuration schema definitions

workspaces/orchestrator/plugins/orchestrator-backend/config.d.ts


2. workspaces/orchestrator/plugins/orchestrator-backend/src/types/kafka.ts ✨ Enhancement +44/-0

Define Kafka service options and authentication types

workspaces/orchestrator/plugins/orchestrator-backend/src/types/kafka.ts


3. workspaces/orchestrator/plugins/orchestrator-backend/src/service/SonataFlowService.ts ✨ Enhancement +77/-1

Implement CloudEvent-based workflow execution via Kafka

workspaces/orchestrator/plugins/orchestrator-backend/src/service/SonataFlowService.ts


View more (9)
4. workspaces/orchestrator/plugins/orchestrator-backend/src/service/OrchestratorService.ts ✨ Enhancement +12/-0

Add executeWorkflowAsCloudEvent method wrapper

workspaces/orchestrator/plugins/orchestrator-backend/src/service/OrchestratorService.ts


5. workspaces/orchestrator/plugins/orchestrator-backend/src/service/api/v2.ts ✨ Enhancement +137/-13

Route event-type workflows to CloudEvent execution path

workspaces/orchestrator/plugins/orchestrator-backend/src/service/api/v2.ts


6. workspaces/orchestrator/plugins/orchestrator-backend/src/service/router.ts ✨ Enhancement +10/-1

Initialize Kafka service with configuration options

workspaces/orchestrator/plugins/orchestrator-backend/src/service/router.ts


7. workspaces/orchestrator/plugins/orchestrator-backend/src/service/Helper.ts Miscellaneous +2/-1

Update copyright and formatting

workspaces/orchestrator/plugins/orchestrator-backend/src/service/Helper.ts


8. workspaces/orchestrator/plugins/orchestrator-backend/src/service/OrchestratorService.test.ts 🧪 Tests +26/-0

Add tests for executeWorkflowAsCloudEvent method

workspaces/orchestrator/plugins/orchestrator-backend/src/service/OrchestratorService.test.ts


9. workspaces/orchestrator/plugins/orchestrator-backend/src/service/api/test-utils.ts 🧪 Tests +245/-0

Add test utilities for event-type workflow definitions

workspaces/orchestrator/plugins/orchestrator-backend/src/service/api/test-utils.ts


10. workspaces/orchestrator/plugins/orchestrator-backend/src/service/api/v2.test.ts 🧪 Tests +228/-0

Add comprehensive tests for event-type workflow execution

workspaces/orchestrator/plugins/orchestrator-backend/src/service/api/v2.test.ts


11. workspaces/orchestrator/plugins/orchestrator-backend/package.json Dependencies +3/-1

Add kafkajs and js-yaml dependencies

workspaces/orchestrator/plugins/orchestrator-backend/package.json


12. workspaces/orchestrator/.changeset/twelve-insects-scream.md 📝 Documentation +5/-0

Document feature addition in changeset

workspaces/orchestrator/.changeset/twelve-insects-scream.md


Grey Divider

Qodo Logo

@rhdh-qodo-merge
Copy link

rhdh-qodo-merge bot commented Mar 11, 2026

Code Review by Qodo

🐞 Bugs (5) 📘 Rule violations (0) 📎 Requirement gaps (0)

Grey Divider


Action required

1. Kafka config always enabled🐞 Bug ⛯ Reliability
Description
router.ts passes {} when orchestrator.kafka is absent, but SonataFlowService treats any object
as enabled and constructs a Kafka client with missing required fields, risking startup/runtime
failures even when Kafka isn’t configured.
Code

workspaces/orchestrator/plugins/orchestrator-backend/src/service/router.ts[R264-272]

+  const orchestratorKafka: OrchestratorKafkaServiceOptions =
+    config.getOptional('orchestrator.kafka') ??
+    ({} as OrchestratorKafkaServiceOptions);
const dataIndexService = new DataIndexService(dataIndexUrl, logger);
-  const sonataFlowService = new SonataFlowService(dataIndexService, logger);
+  const sonataFlowService = new SonataFlowService(
+    dataIndexService,
+    logger,
+    orchestratorKafka,
+  );
Evidence
initPublicServices always supplies an object for kafka options (falls back to {}), and
SonataFlowService enables Kafka based solely on truthiness. The options type itself requires
clientId/brokers/logLevel, so {} is an invalid configuration shape that can’t be safely used to
construct a client.

workspaces/orchestrator/plugins/orchestrator-backend/src/service/router.ts[256-272]
workspaces/orchestrator/plugins/orchestrator-backend/src/service/SonataFlowService.ts[42-56]
workspaces/orchestrator/plugins/orchestrator-backend/src/types/kafka.ts[19-25]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
Kafka is effectively enabled even when `orchestrator.kafka` is not configured because router.ts falls back to `{}` and SonataFlowService checks only truthiness. This can lead to invalid Kafka client construction and startup/runtime failures.
### Issue Context
`orchestrator.kafka` is optional in configuration, so missing config must not break backend startup.
### Fix Focus Areas
- workspaces/orchestrator/plugins/orchestrator-backend/src/service/router.ts[262-272]
- workspaces/orchestrator/plugins/orchestrator-backend/src/service/SonataFlowService.ts[44-56]
- workspaces/orchestrator/plugins/orchestrator-backend/src/types/kafka.ts[19-25]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


2. Kafka send can silently noop🐞 Bug ✓ Correctness
Description
executeWorkflowAsCloudEvent uses optional chaining for producer creation/connect/send; if Kafka
isn’t initialized, no message is sent but the method still returns an id, causing the API to report
success incorrectly.
Code

workspaces/orchestrator/plugins/orchestrator-backend/src/service/SonataFlowService.ts[R152-179]

+    const kfk = this.orchestratorKafkaImpl;
+    const producer = kfk?.producer();
+    try {
+      // Connect the producer
+      await producer?.connect();
+
+      // Send the message
+      await producer?.send({
+        topic: args.workflowEventType,
+        messages: [messageEvent],
+      });
+    } catch (error) {
+      this.logger.error(
+        `Error with Kafka client connection. Options: ${JSON.stringify(this.kafkaServiceOptions)}`,
+      );
+      throw new Error(
+        `Error with Kafka client with connection options: ${JSON.stringify(this.kafkaServiceOptions)}`,
+      );
+    } finally {
+      // Disconnect the producer
+      await producer?.disconnect();
+    }
+
+    // Since sending to kafka doesn't return anything, send back the contextAttributeId here
+    // Then we will query the workflow instances to see if it showed up yet
+    return {
+      id: contextAttributeId,
+    };
Evidence
When this.orchestratorKafkaImpl is undefined, producer is undefined and
connect/send/disconnect become no-ops due to optional chaining, but the method always returns a
correlation id. This creates false positives and silent data loss.

workspaces/orchestrator/plugins/orchestrator-backend/src/service/SonataFlowService.ts[152-179]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
The CloudEvent execution path can return success without sending anything because Kafka producer operations are guarded by optional chaining.
### Issue Context
If Kafka is not configured, `this.orchestratorKafkaImpl` is undefined and `producer?.connect()` / `producer?.send()` do nothing.
### Fix Focus Areas
- workspaces/orchestrator/plugins/orchestrator-backend/src/service/SonataFlowService.ts[152-179]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


3. Kafka secrets logged🐞 Bug ⛨ Security
Description
Kafka options are stringified into logs (debug/error). Config schema marks sasl.password and ssl.key
as secrets, so current logging can leak credentials into log storage.
Code

workspaces/orchestrator/plugins/orchestrator-backend/src/service/SonataFlowService.ts[R164-169]

+      this.logger.error(
+        `Error with Kafka client connection. Options: ${JSON.stringify(this.kafkaServiceOptions)}`,
+      );
+      throw new Error(
+        `Error with Kafka client with connection options: ${JSON.stringify(this.kafkaServiceOptions)}`,
+      );
Evidence
SonataFlowService logs the entire kafka options object via JSON.stringify. The config schema
explicitly marks password/key as secret, so logging the raw object violates secret-handling
expectations and can expose credentials.

workspaces/orchestrator/plugins/orchestrator-backend/src/service/SonataFlowService.ts[163-169]
workspaces/orchestrator/plugins/orchestrator-backend/config.d.ts[42-59]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
Current logging stringifies Kafka options and can leak `sasl.password` / `ssl.key` into logs.
### Issue Context
The config schema marks these fields as secret.
### Fix Focus Areas
- workspaces/orchestrator/plugins/orchestrator-backend/src/service/SonataFlowService.ts[49-56]
- workspaces/orchestrator/plugins/orchestrator-backend/src/service/SonataFlowService.ts[163-169]
- workspaces/orchestrator/plugins/orchestrator-backend/config.d.ts[42-59]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools



Remediation recommended

4. Busy-loop instance polling 🐞 Bug ➹ Performance
Description
The event-trigger executeWorkflow branch polls Data Index up to 10 times with no delay, potentially
hammering GraphQL under load and still likely missing the instance due to eventual consistency.
Code

workspaces/orchestrator/plugins/orchestrator-backend/src/service/api/v2.ts[R290-309]

+      let currentInstanceToReturn: string | any[] = [];
+      for (let i = 0; i < FETCH_INSTANCE_MAX_ATTEMPTS; i++) {
+        const response = await this.orchestratorService.fetchInstances({
+          workflowIds: [workflowId],
+        });
+
+        // Find the correct workflow by using the correlation context attribute(CCA) value
+        // id returned from the execution response will be the CCA value
+        // eslint-disable-next-line no-loop-func
+        currentInstanceToReturn = response.filter((val: any) => {
+          return (
+            val.variables.workflowdata[correlationContextAttributeName] ===
+            executionResponse?.id
+          );
+        });
+
+        if (currentInstanceToReturn.length > 0) {
+          break;
+        }
+      }
Evidence
The code loops FETCH_INSTANCE_MAX_ATTEMPTS times calling fetchInstances with no wait between
iterations. The same file already defines a retry delay constant and uses retryAsyncFunction with a
delay for the non-event path, showing the intended pattern exists but wasn’t applied here.

workspaces/orchestrator/plugins/orchestrator-backend/src/service/api/v2.ts[50-52]
workspaces/orchestrator/plugins/orchestrator-backend/src/service/api/v2.ts[290-309]
workspaces/orchestrator/plugins/orchestrator-backend/src/service/Helper.ts[24-38]
workspaces/orchestrator/plugins/orchestrator-backend/src/service/api/v2.ts[348-356]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
Event-trigger execution polls fetchInstances in a tight loop with no delay.
### Issue Context
This can create unnecessary load on Data Index GraphQL and still not improve correctness due to eventual consistency.
### Fix Focus Areas
- workspaces/orchestrator/plugins/orchestrator-backend/src/service/api/v2.ts[290-309]
- workspaces/orchestrator/plugins/orchestrator-backend/src/service/Helper.ts[24-38]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


5. Event YAML parsing unsafe 🐞 Bug ⛯ Reliability
Description
When inputData.isEvent is true, executeWorkflow parses definition.source and immediately accesses
states/events without validating presence/types. Since WorkflowInfo.source is optional, this can
throw runtime TypeErrors for workflows without source or malformed definitions.
Code

workspaces/orchestrator/plugins/orchestrator-backend/src/service/api/v2.ts[R216-226]

+    if (isEventType) {
+      // definition.source will be the yaml
+      const parsedDefinitionSource: any = load(definition.source as string);
+
+      // Parse the definition to find the start param then determine things from there
+      // All workflows should have this start param
+      const start = parsedDefinitionSource.start;
+
+      // Find the start state from the list of states
+      const startState = parsedDefinitionSource.states.filter(
+        (val: { name: any }) => {
Evidence
WorkflowInfo.source is an optional field, but the event path casts it to string and assumes parsed
YAML has states/events arrays (calling .filter). If source is undefined or YAML doesn’t match
the expected shape, parsedDefinitionSource or parsedDefinitionSource.states may be undefined,
causing a crash before the code’s explicit error checks run.

workspaces/orchestrator/plugins/orchestrator-common/src/types.ts[109-123]
workspaces/orchestrator/plugins/orchestrator-backend/src/service/api/v2.ts[216-229]
workspaces/orchestrator/plugins/orchestrator-backend/src/service/api/v2.ts[247-252]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
Event-trigger code assumes `definition.source`, `states`, and `events` exist and are arrays; this can throw runtime TypeErrors.
### Issue Context
`WorkflowInfo.source` is optional in the shared model.
### Fix Focus Areas
- workspaces/orchestrator/plugins/orchestrator-backend/src/service/api/v2.ts[216-229]
- workspaces/orchestrator/plugins/orchestrator-backend/src/service/api/v2.ts[247-252]
- workspaces/orchestrator/plugins/orchestrator-common/src/types.ts[109-123]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


Grey Divider

ⓘ The new review experience is currently in Beta. Learn more

Grey Divider

Qodo Logo

@lholmquist lholmquist force-pushed the RHIDP-9143-trigger-workflows-ce-backend branch from 3f813bd to ae11629 Compare March 11, 2026 19:11
@lholmquist lholmquist changed the title WIP: feat: triggering kafka workflows with cloud events. backend only code feat(orchestrator): triggering kafka workflows with cloud events. backend only code Mar 11, 2026
@lholmquist lholmquist force-pushed the RHIDP-9143-trigger-workflows-ce-backend branch from c246576 to 1bd2a17 Compare March 12, 2026 14:35
@sonarqubecloud
Copy link

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant