Skip to content

DS 2359 observation loop atomic pipeline output#21237

Open
akuzni2 wants to merge 10 commits intodevelopfrom
DS-2359-observation-loop-atomic-pipeline-output
Open

DS 2359 observation loop atomic pipeline output#21237
akuzni2 wants to merge 10 commits intodevelopfrom
DS-2359-observation-loop-atomic-pipeline-output

Conversation

@akuzni2
Copy link
Contributor

@akuzni2 akuzni2 commented Feb 19, 2026

  • ensure pipeline streams are extracted atomically
  • changeset

@trunk-io
Copy link

trunk-io bot commented Feb 19, 2026

Static BadgeStatic BadgeStatic BadgeStatic Badge

Failed Test Failure Summary Logs
Test_PromoteCandidate The test 'Test_PromoteCandidate' failed during execution. Logs ↗︎
Test_PromoteCandidate/MCMS_disabled The test failed because the network address 127.0.0.1:14511 was already in use, preventing the peer from starting. Logs ↗︎

View Full Report ↗︎Docs

@akuzni2 akuzni2 marked this pull request as ready for review February 20, 2026 05:06
@akuzni2 akuzni2 requested review from a team as code owners February 20, 2026 05:06
Copilot AI review requested due to automatic review settings February 20, 2026 05:06
@akuzni2 akuzni2 requested review from brunotm and calvwang9 and removed request for dav009 and justinkaseman February 20, 2026 05:06
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR ensures pipeline streams are extracted atomically to prevent partial failures where some streams from a multi-stream pipeline have values while others don't. The change addresses a concurrency issue in the LLO observation loop where BridgeTask's use of overtimeContext could cause waiters to bail early with errors while the executor eventually succeeds.

Changes:

  • Modified observation context to block waiters until pipeline execution completes, ensuring all goroutines receive identical results
  • Added removeIncompleteGroups function to enforce all-or-nothing cache writes per pipeline group
  • Extracted StreamValueCache interface to enable test verification of atomic behavior

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
core/services/llo/observation/observation_context.go Removed ctx.Done() handling from waiter path to prevent partial results
core/services/llo/observation/data_source.go Added validation to drop incomplete pipeline groups before caching
core/services/llo/observation/cache.go Extracted StreamValueCache interface for testability
core/services/llo/observation/observation_context_test.go Added tests for partial extraction failures and concurrent atomic output
core/services/llo/observation/data_source_test.go Added comprehensive tests for atomic cache writes and removeIncompleteGroups edge cases
.changeset/warm-roses-argue.md Added changeset with minor formatting issue

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

"chainlink": patch
---

#updated In llo observation loop ensure that the Pipeline streams are extracted atomically
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "#updated" tag appears to have a typo. Based on the existing changeset files in the codebase (e.g., minor-bump-1771355195.md), tags like "#internal" should be on their own line before the frontmatter, not in the summary. The current format "#updated In llo observation loop..." appears to be a combination of a tag and the description. Consider either:

  1. Moving "#updated" to its own line before the frontmatter if it's intended as a tag
  2. Removing "#updated" if it's not a valid tag and keeping only the description

If "#updated" is not a recognized tag, the line should simply read: "In llo observation loop ensure that the Pipeline streams are extracted atomically"

Suggested change
#updated In llo observation loop ensure that the Pipeline streams are extracted atomically
In llo observation loop ensure that the Pipeline streams are extracted atomically

Copilot uses AI. Check for mistakes.
@cl-sonarqube-production
Copy link


mu.Lock()
observedValues[streamID] = val
successfulStreamIDs = append(successfulStreamIDs, streamID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we also want to also adjust the successful stream count based on the dropped streams from removeIncompleteGroups?

}
delete(observedValues, sid)
}
lggr.Debugw("Discarding incomplete pipeline group",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we know under what specific conditions this occurs and would it be indicative of issues with data freshness or otherwise? im wondering if we should add metrics/upgrade to a warn log to keep an eye on it

var wg sync.WaitGroup
oc := NewObservationContext(lggr, d.registry, d.t)

for streamID := range osv.streamValues {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the skip logic still iterates over individual streams - to preserve atomicity, we should attempt to refresh/observe all the streams within a group if any value is expired?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants

Comments