Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions example/submitqueue/gateway/server/queues.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,3 @@ queues:
- name: test-queue
- name: e2e-test-queue
- name: e2e-cancel-queue
# Routes to an analyzer that always errors (conflictfake.FailAlways) so e2e can
# exercise the conflict-analysis error path. See newQueueRegistry in the
# orchestrator example server.
- name: e2e-conflict-error-queue
21 changes: 7 additions & 14 deletions example/submitqueue/orchestrator/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -779,10 +779,9 @@ func newQueueRegistry(logger *zap.Logger, scope tally.Scope) (queueRegistry, err
//
// The scorer is wrapped by scorerfake so a change URI carrying
// "sq-fake=score-error" forces a scoring error end-to-end; it is a pure
// passthrough otherwise. The analyzer is wrapped by conflictfake with a nil
// predicate (passthrough) — swap the predicate (e.g. conflictfake.FailAlways)
// on a queue to exercise the analyzer error path, as e2e-conflict-error-queue
// below does.
// passthrough otherwise. The analyzer is wrapped by conflictfake so a change
// URI carrying "sq-fake=conflict-error" forces a conflict-analysis error;
// passthrough otherwise.
base := queueExtensions{
mergeChecker: mc,
changeProvider: cp,
Expand All @@ -794,7 +793,7 @@ func newQueueRegistry(logger *zap.Logger, scope tally.Scope) (queueRegistry, err
)),
// TODO: replace the delegate with a real analyzer (e.g. Tango target
// analysis). "all" serializes the queue conservatively.
analyzer: conflictfake.New(all.New(), nil),
analyzer: conflictfake.New(all.New()),
}

