diff --git a/example/server/orchestrator/BUILD.bazel b/example/server/orchestrator/BUILD.bazel index 12cf41bc..8f9befb1 100644 --- a/example/server/orchestrator/BUILD.bazel +++ b/example/server/orchestrator/BUILD.bazel @@ -14,6 +14,8 @@ go_library( "//core/consumer", "//core/httpclient", "//entity", + "//extension/build", + "//extension/build/noop", "//extension/changeprovider", "//extension/changeprovider/github", "//extension/changestore", diff --git a/example/server/orchestrator/main.go b/example/server/orchestrator/main.go index d903741e..8793a3bb 100644 --- a/example/server/orchestrator/main.go +++ b/example/server/orchestrator/main.go @@ -33,6 +33,8 @@ import ( "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/core/httpclient" "github.com/uber/submitqueue/entity" + buildext "github.com/uber/submitqueue/extension/build" + buildnoop "github.com/uber/submitqueue/extension/build/noop" "github.com/uber/submitqueue/extension/changeprovider" githubprovider "github.com/uber/submitqueue/extension/changeprovider/github" "github.com/uber/submitqueue/extension/changestore" @@ -216,8 +218,12 @@ func run() error { return fmt.Errorf("failed to create pusher: %w", err) } + // Create build manager. The noop manager is the pass-through default + // (every build immediately succeeds) until a real provider is wired in. + bm := buildnoop.New() + // Register controllers - if err := registerControllers(c, logger.Sugar(), scope, registry, mc, cp, psh, cnt, store, changeStore); err != nil { + if err := registerControllers(c, logger.Sugar(), scope, registry, mc, cp, psh, bm, cnt, store, changeStore); err != nil { return err } @@ -402,7 +408,7 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe // │ │ │ // └────────┴────────────────────────┘ -func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, mc mergechecker.MergeChecker, cp changeprovider.ChangeProvider, psh pusher.Pusher, cnt counter.Counter, store storage.Storage, changeStore changestore.ChangeStore) error { +func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, mc mergechecker.MergeChecker, cp changeprovider.ChangeProvider, psh pusher.Pusher, bm buildext.BuildManager, cnt counter.Counter, store storage.Storage, changeStore changestore.ChangeStore) error { requestController := start.NewController( logger, scope, @@ -488,6 +494,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t logger, scope, store, + bm, registry, consumer.TopicKeyBuild, "orchestrator-build", diff --git a/orchestrator/controller/build/BUILD.bazel b/orchestrator/controller/build/BUILD.bazel index 50c0f3e4..57145a7c 100644 --- a/orchestrator/controller/build/BUILD.bazel +++ b/orchestrator/controller/build/BUILD.bazel @@ -10,6 +10,7 @@ go_library( "//core/metrics", "//entity", "//entity/queue", + "//extension/build", "//extension/storage", "@com_github_uber_go_tally_v4//:tally", "@org_uber_go_zap//:zap", @@ -25,6 +26,9 @@ go_test( "//core/errs", "//entity", "//entity/queue", + "//extension/build", + "//extension/build/mock", + "//extension/build/noop", "//extension/queue/mock", "//extension/storage/mock", "@com_github_stretchr_testify//assert", diff --git a/orchestrator/controller/build/build.go b/orchestrator/controller/build/build.go index 0a450872..ec04094f 100644 --- a/orchestrator/controller/build/build.go +++ b/orchestrator/controller/build/build.go @@ -23,6 +23,7 @@ import ( "github.com/uber/submitqueue/core/metrics" "github.com/uber/submitqueue/entity" entityqueue "github.com/uber/submitqueue/entity/queue" + "github.com/uber/submitqueue/extension/build" "github.com/uber/submitqueue/extension/storage" "go.uber.org/zap" ) @@ -34,6 +35,7 @@ type Controller struct { logger *zap.SugaredLogger metricsScope tally.Scope store storage.Storage + buildManager build.BuildManager registry consumer.TopicRegistry topicKey consumer.TopicKey consumerGroup string @@ -47,6 +49,7 @@ func NewController( logger *zap.SugaredLogger, scope tally.Scope, store storage.Storage, + buildManager build.BuildManager, registry consumer.TopicRegistry, topicKey consumer.TopicKey, consumerGroup string, @@ -55,6 +58,7 @@ func NewController( logger: logger.Named("build_controller"), metricsScope: scope.SubScope("build_controller"), store: store, + buildManager: buildManager, registry: registry, topicKey: topicKey, consumerGroup: consumerGroup, @@ -95,14 +99,31 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r "partition_key", msg.PartitionKey, ) - // TODO: Add build logic - // - Trigger CI build - // - Track build status + // Assemble the changes to build from the batch's requests. + changes := make([]entity.BuildChange, 0, len(batch.Contains)) + for _, reqID := range batch.Contains { + req, err := c.store.GetRequestStore().Get(ctx, reqID) + if err != nil { + metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1) + return fmt.Errorf("failed to get request %s for batch %s: %w", reqID, batch.ID, err) + } + changes = append(changes, entity.BuildChange{ + Change: req.Change, + Action: entity.ChangeActionValidate, + }) + } + + // Trigger the build with the configured build manager. + buildID, status, err := c.buildManager.Trigger(ctx, batch.Queue, changes) + if err != nil { + metrics.NamedCounter(c.metricsScope, opName, "trigger_errors", 1) + return fmt.Errorf("failed to trigger build for batch %s: %w", batch.ID, err) + } build := entity.Build{ - ID: batch.ID, + ID: buildID, BatchID: batch.ID, - Status: entity.BuildStatusAccepted, + Status: status, } // Publish build to build signal topic @@ -114,6 +135,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r c.logger.Infow("published build to buildsignal", "batch_id", batch.ID, "build_id", build.ID, + "status", string(build.Status), "topic_key", consumer.TopicKeyBuildSignal, ) diff --git a/orchestrator/controller/build/build_test.go b/orchestrator/controller/build/build_test.go index 0e595195..18739798 100644 --- a/orchestrator/controller/build/build_test.go +++ b/orchestrator/controller/build/build_test.go @@ -26,6 +26,9 @@ import ( "github.com/uber/submitqueue/core/errs" "github.com/uber/submitqueue/entity" "github.com/uber/submitqueue/entity/queue" + "github.com/uber/submitqueue/extension/build" + buildmock "github.com/uber/submitqueue/extension/build/mock" + buildnoop "github.com/uber/submitqueue/extension/build/noop" queuemock "github.com/uber/submitqueue/extension/queue/mock" storagemock "github.com/uber/submitqueue/extension/storage/mock" "go.uber.org/mock/gomock" @@ -59,8 +62,9 @@ func newMockStorage(ctrl *gomock.Controller, batch entity.Batch) *storagemock.Mo return store } -// newTestController creates a controller with test dependencies. -func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock.MockStorage, publishErr error) *Controller { +// newTestController creates a controller with test dependencies. bm is the +// build manager to inject; pass buildnoop.New() for the pass-through default. +func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock.MockStorage, bm build.BuildManager, publishErr error) *Controller { logger := zaptest.NewLogger(t).Sugar() scope := tally.NoopScope @@ -79,14 +83,14 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock ) require.NoError(t, err) - return NewController(logger, scope, store, registry, consumer.TopicKeyBuild, "orchestrator-build") + return NewController(logger, scope, store, bm, registry, consumer.TopicKeyBuild, "orchestrator-build") } func TestNewController(t *testing.T) { ctrl := gomock.NewController(t) batch := testBatch() store := newMockStorage(ctrl, batch) - controller := newTestController(t, ctrl, store, nil) + controller := newTestController(t, ctrl, store, buildnoop.New(), nil) require.NotNil(t, controller) assert.Equal(t, consumer.TopicKeyBuild, controller.TopicKey()) @@ -99,7 +103,7 @@ func TestController_Process_Success(t *testing.T) { batch := testBatch() store := newMockStorage(ctrl, batch) - controller := newTestController(t, ctrl, store, nil) + controller := newTestController(t, ctrl, store, buildnoop.New(), nil) msg := queue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) @@ -110,6 +114,97 @@ func TestController_Process_Success(t *testing.T) { require.NoError(t, err) } +// TestController_Process_TriggersBuildWithChanges verifies the controller +// assembles one BuildChange per request in the batch, triggers the build, and +// publishes a Build carrying the manager's returned ID and status. +func TestController_Process_TriggersBuildWithChanges(t *testing.T) { + ctrl := gomock.NewController(t) + + batch := entity.Batch{ + ID: "test-queue/batch/1", + Queue: "test-queue", + State: entity.BatchStateCreated, + Version: 1, + Contains: []string{"test-queue/1", "test-queue/2"}, + } + req1 := entity.Request{ID: "test-queue/1", Change: entity.Change{URIs: []string{"github://o/r/pull/1/aaa"}}} + req2 := entity.Request{ID: "test-queue/2", Change: entity.Change{URIs: []string{"github://o/r/pull/2/bbb"}}} + + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil) + mockRequestStore := storagemock.NewMockRequestStore(ctrl) + mockRequestStore.EXPECT().Get(gomock.Any(), req1.ID).Return(req1, nil) + mockRequestStore.EXPECT().Get(gomock.Any(), req2.ID).Return(req2, nil) + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + store.EXPECT().GetRequestStore().Return(mockRequestStore).AnyTimes() + + bm := buildmock.NewMockBuildManager(ctrl) + wantChanges := []entity.BuildChange{ + {Change: req1.Change, Action: entity.ChangeActionValidate}, + {Change: req2.Change, Action: entity.ChangeActionValidate}, + } + bm.EXPECT().Trigger(gomock.Any(), batch.Queue, wantChanges).Return("build-xyz", entity.BuildStatusRunning, nil) + + var published entity.Build + mockPub := queuemock.NewMockPublisher(ctrl) + mockPub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, _ string, msg queue.Message) error { + b, err := entity.BuildFromBytes(msg.Payload) + require.NoError(t, err) + published = b + return nil + }, + ) + mockQ := queuemock.NewMockQueue(ctrl) + mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() + registry, err := consumer.NewTopicRegistry( + []consumer.TopicConfig{{Key: consumer.TopicKeyBuildSignal, Name: "buildsignal", Queue: mockQ}}, + ) + require.NoError(t, err) + + controller := NewController(zaptest.NewLogger(t).Sugar(), tally.NoopScope, store, bm, registry, consumer.TopicKeyBuild, "orchestrator-build") + + msg := queue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + require.NoError(t, controller.Process(context.Background(), delivery)) + assert.Equal(t, "build-xyz", published.ID) + assert.Equal(t, batch.ID, published.BatchID) + assert.Equal(t, entity.BuildStatusRunning, published.Status) +} + +// TestController_Process_TriggerFailure verifies a build-manager failure is +// surfaced as an error (nack) and nothing is published. +func TestController_Process_TriggerFailure(t *testing.T) { + ctrl := gomock.NewController(t) + + batch := testBatch() + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil) + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + + bm := buildmock.NewMockBuildManager(ctrl) + bm.EXPECT().Trigger(gomock.Any(), batch.Queue, gomock.Any()). + Return("", entity.BuildStatusUnknown, fmt.Errorf("provider down")) + + registry, err := consumer.NewTopicRegistry( + []consumer.TopicConfig{{Key: consumer.TopicKeyBuildSignal, Name: "buildsignal", Queue: queuemock.NewMockQueue(ctrl)}}, + ) + require.NoError(t, err) + controller := NewController(zaptest.NewLogger(t).Sugar(), tally.NoopScope, store, bm, registry, consumer.TopicKeyBuild, "orchestrator-build") + + msg := queue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + require.Error(t, controller.Process(context.Background(), delivery)) +} + func TestController_Process_StorageFailure(t *testing.T) { ctrl := gomock.NewController(t) @@ -118,7 +213,7 @@ func TestController_Process_StorageFailure(t *testing.T) { store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() - controller := newTestController(t, ctrl, store, nil) + controller := newTestController(t, ctrl, store, buildnoop.New(), nil) msg := queue.NewMessage("test-queue/batch/1", batchIDPayload(t, "test-queue/batch/1"), "test-queue", nil) delivery := queuemock.NewMockDelivery(ctrl) @@ -135,7 +230,7 @@ func TestController_Process_PublishFailure(t *testing.T) { batch := testBatch() store := newMockStorage(ctrl, batch) - controller := newTestController(t, ctrl, store, fmt.Errorf("publish failed")) + controller := newTestController(t, ctrl, store, buildnoop.New(), fmt.Errorf("publish failed")) msg := queue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) @@ -150,7 +245,7 @@ func TestController_InterfaceImplementation(t *testing.T) { ctrl := gomock.NewController(t) batch := testBatch() store := newMockStorage(ctrl, batch) - controller := newTestController(t, ctrl, store, nil) + controller := newTestController(t, ctrl, store, buildnoop.New(), nil) var _ consumer.Controller = controller }