Centralize enableStreamableLogs resolution at deploy; fix ingestion-runner replacement#28741
Centralize enableStreamableLogs resolution at deploy; fix ingestion-runner replacement#28741IceS2 wants to merge 5 commits into
Conversation
…ory hook Route every deploy path (REST, AutoPilot governance, app runs, update hook) through IngestionPipelineRepository.deployIngestionPipeline, which applies an overridable applyStreamableLogsConfig hook. Removes the global S3-storage force in the Resource and update-hook paths so the flag is no longer forced true on SaaS pipelines in clusters that have S3 log storage configured.
…hook So enableStreamableLogs is derived from the app's runner for hybrid-runner apps, consistent with service and test-suite pipelines.
…ting updateIngestionRunner added a new USES->ingestionRunner relationship without removing the previous one, leaving a service with multiple (hybrid) runners and making service.getIngestionRunner() resolution order-dependent. Delete the old runner relationship before adding the new one.
❌ PR checklist incompleteThis PR cannot be merged until the following are addressed on its linked issue:
The fields live on the linked issue in the Shipping project (open the issue → right sidebar → Projects). After you set them, re-run this check (or push a commit) — issue/project changes do not re-trigger it automatically. Maintainers can bypass this check by adding the |
There was a problem hiding this comment.
Pull request overview
This PR refactors ingestion-pipeline deployments to funnel multiple deploy entry points through IngestionPipelineRepository.deployIngestionPipeline(...) (introducing a deploy-time hook for enableStreamableLogs), and fixes service → ingestion-runner relationship replacement to avoid multiple runners being attached to a service.
Changes:
- Route deploy calls from REST, apps, and governance workflow nodes through
IngestionPipelineRepository.deployIngestionPipeline(...). - Add a single deploy-time hook (
applyStreamableLogsConfig) and remove the prior global S3-based forcing ofenableStreamableLogs. - Fix
ServiceEntityRepository.updateIngestionRunnerto remove the previousUSESrelationship before adding the new runner relationship.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| openmetadata-service/src/main/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineResource.java | Deploy endpoint now uses repository deploy wrapper (removes per-path streamable-logs forcing). |
| openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java | App deploy flow now deploys ingestion pipeline via repository wrapper. |
| openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/ServiceEntityRepository.java | Ensures ingestion-runner relationship is replaced (old runner link removed) rather than appended. |
| openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java | Removes update-hook forcing, adds deploy wrapper + deploy-time hook for streamable logs config. |
| openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/runApp/RunAppImpl.java | App-run workflow deploy step now uses repository deploy wrapper. |
| openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/createAndRunIngestionPipeline/CreateIngestionPipelineImpl.java | Governance pipeline creation deploy now goes through repository deploy wrapper (drops direct client injection). |
| openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/createAndRunIngestionPipeline/CreateIngestionPipelineDelegate.java | Removes pipeline-service-client expression wiring for the create/deploy step. |
| openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/createAndRunIngestionPipeline/CreateAndRunIngestionPipelineTask.java | Removes the PipelineServiceClient field extension from the create-ingestion-pipeline service task. |
| public PipelineServiceClientResponse deployIngestionPipeline( | ||
| IngestionPipeline ingestionPipeline, ServiceEntityInterface service) { | ||
| applyStreamableLogsConfig(ingestionPipeline); | ||
| return pipelineServiceClient.deployPipeline(ingestionPipeline, service); | ||
| } | ||
|
|
||
| // Single deploy-time hook for enableStreamableLogs, shared by every deploy path. | ||
| // Default keeps the pipeline's own value; overridden to derive it from the ingestion runner. | ||
| protected void applyStreamableLogsConfig(IngestionPipeline ingestionPipeline) {} |
| LOG.info( | ||
| "[GovernanceWorkflows] Deploying '{}' for '{}'", | ||
| ingestionPipeline.getDisplayName(), | ||
| service.getName()); | ||
| wasSuccessful = deployPipeline(pipelineServiceClient, ingestionPipeline, service); | ||
| IngestionPipelineRepository repository = | ||
| (IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE); | ||
| wasSuccessful = deployPipeline(repository, ingestionPipeline, service); |
| @@ -254,7 +253,7 @@ private IngestionPipeline deployIngestionPipeline( | |||
| ingestionPipelineConfig.put("appConfig", app.getAppConfiguration()); | |||
| ingestionPipeline.getSourceConfig().setConfig(ingestionPipelineConfig); | |||
|
|
|||
| pipelineServiceClient.deployPipeline( | |||
| repository.deployIngestionPipeline( | |||
| ingestionPipeline, | |||
| Entity.getEntity(ingestionPipeline.getService(), "ingestionRunner", Include.NON_DELETED)); | |||
RunAppImpl.deployIngestionPipeline deployed via the repository hook (which derives enableStreamableLogs) but never persisted the pipeline, unlike the other deploy paths, so the stored value could diverge from what was sent to ingestion. Persist on a successful deploy.
| ingestionPipeline, | ||
| Entity.getEntity( | ||
| ingestionPipeline.getService(), "ingestionRunner", Include.NON_DELETED)); | ||
| if (status.getCode() == 200) { |
There was a problem hiding this comment.
💡 Quality: Magic number 200 for HTTP status code
if (status.getCode() == 200) hardcodes the HTTP success code as a magic number, which the project standards prohibit (use enums/constants). Use the JAX-RS Response.Status.OK.getStatusCode() constant instead so the intent is explicit and consistent with other deploy paths.
Note: getCode() returns an Integer. If the response ever carries a null code, the == 200 comparison will auto-unbox and throw a NullPointerException. Comparing against the constant via .equals or guarding for null avoids this.
Replace the magic 200 with the Response.Status.OK constant (requires importing jakarta.ws.rs.core.Response).:
if (Response.Status.OK.getStatusCode() == status.getCode()) {
repository.createOrUpdate(null, ingestionPipeline, ingestionPipeline.getUpdatedBy());
}
- Apply fix
Check the box to apply the fix or reply for a change | Was this helpful? React with 👍 / 👎
Code Review 👍 Approved with suggestions 1 resolved / 3 findingsCentralizes streamable log resolution for ingestion pipelines and corrects runner relationship management to prevent ingestion-runner accumulation. Address the hardcoded 200 HTTP status code and ensure that mutated pipeline configurations are not persisted to the database unnecessarily. 💡 Quality: Magic number 200 for HTTP status code
Note: Replace the magic 200 with the Response.Status.OK constant (requires importing jakarta.ws.rs.core.Response).💡 Edge Case: Persisting mutated pipeline writes injected appConfig to DBBefore calling ✅ 1 resolved✅ Bug: App-run deploy path never persists derived streamableLogs flag
🤖 Prompt for agentsOptionsDisplay: compact → Showing less information. Comment with these commands to change:
Was this helpful? React with 👍 / 👎 | Gitar |
| // Single deploy-time hook for enableStreamableLogs, shared by every deploy path. | ||
| // Default keeps the pipeline's own value; overrides resolve the pipeline's owning ingestion | ||
| // runner (service / test-suite / application) and derive the flag from it. | ||
| protected void applyStreamableLogsConfig(IngestionPipeline ingestionPipeline) {} |
|
🟡 Playwright Results — all passed (13 flaky)✅ 4265 passed · ❌ 0 failed · 🟡 13 flaky · ⏭️ 88 skipped
🟡 13 flaky test(s) (passed on retry)
How to debug locally# Download playwright-test-results-<shard> artifact and unzip
npx playwright show-trace path/to/trace.zip # view trace |



What
Centralize how
enableStreamableLogsis resolved for ingestion pipelines, and fix ingestion-runner relationship handling on services./deploy, AutoPilot governance (CreateIngestionPipelineImpl), app runs (RunAppImpl/AppResource), and the update hook — throughIngestionPipelineRepository.deployIngestionPipeline, which calls a new overridableapplyStreamableLogsConfig(pipeline). The default is a no-op (keeps the pipeline's own value); deployments override it to derive the flag.isS3LogStorageEnabled()-basedsetEnableStreamableLogs(true)inIngestionPipelineResourceand the update-hook deploy path. That check was global (storage-level) and applied on only one deploy path, so the flag was set inconsistently across pipelines.ServiceEntityRepository.updateIngestionRunneradded a newUSES → ingestionRunnerrelationship without removing the previous one, leaving a service with multiple runners and makingservice.getIngestionRunner()resolution order-dependent. Delete the old relationship before adding the new one.Why
The streamable-logs flag was set by a global storage check on a single entry point rather than derived consistently, so the value sent to ingestion (write) and the value used to fetch logs (read) could disagree — pipelines that weren't redeployed through that path showed no logs, while pipelines forced on duplicated their log writes. Centralizing resolution behind one hook lets each deployment derive the flag consistently. The runner-relationship fix keeps runner resolution deterministic.
Fixes: https://github.com/open-metadata/openmetadata-collate/issues/4427