// test-queue: bucketed heuristic scorer; conservative (serialized) conflicts
Expand All @@ -812,7 +811,7 @@ func newQueueRegistry(logger *zap.Logger, scope tally.Scope) (queueRegistry, err

// e2e-test-queue: composite scorer; no conflicts (maximum parallelism).
e2eQueue := base
e2eQueue.analyzer = conflictfake.New(none.New(), nil)
e2eQueue.analyzer = conflictfake.New(none.New())
e2eQueue.scorer = scorerfake.New(composite.New(
map[string]scorer.Scorer{
"size": heuristic.New([]heuristic.Bucket{{Min: 0, Max: 1<<31 - 1, Score: 0.8}}, batchLines, scope),
Expand All @@ -821,17 +820,11 @@ func newQueueRegistry(logger *zap.Logger, scope tally.Scope) (queueRegistry, err
composite.Avg, scope.SubScope("scorer.e2e-test-queue"),
))

// e2e-conflict-error-queue: every conflict analysis fails, exercising the
// analyzer error path. Scorer/edge integrations inherit the baseline.
conflictErrQueue := base
conflictErrQueue.analyzer = conflictfake.New(all.New(), conflictfake.FailAlways)

return queueRegistry{
def: base,
byQueue: map[string]queueExtensions{
"test-queue": testQueue,
"e2e-test-queue": e2eQueue,
"e2e-conflict-error-queue": conflictErrQueue,
"test-queue": testQueue,
"e2e-test-queue": e2eQueue,
},
}, nil
}
25 changes: 25 additions & 0 deletions submitqueue/core/batchchanges/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
load("@rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "batchchanges",
srcs = ["batchchanges.go"],
importpath = "github.com/uber/submitqueue/submitqueue/core/batchchanges",
visibility = ["//visibility:public"],
deps = [
"//submitqueue/entity",
"//submitqueue/extension/storage",
],
)

go_test(
name = "batchchanges_test",
srcs = ["batchchanges_test.go"],
embed = [":batchchanges"],
deps = [
"//submitqueue/entity",
"//submitqueue/extension/storage/mock",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_uber_go_mock//gomock",
],
)
57 changes: 57 additions & 0 deletions submitqueue/core/batchchanges/batchchanges.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright (c) 2025 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package batchchanges assembles the normalized, batch-level view of a batch's
// changes (entity.BatchChanges) from storage. A Batch references only request
// IDs, so resolving the actual change facts requires reading each request and
// its per-URI change records. Centralizing this here keeps that storage
// traversal out of the extensions (scorer, conflict analyzer), which consume
// entity.BatchChanges and must not touch storage themselves.
package batchchanges

import (
"context"
"fmt"

"github.com/uber/submitqueue/submitqueue/entity"
"github.com/uber/submitqueue/submitqueue/extension/storage"
)

// Collect assembles the normalized entity.BatchChanges for a batch by resolving
// each request and reading its change records per URI. For each URI it selects
// the record owned by the request (GetByURI returns rows for every request that
// ever claimed the URI) and appends its URI + provider-supplied details.
func Collect(ctx context.Context, store storage.Storage, batch entity.Batch) (entity.BatchChanges, error) {
changes := entity.BatchChanges{BatchID: batch.ID, Queue: batch.Queue}
for _, requestID := range batch.Contains {
request, err := store.GetRequestStore().Get(ctx, requestID)
if err != nil {
return entity.BatchChanges{}, fmt.Errorf("failed to get request %s: %w", requestID, err)
}
for _, uri := range request.Change.URIs {
records, err := store.GetChangeStore().GetByURI(ctx, batch.Queue, uri)
if err != nil {
return entity.BatchChanges{}, fmt.Errorf("failed to read change record for request %s uri=%s: %w", requestID, uri, err)
}
for _, rec := range records {
if rec.RequestID != requestID {
continue
}
changes.Changes = append(changes.Changes, entity.ChangeInfo{URI: rec.URI, Details: rec.Details})
break
}
}
}
return changes, nil
}
61 changes: 61 additions & 0 deletions submitqueue/core/batchchanges/batchchanges_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright (c) 2025 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package batchchanges

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber/submitqueue/submitqueue/entity"
storagemock "github.com/uber/submitqueue/submitqueue/extension/storage/mock"
"go.uber.org/mock/gomock"
)

func TestCollect(t *testing.T) {
ctrl := gomock.NewController(t)

batch := entity.Batch{ID: "q/batch/1", Queue: "q", Contains: []string{"q/1", "q/2"}}
req1 := entity.Request{ID: "q/1", Change: entity.Change{URIs: []string{"github://o/r/pull/1/a"}}}
req2 := entity.Request{ID: "q/2", Change: entity.Change{URIs: []string{"github://o/r/pull/2/b"}}}
rec1 := entity.ChangeRecord{Queue: "q", URI: "github://o/r/pull/1/a", RequestID: "q/1",
Details: entity.ChangeDetails{ChangedFiles: []entity.ChangedFile{{Path: "f1", LinesAdded: 3}}}}
// A stray record for the same URI owned by a different request must be skipped.
recOther := entity.ChangeRecord{Queue: "q", URI: "github://o/r/pull/1/a", RequestID: "q/999"}
rec2 := entity.ChangeRecord{Queue: "q", URI: "github://o/r/pull/2/b", RequestID: "q/2",
Details: entity.ChangeDetails{ChangedFiles: []entity.ChangedFile{{Path: "f2", LinesAdded: 5}}}}

reqStore := storagemock.NewMockRequestStore(ctrl)
reqStore.EXPECT().Get(gomock.Any(), "q/1").Return(req1, nil)
reqStore.EXPECT().Get(gomock.Any(), "q/2").Return(req2, nil)

changeStore := storagemock.NewMockChangeStore(ctrl)
changeStore.EXPECT().GetByURI(gomock.Any(), "q", "github://o/r/pull/1/a").Return([]entity.ChangeRecord{recOther, rec1}, nil)
changeStore.EXPECT().GetByURI(gomock.Any(), "q", "github://o/r/pull/2/b").Return([]entity.ChangeRecord{rec2}, nil)

store := storagemock.NewMockStorage(ctrl)
store.EXPECT().GetRequestStore().Return(reqStore).AnyTimes()
store.EXPECT().GetChangeStore().Return(changeStore).AnyTimes()

got, err := Collect(context.Background(), store, batch)
require.NoError(t, err)
assert.Equal(t, "q/batch/1", got.BatchID)
assert.Equal(t, "q", got.Queue)
require.Len(t, got.Changes, 2)
assert.Equal(t, "github://o/r/pull/1/a", got.Changes[0].URI)
assert.Equal(t, "github://o/r/pull/2/b", got.Changes[1].URI)
assert.Equal(t, 8, got.TotalLinesChanged())
}
40 changes: 24 additions & 16 deletions submitqueue/extension/conflict/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,29 @@ and the batches already in flight.

## Interface

`Analyzer` exposes a single `Analyze` method that takes the candidate batch
and the list of in-flight batches it might conflict with. It returns the
subset of in-flight batches that conflict with the candidate, each paired
with a `ConflictType` describing the kind of conflict. An empty result means
`Analyzer` exposes a single `Analyze` method that takes the candidate batch's
changes and the list of in-flight batches' changes it might conflict with.
Inputs are `entity.BatchChanges` — each batch's flattened change facts (URIs +
provider details), assembled by the caller — so the analyzer sees the actual
changes rather than just batch IDs, and can reason about changed targets/files
without reading storage itself. It returns the subset of in-flight batches that
conflict with the candidate, each paired with a `ConflictType` describing the
kind of conflict (referenced by `BatchChanges.BatchID`). An empty result means
the candidate is free to advance independently.

