From 30458c9a09425cf6f6653f31d54b4fd17ce925be Mon Sep 17 00:00:00 2001 From: Preetam Dwivedi Date: Tue, 2 Jun 2026 20:48:17 -0700 Subject: [PATCH] feat(buildrunner): drop per-call queueName, add Factory, wire per-queue MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary ### Why? `BuildRunner.Trigger` took a `queueName` argument that selected the runner-specific job configuration on every call. That put queue routing on the hot path and on a single verb, leaving `Status` and `Cancel` to rediscover the queue from the build ID. The queue belongs at construction time, not per call. ### What? - Drop `queueName` from `BuildRunner.Trigger`; the verbs speak only in builds and changes. - Add a `Factory` interface (`New(cfg Config) (BuildRunner, error)`) and a `Config` struct carrying `QueueID`. A runner is bound to its Config at construction; backends extend `Config` with their own settings. - noop: add `NewFactory()`; keep `New()`. - Wire the factory end to end: the build and buildsignal controllers hold a `buildrunner.Factory` and build a runner per queue via `New(Config{QueueID: batch.Queue})`. buildsignal loads the batch to resolve the queue (TODO: denormalize queue onto the Build). The example orchestrator wires `buildnoop.NewFactory()`. - Docs: BuildRunner is no longer described as a "singleton" but must stay safe for concurrent use. Updated the RFC (Construction section) and README. Caching of runners per queue is intentionally omitted for now. ## Test Plan ✅ `make mocks`, `make gazelle`, `make tidy`, `make fmt` ✅ `bazel build //...` ✅ `make test` (unit), incl. buildrunner + build + buildsignal --- doc/rfc/build-runner.md | 19 +++++++- example/server/orchestrator/main.go | 15 +++--- extension/buildrunner/README.md | 2 +- extension/buildrunner/build_runner.go | 30 ++++++++++-- extension/buildrunner/mock/BUILD.bazel | 1 + .../buildrunner/mock/build_runner_mock.go | 48 +++++++++++++++++-- extension/buildrunner/noop/noop.go | 15 +++++- extension/buildrunner/noop/noop_test.go | 11 ++++- orchestrator/controller/build/build.go | 20 +++++--- orchestrator/controller/build/build_test.go | 23 ++++++--- .../controller/buildsignal/BUILD.bazel | 1 + .../controller/buildsignal/buildsignal.go | 25 ++++++++-- .../buildsignal/buildsignal_test.go | 19 +++++++- 13 files changed, 189 insertions(+), 40 deletions(-) diff --git a/doc/rfc/build-runner.md b/doc/rfc/build-runner.md index 8cc3c815..3d79a99f 100644 --- a/doc/rfc/build-runner.md +++ b/doc/rfc/build-runner.md @@ -38,12 +38,27 @@ The build stage needs a vendor-agnostic abstraction for talking to a Build Runne `BuildRunner` exposes three verbs, all keyed by a build identifier (`entity.BuildID`): -- **`Trigger`** — submit a build for a queue, given the ordered `base` and `head` change sets plus a free-form metadata map; returns the new build's ID. Runner-side work is asynchronous. +- **`Trigger`** — submit a build given the ordered `base` and `head` change sets plus a free-form metadata map; returns the new build's ID. Runner-side work is asynchronous. - **`Status`** — fetch the current `BuildStatus` and runner-defined metadata for a build; MAY round-trip to the runner. - **`Cancel`** — request cancellation; returns once the request reaches the runner, not once the build stops. See `extension/buildrunner/build_runner.go` for the exact Go signatures. The sections below record why the contract is shaped this way. +### Construction: a Factory, queue bound at build time + +A `BuildRunner` does not take a queue selector on any verb. The queue whose job configuration a runner uses is fixed when the runner is constructed, and runners are constructed by a `Factory`. + +- **`Factory`** — produces `BuildRunner` instances from a `Config`. A controller that drives builds for several queues holds one `Factory` and obtains one `BuildRunner` per queue. +- **`Config`** — the configuration the factory binds in: a `QueueID` selecting the queue whose job definition the runner builds against, plus any backend-specific settings (endpoints, credentials, defaults) a concrete implementation adds. + +Why bind the queue at construction rather than pass it per call: + +- A runner's connection pool, caches, and job defaults are all keyed to one queue's configuration. Passing the queue per call would force every implementation to re-resolve that configuration on the hot path, or to maintain an internal queue→config map the factory already expresses cleanly. +- It keeps the per-call verbs (`Trigger`, `Status`, `Cancel`) free of routing concerns — they speak only in builds and changes. +- It matches the rest of the extension family, whose implementations are bound to their configuration at construction. + +Rejected: a `queueName` argument on `Trigger`. It put routing on the hot path and on a single verb, leaving `Status` and `Cancel` to rediscover the queue from the build ID. Carrying the selection in `Config` keeps each runner bound to a single queue. + ### Trigger: base + head `Trigger` takes two ordered lists of changes and a free-form metadata map: @@ -125,7 +140,7 @@ Rejected: long-polling on `Status`. Not every backend supports efficient server- ### Lifecycle -Implementations are long-lived singletons bound to provider config at construction. Every method is concurrent-safe; connection pools and caches live inside the manager; anything that must survive a restart belongs in persistent storage, not the manager. +Implementations are constructed by a `Factory` and bound to one queue's provider config at construction (see *Construction* above). They may be shared and called concurrently, so every method must be concurrent-safe; connection pools and caches live inside the manager; anything that must survive a restart belongs in persistent storage, not the manager. ### Transient failures diff --git a/example/server/orchestrator/main.go b/example/server/orchestrator/main.go index 48ec4994..88a84829 100644 --- a/example/server/orchestrator/main.go +++ b/example/server/orchestrator/main.go @@ -218,12 +218,13 @@ func run() error { return fmt.Errorf("failed to create pusher: %w", err) } - // Create build runner. The noop runner is the pass-through default - // (every build immediately succeeds) until a real backend is wired in. - br := buildnoop.New() + // Create the build runner factory. The noop factory is the pass-through + // default (every build immediately succeeds) until a real backend is + // wired in; controllers build a runner per queue from it. + runnerFactory := buildnoop.NewFactory() // Register controllers - if err := registerControllers(c, logger.Sugar(), scope, registry, mc, cp, psh, br, cnt, store, changeStore); err != nil { + if err := registerControllers(c, logger.Sugar(), scope, registry, mc, cp, psh, runnerFactory, cnt, store, changeStore); err != nil { return err } @@ -408,7 +409,7 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe // │ │ │ // └────────┴───────────────────────┘ -func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, mc mergechecker.MergeChecker, cp changeprovider.ChangeProvider, psh pusher.Pusher, br buildrunner.BuildRunner, cnt counter.Counter, store storage.Storage, changeStore changestore.ChangeStore) error { +func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, mc mergechecker.MergeChecker, cp changeprovider.ChangeProvider, psh pusher.Pusher, runnerFactory buildrunner.Factory, cnt counter.Counter, store storage.Storage, changeStore changestore.ChangeStore) error { requestController := start.NewController( logger, scope, @@ -494,7 +495,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t logger, scope, store, - br, + runnerFactory, registry, consumer.TopicKeyBuild, "orchestrator-build", @@ -507,7 +508,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t logger, scope, store, - br, + runnerFactory, registry, consumer.TopicKeyBuildSignal, "orchestrator-buildsignal", diff --git a/extension/buildrunner/README.md b/extension/buildrunner/README.md index 6e56ac01..99ad09cb 100644 --- a/extension/buildrunner/README.md +++ b/extension/buildrunner/README.md @@ -6,7 +6,7 @@ See [`doc/rfc/build-runner.md`](../../doc/rfc/build-runner.md) for the contract ## Adding a new backend -1. Create `extension/buildrunner/{backend}/` with a `BuildRunner` implementation bound to its runner configuration at construction. +1. Create `extension/buildrunner/{backend}/` with a `Factory` whose `New` returns a `BuildRunner` bound to one queue's job configuration. The runner verbs carry no queue selector — that selection lives in the `Config` passed to the factory. 2. Map the `base` and `head` change slices onto the backend's build primitives (apply `base`, apply `head`, validate the result). 3. Map the runner's lifecycle states down to the `BuildStatus` values: `Accepted` (accepted for execution), `Running` (executing), and the terminal `Succeeded` / `Failed` / `Cancelled`. 4. Implement internal reconnect / retry so transient failures surface as plain errors without blocking the caller. diff --git a/extension/buildrunner/build_runner.go b/extension/buildrunner/build_runner.go index eb262426..f7f46f6d 100644 --- a/extension/buildrunner/build_runner.go +++ b/extension/buildrunner/build_runner.go @@ -22,10 +22,34 @@ import ( "github.com/uber/submitqueue/entity" ) +// Config carries the configuration a Factory binds into a BuildRunner: the +// queue whose job definition the runner builds against, plus any +// backend-specific settings (endpoints, credentials, defaults) a concrete +// implementation adds. +type Config struct { + // QueueID identifies the queue whose job configuration this runner + // builds against. + QueueID string +} + +// Factory constructs BuildRunner instances from a Config. A BuildRunner is +// bound to its Config at construction; the per-build verbs take no queue +// selector. A controller that serves multiple queues holds a Factory and +// calls New with each batch's queue (see Config.QueueID). +// +// Implementations must be safe for concurrent use by multiple goroutines. +type Factory interface { + // New returns a BuildRunner for cfg, ready to trigger builds. Returns an + // error if a runner cannot be constructed from cfg (e.g. invalid + // configuration or an unreachable backend). + New(cfg Config) (BuildRunner, error) +} + // BuildRunner triggers builds against an external Build Runner, queries -// their status, and cancels them. +// their status, and cancels them. A BuildRunner is bound to its Config at +// construction (see Factory); the verbs below take no queue selector. // -// Implementations are long-lived singletons and must: +// Implementations may be shared and called concurrently, and must: // - make every method safe for concurrent use by multiple goroutines; // - recover from transient connectivity failures internally, returning // plain errors during the recovery window rather than blocking the @@ -55,11 +79,9 @@ type BuildRunner interface { // asynchronously. Callers learn the build's progress via Status, not // via Trigger. // - // queueName selects the runner-specific job configuration. // Returns an error if the request is invalid. Trigger( ctx context.Context, - queueName string, base []entity.Change, head []entity.Change, metadata entity.BuildMetadata, diff --git a/extension/buildrunner/mock/BUILD.bazel b/extension/buildrunner/mock/BUILD.bazel index e24e6c3b..d1ca1e48 100644 --- a/extension/buildrunner/mock/BUILD.bazel +++ b/extension/buildrunner/mock/BUILD.bazel @@ -7,6 +7,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//entity", + "//extension/buildrunner", "@org_uber_go_mock//gomock", ], ) diff --git a/extension/buildrunner/mock/build_runner_mock.go b/extension/buildrunner/mock/build_runner_mock.go index 9fae6097..fa5bc4c3 100644 --- a/extension/buildrunner/mock/build_runner_mock.go +++ b/extension/buildrunner/mock/build_runner_mock.go @@ -14,9 +14,49 @@ import ( reflect "reflect" entity "github.com/uber/submitqueue/entity" + buildrunner "github.com/uber/submitqueue/extension/buildrunner" gomock "go.uber.org/mock/gomock" ) +// MockFactory is a mock of Factory interface. +type MockFactory struct { + ctrl *gomock.Controller + recorder *MockFactoryMockRecorder + isgomock struct{} +} + +// MockFactoryMockRecorder is the mock recorder for MockFactory. +type MockFactoryMockRecorder struct { + mock *MockFactory +} + +// NewMockFactory creates a new mock instance. +func NewMockFactory(ctrl *gomock.Controller) *MockFactory { + mock := &MockFactory{ctrl: ctrl} + mock.recorder = &MockFactoryMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockFactory) EXPECT() *MockFactoryMockRecorder { + return m.recorder +} + +// New mocks base method. +func (m *MockFactory) New(cfg buildrunner.Config) (buildrunner.BuildRunner, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "New", cfg) + ret0, _ := ret[0].(buildrunner.BuildRunner) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// New indicates an expected call of New. +func (mr *MockFactoryMockRecorder) New(cfg any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "New", reflect.TypeOf((*MockFactory)(nil).New), cfg) +} + // MockBuildRunner is a mock of BuildRunner interface. type MockBuildRunner struct { ctrl *gomock.Controller @@ -72,16 +112,16 @@ func (mr *MockBuildRunnerMockRecorder) Status(ctx, buildID any) *gomock.Call { } // Trigger mocks base method. -func (m *MockBuildRunner) Trigger(ctx context.Context, queueName string, base, head []entity.Change, metadata entity.BuildMetadata) (entity.BuildID, error) { +func (m *MockBuildRunner) Trigger(ctx context.Context, base, head []entity.Change, metadata entity.BuildMetadata) (entity.BuildID, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Trigger", ctx, queueName, base, head, metadata) + ret := m.ctrl.Call(m, "Trigger", ctx, base, head, metadata) ret0, _ := ret[0].(entity.BuildID) ret1, _ := ret[1].(error) return ret0, ret1 } // Trigger indicates an expected call of Trigger. -func (mr *MockBuildRunnerMockRecorder) Trigger(ctx, queueName, base, head, metadata any) *gomock.Call { +func (mr *MockBuildRunnerMockRecorder) Trigger(ctx, base, head, metadata any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Trigger", reflect.TypeOf((*MockBuildRunner)(nil).Trigger), ctx, queueName, base, head, metadata) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Trigger", reflect.TypeOf((*MockBuildRunner)(nil).Trigger), ctx, base, head, metadata) } diff --git a/extension/buildrunner/noop/noop.go b/extension/buildrunner/noop/noop.go index 61637434..a919b4d4 100644 --- a/extension/buildrunner/noop/noop.go +++ b/extension/buildrunner/noop/noop.go @@ -34,6 +34,19 @@ type runner struct { counter atomic.Uint64 } +// factory builds no-op runners. It ignores the supplied Config. +type factory struct{} + +// NewFactory returns a buildrunner.Factory that produces no-op runners. +func NewFactory() buildrunner.Factory { + return factory{} +} + +// New returns a no-op buildrunner.BuildRunner. The Config is ignored. +func (factory) New(_ buildrunner.Config) (buildrunner.BuildRunner, error) { + return New(), nil +} + // New returns a buildrunner.BuildRunner that performs no real work. func New() buildrunner.BuildRunner { return &runner{} @@ -41,7 +54,7 @@ func New() buildrunner.BuildRunner { // Trigger returns a unique build ID without contacting any runner. // Inputs are ignored. -func (r *runner) Trigger(_ context.Context, _ string, _ []entity.Change, _ []entity.Change, _ entity.BuildMetadata) (entity.BuildID, error) { +func (r *runner) Trigger(_ context.Context, _ []entity.Change, _ []entity.Change, _ entity.BuildMetadata) (entity.BuildID, error) { return entity.BuildID{ID: fmt.Sprintf("noop-%d", r.counter.Add(1))}, nil } diff --git a/extension/buildrunner/noop/noop_test.go b/extension/buildrunner/noop/noop_test.go index 436725cd..55866759 100644 --- a/extension/buildrunner/noop/noop_test.go +++ b/extension/buildrunner/noop/noop_test.go @@ -28,11 +28,18 @@ func TestNew_ImplementsInterface(t *testing.T) { var _ buildrunner.BuildRunner = New() } +func TestNewFactory_ImplementsInterface(t *testing.T) { + var f buildrunner.Factory = NewFactory() + r, err := f.New(buildrunner.Config{QueueID: "queueA"}) + require.NoError(t, err) + assert.NotNil(t, r) +} + func TestRunner_Trigger(t *testing.T) { r := New() ctx := context.Background() - id1, err := r.Trigger(ctx, "queueA", + id1, err := r.Trigger(ctx, []entity.Change{{URIs: []string{"github://owner/repo/pull/1"}}}, []entity.Change{{URIs: []string{"github://owner/repo/pull/2"}}}, entity.BuildMetadata{"requester": "alice"}, @@ -41,7 +48,7 @@ func TestRunner_Trigger(t *testing.T) { assert.NotEmpty(t, id1.ID) // IDs are unique across calls, even with empty inputs. - id2, err := r.Trigger(ctx, "queueA", nil, nil, nil) + id2, err := r.Trigger(ctx, nil, nil, nil) require.NoError(t, err) assert.NotEqual(t, id1, id2) } diff --git a/orchestrator/controller/build/build.go b/orchestrator/controller/build/build.go index de61374b..993a8c98 100644 --- a/orchestrator/controller/build/build.go +++ b/orchestrator/controller/build/build.go @@ -36,7 +36,7 @@ type Controller struct { logger *zap.SugaredLogger metricsScope tally.Scope store storage.Storage - buildRunner buildrunner.BuildRunner + runnerFactory buildrunner.Factory registry consumer.TopicRegistry topicKey consumer.TopicKey consumerGroup string @@ -50,7 +50,7 @@ func NewController( logger *zap.SugaredLogger, scope tally.Scope, store storage.Storage, - buildRunner buildrunner.BuildRunner, + runnerFactory buildrunner.Factory, registry consumer.TopicRegistry, topicKey consumer.TopicKey, consumerGroup string, @@ -59,7 +59,7 @@ func NewController( logger: logger.Named("build_controller"), metricsScope: scope.SubScope("build_controller"), store: store, - buildRunner: buildRunner, + runnerFactory: runnerFactory, registry: registry, topicKey: topicKey, consumerGroup: consumerGroup, @@ -112,10 +112,16 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r return fmt.Errorf("failed to assemble head changes for batch %s: %w", batch.ID, err) } - // Trigger the build with the configured build manager. metadata is nil - // until a caller-supplied source materializes (e.g. requester / ticket - // pulled off the originating LandRequest). - buildID, err := c.buildRunner.Trigger(ctx, batch.Queue, base, head, nil) + // Resolve the BuildRunner for this batch's queue and trigger the build. + // metadata is nil until a caller-supplied source materializes (e.g. + // requester / ticket pulled off the originating LandRequest). + runner, err := c.runnerFactory.New(buildrunner.Config{QueueID: batch.Queue}) + if err != nil { + metrics.NamedCounter(c.metricsScope, opName, "runner_errors", 1) + return fmt.Errorf("failed to create build runner for queue %s: %w", batch.Queue, err) + } + + buildID, err := runner.Trigger(ctx, base, head, nil) if err != nil { metrics.NamedCounter(c.metricsScope, opName, "trigger_errors", 1) return fmt.Errorf("failed to trigger build for batch %s: %w", batch.ID, err) diff --git a/orchestrator/controller/build/build_test.go b/orchestrator/controller/build/build_test.go index a015907a..3dda274d 100644 --- a/orchestrator/controller/build/build_test.go +++ b/orchestrator/controller/build/build_test.go @@ -36,6 +36,15 @@ import ( "go.uber.org/zap/zaptest" ) +// staticFactory returns the same BuildRunner for every Config. Tests inject +// it so a single mock (or noop) runner satisfies the controller's Factory +// dependency. +type staticFactory struct{ runner buildrunner.BuildRunner } + +func (s staticFactory) New(buildrunner.Config) (buildrunner.BuildRunner, error) { + return s.runner, nil +} + // batchIDPayload serializes a BatchID to JSON bytes for test message payloads. func batchIDPayload(t *testing.T, id string) []byte { payload, err := entity.BatchID{ID: id}.ToBytes() @@ -96,7 +105,7 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock ) require.NoError(t, err) - return NewController(logger, scope, store, br, registry, consumer.TopicKeyBuild, "orchestrator-build") + return NewController(logger, scope, store, staticFactory{br}, registry, consumer.TopicKeyBuild, "orchestrator-build") } func TestNewController(t *testing.T) { @@ -176,7 +185,7 @@ func TestController_Process_TriggersWithBaseAndHead(t *testing.T) { br := buildrunnermock.NewMockBuildRunner(ctrl) wantBase := []entity.Change{depReq.Change} wantHead := []entity.Change{head1.Change, head2.Change} - br.EXPECT().Trigger(gomock.Any(), headBatch.Queue, wantBase, wantHead, gomock.Nil()).Return(entity.BuildID{ID: "build-xyz"}, nil) + br.EXPECT().Trigger(gomock.Any(), wantBase, wantHead, gomock.Nil()).Return(entity.BuildID{ID: "build-xyz"}, nil) var publishedTopic string var published entity.BuildID @@ -197,7 +206,7 @@ func TestController_Process_TriggersWithBaseAndHead(t *testing.T) { ) require.NoError(t, err) - controller := NewController(zaptest.NewLogger(t).Sugar(), tally.NoopScope, store, br, registry, consumer.TopicKeyBuild, "orchestrator-build") + controller := NewController(zaptest.NewLogger(t).Sugar(), tally.NoopScope, store, staticFactory{br}, registry, consumer.TopicKeyBuild, "orchestrator-build") msg := queue.NewMessage(headBatch.ID, batchIDPayload(t, headBatch.ID), headBatch.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) @@ -239,7 +248,7 @@ func TestController_Process_BuildStoreAlreadyExistsIsSwallowed(t *testing.T) { store.EXPECT().GetBuildStore().Return(mockBuildStore).AnyTimes() br := buildrunnermock.NewMockBuildRunner(ctrl) - br.EXPECT().Trigger(gomock.Any(), batch.Queue, gomock.Any(), gomock.Any(), gomock.Any()).Return(entity.BuildID{ID: "build-dup"}, nil) + br.EXPECT().Trigger(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(entity.BuildID{ID: "build-dup"}, nil) publishCalled := false mockPub := queuemock.NewMockPublisher(ctrl) @@ -255,7 +264,7 @@ func TestController_Process_BuildStoreAlreadyExistsIsSwallowed(t *testing.T) { []consumer.TopicConfig{{Key: consumer.TopicKeyBuildSignal, Name: "buildsignal", Queue: mockQ}}, ) require.NoError(t, err) - controller := NewController(zaptest.NewLogger(t).Sugar(), tally.NoopScope, store, br, registry, consumer.TopicKeyBuild, "orchestrator-build") + controller := NewController(zaptest.NewLogger(t).Sugar(), tally.NoopScope, store, staticFactory{br}, registry, consumer.TopicKeyBuild, "orchestrator-build") msg := queue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) @@ -280,14 +289,14 @@ func TestController_Process_TriggerFailure(t *testing.T) { // No build store expectation: Trigger failure must short-circuit before Create. br := buildrunnermock.NewMockBuildRunner(ctrl) - br.EXPECT().Trigger(gomock.Any(), batch.Queue, gomock.Any(), gomock.Any(), gomock.Any()). + br.EXPECT().Trigger(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). Return(entity.BuildID{}, fmt.Errorf("provider down")) registry, err := consumer.NewTopicRegistry( []consumer.TopicConfig{{Key: consumer.TopicKeyBuildSignal, Name: "buildsignal", Queue: queuemock.NewMockQueue(ctrl)}}, ) require.NoError(t, err) - controller := NewController(zaptest.NewLogger(t).Sugar(), tally.NoopScope, store, br, registry, consumer.TopicKeyBuild, "orchestrator-build") + controller := NewController(zaptest.NewLogger(t).Sugar(), tally.NoopScope, store, staticFactory{br}, registry, consumer.TopicKeyBuild, "orchestrator-build") msg := queue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) diff --git a/orchestrator/controller/buildsignal/BUILD.bazel b/orchestrator/controller/buildsignal/BUILD.bazel index 4c8d48e8..f2690ed3 100644 --- a/orchestrator/controller/buildsignal/BUILD.bazel +++ b/orchestrator/controller/buildsignal/BUILD.bazel @@ -27,6 +27,7 @@ go_test( "//core/errs", "//entity", "//entity/queue", + "//extension/buildrunner", "//extension/buildrunner/mock", "//extension/queue/mock", "//extension/storage/mock", diff --git a/orchestrator/controller/buildsignal/buildsignal.go b/orchestrator/controller/buildsignal/buildsignal.go index 05ac22de..276078b6 100644 --- a/orchestrator/controller/buildsignal/buildsignal.go +++ b/orchestrator/controller/buildsignal/buildsignal.go @@ -58,7 +58,7 @@ type Controller struct { logger *zap.SugaredLogger metricsScope tally.Scope store storage.Storage - buildRunner buildrunner.BuildRunner + runnerFactory buildrunner.Factory registry consumer.TopicRegistry topicKey consumer.TopicKey consumerGroup string @@ -72,7 +72,7 @@ func NewController( logger *zap.SugaredLogger, scope tally.Scope, store storage.Storage, - buildRunner buildrunner.BuildRunner, + runnerFactory buildrunner.Factory, registry consumer.TopicRegistry, topicKey consumer.TopicKey, consumerGroup string, @@ -81,7 +81,7 @@ func NewController( logger: logger.Named("buildsignal_controller"), metricsScope: scope.SubScope("buildsignal_controller"), store: store, - buildRunner: buildRunner, + runnerFactory: runnerFactory, registry: registry, topicKey: topicKey, consumerGroup: consumerGroup, @@ -132,7 +132,24 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r "partition_key", msg.PartitionKey, ) - status, _, err := c.buildRunner.Status(ctx, buildID) + // Resolve the BuildRunner for this build's queue. The queue lives on the + // Batch, not the Build, so load the Batch to look it up. + // + // TODO: denormalize the queue onto the Build (or carry it on the message) + // so the poll loop does not read the Batch on every poll. + batch, err := c.store.GetBatchStore().Get(ctx, build.BatchID) + if err != nil { + metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1) + return fmt.Errorf("failed to get batch %s for build %s: %w", build.BatchID, build.ID, err) + } + + runner, err := c.runnerFactory.New(buildrunner.Config{QueueID: batch.Queue}) + if err != nil { + metrics.NamedCounter(c.metricsScope, opName, "runner_errors", 1) + return fmt.Errorf("failed to create build runner for queue %s: %w", batch.Queue, err) + } + + status, _, err := runner.Status(ctx, buildID) if err != nil { metrics.NamedCounter(c.metricsScope, opName, "status_errors", 1) return fmt.Errorf("failed to get status for build %s: %w", buildID.ID, err) diff --git a/orchestrator/controller/buildsignal/buildsignal_test.go b/orchestrator/controller/buildsignal/buildsignal_test.go index 08498120..69e04631 100644 --- a/orchestrator/controller/buildsignal/buildsignal_test.go +++ b/orchestrator/controller/buildsignal/buildsignal_test.go @@ -26,6 +26,7 @@ import ( "github.com/uber/submitqueue/core/errs" "github.com/uber/submitqueue/entity" entityqueue "github.com/uber/submitqueue/entity/queue" + "github.com/uber/submitqueue/extension/buildrunner" buildrunnermock "github.com/uber/submitqueue/extension/buildrunner/mock" queuemock "github.com/uber/submitqueue/extension/queue/mock" storagemock "github.com/uber/submitqueue/extension/storage/mock" @@ -40,10 +41,19 @@ type testHarness struct { controller *Controller br *buildrunnermock.MockBuildRunner buildStore *storagemock.MockBuildStore + batchStore *storagemock.MockBatchStore signalPub *queuemock.MockPublisher speculatePub *queuemock.MockPublisher } +// staticFactory returns the same BuildRunner for every Config, so a single +// mock runner satisfies the controller's Factory dependency. +type staticFactory struct{ runner buildrunner.BuildRunner } + +func (s staticFactory) New(buildrunner.Config) (buildrunner.BuildRunner, error) { + return s.runner, nil +} + func newTestHarness(t *testing.T, ctrl *gomock.Controller) *testHarness { br := buildrunnermock.NewMockBuildRunner(ctrl) @@ -62,14 +72,20 @@ func newTestHarness(t *testing.T, ctrl *gomock.Controller) *testHarness { require.NoError(t, err) buildStore := storagemock.NewMockBuildStore(ctrl) + // The poll loop loads the Batch to resolve the build's queue. Default to a + // batch with a queue; tests that exercise this read override it. + batchStore := storagemock.NewMockBatchStore(ctrl) + batchStore.EXPECT().Get(gomock.Any(), gomock.Any()). + Return(entity.Batch{ID: "batch-1", Queue: "test-queue"}, nil).AnyTimes() store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBuildStore().Return(buildStore).AnyTimes() + store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() c := NewController( zaptest.NewLogger(t).Sugar(), tally.NoopScope, store, - br, + staticFactory{br}, registry, consumer.TopicKeyBuildSignal, "orchestrator-buildsignal", @@ -78,6 +94,7 @@ func newTestHarness(t *testing.T, ctrl *gomock.Controller) *testHarness { controller: c, br: br, buildStore: buildStore, + batchStore: batchStore, signalPub: signalPub, speculatePub: speculatePub, }