feat: implement collapsed forwarding for object and chunk requests#56
Open
sendya wants to merge 5 commits into
Open
feat: implement collapsed forwarding for object and chunk requests#56sendya wants to merge 5 commits into
sendya wants to merge 5 commits into
Conversation
Contributor
There was a problem hiding this comment.
Pull request overview
This PR adds collapsed forwarding to the caching middleware so concurrent cache object and byte-range chunk requests can share upstream fetches instead of independently hitting origin.
Changes:
- Adds object-level and chunk-level flight groups using
io.Pipe/io.MultiWriter. - Wires flight groups into cache MISS and partial HIT/MISS request paths.
- Adds unit and E2E coverage for collapsed forwarding behavior and updates E2E request cloning.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 14 comments.
Show a summary per file
| File | Description |
|---|---|
server/middleware/caching/object_flight.go |
Adds object-level collapsed forwarding implementation. |
server/middleware/caching/chunk_flight.go |
Adds chunk/range-level collapsed forwarding implementation. |
server/middleware/caching/caching.go |
Integrates flight groups and refactors cache HIT response handling. |
server/middleware/caching/internal.go |
Adds chunk flight group reference to Caching. |
server/middleware/caching/collapsed_forwarding_test.go |
Adds unit tests for flight group behavior. |
tests/all-features/caching/collapsed_forwarding_test.go |
Adds E2E tests for object and chunk collapsed forwarding. |
pkg/e2e/e2e.go |
Clones requests before per-call rewrites in E2E helper. |
Comments suppressed due to low confidence (3)
server/middleware/caching/caching.go:167
- This wraps
caching.doProxyin a new object flight whiledoProxystill callsproxyClient.Dowithc.opt.CollapsedRequestenabled. With the default config, a fast MISS now waits once inObjectFlightGroupand again in the proxy singleflight, adding up to roughly two collapsed-request wait windows to every miss and duplicating the collapse layer. The inner proxy collapse should be disabled or the wait should not be applied twice.
flightResp, _, flightErr := objectFlight.Do(caching.id.HashStr(), opts.CollapsedRequestWaitTimeout.AsDuration(), func() (*http.Response, error) {
r, e := caching.doProxy(req, false)
tests/all-features/caching/collapsed_forwarding_test.go:146
- This
require.NoErrorruns inside a worker goroutine. SincerequireusesFailNow, testify requires it to be called from the test goroutine; collect the error and assert afterwg.Waitto avoid unreliable concurrent-test failures.
require.NoError(t, err)
tests/all-features/caching/collapsed_forwarding_test.go:206
- This assertion is executed in a goroutine;
require.NoErrormay callFailNow, which is only safe from the test goroutine. Capture the per-caller error and assert after all goroutines complete so failures are reported deterministically.
require.NoError(t, err1, "caller %d: request should not error", idx)
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+80
to
+81
| // fn owns resp.Body on error — it must close it before | ||
| // returning. We only guard against a nil body here. |
| // concurrent requests for the same cache object share one | ||
| // origin fetch (Squid-style collapsed_forwarding). | ||
| if opts.CollapsedRequest { | ||
| flightResp, _, flightErr := objectFlight.Do(caching.id.HashStr(), opts.CollapsedRequestWaitTimeout.AsDuration(), func() (*http.Response, error) { |
Comment on lines
+68
to
+73
| g.mu.Lock() | ||
| // Snapshot pipes and remove the key so no further callers | ||
| // register against this flight. | ||
| pipes := make([]*io.PipeWriter, len(c.pipes)) | ||
| copy(pipes, c.pipes) | ||
| delete(g.m, key) |
Comment on lines
+87
to
+89
| g.mu.Lock() | ||
| delete(g.m, key) | ||
|
|
Comment on lines
+121
to
+126
| mw := io.MultiWriter(writers...) | ||
|
|
||
| var copyErr error | ||
| if resp.Body != nil { | ||
| _, copyErr = io.Copy(mw, resp.Body) | ||
| _ = resp.Body.Close() |
Comment on lines
+47
to
+49
| // resp — a response carrying the leader's headers and a shared body | ||
| // shared — true if this caller joined an existing flight | ||
| // err — error from fn or from body copy |
Comment on lines
+101
to
+113
| // Sequential requests should not be collapsed. | ||
| for i := 0; i < N; i++ { | ||
| resp, err := case1.Do(func(r *http.Request) { | ||
| r.Header.Set("X-Request-Idx", strconv.Itoa(i)) | ||
| }) | ||
|
|
||
| require.NoError(t, err, "request %d should not error", i) | ||
|
|
||
| bodies[i] = e2e.HashBody(resp) | ||
| } | ||
|
|
||
| assert.Equal(t, int32(1), originCallCount.Load(), | ||
| "object flight should not collapse sequential requests") |
| rng, err := xhttp.SingleRange(req.Header.Get("Range"), c.md.Size) | ||
| if err != nil { | ||
| headers := make(http.Header) | ||
| xhttp.CopyHeader(c.md.Headers, headers) |
Comment on lines
+66
to
+68
| // Verify only one origin call — ObjectFlightGroup collapsed all 5. | ||
| assert.Equal(t, int32(1), originCallCount.Load(), | ||
| "object flight should collapse concurrent full-MISS requests") |
Comment on lines
+234
to
+236
| // The concurrent chunk fetch for the missing range must be collapsed. | ||
| assert.Equal(t, int32(1), originCallCount.Load(), | ||
| "chunk flight should collapse concurrent chunk fetches to 1 origin call") |
2be63eb to
b833273
Compare
…goroutine leaks - Wrap fn() with defer/recover in ObjectFlightGroup.Do and ChunkFlightGroup.Do to convert panics to errors, ensuring pipes are properly closed and map entries are cleaned up - Without this, a panic in fn() would leave waiters hanging indefinitely: pipes never closed, goroutines blocked forever on pipe reads - Add TestObjectFlight_PanicRecovery and TestChunkFlight_PanicRecovery tests Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
b833273 to
e7a2694
Compare
- Fix channel buffer size to prevent goroutine leaks in tests - Move originCallCount to per-subtest scope for isolation - Add response body hash verification in chunk flight tests - Track X-Cache, Content-Range, Content-Length in assertions - Add PURGE cleanup after each test group - Add e2e.PurgeMethod with directory purge support - Fix response body close leaks - Add debug logging for test diagnostics - Remove unused watchdog plugin from test config Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Change ChunkFlightGroup from range-prefixed keys
("<hash>:<fromByte>-<toByte>") to object-level keys ("<hash>")
with automatic range union. When concurrent requests for the
same object need different byte ranges, the leader computes
the union of all registered ranges and fetches a single range
that covers everyone. Each caller then trims the shared
response stream to their specific sub-range via iobuf.RangeReader.
Key changes:
- chunk_flight.go: new chunkRange type, extended chunkCall with
union tracking, WaitGroup-based union signaling, RangeReader
wrapping for all callers, async fn execution (preserves
header-building timing)
- caching.go: getUpstreamReader uses object-level key, fn
parameterized by union range
- tests: new RangeUnion unit test (3 different ranges → 1 call),
updated KeyIsolation for object-level collapsing, new E2E
RangeUnion test with body hash verification
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
- Fix critical race in ChunkFlightGroup: move delete(g.m, objectKey) before c.wg.Done() to prevent late callers from joining a flight with stale union range (data corruption / goroutine leak) - Fix missing CachePartHit for chunk flight leader path: set cacheStatus unconditionally in getUpstreamReader so both leader and shared callers correctly report partial hit - Fix c.err data race in ChunkFlightGroup: protect reads and writes with c.mu to silence Go race detector - Restore three dead Prometheus metrics (cacheRequestTotal, cacheChunkWriteTotal, cacheFlushFailedTotal) that were silently dropped during the refactor - Fix PurgeMethod to always send explicit Purge-Type header (file,hard) for non-dir purges Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This pull request introduces advanced collapsed forwarding for the caching middleware, enabling concurrent requests for the same cache object or chunk to share a single upstream fetch. This mirrors Squid's
collapsed_forwardingfeature and improves efficiency under high concurrency. The implementation adds both object-level and chunk-level collapsed forwarding, refactors cache HIT handling, and includes comprehensive tests for these new mechanisms.Collapsed Forwarding Implementation:
ObjectFlightGroupandChunkFlightGroupto collapse concurrent full-object and chunk (byte-range) upstream requests, ensuring only one origin fetch per unique cache key or chunk, with all callers receiving a shared response. (server/middleware/caching/object_flight.go[1]server/middleware/caching/chunk_flight.go[2]ObjectFlightGroupandChunkFlightGroupinto the caching middleware, enabling object-level collapsed forwarding for full cache misses and chunk-level collapsed forwarding for partial (range) requests. (server/middleware/caching/caching.go[1] [2] [3] [4]server/middleware/caching/internal.go[5]Cache HIT Handling Refactor:
respondFromCachemethod, improving clarity and separation of concerns for serving cached responses and handling range requests. (server/middleware/caching/caching.go[1] [2]Testing:
ObjectFlightGroupandChunkFlightGroup, covering basic collapse, error propagation, key isolation, and high concurrency scenarios to ensure correctness and robustness. (server/middleware/caching/collapsed_forwarding_test.goserver/middleware/caching/collapsed_forwarding_test.goR1-R394)Other Improvements:
io.Pipeandio.MultiWriterin both flight group implementations. (server/middleware/caching/object_flight.go[1]server/middleware/caching/chunk_flight.go[2]pkg/e2e/e2e.gopkg/e2e/e2e.goL91-R108)