Callers are responsible for filtering out the candidate itself and any
terminal batches from the in-flight list before invoking the analyzer. The
analyzer itself stays free of lifecycle knowledge. A non-nil error reports
an infrastructure failure of the analysis and should be treated as
retryable by the caller.
Callers are responsible for assembling `entity.BatchChanges` (see
`submitqueue/core/batchchanges`), and for filtering out the candidate itself and
any terminal batches from the in-flight list before invoking the analyzer. The
analyzer itself stays free of lifecycle knowledge. A non-nil error reports an
infrastructure failure of the analysis and should be treated as retryable by the
caller.

The analyzer is intentionally pure with respect to batch state: it does not
mutate inputs, does not read storage, and may be called concurrently. Real
implementations are expected to resolve the batch contents (e.g. changed
build targets, modified files) via whichever upstream system they depend
on, and to return as much classification detail as that system supports.
The analyzer is intentionally pure with respect to storage: it does not mutate
inputs, does not read storage, and may be called concurrently. A real
implementation (e.g. one backed by uber/tango) derives changed build
targets/edges from the change set — `Changes[].URI` carries the repo + head
commit SHA, and the target branch is injected per queue at construction — and
returns as much classification detail as that system supports.

## Implementations

Expand All @@ -35,8 +41,10 @@ on, and to return as much classification detail as that system supports.
## Adding a new backend

