Skip to content

Centralize enableStreamableLogs resolution at deploy; fix ingestion-runner replacement#28741

Open
IceS2 wants to merge 5 commits into
mainfrom
fix/streamable-logs-runner-driven
Open

Centralize enableStreamableLogs resolution at deploy; fix ingestion-runner replacement#28741
IceS2 wants to merge 5 commits into
mainfrom
fix/streamable-logs-runner-driven

Conversation

@IceS2
Copy link
Copy Markdown
Contributor

@IceS2 IceS2 commented Jun 5, 2026


What

Centralize how enableStreamableLogs is resolved for ingestion pipelines, and fix ingestion-runner relationship handling on services.

  • Single deploy-time hook. Route every deploy path — REST /deploy, AutoPilot governance (CreateIngestionPipelineImpl), app runs (RunAppImpl/AppResource), and the update hook — through IngestionPipelineRepository.deployIngestionPipeline, which calls a new overridable applyStreamableLogsConfig(pipeline). The default is a no-op (keeps the pipeline's own value); deployments override it to derive the flag.
  • Remove the global force. Drop the isS3LogStorageEnabled()-based setEnableStreamableLogs(true) in IngestionPipelineResource and the update-hook deploy path. That check was global (storage-level) and applied on only one deploy path, so the flag was set inconsistently across pipelines.
  • Fix runner replacement. ServiceEntityRepository.updateIngestionRunner added a new USES → ingestionRunner relationship without removing the previous one, leaving a service with multiple runners and making service.getIngestionRunner() resolution order-dependent. Delete the old relationship before adding the new one.

Why

The streamable-logs flag was set by a global storage check on a single entry point rather than derived consistently, so the value sent to ingestion (write) and the value used to fetch logs (read) could disagree — pipelines that weren't redeployed through that path showed no logs, while pipelines forced on duplicated their log writes. Centralizing resolution behind one hook lets each deployment derive the flag consistently. The runner-relationship fix keeps runner resolution deterministic.

Fixes: https://github.com/open-metadata/openmetadata-collate/issues/4427

IceS2 added 3 commits June 4, 2026 12:41
…ory hook

Route every deploy path (REST, AutoPilot governance, app runs, update hook)
through IngestionPipelineRepository.deployIngestionPipeline, which applies an
overridable applyStreamableLogsConfig hook. Removes the global S3-storage force
in the Resource and update-hook paths so the flag is no longer forced true on
SaaS pipelines in clusters that have S3 log storage configured.
…hook

So enableStreamableLogs is derived from the app's runner for hybrid-runner apps,
consistent with service and test-suite pipelines.
…ting

updateIngestionRunner added a new USES->ingestionRunner relationship without
removing the previous one, leaving a service with multiple (hybrid) runners and
making service.getIngestionRunner() resolution order-dependent. Delete the old
runner relationship before adding the new one.
Copilot AI review requested due to automatic review settings June 5, 2026 08:25
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Jun 5, 2026

❌ PR checklist incomplete

This PR cannot be merged until the following are addressed on its linked issue:

  • No GitHub issue is linked. Add a closing reference such as Fixes #12345 to the PR description (accepted keywords: Fixes, Closes, Resolves).

The fields live on the linked issue in the Shipping project (open the issue → right sidebar → Projects). After you set them, re-run this check (or push a commit) — issue/project changes do not re-trigger it automatically.

Maintainers can bypass this check by adding the skip-pr-checks label.

@github-actions github-actions Bot added Ingestion safe to test Add this label to run secure Github workflows on PRs labels Jun 5, 2026
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR refactors ingestion-pipeline deployments to funnel multiple deploy entry points through IngestionPipelineRepository.deployIngestionPipeline(...) (introducing a deploy-time hook for enableStreamableLogs), and fixes service → ingestion-runner relationship replacement to avoid multiple runners being attached to a service.

Changes:

  • Route deploy calls from REST, apps, and governance workflow nodes through IngestionPipelineRepository.deployIngestionPipeline(...).
  • Add a single deploy-time hook (applyStreamableLogsConfig) and remove the prior global S3-based forcing of enableStreamableLogs.
  • Fix ServiceEntityRepository.updateIngestionRunner to remove the previous USES relationship before adding the new runner relationship.

Reviewed changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
openmetadata-service/src/main/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineResource.java Deploy endpoint now uses repository deploy wrapper (removes per-path streamable-logs forcing).
openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java App deploy flow now deploys ingestion pipeline via repository wrapper.
openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/ServiceEntityRepository.java Ensures ingestion-runner relationship is replaced (old runner link removed) rather than appended.
openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java Removes update-hook forcing, adds deploy wrapper + deploy-time hook for streamable logs config.
openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/runApp/RunAppImpl.java App-run workflow deploy step now uses repository deploy wrapper.
openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/createAndRunIngestionPipeline/CreateIngestionPipelineImpl.java Governance pipeline creation deploy now goes through repository deploy wrapper (drops direct client injection).
openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/createAndRunIngestionPipeline/CreateIngestionPipelineDelegate.java Removes pipeline-service-client expression wiring for the create/deploy step.
openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/createAndRunIngestionPipeline/CreateAndRunIngestionPipelineTask.java Removes the PipelineServiceClient field extension from the create-ingestion-pipeline service task.

Comment on lines 1314 to +1322
public PipelineServiceClientResponse deployIngestionPipeline(
IngestionPipeline ingestionPipeline, ServiceEntityInterface service) {
applyStreamableLogsConfig(ingestionPipeline);
return pipelineServiceClient.deployPipeline(ingestionPipeline, service);
}

// Single deploy-time hook for enableStreamableLogs, shared by every deploy path.
// Default keeps the pipeline's own value; overridden to derive it from the ingestion runner.
protected void applyStreamableLogsConfig(IngestionPipeline ingestionPipeline) {}
Comment on lines 186 to +192
LOG.info(
"[GovernanceWorkflows] Deploying '{}' for '{}'",
ingestionPipeline.getDisplayName(),
service.getName());
wasSuccessful = deployPipeline(pipelineServiceClient, ingestionPipeline, service);
IngestionPipelineRepository repository =
(IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE);
wasSuccessful = deployPipeline(repository, ingestionPipeline, service);
Comment on lines 239 to 258
@@ -254,7 +253,7 @@ private IngestionPipeline deployIngestionPipeline(
ingestionPipelineConfig.put("appConfig", app.getAppConfiguration());
ingestionPipeline.getSourceConfig().setConfig(ingestionPipelineConfig);

pipelineServiceClient.deployPipeline(
repository.deployIngestionPipeline(
ingestionPipeline,
Entity.getEntity(ingestionPipeline.getService(), "ingestionRunner", Include.NON_DELETED));
RunAppImpl.deployIngestionPipeline deployed via the repository hook (which derives
enableStreamableLogs) but never persisted the pipeline, unlike the other deploy
paths, so the stored value could diverge from what was sent to ingestion. Persist
on a successful deploy.
ingestionPipeline,
Entity.getEntity(
ingestionPipeline.getService(), "ingestionRunner", Include.NON_DELETED));
if (status.getCode() == 200) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Quality: Magic number 200 for HTTP status code

if (status.getCode() == 200) hardcodes the HTTP success code as a magic number, which the project standards prohibit (use enums/constants). Use the JAX-RS Response.Status.OK.getStatusCode() constant instead so the intent is explicit and consistent with other deploy paths.

Note: getCode() returns an Integer. If the response ever carries a null code, the == 200 comparison will auto-unbox and throw a NullPointerException. Comparing against the constant via .equals or guarding for null avoids this.

Replace the magic 200 with the Response.Status.OK constant (requires importing jakarta.ws.rs.core.Response).:

if (Response.Status.OK.getStatusCode() == status.getCode()) {
  repository.createOrUpdate(null, ingestionPipeline, ingestionPipeline.getUpdatedBy());
}
  • Apply fix

Check the box to apply the fix or reply for a change | Was this helpful? React with 👍 / 👎

@IceS2 IceS2 added the skip-pr-checks Bypass PR metadata validation check label Jun 5, 2026
Copilot AI review requested due to automatic review settings June 5, 2026 09:38
@gitar-bot
Copy link
Copy Markdown

gitar-bot Bot commented Jun 5, 2026

Code Review 👍 Approved with suggestions 1 resolved / 3 findings

Centralizes streamable log resolution for ingestion pipelines and corrects runner relationship management to prevent ingestion-runner accumulation. Address the hardcoded 200 HTTP status code and ensure that mutated pipeline configurations are not persisted to the database unnecessarily.

💡 Quality: Magic number 200 for HTTP status code

📄 openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/runApp/RunAppImpl.java:262

if (status.getCode() == 200) hardcodes the HTTP success code as a magic number, which the project standards prohibit (use enums/constants). Use the JAX-RS Response.Status.OK.getStatusCode() constant instead so the intent is explicit and consistent with other deploy paths.

Note: getCode() returns an Integer. If the response ever carries a null code, the == 200 comparison will auto-unbox and throw a NullPointerException. Comparing against the constant via .equals or guarding for null avoids this.

Replace the magic 200 with the Response.Status.OK constant (requires importing jakarta.ws.rs.core.Response).
if (Response.Status.OK.getStatusCode() == status.getCode()) {
  repository.createOrUpdate(null, ingestionPipeline, ingestionPipeline.getUpdatedBy());
}
💡 Edge Case: Persisting mutated pipeline writes injected appConfig to DB

📄 openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/runApp/RunAppImpl.java:252-264

Before calling repository.createOrUpdate, the pipeline's sourceConfig.config is mutated to inject appConfig (app.getAppConfiguration()) for the deploy call (lines 252-255). The new createOrUpdate on success now persists this mutated object, so the injected appConfig is written into the stored ingestion pipeline's sourceConfig — a behavior change versus the prior code which never persisted. The intent of this commit is to persist the derived enableStreamableLogs flag; persisting the injected appConfig is an incidental side effect. While openMetadataServerConnection is stripped by getFieldsStrippedFromStorageJson, appConfig is not, so app-specific configuration now leaks into the pipeline entity. Each subsequent run re-injects it, so it is largely self-correcting, but consider persisting only the deploy-derived fields (or re-reading the pipeline before persisting) to avoid storing transient deploy-time mutations.

✅ 1 resolved
Bug: App-run deploy path never persists derived streamableLogs flag

📄 openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/runApp/RunAppImpl.java:239-253
In RunAppImpl.deployIngestionPipeline(App) the pipeline is deployed via repository.deployIngestionPipeline(...) (line 256), which now mutates the pipeline through the new applyStreamableLogsConfig(pipeline) hook. However, unlike the other migrated deploy paths (CreateIngestionPipelineImpl and AppResource, which both call createOrUpdate(...) after a successful deploy), this method returns the in-memory pipeline without ever persisting it.

Consequence: the value the hook derives is sent to ingestion (the write side) but the stored IngestionPipeline entity keeps its old enableStreamableLogs value. When logs are later fetched (the read side), resolution uses the persisted entity, so write and read can disagree — the exact inconsistency this PR aims to eliminate. For app-run pipelines whose flag the deployment override flips, log writes/reads will be out of sync.

Suggested fix: persist the pipeline after a successful deploy in this path, mirroring the other call sites (e.g. repository.createOrUpdate(null, ingestionPipeline, ingestionPipeline.getUpdatedBy())).

🤖 Prompt for agents
Code Review: Centralizes streamable log resolution for ingestion pipelines and corrects runner relationship management to prevent ingestion-runner accumulation. Address the hardcoded 200 HTTP status code and ensure that mutated pipeline configurations are not persisted to the database unnecessarily.

1. 💡 Quality: Magic number 200 for HTTP status code
   Files: openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/runApp/RunAppImpl.java:262

   `if (status.getCode() == 200)` hardcodes the HTTP success code as a magic number, which the project standards prohibit (use enums/constants). Use the JAX-RS `Response.Status.OK.getStatusCode()` constant instead so the intent is explicit and consistent with other deploy paths.
   
   Note: `getCode()` returns an `Integer`. If the response ever carries a null code, the `== 200` comparison will auto-unbox and throw a NullPointerException. Comparing against the constant via `.equals` or guarding for null avoids this.

   Fix (Replace the magic 200 with the Response.Status.OK constant (requires importing jakarta.ws.rs.core.Response).):
   if (Response.Status.OK.getStatusCode() == status.getCode()) {
     repository.createOrUpdate(null, ingestionPipeline, ingestionPipeline.getUpdatedBy());
   }

2. 💡 Edge Case: Persisting mutated pipeline writes injected appConfig to DB
   Files: openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/runApp/RunAppImpl.java:252-264

   Before calling `repository.createOrUpdate`, the pipeline's `sourceConfig.config` is mutated to inject `appConfig` (app.getAppConfiguration()) for the deploy call (lines 252-255). The new `createOrUpdate` on success now persists this mutated object, so the injected `appConfig` is written into the stored ingestion pipeline's sourceConfig — a behavior change versus the prior code which never persisted. The intent of this commit is to persist the derived `enableStreamableLogs` flag; persisting the injected appConfig is an incidental side effect. While `openMetadataServerConnection` is stripped by `getFieldsStrippedFromStorageJson`, `appConfig` is not, so app-specific configuration now leaks into the pipeline entity. Each subsequent run re-injects it, so it is largely self-correcting, but consider persisting only the deploy-derived fields (or re-reading the pipeline before persisting) to avoid storing transient deploy-time mutations.

Options

Display: compact → Showing less information.

Comment with these commands to change:

Compact
gitar display:verbose         

Was this helpful? React with 👍 / 👎 | Gitar

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 8 out of 8 changed files in this pull request and generated 1 comment.

Comment on lines +1320 to +1323
// Single deploy-time hook for enableStreamableLogs, shared by every deploy path.
// Default keeps the pipeline's own value; overrides resolve the pipeline's owning ingestion
// runner (service / test-suite / application) and derive the flag from it.
protected void applyStreamableLogsConfig(IngestionPipeline ingestionPipeline) {}
@sonarqubecloud
Copy link
Copy Markdown

sonarqubecloud Bot commented Jun 5, 2026

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Jun 5, 2026

🟡 Playwright Results — all passed (13 flaky)

✅ 4265 passed · ❌ 0 failed · 🟡 13 flaky · ⏭️ 88 skipped

Shard Passed Failed Flaky Skipped
✅ Shard 1 299 0 0 4
🟡 Shard 2 803 0 1 9
🟡 Shard 3 801 0 4 8
🟡 Shard 4 853 0 2 12
🟡 Shard 5 718 0 3 47
🟡 Shard 6 791 0 3 8
🟡 13 flaky test(s) (passed on retry)
  • Features/Glossary/GlossaryWorkflow.spec.ts › should display correct status badge color and icon (shard 2, 1 retry)
  • Features/QueryEntity.spec.ts › Query Entity (shard 3, 1 retry)
  • Features/RTL.spec.ts › Verify Following widget functionality (shard 3, 2 retries)
  • Features/Table.spec.ts › Table pagination with sorting should works (shard 3, 1 retry)
  • Flow/ExploreAggregationCountsMatching.spec.ts › should verify left panel counts and tab search results for normal search (shard 3, 1 retry)
  • Flow/ObservabilityAlerts.spec.ts › Table alert (shard 4, 2 retries)
  • Pages/Domains.spec.ts › Create DataProducts and add remove assets (shard 4, 1 retry)
  • Pages/EntityDataConsumer.spec.ts › Tier Add, Update and Remove (shard 5, 1 retry)
  • Pages/EntityDataSteward.spec.ts › Tier Add, Update and Remove (shard 5, 1 retry)
  • Pages/ExplorePageRightPanel_KnowledgeCenter.spec.ts › Should remove user owner for knowledgeCenter (shard 5, 1 retry)
  • Pages/Lineage/LineageFilters.spec.ts › Verify lineage service type filter selection (shard 6, 1 retry)
  • Pages/Lineage/LineageFilters.spec.ts › Verify lineage schema filter selection (shard 6, 1 retry)
  • Pages/ODCSImportExport.spec.ts › Multi-object ODCS contract - object selector shows all schema objects (shard 6, 1 retry)

📦 Download artifacts

How to debug locally
# Download playwright-test-results-<shard> artifact and unzip
npx playwright show-trace path/to/trace.zip    # view trace

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Ingestion safe to test Add this label to run secure Github workflows on PRs skip-pr-checks Bypass PR metadata validation check

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants