Skip to content

feat: implement collapsed forwarding for object and chunk requests#56

Open
sendya wants to merge 5 commits into
mainfrom
feat/collapsed_forwarding_chunk
Open

feat: implement collapsed forwarding for object and chunk requests#56
sendya wants to merge 5 commits into
mainfrom
feat/collapsed_forwarding_chunk

Conversation

@sendya

@sendya sendya commented May 14, 2026

Copy link
Copy Markdown
Member

This pull request introduces advanced collapsed forwarding for the caching middleware, enabling concurrent requests for the same cache object or chunk to share a single upstream fetch. This mirrors Squid's collapsed_forwarding feature and improves efficiency under high concurrency. The implementation adds both object-level and chunk-level collapsed forwarding, refactors cache HIT handling, and includes comprehensive tests for these new mechanisms.

Collapsed Forwarding Implementation:

  • Introduced ObjectFlightGroup and ChunkFlightGroup to collapse concurrent full-object and chunk (byte-range) upstream requests, ensuring only one origin fetch per unique cache key or chunk, with all callers receiving a shared response. (server/middleware/caching/object_flight.go [1] server/middleware/caching/chunk_flight.go [2]
  • Wired up ObjectFlightGroup and ChunkFlightGroup into the caching middleware, enabling object-level collapsed forwarding for full cache misses and chunk-level collapsed forwarding for partial (range) requests. (server/middleware/caching/caching.go [1] [2] [3] [4] server/middleware/caching/internal.go [5]

Cache HIT Handling Refactor:

  • Refactored cache HIT logic into a new respondFromCache method, improving clarity and separation of concerns for serving cached responses and handling range requests. (server/middleware/caching/caching.go [1] [2]

Testing:

  • Added comprehensive tests for both ObjectFlightGroup and ChunkFlightGroup, covering basic collapse, error propagation, key isolation, and high concurrency scenarios to ensure correctness and robustness. (server/middleware/caching/collapsed_forwarding_test.go server/middleware/caching/collapsed_forwarding_test.goR1-R394)

Other Improvements:

  • Ensured thread safety and correct fan-out of response bodies to all waiting callers using io.Pipe and io.MultiWriter in both flight group implementations. (server/middleware/caching/object_flight.go [1] server/middleware/caching/chunk_flight.go [2]
  • Updated the E2E test helper to clone and rewrite requests safely, preventing mutation of the original request object. (pkg/e2e/e2e.go pkg/e2e/e2e.goL91-R108)

Copilot AI review requested due to automatic review settings May 14, 2026 08:12

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds collapsed forwarding to the caching middleware so concurrent cache object and byte-range chunk requests can share upstream fetches instead of independently hitting origin.

Changes:

  • Adds object-level and chunk-level flight groups using io.Pipe/io.MultiWriter.
  • Wires flight groups into cache MISS and partial HIT/MISS request paths.
  • Adds unit and E2E coverage for collapsed forwarding behavior and updates E2E request cloning.

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 14 comments.

Show a summary per file
File Description
server/middleware/caching/object_flight.go Adds object-level collapsed forwarding implementation.
server/middleware/caching/chunk_flight.go Adds chunk/range-level collapsed forwarding implementation.
server/middleware/caching/caching.go Integrates flight groups and refactors cache HIT response handling.
server/middleware/caching/internal.go Adds chunk flight group reference to Caching.
server/middleware/caching/collapsed_forwarding_test.go Adds unit tests for flight group behavior.
tests/all-features/caching/collapsed_forwarding_test.go Adds E2E tests for object and chunk collapsed forwarding.
pkg/e2e/e2e.go Clones requests before per-call rewrites in E2E helper.
Comments suppressed due to low confidence (3)

server/middleware/caching/caching.go:167

  • This wraps caching.doProxy in a new object flight while doProxy still calls proxyClient.Do with c.opt.CollapsedRequest enabled. With the default config, a fast MISS now waits once in ObjectFlightGroup and again in the proxy singleflight, adding up to roughly two collapsed-request wait windows to every miss and duplicating the collapse layer. The inner proxy collapse should be disabled or the wait should not be applied twice.
				flightResp, _, flightErr := objectFlight.Do(caching.id.HashStr(), opts.CollapsedRequestWaitTimeout.AsDuration(), func() (*http.Response, error) {
					r, e := caching.doProxy(req, false)

tests/all-features/caching/collapsed_forwarding_test.go:146

  • This require.NoError runs inside a worker goroutine. Since require uses FailNow, testify requires it to be called from the test goroutine; collect the error and assert after wg.Wait to avoid unreliable concurrent-test failures.
				require.NoError(t, err)

tests/all-features/caching/collapsed_forwarding_test.go:206

  • This assertion is executed in a goroutine; require.NoError may call FailNow, which is only safe from the test goroutine. Capture the per-caller error and assert after all goroutines complete so failures are reported deterministically.
				require.NoError(t, err1, "caller %d: request should not error", idx)

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +80 to +81
// fn owns resp.Body on error — it must close it before
// returning. We only guard against a nil body here.
// concurrent requests for the same cache object share one
// origin fetch (Squid-style collapsed_forwarding).
if opts.CollapsedRequest {
flightResp, _, flightErr := objectFlight.Do(caching.id.HashStr(), opts.CollapsedRequestWaitTimeout.AsDuration(), func() (*http.Response, error) {
Comment on lines +68 to +73
g.mu.Lock()
// Snapshot pipes and remove the key so no further callers
// register against this flight.
pipes := make([]*io.PipeWriter, len(c.pipes))
copy(pipes, c.pipes)
delete(g.m, key)
Comment on lines +87 to +89
g.mu.Lock()
delete(g.m, key)

Comment on lines +121 to +126
mw := io.MultiWriter(writers...)

var copyErr error
if resp.Body != nil {
_, copyErr = io.Copy(mw, resp.Body)
_ = resp.Body.Close()
Comment on lines +47 to +49
// resp — a response carrying the leader's headers and a shared body
// shared — true if this caller joined an existing flight
// err — error from fn or from body copy
Comment on lines +101 to +113
// Sequential requests should not be collapsed.
for i := 0; i < N; i++ {
resp, err := case1.Do(func(r *http.Request) {
r.Header.Set("X-Request-Idx", strconv.Itoa(i))
})

require.NoError(t, err, "request %d should not error", i)

bodies[i] = e2e.HashBody(resp)
}

assert.Equal(t, int32(1), originCallCount.Load(),
"object flight should not collapse sequential requests")
rng, err := xhttp.SingleRange(req.Header.Get("Range"), c.md.Size)
if err != nil {
headers := make(http.Header)
xhttp.CopyHeader(c.md.Headers, headers)
Comment on lines +66 to +68
// Verify only one origin call — ObjectFlightGroup collapsed all 5.
assert.Equal(t, int32(1), originCallCount.Load(),
"object flight should collapse concurrent full-MISS requests")
Comment on lines +234 to +236
// The concurrent chunk fetch for the missing range must be collapsed.
assert.Equal(t, int32(1), originCallCount.Load(),
"chunk flight should collapse concurrent chunk fetches to 1 origin call")
@sendya sendya force-pushed the feat/collapsed_forwarding_chunk branch from 2be63eb to b833273 Compare June 10, 2026 03:33
sendya and others added 2 commits June 10, 2026 11:53
…goroutine leaks

- Wrap fn() with defer/recover in ObjectFlightGroup.Do and ChunkFlightGroup.Do
  to convert panics to errors, ensuring pipes are properly closed and map
  entries are cleaned up
- Without this, a panic in fn() would leave waiters hanging indefinitely:
  pipes never closed, goroutines blocked forever on pipe reads
- Add TestObjectFlight_PanicRecovery and TestChunkFlight_PanicRecovery tests

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@sendya sendya force-pushed the feat/collapsed_forwarding_chunk branch from b833273 to e7a2694 Compare June 10, 2026 03:53
sendya and others added 3 commits June 11, 2026 20:15
- Fix channel buffer size to prevent goroutine leaks in tests
- Move originCallCount to per-subtest scope for isolation
- Add response body hash verification in chunk flight tests
- Track X-Cache, Content-Range, Content-Length in assertions
- Add PURGE cleanup after each test group
- Add e2e.PurgeMethod with directory purge support
- Fix response body close leaks
- Add debug logging for test diagnostics
- Remove unused watchdog plugin from test config

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Change ChunkFlightGroup from range-prefixed keys
("<hash>:<fromByte>-<toByte>") to object-level keys ("<hash>")
with automatic range union. When concurrent requests for the
same object need different byte ranges, the leader computes
the union of all registered ranges and fetches a single range
that covers everyone. Each caller then trims the shared
response stream to their specific sub-range via iobuf.RangeReader.

Key changes:
- chunk_flight.go: new chunkRange type, extended chunkCall with
  union tracking, WaitGroup-based union signaling, RangeReader
  wrapping for all callers, async fn execution (preserves
  header-building timing)
- caching.go: getUpstreamReader uses object-level key, fn
  parameterized by union range
- tests: new RangeUnion unit test (3 different ranges → 1 call),
  updated KeyIsolation for object-level collapsing, new E2E
  RangeUnion test with body hash verification

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
- Fix critical race in ChunkFlightGroup: move delete(g.m, objectKey)
  before c.wg.Done() to prevent late callers from joining a flight
  with stale union range (data corruption / goroutine leak)

- Fix missing CachePartHit for chunk flight leader path: set
  cacheStatus unconditionally in getUpstreamReader so both leader
  and shared callers correctly report partial hit

- Fix c.err data race in ChunkFlightGroup: protect reads and writes
  with c.mu to silence Go race detector

- Restore three dead Prometheus metrics (cacheRequestTotal,
  cacheChunkWriteTotal, cacheFlushFailedTotal) that were silently
  dropped during the refactor

- Fix PurgeMethod to always send explicit Purge-Type header
  (file,hard) for non-dir purges

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants