feat(orchestrator): triggering kafka workflows with cloud events. backend only code#2512
Conversation
|
Important This PR includes changes that affect public-facing API. Please ensure you are adding/updating documentation for new features or behavior. Changed Packages
|
84668ec to
7561c8e
Compare
Review Summary by QodoAdd Kafka-based workflow execution with CloudEvents support
WalkthroughsDescription• Add Kafka-based workflow execution via CloudEvents • Implement event-driven workflow triggering with correlation attributes • Add Kafka configuration support with SSL and SASL authentication • Extend workflow execution API to handle event-type workflows Diagramflowchart LR
A["Workflow Execution Request"] -->|isEvent=true| B["Parse Workflow Definition"]
B --> C["Extract Event Configuration"]
C --> D["Create CloudEvent"]
D --> E["Send via Kafka Producer"]
E --> F["Return Correlation ID"]
F --> G["Poll for Workflow Instance"]
G --> H["Return Instance ID"]
I["Kafka Config"] -->|brokers, SSL, SASL| E
File Changes1. workspaces/orchestrator/plugins/orchestrator-backend/config.d.ts
|
Code Review by Qodo
1.
|
workspaces/orchestrator/plugins/orchestrator-backend/src/service/router.ts
Outdated
Show resolved
Hide resolved
workspaces/orchestrator/plugins/orchestrator-backend/src/service/SonataFlowService.ts
Show resolved
Hide resolved
workspaces/orchestrator/plugins/orchestrator-backend/src/service/SonataFlowService.ts
Show resolved
Hide resolved
3f813bd to
ae11629
Compare
c246576 to
1bd2a17
Compare
|



Hey, I just made a Pull Request!
This PR is for feature https://issues.redhat.com/browse/RHDHPLAN-291
This PR is specifically for the backend code: https://issues.redhat.com/browse/RHIDP-9143
This enables users with workflows that are triggered by kafka events(by sending a Cloud Events), to be triggered from the RHDH UI. Front end changes will come in a separate PR.
To test this i used the demo workflow from here: https://github.com/lholmquist/orchestrator-demo/tree/callback-flow-updates.
I start the kafka server using the instructions in the readme here: https://github.com/lholmquist/orchestrator-demo/tree/callback-flow-updates
Then i start the workflow, which is a quarkus application by running
mvn clean quarkus:devfrom the08_kafka_events/callback-flowdirectory. Since this is pulling dependecies from a Red Hat maven registry, I had to make sure i added those registries to my.m2/settings.xml. This is mine: https://gist.github.com/lholmquist/281b139db727504e833c76e255e975cbThe quarkus app(the workflow) should be running on port 8080 once it is started.
The app-config of RHDH needs some additions/changes to connect to the workflow and to specify the kafka parameters. This is the app-config i use: https://gist.github.com/lholmquist/727190f1f8e91d217b4676310686c282
Biggest changes to the app-config are the addition of the
orchestrator.kafkaconfig and I also changed theorchestrator.sonataFlowService.portto8080and theorchestrator.dataIndexService.urltohttp://localhost:8080which gives the RHDH UI access to the workflow.Run the local RHDH by running
yarn devfrom theworkspaces/orchestratordirectoryIf everything is setup correctly, you should see 1 workflow in the list under the orchestrator tab:
Since this is only the backend code, you can trigger the workflow directly by using this curl command:
The auth token can be found in the chrome dev tools network tab by looking at one of the requests
The plan for the front-end is to have a button that says something like "Run as event", which is where that
inputData: {isEvent: true}from the data in the curl will come from.There are two things that can happen during a successful run.
Because of timing issues of sending an event to the kafka broker and it triggering the workflow, the workflow instance id might not be available yet. The code tries a few times to query the instances but if it can't find it, it is not necesarilly an error and the return value will be
{"id": "kafkaEvent"}which will tell the UI to just return to the "All Runs" tab for the orchestrator workflows. Since this tab refreshes on a timer, the workflow will eventually show up.If the timing works out and the workflow shows up, then we will return the instanceId of that workflow run and the UI should just show the instance run detail page(which is what happens for a "normal" workflow)
✔️ Checklist