1. Create `extension/conflict/{backend}/` with an `Analyzer` implementation.
2. Resolve each `entity.Batch` into whatever signal the backend needs
(e.g. changed build targets, files touched, dependency graphs).
2. Derive whatever signal the backend needs from each `entity.BatchChanges`
(e.g. changed build targets, files touched, dependency graphs) — the change
URIs and provider details are in hand; resolve the rest via your upstream
system.
3. Emit one `Conflict` per (in-flight batch, detected conflict type). Pick
the most specific `ConflictType` your backend can determine; use
`ConflictTypeConservative` only when the backend cannot prove the absence
Expand Down
4 changes: 2 additions & 2 deletions submitqueue/extension/conflict/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ func New() conflict.Analyzer {

// Analyze returns one ConflictTypeConservative Conflict per in-flight batch,
// preserving the input order. Returns an empty slice when inFlight is empty.
func (analyzer) Analyze(_ context.Context, _ entity.Batch, inFlight []entity.Batch) ([]conflict.Conflict, error) {
func (analyzer) Analyze(_ context.Context, _ entity.BatchChanges, inFlight []entity.BatchChanges) ([]conflict.Conflict, error) {
if len(inFlight) == 0 {
return nil, nil
}
conflicts := make([]conflict.Conflict, len(inFlight))
for i, b := range inFlight {
conflicts[i] = conflict.Conflict{
BatchID: b.ID,
BatchID: b.BatchID,
Type: conflict.ConflictTypeConservative,
}
}
Expand Down
16 changes: 8 additions & 8 deletions submitqueue/extension/conflict/all/all_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ import (
)

func TestAnalyze(t *testing.T) {
batch := entity.Batch{ID: "queueA/batch/10"}
candidate := entity.BatchChanges{BatchID: "queueA/batch/10"}

tests := []struct {
name string
inFlight []entity.Batch
inFlight []entity.BatchChanges
want []conflict.Conflict
}{
{
Expand All @@ -39,15 +39,15 @@ func TestAnalyze(t *testing.T) {
},
{
name: "empty in-flight slice yields no conflicts",
inFlight: []entity.Batch{},
inFlight: []entity.BatchChanges{},
want: nil,
},
{
name: "every in-flight batch is reported in input order",
inFlight: []entity.Batch{
{ID: "queueA/batch/1"},
{ID: "queueA/batch/2"},
{ID: "queueA/batch/3"},
inFlight: []entity.BatchChanges{
{BatchID: "queueA/batch/1"},
{BatchID: "queueA/batch/2"},
{BatchID: "queueA/batch/3"},
},
want: []conflict.Conflict{
{BatchID: "queueA/batch/1", Type: conflict.ConflictTypeConservative},
Expand All @@ -60,7 +60,7 @@ func TestAnalyze(t *testing.T) {
a := New()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := a.Analyze(context.Background(), batch, tt.inFlight)
got, err := a.Analyze(context.Background(), candidate, tt.inFlight)
require.NoError(t, err)
assert.Equal(t, tt.want, got)
})
Expand Down
22 changes: 14 additions & 8 deletions submitqueue/extension/conflict/conflict.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,21 @@ type Conflict struct {
// already in flight, so the speculation layer can decide which batches can
// safely advance in parallel.
type Analyzer interface {
// Analyze returns the subset of inFlight batches that conflict with
// batch, each paired with the type of conflict detected. An empty
// result means batch does not conflict with any in-flight batch.
// Analyze returns the subset of inFlight batches that conflict with the
// candidate, each paired with the type of conflict detected. An empty
// result means the candidate does not conflict with any in-flight batch.
//
// Callers should not include batch itself in inFlight; terminal batches
// should be filtered out before calling. A non-nil error indicates the
// analysis itself failed (infrastructure issue) and should be treated as
// retryable by the caller.
Analyze(ctx context.Context, batch entity.Batch, inFlight []entity.Batch) ([]Conflict, error)
// Inputs are entity.BatchChanges — each batch's flattened change facts
// (URIs + provider details), assembled by the caller — so the analyzer
// sees the actual changes rather than just batch IDs, and can reason about
// changed targets/files without touching storage. A conflict references the
// in-flight batch by its BatchChanges.BatchID.
//
// Callers should not include the candidate itself in inFlight; terminal
// batches should be filtered out before calling. A non-nil error indicates
// the analysis itself failed (infrastructure issue) and should be treated
// as retryable by the caller.
Analyze(ctx context.Context, candidate entity.BatchChanges, inFlight []entity.BatchChanges) ([]Conflict, error)
}

// Config carries the per-queue identity handed to a Factory. The system knows
Expand Down
1 change: 1 addition & 0 deletions submitqueue/extension/conflict/fake/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
importpath = "github.com/uber/submitqueue/submitqueue/extension/conflict/fake",
visibility = ["//visibility:public"],
deps = [
"//submitqueue/core/fakemarker",
"//submitqueue/entity",
"//submitqueue/extension/conflict",
],
Expand Down
Loading
Loading