DS 2359 observation loop atomic pipeline output#21237
DS 2359 observation loop atomic pipeline output#21237
Conversation
There was a problem hiding this comment.
Pull request overview
This PR ensures pipeline streams are extracted atomically to prevent partial failures where some streams from a multi-stream pipeline have values while others don't. The change addresses a concurrency issue in the LLO observation loop where BridgeTask's use of overtimeContext could cause waiters to bail early with errors while the executor eventually succeeds.
Changes:
- Modified observation context to block waiters until pipeline execution completes, ensuring all goroutines receive identical results
- Added
removeIncompleteGroupsfunction to enforce all-or-nothing cache writes per pipeline group - Extracted
StreamValueCacheinterface to enable test verification of atomic behavior
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
core/services/llo/observation/observation_context.go |
Removed ctx.Done() handling from waiter path to prevent partial results |
core/services/llo/observation/data_source.go |
Added validation to drop incomplete pipeline groups before caching |
core/services/llo/observation/cache.go |
Extracted StreamValueCache interface for testability |
core/services/llo/observation/observation_context_test.go |
Added tests for partial extraction failures and concurrent atomic output |
core/services/llo/observation/data_source_test.go |
Added comprehensive tests for atomic cache writes and removeIncompleteGroups edge cases |
.changeset/warm-roses-argue.md |
Added changeset with minor formatting issue |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| "chainlink": patch | ||
| --- | ||
|
|
||
| #updated In llo observation loop ensure that the Pipeline streams are extracted atomically |
There was a problem hiding this comment.
The "#updated" tag appears to have a typo. Based on the existing changeset files in the codebase (e.g., minor-bump-1771355195.md), tags like "#internal" should be on their own line before the frontmatter, not in the summary. The current format "#updated In llo observation loop..." appears to be a combination of a tag and the description. Consider either:
- Moving "#updated" to its own line before the frontmatter if it's intended as a tag
- Removing "#updated" if it's not a valid tag and keeping only the description
If "#updated" is not a recognized tag, the line should simply read: "In llo observation loop ensure that the Pipeline streams are extracted atomically"
| #updated In llo observation loop ensure that the Pipeline streams are extracted atomically | |
| In llo observation loop ensure that the Pipeline streams are extracted atomically |
|
|
|
||
| mu.Lock() | ||
| observedValues[streamID] = val | ||
| successfulStreamIDs = append(successfulStreamIDs, streamID) |
There was a problem hiding this comment.
do we also want to also adjust the successful stream count based on the dropped streams from removeIncompleteGroups?
| } | ||
| delete(observedValues, sid) | ||
| } | ||
| lggr.Debugw("Discarding incomplete pipeline group", |
There was a problem hiding this comment.
do we know under what specific conditions this occurs and would it be indicative of issues with data freshness or otherwise? im wondering if we should add metrics/upgrade to a warn log to keep an eye on it
| var wg sync.WaitGroup | ||
| oc := NewObservationContext(lggr, d.registry, d.t) | ||
|
|
||
| for streamID := range osv.streamValues { |
There was a problem hiding this comment.
the skip logic still iterates over individual streams - to preserve atomicity, we should attempt to refresh/observe all the streams within a group if any value is expired?




Uh oh!
There was an error while loading. Please reload this page.