From 8213ccc68c3c8ca8abe95143bcda42ba6c75a60c Mon Sep 17 00:00:00 2001 From: Jamy Timmermans Date: Wed, 3 Jun 2026 20:57:23 -0700 Subject: [PATCH 1/8] feat(buildrunner): add Buildkite BuildRunner implementation Implements buildrunner.BuildRunner and buildrunner.Factory backed by the Buildkite CI platform, under submitqueue/extension/buildrunner/buildkite. Design: - Trigger/Cancel return promptly (per the async/sync contract in the build-runner RFC): both enqueue work on a buffered channel and a background worker contacts Buildkite, keeping the orchestrator's queue loops decoupled from Buildkite availability. - The in-memory build-ID -> Buildkite-ref map is a pure latency cache, not the source of truth. The SQ build ID is stamped into the Buildkite build's metadata at create time; Status and Cancel re-derive the ref via a metadata-filtered build lookup on cache miss, so they remain correct after a process restart that empties the cache (no orphaned Accepted-forever builds). - The background worker retries transient create/cancel failures with backoff. If a submission exhausts its retries, the build is recorded as a submission failure and Status reports terminal Failed (with the reason in BuildMetadata["error"]) rather than polling Accepted forever. - mapState collapses Buildkite states into BuildStatus; unrecognised states map to the non-terminal Unknown (not Failed), so an unknown state does not terminally fail a batch. - Each job is dispatched to its own goroutine so a retrying submit does not head-of-line block other builds. Errors are returned plain (classification left to the controller, per core/errs). Includes config knobs (queue sizes, submit timeout, retry attempts/backoff, test overrides) and unit tests covering trigger/status/cancel, cache-miss recovery, retry-then-succeed, retry-exhaustion-fails, and state mapping. Known limitation: a process restart while a trigger job is still buffered (never submitted to Buildkite) leaves the build Accepted with nothing to recover; catching that belongs to an orchestrator-level Accepted deadline, which is out of scope for this change. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../buildrunner/buildkite/BUILD.bazel | 29 + .../buildrunner/buildkite/buildkite.go | 501 +++++++++++++++++ .../buildrunner/buildkite/buildkite_test.go | 528 ++++++++++++++++++ .../extension/buildrunner/buildkite/client.go | 162 ++++++ .../extension/buildrunner/buildkite/config.go | 73 +++ .../buildrunner/buildkite/factory.go | 78 +++ 6 files changed, 1371 insertions(+) create mode 100644 submitqueue/extension/buildrunner/buildkite/BUILD.bazel create mode 100644 submitqueue/extension/buildrunner/buildkite/buildkite.go create mode 100644 submitqueue/extension/buildrunner/buildkite/buildkite_test.go create mode 100644 submitqueue/extension/buildrunner/buildkite/client.go create mode 100644 submitqueue/extension/buildrunner/buildkite/config.go create mode 100644 submitqueue/extension/buildrunner/buildkite/factory.go diff --git a/submitqueue/extension/buildrunner/buildkite/BUILD.bazel b/submitqueue/extension/buildrunner/buildkite/BUILD.bazel new file mode 100644 index 00000000..0bfbdabc --- /dev/null +++ b/submitqueue/extension/buildrunner/buildkite/BUILD.bazel @@ -0,0 +1,29 @@ +load("@rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "buildkite", + srcs = [ + "buildkite.go", + "client.go", + "config.go", + "factory.go", + ], + importpath = "github.com/uber/submitqueue/submitqueue/extension/buildrunner/buildkite", + visibility = ["//visibility:public"], + deps = [ + "//submitqueue/entity", + "//submitqueue/extension/buildrunner", + ], +) + +go_test( + name = "buildkite_test", + srcs = ["buildkite_test.go"], + embed = [":buildkite"], + deps = [ + "//submitqueue/entity", + "//submitqueue/extension/buildrunner", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + ], +) diff --git a/submitqueue/extension/buildrunner/buildkite/buildkite.go b/submitqueue/extension/buildrunner/buildkite/buildkite.go new file mode 100644 index 00000000..09c740be --- /dev/null +++ b/submitqueue/extension/buildrunner/buildkite/buildkite.go @@ -0,0 +1,501 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package buildkite implements buildrunner.BuildRunner backed by the Buildkite +// CI platform. +// +// Trigger is non-blocking: it generates a build ID, enqueues the job on a +// buffered channel, and returns immediately. This keeps the orchestrator's +// queue loop decoupled from Buildkite availability — provider-side work +// happens asynchronously, per the BuildRunner contract. A background worker +// drains the channel, submits the build to Buildkite (retrying transient +// failures with backoff), and stamps the SQ build ID into the build's +// metadata. If submission fails after all retries, the build is recorded as a +// submission failure and Status reports it as terminal Failed. Cancel is +// similarly async. +// +// The in-memory map from SQ build ID to Buildkite reference is a pure latency +// cache, not the source of truth. Because every build carries its SQ build ID +// in Buildkite metadata, Status and Cancel re-derive the reference with a +// metadata-filtered build lookup whenever the cache misses — including after a +// process restart that empties the map. Nothing about a build's identity lives +// only in memory. +// +// The Buildkite build receives base and head change URIs as JSON-encoded +// environment variables (SQ_BASE_URIS, SQ_HEAD_URIS, SQ_QUEUE). The pipeline +// script fetches each PR's diff with the GitHub API, applies them with +// `git apply -3`, produces one commit per layer (base, head), then runs CI. +package buildkite + +import ( + "context" + "crypto/rand" + "encoding/hex" + "encoding/json" + "fmt" + "net/http" + "strconv" + "strings" + "sync" + "time" + + "github.com/uber/submitqueue/submitqueue/entity" + "github.com/uber/submitqueue/submitqueue/extension/buildrunner" +) + +// Env var keys set on every triggered Buildkite build. +const ( + // EnvKeyBaseURIs carries the JSON-encoded ordered list of change URIs from + // the dependency batches. The pipeline script applies these first and + // commits the result as the "base" layer. + EnvKeyBaseURIs = "SQ_BASE_URIS" + + // EnvKeyHeadURIs carries the JSON-encoded ordered list of change URIs from + // the batch under test. Applied on top of the base layer, committed + // separately. + EnvKeyHeadURIs = "SQ_HEAD_URIS" + + // EnvKeyQueue carries the SQ queue name so the pipeline script can select + // queue-specific test targets. + EnvKeyQueue = "SQ_QUEUE" +) + +// metaKeyBuildID is the Buildkite build-metadata key under which the SQ build +// ID is stored at create time. Status and Cancel filter builds by this key to +// recover the Buildkite reference when the in-memory cache misses (e.g. after +// a restart), which keeps that cache from being a durable source of truth. +const metaKeyBuildID = "sq_build_id" + +const ( + defaultTriggerQueueSize = 256 + defaultCancelQueueSize = 256 + defaultSubmitTimeout = 30 * time.Second + defaultMaxSubmitAttempts = 5 + defaultSubmitBackoff = 1 * time.Second +) + +// triggerJob carries everything the background worker needs to submit one build +// to Buildkite. It is enqueued by Trigger and consumed by processTrigger. +type triggerJob struct { + buildID string + baseURIs []string + headURIs []string +} + +// cancelJob carries the build ID the background worker should cancel in +// Buildkite. It is enqueued by Cancel and consumed by processCancel. +type cancelJob struct { + buildID string +} + +// runner implements buildrunner.BuildRunner. A background goroutine drains +// triggerCh and cancelCh for the lifetime of the runner, dispatching each job +// to its own goroutine so that a slow submit (retry/backoff) never head-of-line +// blocks other builds. +type runner struct { + cfg Config + client *client + + // mu protects refs and submitFailures. + mu sync.RWMutex + // refs maps our internal build ID to the encoded Buildkite reference + // ("{org}/{pipeline}/{number}"). It is a pure latency cache: the durable + // record is the SQ build ID stamped into the Buildkite build's metadata, + // so a missing entry is recovered via the client's metadata lookup rather + // than treated as authoritative. + refs map[string]string + // submitFailures records build IDs whose Buildkite submission permanently + // failed (all retries exhausted), mapped to a short reason. Status reports + // these as terminal Failed so the build does not poll Accepted forever. + // This is in-memory only: a restart that loses it leaves the build Accepted, + // which the orchestrator's (out-of-scope) Accepted deadline must catch. + submitFailures map[string]string + + // triggerCh queues pending build-creation jobs from Trigger. + triggerCh chan triggerJob + // cancelCh queues pending build-cancellation jobs from Cancel. + cancelCh chan cancelJob +} + +var _ buildrunner.BuildRunner = (*runner)(nil) + +// New constructs a Buildkite-backed BuildRunner bound to a single pipeline and +// starts its background worker goroutine. The goroutine runs for the lifetime +// of the process. +// +// Returns an error if APIToken, OrgSlug, PipelineSlug, or Branch are empty. +// Prefer Factory.For over calling New directly; New is exported for testing. +func New(cfg Config) (buildrunner.BuildRunner, error) { + if cfg.APIToken == "" { + return nil, fmt.Errorf("buildkite: APIToken is required") + } + if cfg.OrgSlug == "" { + return nil, fmt.Errorf("buildkite: OrgSlug is required") + } + if cfg.PipelineSlug == "" { + return nil, fmt.Errorf("buildkite: PipelineSlug is required") + } + if cfg.Branch == "" { + return nil, fmt.Errorf("buildkite: Branch is required") + } + + baseURL := cfg.BaseURL + if baseURL == "" { + baseURL = "https://api.buildkite.com/v2" + } + httpClient := cfg.HTTPClient + if httpClient == nil { + httpClient = http.DefaultClient + } + triggerSize := cfg.TriggerQueueSize + if triggerSize == 0 { + triggerSize = defaultTriggerQueueSize + } + cancelSize := cfg.CancelQueueSize + if cancelSize == 0 { + cancelSize = defaultCancelQueueSize + } + + r := newRunner(cfg, &client{ + token: cfg.APIToken, + httpClient: httpClient, + baseURL: baseURL, + }, triggerSize, cancelSize) + go r.work() + return r, nil +} + +// newRunner constructs a runner without starting the background goroutine. +// Used by New and by tests (which drive the goroutine via drainTrigger / +// drainCancel to avoid timing races). +func newRunner(cfg Config, c *client, triggerSize, cancelSize int) *runner { + return &runner{ + cfg: cfg, + client: c, + refs: make(map[string]string), + submitFailures: make(map[string]string), + triggerCh: make(chan triggerJob, triggerSize), + cancelCh: make(chan cancelJob, cancelSize), + } +} + +// Trigger generates a unique build ID, enqueues an async job to submit the +// build to Buildkite, and returns immediately. The build is visible to Status +// as Accepted until the background worker contacts Buildkite and the build +// becomes discoverable by its stamped metadata. +// +// Returns an error (retryable by the controller) if the trigger queue is full. +func (r *runner) Trigger(_ context.Context, base, head []entity.Change, _ entity.BuildMetadata) (entity.BuildID, error) { + id := newBuildID() + job := triggerJob{ + buildID: id, + baseURIs: flattenURIs(base), + headURIs: flattenURIs(head), + } + select { + case r.triggerCh <- job: + return entity.BuildID{ID: id}, nil + default: + return entity.BuildID{}, fmt.Errorf("buildkite: trigger queue full; backpressure from Buildkite API") + } +} + +// Status returns the current build status. While the async submission is still +// in flight (no Buildkite build carries this build ID yet) it returns Accepted. +// Once the build exists, Status fetches the live state from Buildkite and +// returns it with the build URL in BuildMetadata["url"]. +// +// If the submission permanently failed (all retries exhausted), Status reports +// terminal Failed with the reason in BuildMetadata["error"]. +// +// On a cache miss Status re-derives the Buildkite reference by filtering builds +// on the stamped SQ build ID, so it works after a restart that emptied the +// in-memory cache. +func (r *runner) Status(ctx context.Context, buildID entity.BuildID) (entity.BuildStatus, entity.BuildMetadata, error) { + ref, cached := r.lookupRef(buildID.ID) + + var resp buildResponse + switch { + case cached: + org, pipeline, number, err := parseBuildRef(ref) + if err != nil { + return entity.BuildStatusUnknown, nil, fmt.Errorf("buildkite: malformed build ref: %w", err) + } + resp, err = r.client.getBuild(ctx, org, pipeline, number) + if err != nil { + return entity.BuildStatusUnknown, nil, fmt.Errorf("buildkite: get build: %w", err) + } + default: + if reason, failed := r.lookupSubmitFailure(buildID.ID); failed { + // The async submission exhausted its retries; nothing exists in + // Buildkite to poll. Report terminal Failed so the build does not + // loop in Accepted forever. + return entity.BuildStatusFailed, entity.BuildMetadata{"error": reason}, nil + } + found, exists, err := r.client.findBuildByMeta(ctx, r.cfg.OrgSlug, r.cfg.PipelineSlug, metaKeyBuildID, buildID.ID) + if err != nil { + return entity.BuildStatusUnknown, nil, fmt.Errorf("buildkite: find build: %w", err) + } + if !exists { + // Not yet visible in Buildkite — the submission is still in flight + // (or queued for retry). Report Accepted so the buildsignal poll + // loop keeps waiting without treating this as terminal. + return entity.BuildStatusAccepted, nil, nil + } + ref = encodeBuildRef(r.cfg.OrgSlug, r.cfg.PipelineSlug, found.Number) + r.storeRef(buildID.ID, ref) + resp = found + } + + return mapState(resp.State), entity.BuildMetadata{"url": resp.WebURL}, nil +} + +// Cancel enqueues an async cancellation and returns immediately, keeping the +// caller's queue loop decoupled from Buildkite availability. The background +// worker delivers the cancel to Buildkite, recovering the build reference from +// the stamped metadata if it is not cached (e.g. after a restart). If the build +// was never submitted, the cancel is a no-op. +// +// Returns an error if the cancel queue is full; the caller should retry. +func (r *runner) Cancel(_ context.Context, buildID entity.BuildID) error { + select { + case r.cancelCh <- cancelJob{buildID: buildID.ID}: + return nil + default: + return fmt.Errorf("buildkite: cancel queue full; try again later") + } +} + +// work is the background consumer goroutine. It dispatches each trigger and +// cancel job to its own goroutine so that a job's retry/backoff does not block +// the others. processTrigger and processCancel guard shared state with r.mu and +// are safe to run concurrently. +func (r *runner) work() { + for { + select { + case job := <-r.triggerCh: + go r.processTrigger(job) + case job := <-r.cancelCh: + go r.processCancel(job) + } + } +} + +// processTrigger submits one build to Buildkite, retrying transient failures +// with backoff. On success it caches the Buildkite reference so subsequent +// Status calls skip the metadata lookup. If every attempt fails the build was +// never created, so it is recorded as a submission failure and Status reports +// it as terminal Failed rather than polling Accepted forever. +func (r *runner) processTrigger(job triggerJob) { + baseJSON, _ := json.Marshal(job.baseURIs) + headJSON, _ := json.Marshal(job.headURIs) + + req := createBuildRequest{ + Branch: r.cfg.Branch, + Message: "submitqueue speculative build", + Env: map[string]string{ + EnvKeyBaseURIs: string(baseJSON), + EnvKeyHeadURIs: string(headJSON), + EnvKeyQueue: r.cfg.QueueName, + }, + MetaData: map[string]string{ + metaKeyBuildID: job.buildID, + }, + } + + var resp buildResponse + err := r.withRetry(func() error { + ctx, cancel := r.opCtx() + defer cancel() + var e error + resp, e = r.client.createBuild(ctx, r.cfg.OrgSlug, r.cfg.PipelineSlug, req) + return e + }) + if err != nil { + r.markSubmitFailed(job.buildID, fmt.Sprintf("buildkite submission failed after retries: %v", err)) + return + } + + r.storeRef(job.buildID, encodeBuildRef(r.cfg.OrgSlug, r.cfg.PipelineSlug, resp.Number)) +} + +// processCancel cancels the Buildkite build, recovering its reference from the +// stamped metadata when the cache misses. No-ops when no build carries this +// build ID yet (trigger not yet processed, or submission failed). +func (r *runner) processCancel(job cancelJob) { + ref, cached := r.lookupRef(job.buildID) + if !cached { + ctx, cancel := r.opCtx() + found, exists, err := r.client.findBuildByMeta(ctx, r.cfg.OrgSlug, r.cfg.PipelineSlug, metaKeyBuildID, job.buildID) + cancel() + if err != nil || !exists { + // Nothing to cancel (not yet submitted) or a transient lookup + // failure; the caller may re-issue Cancel. + return + } + ref = encodeBuildRef(r.cfg.OrgSlug, r.cfg.PipelineSlug, found.Number) + r.storeRef(job.buildID, ref) + } + + org, pipeline, number, err := parseBuildRef(ref) + if err != nil { + return + } + + _ = r.withRetry(func() error { + ctx, cancel := r.opCtx() + defer cancel() + return r.client.cancelBuild(ctx, org, pipeline, number) + }) +} + +// withRetry runs fn up to MaxSubmitAttempts times with linear backoff, returning +// the last error. Used for the background submit and cancel API calls so a +// transient Buildkite failure does not abandon the work. +func (r *runner) withRetry(fn func() error) error { + attempts := r.cfg.MaxSubmitAttempts + if attempts <= 0 { + attempts = defaultMaxSubmitAttempts + } + backoff := r.cfg.SubmitBackoff + if backoff <= 0 { + backoff = defaultSubmitBackoff + } + + var err error + for attempt := 1; attempt <= attempts; attempt++ { + if err = fn(); err == nil { + return nil + } + if attempt < attempts { + time.Sleep(backoff * time.Duration(attempt)) + } + } + return err +} + +// opCtx returns a context bounded by SubmitTimeout for a single background API +// call. The caller must invoke the returned CancelFunc. +func (r *runner) opCtx() (context.Context, context.CancelFunc) { + timeout := r.cfg.SubmitTimeout + if timeout == 0 { + timeout = defaultSubmitTimeout + } + return context.WithTimeout(context.Background(), timeout) +} + +// lookupRef returns the cached Buildkite reference for a build ID. +func (r *runner) lookupRef(buildID string) (string, bool) { + r.mu.RLock() + defer r.mu.RUnlock() + ref, ok := r.refs[buildID] + return ref, ok +} + +// storeRef caches the Buildkite reference for a build ID. +func (r *runner) storeRef(buildID, ref string) { + r.mu.Lock() + defer r.mu.Unlock() + r.refs[buildID] = ref +} + +// markSubmitFailed records that a build's Buildkite submission permanently +// failed, so Status reports it as terminal Failed. +func (r *runner) markSubmitFailed(buildID, reason string) { + r.mu.Lock() + defer r.mu.Unlock() + r.submitFailures[buildID] = reason +} + +// lookupSubmitFailure reports whether a build's submission permanently failed, +// returning the recorded reason. +func (r *runner) lookupSubmitFailure(buildID string) (string, bool) { + r.mu.RLock() + defer r.mu.RUnlock() + reason, ok := r.submitFailures[buildID] + return reason, ok +} + +// newBuildID returns a cryptographically random hex string prefixed with "bk-" +// that uniquely identifies a build within this runner implementation. +func newBuildID() string { + b := make([]byte, 16) + if _, err := rand.Read(b); err != nil { + // crypto/rand.Read only fails when the OS entropy source is broken. + panic(fmt.Sprintf("buildkite: crypto/rand.Read failed: %v", err)) + } + return "bk-" + hex.EncodeToString(b) +} + +// flattenURIs concatenates the URI lists from all changes into a single slice. +func flattenURIs(changes []entity.Change) []string { + uris := make([]string, 0, len(changes)) + for _, c := range changes { + uris = append(uris, c.URIs...) + } + return uris +} + +// encodeBuildRef encodes org, pipeline, and build number into the internal +// reference string stored in r.refs. +// Format: "{org}/{pipeline}/{number}". Buildkite slugs are [a-z0-9-] so "/" +// is unambiguous as a separator. +func encodeBuildRef(org, pipeline string, number int) string { + return fmt.Sprintf("%s/%s/%d", org, pipeline, number) +} + +// parseBuildRef is the inverse of encodeBuildRef. +func parseBuildRef(ref string) (org, pipeline string, number int, err error) { + last := strings.LastIndex(ref, "/") + if last < 1 { + return "", "", 0, fmt.Errorf("invalid build ref %q", ref) + } + number, err = strconv.Atoi(ref[last+1:]) + if err != nil { + return "", "", 0, fmt.Errorf("invalid build ref %q: non-numeric number segment", ref) + } + prefix := ref[:last] + first := strings.Index(prefix, "/") + if first < 1 { + return "", "", 0, fmt.Errorf("invalid build ref %q", ref) + } + return prefix[:first], prefix[first+1:], number, nil +} + +// mapState maps a Buildkite build state string to a BuildStatus. +// +// Buildkite states: creating, scheduled, running, blocked, passed, failed, +// canceling, canceled, skipped, not_run. +func mapState(state string) entity.BuildStatus { + switch state { + case "creating", "scheduled": + return entity.BuildStatusAccepted + case "running", "blocked": + // blocked = waiting on a block step; still live, not yet terminal. + return entity.BuildStatusRunning + case "passed": + return entity.BuildStatusSucceeded + case "failed", "not_run", "skipped": + // not_run/skipped never produced a passing result; treat them as + // terminal failure so the batch is not merged on a non-success verdict. + return entity.BuildStatusFailed + case "canceling", "canceled": + return entity.BuildStatusCancelled + default: + // Unrecognised Buildkite state. Do NOT assume terminal: Unknown is + // non-terminal, so the buildsignal poll loop keeps waiting rather than + // failing the batch on a state this code does not understand. + return entity.BuildStatusUnknown + } +} diff --git a/submitqueue/extension/buildrunner/buildkite/buildkite_test.go b/submitqueue/extension/buildrunner/buildkite/buildkite_test.go new file mode 100644 index 00000000..920d72ee --- /dev/null +++ b/submitqueue/extension/buildrunner/buildkite/buildkite_test.go @@ -0,0 +1,528 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package buildkite + +import ( + "context" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber/submitqueue/submitqueue/entity" + "github.com/uber/submitqueue/submitqueue/extension/buildrunner" +) + +// newTestRunner creates a runner backed by a test HTTP server without starting +// the background goroutine. Tests drive job processing synchronously via +// drainTrigger and drainCancel to avoid goroutine timing races. The retry +// backoff is set to a millisecond so retry paths run fast. +func newTestRunner(t *testing.T, handler http.Handler) *runner { + t.Helper() + srv := httptest.NewServer(handler) + t.Cleanup(srv.Close) + return newRunner( + Config{ + APIToken: "test-token", + OrgSlug: "test-org", + PipelineSlug: "my-pipeline", + QueueName: "my-queue", + Branch: "main", + SubmitTimeout: 5 * time.Second, + MaxSubmitAttempts: 3, + SubmitBackoff: time.Millisecond, + }, + &client{ + token: "test-token", + httpClient: http.DefaultClient, + baseURL: srv.URL, + }, + 16, // triggerSize + 16, // cancelSize + ) +} + +// drainTrigger synchronously processes the next pending trigger job. +// Use after Trigger() to simulate the background worker in tests. +func drainTrigger(t *testing.T, r *runner) { + t.Helper() + select { + case job := <-r.triggerCh: + r.processTrigger(job) + default: + t.Fatal("drainTrigger: no pending trigger job in channel") + } +} + +// drainCancel synchronously processes the next pending cancel job. +func drainCancel(t *testing.T, r *runner) { + t.Helper() + select { + case job := <-r.cancelCh: + r.processCancel(job) + default: + t.Fatal("drainCancel: no pending cancel job in channel") + } +} + +// buildJSON encodes fields into a minimal Buildkite build JSON response. +func buildJSON(number int, state, webURL string) []byte { + b, _ := json.Marshal(buildResponse{Number: number, State: state, WebURL: webURL}) + return b +} + +// emptyListHandler responds to a metadata-filtered list query with an empty +// array (no build matches yet), failing the test on any other request. +func emptyListHandler(t *testing.T) http.HandlerFunc { + return func(w http.ResponseWriter, req *http.Request) { + if req.Method != http.MethodGet { + t.Fatalf("unexpected %s request; expected only a metadata list lookup", req.Method) + } + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte("[]")) + } +} + +// --- Interface / constructor --- + +func TestNew_ImplementsInterface(t *testing.T) { + r, err := New(Config{ + APIToken: "tok", + OrgSlug: "org", + PipelineSlug: "pipeline", + Branch: "main", + }) + require.NoError(t, err) + var _ buildrunner.BuildRunner = r +} + +func TestNew_Validation(t *testing.T) { + tests := []struct { + name string + cfg Config + }{ + {"missing token", Config{OrgSlug: "org", PipelineSlug: "p", Branch: "main"}}, + {"missing org", Config{APIToken: "tok", PipelineSlug: "p", Branch: "main"}}, + {"missing pipeline", Config{APIToken: "tok", OrgSlug: "org", Branch: "main"}}, + {"missing branch", Config{APIToken: "tok", OrgSlug: "org", PipelineSlug: "p"}}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, err := New(tt.cfg) + require.Error(t, err) + }) + } +} + +// --- Factory --- + +func TestFactory_ImplementsInterface(t *testing.T) { + var _ buildrunner.Factory = Factory{} +} + +func TestFactory_For_Success(t *testing.T) { + f := Factory{ + APIToken: "tok", + OrgSlug: "org", + Branch: "main", + Pipelines: map[string]string{"my-queue": "my-pipeline"}, + } + r, err := f.For(buildrunner.Config{QueueName: "my-queue"}) + require.NoError(t, err) + require.NotNil(t, r) +} + +func TestFactory_For_UnknownQueue(t *testing.T) { + f := Factory{ + APIToken: "tok", + OrgSlug: "org", + Branch: "main", + Pipelines: map[string]string{"my-queue": "my-pipeline"}, + } + _, err := f.For(buildrunner.Config{QueueName: "unknown"}) + require.Error(t, err) +} + +// --- Trigger --- + +func TestTrigger_EnqueuesJobAndReturnsID(t *testing.T) { + r := newTestRunner(t, http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) // should not be reached before drain + })) + + id, err := r.Trigger(context.Background(), nil, nil, nil) + require.NoError(t, err) + assert.NotEmpty(t, id.ID) + + // Exactly one job should be in the channel. + assert.Len(t, r.triggerCh, 1) +} + +func TestTrigger_StatusIsAcceptedBeforeWorkerRuns(t *testing.T) { + // Before the worker submits, no build carries this ID in Buildkite, so the + // metadata lookup returns empty and Status reports Accepted. + r := newTestRunner(t, emptyListHandler(t)) + + id, err := r.Trigger(context.Background(), nil, nil, nil) + require.NoError(t, err) + + status, _, err := r.Status(context.Background(), id) + require.NoError(t, err) + assert.Equal(t, entity.BuildStatusAccepted, status) +} + +func TestTrigger_SubmitsCorrectPayloadToBuildkite(t *testing.T) { + var capturedMethod, capturedAuth string + var capturedBody []byte + + r := newTestRunner(t, http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + capturedMethod = req.Method + capturedAuth = req.Header.Get("Authorization") + capturedBody, _ = io.ReadAll(req.Body) + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write(buildJSON(42, "scheduled", "https://buildkite.com/test-org/my-pipeline/builds/42")) + })) + + base := []entity.Change{{URIs: []string{"github://org/repo/pull/1/aaa111"}}} + head := []entity.Change{{URIs: []string{"github://org/repo/pull/2/bbb222"}}} + + id, err := r.Trigger(context.Background(), base, head, nil) + require.NoError(t, err) + + drainTrigger(t, r) + + assert.Equal(t, http.MethodPost, capturedMethod) + assert.Equal(t, "Bearer test-token", capturedAuth) + + var req createBuildRequest + require.NoError(t, json.Unmarshal(capturedBody, &req)) + assert.Equal(t, "main", req.Branch) + assert.Equal(t, `["github://org/repo/pull/1/aaa111"]`, req.Env[EnvKeyBaseURIs]) + assert.Equal(t, `["github://org/repo/pull/2/bbb222"]`, req.Env[EnvKeyHeadURIs]) + assert.Equal(t, "my-queue", req.Env[EnvKeyQueue]) + // The SQ build ID is stamped into metadata so Status/Cancel can recover the + // build after a cache loss. + assert.Equal(t, id.ID, req.MetaData[metaKeyBuildID]) + + // After a successful submit the ref is cached, so Status uses getBuild. + ref, ok := r.lookupRef(id.ID) + require.True(t, ok) + assert.Equal(t, encodeBuildRef("test-org", "my-pipeline", 42), ref) +} + +func TestTrigger_EmptyBase_ProducesJSONArray(t *testing.T) { + var capturedBody []byte + r := newTestRunner(t, http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + capturedBody, _ = io.ReadAll(req.Body) + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write(buildJSON(1, "scheduled", "")) + })) + + _, err := r.Trigger(context.Background(), nil, []entity.Change{{URIs: []string{"u"}}}, nil) + require.NoError(t, err) + drainTrigger(t, r) + + var req createBuildRequest + require.NoError(t, json.Unmarshal(capturedBody, &req)) + // nil base must produce [] in JSON, not null. + assert.Equal(t, "[]", req.Env[EnvKeyBaseURIs]) +} + +func TestTrigger_MultipleChangesFlattened(t *testing.T) { + var capturedBody []byte + r := newTestRunner(t, http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + capturedBody, _ = io.ReadAll(req.Body) + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write(buildJSON(2, "scheduled", "")) + })) + + head := []entity.Change{ + {URIs: []string{"github://org/repo/pull/1/aaa"}}, + {URIs: []string{"github://org/repo/pull/2/bbb", "github://org/repo/pull/3/ccc"}}, + } + _, err := r.Trigger(context.Background(), nil, head, nil) + require.NoError(t, err) + drainTrigger(t, r) + + var req createBuildRequest + require.NoError(t, json.Unmarshal(capturedBody, &req)) + assert.Equal(t, + `["github://org/repo/pull/1/aaa","github://org/repo/pull/2/bbb","github://org/repo/pull/3/ccc"]`, + req.Env[EnvKeyHeadURIs], + ) +} + +func TestTrigger_QueueFull_ReturnsError(t *testing.T) { + r := newRunner( + Config{APIToken: "tok", OrgSlug: "org", PipelineSlug: "p", Branch: "main"}, + &client{token: "tok", httpClient: http.DefaultClient, baseURL: "http://unused"}, + 1, 1, + ) + // Fill the channel. + _, err := r.Trigger(context.Background(), nil, nil, nil) + require.NoError(t, err) + // Second call must fail. + _, err = r.Trigger(context.Background(), nil, nil, nil) + require.Error(t, err) +} + +func TestProcessTrigger_RetriesTransientFailureThenSucceeds(t *testing.T) { + var posts int + r := newTestRunner(t, http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + require.Equal(t, http.MethodPost, req.Method) + posts++ + if posts < 2 { + w.WriteHeader(http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write(buildJSON(7, "scheduled", "")) + })) + + id, err := r.Trigger(context.Background(), nil, nil, nil) + require.NoError(t, err) + drainTrigger(t, r) + + assert.Equal(t, 2, posts, "submit should retry after a transient failure") + ref, ok := r.lookupRef(id.ID) + require.True(t, ok) + assert.Equal(t, encodeBuildRef("test-org", "my-pipeline", 7), ref) +} + +func TestTrigger_SubmitExhaustsRetries_BuildFails(t *testing.T) { + // create (POST) always fails; the worker exhausts its retries and records a + // submission failure. + r := newTestRunner(t, http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + if req.Method == http.MethodPost { + w.WriteHeader(http.StatusInternalServerError) + return + } + // Nothing was created, so any metadata lookup finds nothing. + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte("[]")) + })) + + id, err := r.Trigger(context.Background(), nil, nil, nil) + require.NoError(t, err) + drainTrigger(t, r) + + // With submission permanently failed, Status reports terminal Failed (with a + // reason) rather than polling Accepted forever. + status, meta, err := r.Status(context.Background(), id) + require.NoError(t, err) + assert.Equal(t, entity.BuildStatusFailed, status) + assert.NotEmpty(t, meta["error"]) +} + +// --- Status --- + +func TestStatus_StateMapping(t *testing.T) { + tests := []struct { + bkState string + want entity.BuildStatus + }{ + {"creating", entity.BuildStatusAccepted}, + {"scheduled", entity.BuildStatusAccepted}, + {"running", entity.BuildStatusRunning}, + {"blocked", entity.BuildStatusRunning}, + {"passed", entity.BuildStatusSucceeded}, + {"failed", entity.BuildStatusFailed}, + {"not_run", entity.BuildStatusFailed}, + {"skipped", entity.BuildStatusFailed}, + {"canceling", entity.BuildStatusCancelled}, + {"canceled", entity.BuildStatusCancelled}, + // Unrecognised states map to the non-terminal Unknown, not Failed, so a + // state this code doesn't know about doesn't terminally fail a batch. + {"some_future_state", entity.BuildStatusUnknown}, + {"", entity.BuildStatusUnknown}, + } + for _, tt := range tests { + t.Run(tt.bkState, func(t *testing.T) { + assert.Equal(t, tt.want, mapState(tt.bkState)) + }) + } +} + +func TestStatus_ReturnsLiveBuildkiteState(t *testing.T) { + r := newTestRunner(t, http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write(buildJSON(7, "running", "https://buildkite.com/test-org/my-pipeline/builds/7")) + })) + + // Inject ref directly (simulates successful processTrigger). + r.storeRef("some-id", encodeBuildRef("test-org", "my-pipeline", 7)) + + status, meta, err := r.Status(context.Background(), entity.BuildID{ID: "some-id"}) + require.NoError(t, err) + assert.Equal(t, entity.BuildStatusRunning, status) + assert.Equal(t, "https://buildkite.com/test-org/my-pipeline/builds/7", meta["url"]) +} + +func TestStatus_RecoversRefAfterCacheMiss(t *testing.T) { + var listed bool + r := newTestRunner(t, http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + // Cache miss path: Status lists builds filtered by the stamped metadata. + require.Equal(t, http.MethodGet, req.Method) + assert.Contains(t, req.URL.RawQuery, "meta_data") + listed = true + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`[{"number":7,"state":"running","web_url":"https://bk/7"}]`)) + })) + + // refs is empty (e.g. after a restart); Status must recover the ref. + status, meta, err := r.Status(context.Background(), entity.BuildID{ID: "bk-lost"}) + require.NoError(t, err) + assert.True(t, listed) + assert.Equal(t, entity.BuildStatusRunning, status) + assert.Equal(t, "https://bk/7", meta["url"]) + + // The recovered ref is now cached for subsequent calls. + ref, ok := r.lookupRef("bk-lost") + require.True(t, ok) + assert.Equal(t, encodeBuildRef("test-org", "my-pipeline", 7), ref) +} + +func TestStatus_BuildkiteNotFound(t *testing.T) { + r := newTestRunner(t, http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusNotFound) + })) + r.storeRef("some-id", encodeBuildRef("test-org", "my-pipeline", 99)) + + _, _, err := r.Status(context.Background(), entity.BuildID{ID: "some-id"}) + require.Error(t, err) +} + +// --- Cancel --- + +func TestCancel_EnqueuesJobAndReturnsImmediately(t *testing.T) { + r := newTestRunner(t, http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + t.Fatal("Buildkite API called before cancel drain") + })) + + require.NoError(t, r.Cancel(context.Background(), entity.BuildID{ID: "some-id"})) + assert.Len(t, r.cancelCh, 1) +} + +func TestCancel_CallsBuildkiteWhenRefKnown(t *testing.T) { + var cancelCalled bool + r := newTestRunner(t, http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + cancelCalled = true + w.WriteHeader(http.StatusOK) + _, _ = w.Write(buildJSON(5, "canceled", "")) + })) + r.storeRef("some-id", encodeBuildRef("test-org", "my-pipeline", 5)) + + require.NoError(t, r.Cancel(context.Background(), entity.BuildID{ID: "some-id"})) + drainCancel(t, r) + assert.True(t, cancelCalled) +} + +func TestCancel_RecoversRefAfterCacheMiss(t *testing.T) { + var listed, cancelled bool + r := newTestRunner(t, http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + switch req.Method { + case http.MethodGet: + listed = true + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`[{"number":5,"state":"running","web_url":""}]`)) + case http.MethodPut: + cancelled = true + w.WriteHeader(http.StatusOK) + default: + t.Fatalf("unexpected %s request", req.Method) + } + })) + + // refs is empty; processCancel must recover the ref from metadata first. + require.NoError(t, r.Cancel(context.Background(), entity.BuildID{ID: "bk-lost"})) + drainCancel(t, r) + assert.True(t, listed, "cancel should look up the build by metadata on cache miss") + assert.True(t, cancelled, "cancel should reach Buildkite after recovering the ref") +} + +func TestCancel_NoopWhenBuildNotYetSubmitted(t *testing.T) { + // The metadata lookup finds nothing, so there is nothing to cancel; the + // handler fails the test if a cancel (PUT) is attempted. + r := newTestRunner(t, emptyListHandler(t)) + + require.NoError(t, r.Cancel(context.Background(), entity.BuildID{ID: "some-id"})) + drainCancel(t, r) +} + +func TestCancel_AlreadyTerminal_Noop(t *testing.T) { + // Buildkite returns 422 when the build cannot be cancelled (already terminal). + r := newTestRunner(t, http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusUnprocessableEntity) + })) + r.storeRef("some-id", encodeBuildRef("test-org", "my-pipeline", 5)) + + require.NoError(t, r.Cancel(context.Background(), entity.BuildID{ID: "some-id"})) + drainCancel(t, r) // must not panic or error +} + +func TestCancel_QueueFull_ReturnsError(t *testing.T) { + r := newRunner( + Config{APIToken: "tok", OrgSlug: "org", PipelineSlug: "p", Branch: "main"}, + &client{token: "tok", httpClient: http.DefaultClient, baseURL: "http://unused"}, + 1, 1, + ) + require.NoError(t, r.Cancel(context.Background(), entity.BuildID{ID: "a"})) + require.Error(t, r.Cancel(context.Background(), entity.BuildID{ID: "b"})) +} + +// --- Internal helpers --- + +func TestEncodeParseBuildRef_RoundTrip(t *testing.T) { + tests := []struct { + org string + pipeline string + number int + }{ + {"myorg", "my-pipeline", 1}, + {"uber", "submit-queue-ci", 9999}, + {"a", "b", 0}, + } + for _, tt := range tests { + ref := encodeBuildRef(tt.org, tt.pipeline, tt.number) + org, pipeline, number, err := parseBuildRef(ref) + require.NoError(t, err) + assert.Equal(t, tt.org, org) + assert.Equal(t, tt.pipeline, pipeline) + assert.Equal(t, tt.number, number) + } +} + +func TestParseBuildRef_Invalid(t *testing.T) { + for _, ref := range []string{"", "noslash", "only/one", "org/pipeline/notanumber"} { + t.Run(ref, func(t *testing.T) { + _, _, _, err := parseBuildRef(ref) + require.Error(t, err) + }) + } +} + +func TestNewBuildID_Unique(t *testing.T) { + seen := make(map[string]bool) + for i := 0; i < 100; i++ { + id := newBuildID() + assert.NotEmpty(t, id) + assert.False(t, seen[id], "duplicate build ID: %s", id) + seen[id] = true + } +} diff --git a/submitqueue/extension/buildrunner/buildkite/client.go b/submitqueue/extension/buildrunner/buildkite/client.go new file mode 100644 index 00000000..0480804c --- /dev/null +++ b/submitqueue/extension/buildrunner/buildkite/client.go @@ -0,0 +1,162 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package buildkite + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" +) + +// client is a thin wrapper around the Buildkite REST endpoints that BuildRunner +// needs: create, get, list-by-metadata, and cancel a build. +type client struct { + token string + httpClient *http.Client + baseURL string +} + +type createBuildRequest struct { + Branch string `json:"branch"` + Message string `json:"message"` + Env map[string]string `json:"env"` + MetaData map[string]string `json:"meta_data,omitempty"` +} + +// buildResponse is the subset of fields the runner needs from a Buildkite +// build object. +type buildResponse struct { + Number int `json:"number"` + State string `json:"state"` + WebURL string `json:"web_url"` +} + +func (c *client) createBuild(ctx context.Context, org, pipeline string, req createBuildRequest) (buildResponse, error) { + body, err := json.Marshal(req) + if err != nil { + return buildResponse{}, fmt.Errorf("marshal request: %w", err) + } + u := fmt.Sprintf("%s/organizations/%s/pipelines/%s/builds", c.baseURL, org, pipeline) + var build buildResponse + if err := c.do(ctx, http.MethodPost, u, body, &build); err != nil { + return buildResponse{}, err + } + return build, nil +} + +func (c *client) getBuild(ctx context.Context, org, pipeline string, number int) (buildResponse, error) { + u := fmt.Sprintf("%s/organizations/%s/pipelines/%s/builds/%d", c.baseURL, org, pipeline, number) + var build buildResponse + if err := c.do(ctx, http.MethodGet, u, nil, &build); err != nil { + return buildResponse{}, err + } + return build, nil +} + +// findBuildByMeta returns the build in the pipeline whose meta_data[key] equals +// value. ok is false when no such build exists yet. This lets Status and Cancel +// recover the Buildkite reference from Buildkite itself (the source of truth) +// when the in-memory cache misses, e.g. after a process restart. +func (c *client) findBuildByMeta(ctx context.Context, org, pipeline, key, value string) (build buildResponse, ok bool, err error) { + u := fmt.Sprintf("%s/organizations/%s/pipelines/%s/builds?meta_data[%s]=%s", + c.baseURL, org, pipeline, url.QueryEscape(key), url.QueryEscape(value)) + var builds []buildResponse + if err := c.do(ctx, http.MethodGet, u, nil, &builds); err != nil { + return buildResponse{}, false, err + } + if len(builds) == 0 { + return buildResponse{}, false, nil + } + return builds[0], true, nil +} + +// cancelBuild requests cancellation. Returns nil when the build is already +// terminal (HTTP 422) — the Buildkite API uses that status to indicate a +// non-cancellable build, which the BuildRunner contract treats as a no-op. +func (c *client) cancelBuild(ctx context.Context, org, pipeline string, number int) error { + u := fmt.Sprintf("%s/organizations/%s/pipelines/%s/builds/%d/cancel", c.baseURL, org, pipeline, number) + req, err := http.NewRequestWithContext(ctx, http.MethodPut, u, nil) + if err != nil { + return fmt.Errorf("create request: %w", err) + } + c.setHeaders(req) + + resp, err := c.httpClient.Do(req) + if err != nil { + return fmt.Errorf("send request: %w", err) + } + defer resp.Body.Close() + _, _ = io.Copy(io.Discard, resp.Body) + + switch resp.StatusCode { + case http.StatusOK: + return nil + case http.StatusUnprocessableEntity: + // Already terminal — no-op per BuildRunner.Cancel contract. + return nil + default: + return fmt.Errorf("unexpected status %d from cancel", resp.StatusCode) + } +} + +// do sends an HTTP request with the standard Buildkite headers and, on a 2xx +// response, decodes the body into out (when non-nil). A 404 is reported as a +// "build not found" error per the BuildRunner contract. +func (c *client) do(ctx context.Context, method, rawURL string, body []byte, out any) error { + var bodyReader io.Reader + if body != nil { + bodyReader = bytes.NewReader(body) + } + + req, err := http.NewRequestWithContext(ctx, method, rawURL, bodyReader) + if err != nil { + return fmt.Errorf("create request: %w", err) + } + c.setHeaders(req) + + resp, err := c.httpClient.Do(req) + if err != nil { + return fmt.Errorf("send request: %w", err) + } + defer resp.Body.Close() + + respBody, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("read response: %w", err) + } + + if resp.StatusCode == http.StatusNotFound { + return fmt.Errorf("build not found") + } + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return fmt.Errorf("API returned status %d: %s", resp.StatusCode, respBody) + } + + if out != nil { + if err := json.Unmarshal(respBody, out); err != nil { + return fmt.Errorf("unmarshal response: %w", err) + } + } + return nil +} + +func (c *client) setHeaders(req *http.Request) { + req.Header.Set("Authorization", "Bearer "+c.token) + req.Header.Set("Content-Type", "application/json") +} diff --git a/submitqueue/extension/buildrunner/buildkite/config.go b/submitqueue/extension/buildrunner/buildkite/config.go new file mode 100644 index 00000000..1c1ada41 --- /dev/null +++ b/submitqueue/extension/buildrunner/buildkite/config.go @@ -0,0 +1,73 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package buildkite + +import ( + "net/http" + "time" +) + +// Config holds the per-queue settings for a Buildkite BuildRunner. It is +// constructed by Factory.For for each queue and bound to a single pipeline. +type Config struct { + // APIToken is the Buildkite personal or agent API token. Required. + APIToken string + + // OrgSlug is the Buildkite organisation slug (URL segment after + // buildkite.com/). Required. + OrgSlug string + + // PipelineSlug is the Buildkite pipeline that runs builds for this queue. + // Resolved by Factory from its Pipelines map. Required. + PipelineSlug string + + // QueueName is the SQ queue this runner serves. Passed as SQ_QUEUE in + // the build environment so the pipeline script can select queue-specific + // test targets. + QueueName string + + // Branch is the target branch builds run against (e.g. "main"). Required. + Branch string + + // TriggerQueueSize is the buffer capacity of the async trigger channel. + // Trigger returns an error when the channel is full (the build controller + // will nack and retry). Defaults to 256. + TriggerQueueSize int + + // CancelQueueSize is the buffer capacity of the async cancel channel. + // Defaults to 256. + CancelQueueSize int + + // SubmitTimeout is the per-call HTTP timeout used by the background worker + // when contacting the Buildkite API. Defaults to 30s. + SubmitTimeout time.Duration + + // MaxSubmitAttempts is the number of times the background worker tries a + // Buildkite create/cancel call before giving up, to ride out transient + // failures. Defaults to 5. + MaxSubmitAttempts int + + // SubmitBackoff is the base delay between background-worker retries; the + // delay grows linearly with the attempt number. Defaults to 1s. + SubmitBackoff time.Duration + + // HTTPClient overrides the HTTP client used for Buildkite API calls. + // If nil, http.DefaultClient is used. Intended for testing. + HTTPClient *http.Client + + // BaseURL overrides the Buildkite API base URL + // (default: "https://api.buildkite.com/v2"). Intended for testing. + BaseURL string +} diff --git a/submitqueue/extension/buildrunner/buildkite/factory.go b/submitqueue/extension/buildrunner/buildkite/factory.go new file mode 100644 index 00000000..28e1afd3 --- /dev/null +++ b/submitqueue/extension/buildrunner/buildkite/factory.go @@ -0,0 +1,78 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package buildkite + +import ( + "fmt" + "net/http" + + "github.com/uber/submitqueue/submitqueue/extension/buildrunner" +) + +// Factory implements buildrunner.Factory. It holds credentials shared across +// all queues and resolves the per-queue pipeline slug at For time. +// +// Typical wiring in an orchestrator server: +// +// buildkite.Factory{ +// APIToken: os.Getenv("BUILDKITE_API_TOKEN"), +// OrgSlug: "my-org", +// Branch: "main", +// Pipelines: map[string]string{ +// "my-queue": "my-pipeline-slug", +// }, +// } +type Factory struct { + // APIToken is the Buildkite personal or agent API token. Required. + APIToken string + + // OrgSlug is the Buildkite organisation slug. Required. + OrgSlug string + + // Branch is the target branch builds run against (e.g. "main"). Required. + Branch string + + // Pipelines maps SQ queue names to Buildkite pipeline slugs. + // For returns an error for any queue not present in this map. + Pipelines map[string]string + + // HTTPClient overrides the HTTP client for all runners this factory + // produces. If nil, http.DefaultClient is used. Intended for testing. + HTTPClient *http.Client + + // BaseURL overrides the Buildkite API base URL for all produced runners. + // Intended for testing. + BaseURL string +} + +var _ buildrunner.Factory = Factory{} + +// For returns a BuildRunner bound to the Buildkite pipeline configured for +// cfg.QueueName. Returns an error if no pipeline is configured for the queue. +func (f Factory) For(cfg buildrunner.Config) (buildrunner.BuildRunner, error) { + pipeline, ok := f.Pipelines[cfg.QueueName] + if !ok { + return nil, fmt.Errorf("buildkite: no pipeline configured for queue %q", cfg.QueueName) + } + return New(Config{ + APIToken: f.APIToken, + OrgSlug: f.OrgSlug, + PipelineSlug: pipeline, + QueueName: cfg.QueueName, + Branch: f.Branch, + HTTPClient: f.HTTPClient, + BaseURL: f.BaseURL, + }) +} From 85e6f48394708487ce9ceea9fc1e726c1b942c6d Mon Sep 17 00:00:00 2001 From: Jamy Timmermans Date: Thu, 4 Jun 2026 11:42:17 -0700 Subject: [PATCH 2/8] refactor(buildrunner): adopt Params pattern; inject auth and base URL via HTTP transport - Replace New(Config) with NewBuildRunner(Params) following the MergeChecker Params pattern: Config holds queue-specific settings, HTTPClient is injected by the caller with auth and base URL already configured via transports - Remove APIToken from Config; auth is now the caller's concern via a transport layer (e.g. Authorization-header RoundTripper) - Remove BaseURL from Config and client; client now uses relative paths so BaseURLTransport on the injected client resolves them to absolute URLs - Delete factory.go; the Factory belongs in the example server (per-queue wiring is an application-level concern, not an extension concern) --- .../buildrunner/buildkite/buildkite.go | 35 +++++---- .../buildrunner/buildkite/buildkite_test.go | 69 ++++------------ .../extension/buildrunner/buildkite/client.go | 13 ++-- .../extension/buildrunner/buildkite/config.go | 21 +---- .../buildrunner/buildkite/factory.go | 78 ------------------- 5 files changed, 44 insertions(+), 172 deletions(-) delete mode 100644 submitqueue/extension/buildrunner/buildkite/factory.go diff --git a/submitqueue/extension/buildrunner/buildkite/buildkite.go b/submitqueue/extension/buildrunner/buildkite/buildkite.go index 09c740be..8b2deb92 100644 --- a/submitqueue/extension/buildrunner/buildkite/buildkite.go +++ b/submitqueue/extension/buildrunner/buildkite/buildkite.go @@ -130,16 +130,25 @@ type runner struct { var _ buildrunner.BuildRunner = (*runner)(nil) -// New constructs a Buildkite-backed BuildRunner bound to a single pipeline and -// starts its background worker goroutine. The goroutine runs for the lifetime -// of the process. +// Params holds the dependencies for a Buildkite BuildRunner. The caller is +// responsible for configuring HTTPClient with the base URL (via +// httpclient.BaseURLTransport) and auth (via an Authorization-header transport). +type Params struct { + // Config holds Buildkite-specific configuration for a single queue. + Config Config + // HTTPClient is a pre-configured HTTP client. The caller is responsible + // for the base URL (via httpclient.BaseURLTransport) and auth (via a + // transport layer). If nil, http.DefaultClient is used. + HTTPClient *http.Client +} + +// NewBuildRunner constructs a Buildkite-backed BuildRunner bound to a single +// pipeline and starts its background worker goroutine. The goroutine runs for +// the lifetime of the process. // -// Returns an error if APIToken, OrgSlug, PipelineSlug, or Branch are empty. -// Prefer Factory.For over calling New directly; New is exported for testing. -func New(cfg Config) (buildrunner.BuildRunner, error) { - if cfg.APIToken == "" { - return nil, fmt.Errorf("buildkite: APIToken is required") - } +// Returns an error if OrgSlug, PipelineSlug, or Branch are empty. +func NewBuildRunner(params Params) (buildrunner.BuildRunner, error) { + cfg := params.Config if cfg.OrgSlug == "" { return nil, fmt.Errorf("buildkite: OrgSlug is required") } @@ -150,11 +159,7 @@ func New(cfg Config) (buildrunner.BuildRunner, error) { return nil, fmt.Errorf("buildkite: Branch is required") } - baseURL := cfg.BaseURL - if baseURL == "" { - baseURL = "https://api.buildkite.com/v2" - } - httpClient := cfg.HTTPClient + httpClient := params.HTTPClient if httpClient == nil { httpClient = http.DefaultClient } @@ -168,9 +173,7 @@ func New(cfg Config) (buildrunner.BuildRunner, error) { } r := newRunner(cfg, &client{ - token: cfg.APIToken, httpClient: httpClient, - baseURL: baseURL, }, triggerSize, cancelSize) go r.work() return r, nil diff --git a/submitqueue/extension/buildrunner/buildkite/buildkite_test.go b/submitqueue/extension/buildrunner/buildkite/buildkite_test.go index 920d72ee..73731869 100644 --- a/submitqueue/extension/buildrunner/buildkite/buildkite_test.go +++ b/submitqueue/extension/buildrunner/buildkite/buildkite_test.go @@ -25,6 +25,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/uber/submitqueue/core/httpclient" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/buildrunner" ) @@ -37,9 +38,10 @@ func newTestRunner(t *testing.T, handler http.Handler) *runner { t.Helper() srv := httptest.NewServer(handler) t.Cleanup(srv.Close) + c, err := httpclient.NewClient(srv.URL) + require.NoError(t, err) return newRunner( Config{ - APIToken: "test-token", OrgSlug: "test-org", PipelineSlug: "my-pipeline", QueueName: "my-queue", @@ -48,11 +50,7 @@ func newTestRunner(t *testing.T, handler http.Handler) *runner { MaxSubmitAttempts: 3, SubmitBackoff: time.Millisecond, }, - &client{ - token: "test-token", - httpClient: http.DefaultClient, - baseURL: srv.URL, - }, + &client{httpClient: c}, 16, // triggerSize 16, // cancelSize ) @@ -102,63 +100,32 @@ func emptyListHandler(t *testing.T) http.HandlerFunc { // --- Interface / constructor --- func TestNew_ImplementsInterface(t *testing.T) { - r, err := New(Config{ - APIToken: "tok", + r, err := NewBuildRunner(Params{Config: Config{ OrgSlug: "org", PipelineSlug: "pipeline", Branch: "main", - }) + }}) require.NoError(t, err) var _ buildrunner.BuildRunner = r } func TestNew_Validation(t *testing.T) { tests := []struct { - name string - cfg Config + name string + params Params }{ - {"missing token", Config{OrgSlug: "org", PipelineSlug: "p", Branch: "main"}}, - {"missing org", Config{APIToken: "tok", PipelineSlug: "p", Branch: "main"}}, - {"missing pipeline", Config{APIToken: "tok", OrgSlug: "org", Branch: "main"}}, - {"missing branch", Config{APIToken: "tok", OrgSlug: "org", PipelineSlug: "p"}}, + {"missing org", Params{Config: Config{PipelineSlug: "p", Branch: "main"}}}, + {"missing pipeline", Params{Config: Config{OrgSlug: "org", Branch: "main"}}}, + {"missing branch", Params{Config: Config{OrgSlug: "org", PipelineSlug: "p"}}}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, err := New(tt.cfg) + _, err := NewBuildRunner(tt.params) require.Error(t, err) }) } } -// --- Factory --- - -func TestFactory_ImplementsInterface(t *testing.T) { - var _ buildrunner.Factory = Factory{} -} - -func TestFactory_For_Success(t *testing.T) { - f := Factory{ - APIToken: "tok", - OrgSlug: "org", - Branch: "main", - Pipelines: map[string]string{"my-queue": "my-pipeline"}, - } - r, err := f.For(buildrunner.Config{QueueName: "my-queue"}) - require.NoError(t, err) - require.NotNil(t, r) -} - -func TestFactory_For_UnknownQueue(t *testing.T) { - f := Factory{ - APIToken: "tok", - OrgSlug: "org", - Branch: "main", - Pipelines: map[string]string{"my-queue": "my-pipeline"}, - } - _, err := f.For(buildrunner.Config{QueueName: "unknown"}) - require.Error(t, err) -} - // --- Trigger --- func TestTrigger_EnqueuesJobAndReturnsID(t *testing.T) { @@ -188,12 +155,11 @@ func TestTrigger_StatusIsAcceptedBeforeWorkerRuns(t *testing.T) { } func TestTrigger_SubmitsCorrectPayloadToBuildkite(t *testing.T) { - var capturedMethod, capturedAuth string + var capturedMethod string var capturedBody []byte r := newTestRunner(t, http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { capturedMethod = req.Method - capturedAuth = req.Header.Get("Authorization") capturedBody, _ = io.ReadAll(req.Body) w.Header().Set("Content-Type", "application/json") _, _ = w.Write(buildJSON(42, "scheduled", "https://buildkite.com/test-org/my-pipeline/builds/42")) @@ -208,7 +174,6 @@ func TestTrigger_SubmitsCorrectPayloadToBuildkite(t *testing.T) { drainTrigger(t, r) assert.Equal(t, http.MethodPost, capturedMethod) - assert.Equal(t, "Bearer test-token", capturedAuth) var req createBuildRequest require.NoError(t, json.Unmarshal(capturedBody, &req)) @@ -270,8 +235,8 @@ func TestTrigger_MultipleChangesFlattened(t *testing.T) { func TestTrigger_QueueFull_ReturnsError(t *testing.T) { r := newRunner( - Config{APIToken: "tok", OrgSlug: "org", PipelineSlug: "p", Branch: "main"}, - &client{token: "tok", httpClient: http.DefaultClient, baseURL: "http://unused"}, + Config{OrgSlug: "org", PipelineSlug: "p", Branch: "main"}, + &client{httpClient: http.DefaultClient}, 1, 1, ) // Fill the channel. @@ -478,8 +443,8 @@ func TestCancel_AlreadyTerminal_Noop(t *testing.T) { func TestCancel_QueueFull_ReturnsError(t *testing.T) { r := newRunner( - Config{APIToken: "tok", OrgSlug: "org", PipelineSlug: "p", Branch: "main"}, - &client{token: "tok", httpClient: http.DefaultClient, baseURL: "http://unused"}, + Config{OrgSlug: "org", PipelineSlug: "p", Branch: "main"}, + &client{httpClient: http.DefaultClient}, 1, 1, ) require.NoError(t, r.Cancel(context.Background(), entity.BuildID{ID: "a"})) diff --git a/submitqueue/extension/buildrunner/buildkite/client.go b/submitqueue/extension/buildrunner/buildkite/client.go index 0480804c..119753ff 100644 --- a/submitqueue/extension/buildrunner/buildkite/client.go +++ b/submitqueue/extension/buildrunner/buildkite/client.go @@ -27,9 +27,7 @@ import ( // client is a thin wrapper around the Buildkite REST endpoints that BuildRunner // needs: create, get, list-by-metadata, and cancel a build. type client struct { - token string httpClient *http.Client - baseURL string } type createBuildRequest struct { @@ -52,7 +50,7 @@ func (c *client) createBuild(ctx context.Context, org, pipeline string, req crea if err != nil { return buildResponse{}, fmt.Errorf("marshal request: %w", err) } - u := fmt.Sprintf("%s/organizations/%s/pipelines/%s/builds", c.baseURL, org, pipeline) + u := fmt.Sprintf("/organizations/%s/pipelines/%s/builds", org, pipeline) var build buildResponse if err := c.do(ctx, http.MethodPost, u, body, &build); err != nil { return buildResponse{}, err @@ -61,7 +59,7 @@ func (c *client) createBuild(ctx context.Context, org, pipeline string, req crea } func (c *client) getBuild(ctx context.Context, org, pipeline string, number int) (buildResponse, error) { - u := fmt.Sprintf("%s/organizations/%s/pipelines/%s/builds/%d", c.baseURL, org, pipeline, number) + u := fmt.Sprintf("/organizations/%s/pipelines/%s/builds/%d", org, pipeline, number) var build buildResponse if err := c.do(ctx, http.MethodGet, u, nil, &build); err != nil { return buildResponse{}, err @@ -74,8 +72,8 @@ func (c *client) getBuild(ctx context.Context, org, pipeline string, number int) // recover the Buildkite reference from Buildkite itself (the source of truth) // when the in-memory cache misses, e.g. after a process restart. func (c *client) findBuildByMeta(ctx context.Context, org, pipeline, key, value string) (build buildResponse, ok bool, err error) { - u := fmt.Sprintf("%s/organizations/%s/pipelines/%s/builds?meta_data[%s]=%s", - c.baseURL, org, pipeline, url.QueryEscape(key), url.QueryEscape(value)) + u := fmt.Sprintf("/organizations/%s/pipelines/%s/builds?meta_data[%s]=%s", + org, pipeline, url.QueryEscape(key), url.QueryEscape(value)) var builds []buildResponse if err := c.do(ctx, http.MethodGet, u, nil, &builds); err != nil { return buildResponse{}, false, err @@ -90,7 +88,7 @@ func (c *client) findBuildByMeta(ctx context.Context, org, pipeline, key, value // terminal (HTTP 422) — the Buildkite API uses that status to indicate a // non-cancellable build, which the BuildRunner contract treats as a no-op. func (c *client) cancelBuild(ctx context.Context, org, pipeline string, number int) error { - u := fmt.Sprintf("%s/organizations/%s/pipelines/%s/builds/%d/cancel", c.baseURL, org, pipeline, number) + u := fmt.Sprintf("/organizations/%s/pipelines/%s/builds/%d/cancel", org, pipeline, number) req, err := http.NewRequestWithContext(ctx, http.MethodPut, u, nil) if err != nil { return fmt.Errorf("create request: %w", err) @@ -157,6 +155,5 @@ func (c *client) do(ctx context.Context, method, rawURL string, body []byte, out } func (c *client) setHeaders(req *http.Request) { - req.Header.Set("Authorization", "Bearer "+c.token) req.Header.Set("Content-Type", "application/json") } diff --git a/submitqueue/extension/buildrunner/buildkite/config.go b/submitqueue/extension/buildrunner/buildkite/config.go index 1c1ada41..b4e193d8 100644 --- a/submitqueue/extension/buildrunner/buildkite/config.go +++ b/submitqueue/extension/buildrunner/buildkite/config.go @@ -14,23 +14,16 @@ package buildkite -import ( - "net/http" - "time" -) +import "time" -// Config holds the per-queue settings for a Buildkite BuildRunner. It is -// constructed by Factory.For for each queue and bound to a single pipeline. +// Config holds the per-queue settings for a Buildkite BuildRunner. type Config struct { - // APIToken is the Buildkite personal or agent API token. Required. - APIToken string - // OrgSlug is the Buildkite organisation slug (URL segment after // buildkite.com/). Required. OrgSlug string // PipelineSlug is the Buildkite pipeline that runs builds for this queue. - // Resolved by Factory from its Pipelines map. Required. + // Required. PipelineSlug string // QueueName is the SQ queue this runner serves. Passed as SQ_QUEUE in @@ -62,12 +55,4 @@ type Config struct { // SubmitBackoff is the base delay between background-worker retries; the // delay grows linearly with the attempt number. Defaults to 1s. SubmitBackoff time.Duration - - // HTTPClient overrides the HTTP client used for Buildkite API calls. - // If nil, http.DefaultClient is used. Intended for testing. - HTTPClient *http.Client - - // BaseURL overrides the Buildkite API base URL - // (default: "https://api.buildkite.com/v2"). Intended for testing. - BaseURL string } diff --git a/submitqueue/extension/buildrunner/buildkite/factory.go b/submitqueue/extension/buildrunner/buildkite/factory.go deleted file mode 100644 index 28e1afd3..00000000 --- a/submitqueue/extension/buildrunner/buildkite/factory.go +++ /dev/null @@ -1,78 +0,0 @@ -// Copyright (c) 2025 Uber Technologies, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package buildkite - -import ( - "fmt" - "net/http" - - "github.com/uber/submitqueue/submitqueue/extension/buildrunner" -) - -// Factory implements buildrunner.Factory. It holds credentials shared across -// all queues and resolves the per-queue pipeline slug at For time. -// -// Typical wiring in an orchestrator server: -// -// buildkite.Factory{ -// APIToken: os.Getenv("BUILDKITE_API_TOKEN"), -// OrgSlug: "my-org", -// Branch: "main", -// Pipelines: map[string]string{ -// "my-queue": "my-pipeline-slug", -// }, -// } -type Factory struct { - // APIToken is the Buildkite personal or agent API token. Required. - APIToken string - - // OrgSlug is the Buildkite organisation slug. Required. - OrgSlug string - - // Branch is the target branch builds run against (e.g. "main"). Required. - Branch string - - // Pipelines maps SQ queue names to Buildkite pipeline slugs. - // For returns an error for any queue not present in this map. - Pipelines map[string]string - - // HTTPClient overrides the HTTP client for all runners this factory - // produces. If nil, http.DefaultClient is used. Intended for testing. - HTTPClient *http.Client - - // BaseURL overrides the Buildkite API base URL for all produced runners. - // Intended for testing. - BaseURL string -} - -var _ buildrunner.Factory = Factory{} - -// For returns a BuildRunner bound to the Buildkite pipeline configured for -// cfg.QueueName. Returns an error if no pipeline is configured for the queue. -func (f Factory) For(cfg buildrunner.Config) (buildrunner.BuildRunner, error) { - pipeline, ok := f.Pipelines[cfg.QueueName] - if !ok { - return nil, fmt.Errorf("buildkite: no pipeline configured for queue %q", cfg.QueueName) - } - return New(Config{ - APIToken: f.APIToken, - OrgSlug: f.OrgSlug, - PipelineSlug: pipeline, - QueueName: cfg.QueueName, - Branch: f.Branch, - HTTPClient: f.HTTPClient, - BaseURL: f.BaseURL, - }) -} From a8a06784d92380d268eebd04c4e06d2a71ab721b Mon Sep 17 00:00:00 2001 From: Jamy Timmermans Date: Thu, 4 Jun 2026 13:27:44 -0700 Subject: [PATCH 3/8] fixup! move some more variables into the httpClient --- .../buildrunner/buildkite/BUILD.bazel | 2 +- .../buildrunner/buildkite/buildkite.go | 63 ++++++----------- .../buildrunner/buildkite/buildkite_test.go | 68 +++++-------------- .../extension/buildrunner/buildkite/client.go | 19 +++--- .../extension/buildrunner/buildkite/config.go | 11 --- 5 files changed, 46 insertions(+), 117 deletions(-) diff --git a/submitqueue/extension/buildrunner/buildkite/BUILD.bazel b/submitqueue/extension/buildrunner/buildkite/BUILD.bazel index 0bfbdabc..1b44e422 100644 --- a/submitqueue/extension/buildrunner/buildkite/BUILD.bazel +++ b/submitqueue/extension/buildrunner/buildkite/BUILD.bazel @@ -6,7 +6,6 @@ go_library( "buildkite.go", "client.go", "config.go", - "factory.go", ], importpath = "github.com/uber/submitqueue/submitqueue/extension/buildrunner/buildkite", visibility = ["//visibility:public"], @@ -21,6 +20,7 @@ go_test( srcs = ["buildkite_test.go"], embed = [":buildkite"], deps = [ + "//core/httpclient", "//submitqueue/entity", "//submitqueue/extension/buildrunner", "@com_github_stretchr_testify//assert", diff --git a/submitqueue/extension/buildrunner/buildkite/buildkite.go b/submitqueue/extension/buildrunner/buildkite/buildkite.go index 8b2deb92..05b2c8ab 100644 --- a/submitqueue/extension/buildrunner/buildkite/buildkite.go +++ b/submitqueue/extension/buildrunner/buildkite/buildkite.go @@ -46,7 +46,6 @@ import ( "fmt" "net/http" "strconv" - "strings" "sync" "time" @@ -146,19 +145,11 @@ type Params struct { // pipeline and starts its background worker goroutine. The goroutine runs for // the lifetime of the process. // -// Returns an error if OrgSlug, PipelineSlug, or Branch are empty. +// The HTTPClient must have BaseURLTransport configured to the pipeline's API +// root (e.g. "https://api.buildkite.com/v2/organizations/{org}/pipelines/{slug}"), +// and an auth transport that injects the Authorization header. func NewBuildRunner(params Params) (buildrunner.BuildRunner, error) { cfg := params.Config - if cfg.OrgSlug == "" { - return nil, fmt.Errorf("buildkite: OrgSlug is required") - } - if cfg.PipelineSlug == "" { - return nil, fmt.Errorf("buildkite: PipelineSlug is required") - } - if cfg.Branch == "" { - return nil, fmt.Errorf("buildkite: Branch is required") - } - httpClient := params.HTTPClient if httpClient == nil { httpClient = http.DefaultClient @@ -231,11 +222,11 @@ func (r *runner) Status(ctx context.Context, buildID entity.BuildID) (entity.Bui var resp buildResponse switch { case cached: - org, pipeline, number, err := parseBuildRef(ref) + number, err := parseBuildRef(ref) if err != nil { return entity.BuildStatusUnknown, nil, fmt.Errorf("buildkite: malformed build ref: %w", err) } - resp, err = r.client.getBuild(ctx, org, pipeline, number) + resp, err = r.client.getBuild(ctx, number) if err != nil { return entity.BuildStatusUnknown, nil, fmt.Errorf("buildkite: get build: %w", err) } @@ -246,7 +237,7 @@ func (r *runner) Status(ctx context.Context, buildID entity.BuildID) (entity.Bui // loop in Accepted forever. return entity.BuildStatusFailed, entity.BuildMetadata{"error": reason}, nil } - found, exists, err := r.client.findBuildByMeta(ctx, r.cfg.OrgSlug, r.cfg.PipelineSlug, metaKeyBuildID, buildID.ID) + found, exists, err := r.client.findBuildByMeta(ctx, metaKeyBuildID, buildID.ID) if err != nil { return entity.BuildStatusUnknown, nil, fmt.Errorf("buildkite: find build: %w", err) } @@ -256,7 +247,7 @@ func (r *runner) Status(ctx context.Context, buildID entity.BuildID) (entity.Bui // loop keeps waiting without treating this as terminal. return entity.BuildStatusAccepted, nil, nil } - ref = encodeBuildRef(r.cfg.OrgSlug, r.cfg.PipelineSlug, found.Number) + ref = encodeBuildRef(found.Number) r.storeRef(buildID.ID, ref) resp = found } @@ -305,7 +296,6 @@ func (r *runner) processTrigger(job triggerJob) { headJSON, _ := json.Marshal(job.headURIs) req := createBuildRequest{ - Branch: r.cfg.Branch, Message: "submitqueue speculative build", Env: map[string]string{ EnvKeyBaseURIs: string(baseJSON), @@ -322,7 +312,7 @@ func (r *runner) processTrigger(job triggerJob) { ctx, cancel := r.opCtx() defer cancel() var e error - resp, e = r.client.createBuild(ctx, r.cfg.OrgSlug, r.cfg.PipelineSlug, req) + resp, e = r.client.createBuild(ctx, req) return e }) if err != nil { @@ -330,7 +320,7 @@ func (r *runner) processTrigger(job triggerJob) { return } - r.storeRef(job.buildID, encodeBuildRef(r.cfg.OrgSlug, r.cfg.PipelineSlug, resp.Number)) + r.storeRef(job.buildID, encodeBuildRef(resp.Number)) } // processCancel cancels the Buildkite build, recovering its reference from the @@ -340,18 +330,18 @@ func (r *runner) processCancel(job cancelJob) { ref, cached := r.lookupRef(job.buildID) if !cached { ctx, cancel := r.opCtx() - found, exists, err := r.client.findBuildByMeta(ctx, r.cfg.OrgSlug, r.cfg.PipelineSlug, metaKeyBuildID, job.buildID) + found, exists, err := r.client.findBuildByMeta(ctx, metaKeyBuildID, job.buildID) cancel() if err != nil || !exists { // Nothing to cancel (not yet submitted) or a transient lookup // failure; the caller may re-issue Cancel. return } - ref = encodeBuildRef(r.cfg.OrgSlug, r.cfg.PipelineSlug, found.Number) + ref = encodeBuildRef(found.Number) r.storeRef(job.buildID, ref) } - org, pipeline, number, err := parseBuildRef(ref) + number, err := parseBuildRef(ref) if err != nil { return } @@ -359,7 +349,7 @@ func (r *runner) processCancel(job cancelJob) { _ = r.withRetry(func() error { ctx, cancel := r.opCtx() defer cancel() - return r.client.cancelBuild(ctx, org, pipeline, number) + return r.client.cancelBuild(ctx, number) }) } @@ -450,30 +440,19 @@ func flattenURIs(changes []entity.Change) []string { return uris } -// encodeBuildRef encodes org, pipeline, and build number into the internal -// reference string stored in r.refs. -// Format: "{org}/{pipeline}/{number}". Buildkite slugs are [a-z0-9-] so "/" -// is unambiguous as a separator. -func encodeBuildRef(org, pipeline string, number int) string { - return fmt.Sprintf("%s/%s/%d", org, pipeline, number) +// encodeBuildRef encodes a Buildkite build number as the internal reference +// string stored in r.refs. +func encodeBuildRef(number int) string { + return strconv.Itoa(number) } // parseBuildRef is the inverse of encodeBuildRef. -func parseBuildRef(ref string) (org, pipeline string, number int, err error) { - last := strings.LastIndex(ref, "/") - if last < 1 { - return "", "", 0, fmt.Errorf("invalid build ref %q", ref) - } - number, err = strconv.Atoi(ref[last+1:]) +func parseBuildRef(ref string) (int, error) { + n, err := strconv.Atoi(ref) if err != nil { - return "", "", 0, fmt.Errorf("invalid build ref %q: non-numeric number segment", ref) - } - prefix := ref[:last] - first := strings.Index(prefix, "/") - if first < 1 { - return "", "", 0, fmt.Errorf("invalid build ref %q", ref) + return 0, fmt.Errorf("invalid build ref %q", ref) } - return prefix[:first], prefix[first+1:], number, nil + return n, nil } // mapState maps a Buildkite build state string to a BuildStatus. diff --git a/submitqueue/extension/buildrunner/buildkite/buildkite_test.go b/submitqueue/extension/buildrunner/buildkite/buildkite_test.go index 73731869..8b908784 100644 --- a/submitqueue/extension/buildrunner/buildkite/buildkite_test.go +++ b/submitqueue/extension/buildrunner/buildkite/buildkite_test.go @@ -42,10 +42,7 @@ func newTestRunner(t *testing.T, handler http.Handler) *runner { require.NoError(t, err) return newRunner( Config{ - OrgSlug: "test-org", - PipelineSlug: "my-pipeline", QueueName: "my-queue", - Branch: "main", SubmitTimeout: 5 * time.Second, MaxSubmitAttempts: 3, SubmitBackoff: time.Millisecond, @@ -100,32 +97,11 @@ func emptyListHandler(t *testing.T) http.HandlerFunc { // --- Interface / constructor --- func TestNew_ImplementsInterface(t *testing.T) { - r, err := NewBuildRunner(Params{Config: Config{ - OrgSlug: "org", - PipelineSlug: "pipeline", - Branch: "main", - }}) + r, err := NewBuildRunner(Params{}) require.NoError(t, err) var _ buildrunner.BuildRunner = r } -func TestNew_Validation(t *testing.T) { - tests := []struct { - name string - params Params - }{ - {"missing org", Params{Config: Config{PipelineSlug: "p", Branch: "main"}}}, - {"missing pipeline", Params{Config: Config{OrgSlug: "org", Branch: "main"}}}, - {"missing branch", Params{Config: Config{OrgSlug: "org", PipelineSlug: "p"}}}, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - _, err := NewBuildRunner(tt.params) - require.Error(t, err) - }) - } -} - // --- Trigger --- func TestTrigger_EnqueuesJobAndReturnsID(t *testing.T) { @@ -177,7 +153,6 @@ func TestTrigger_SubmitsCorrectPayloadToBuildkite(t *testing.T) { var req createBuildRequest require.NoError(t, json.Unmarshal(capturedBody, &req)) - assert.Equal(t, "main", req.Branch) assert.Equal(t, `["github://org/repo/pull/1/aaa111"]`, req.Env[EnvKeyBaseURIs]) assert.Equal(t, `["github://org/repo/pull/2/bbb222"]`, req.Env[EnvKeyHeadURIs]) assert.Equal(t, "my-queue", req.Env[EnvKeyQueue]) @@ -188,7 +163,7 @@ func TestTrigger_SubmitsCorrectPayloadToBuildkite(t *testing.T) { // After a successful submit the ref is cached, so Status uses getBuild. ref, ok := r.lookupRef(id.ID) require.True(t, ok) - assert.Equal(t, encodeBuildRef("test-org", "my-pipeline", 42), ref) + assert.Equal(t, encodeBuildRef(42), ref) } func TestTrigger_EmptyBase_ProducesJSONArray(t *testing.T) { @@ -235,7 +210,7 @@ func TestTrigger_MultipleChangesFlattened(t *testing.T) { func TestTrigger_QueueFull_ReturnsError(t *testing.T) { r := newRunner( - Config{OrgSlug: "org", PipelineSlug: "p", Branch: "main"}, + Config{}, &client{httpClient: http.DefaultClient}, 1, 1, ) @@ -267,7 +242,7 @@ func TestProcessTrigger_RetriesTransientFailureThenSucceeds(t *testing.T) { assert.Equal(t, 2, posts, "submit should retry after a transient failure") ref, ok := r.lookupRef(id.ID) require.True(t, ok) - assert.Equal(t, encodeBuildRef("test-org", "my-pipeline", 7), ref) + assert.Equal(t, encodeBuildRef(7), ref) } func TestTrigger_SubmitExhaustsRetries_BuildFails(t *testing.T) { @@ -331,7 +306,7 @@ func TestStatus_ReturnsLiveBuildkiteState(t *testing.T) { })) // Inject ref directly (simulates successful processTrigger). - r.storeRef("some-id", encodeBuildRef("test-org", "my-pipeline", 7)) + r.storeRef("some-id", encodeBuildRef(7)) status, meta, err := r.Status(context.Background(), entity.BuildID{ID: "some-id"}) require.NoError(t, err) @@ -360,14 +335,14 @@ func TestStatus_RecoversRefAfterCacheMiss(t *testing.T) { // The recovered ref is now cached for subsequent calls. ref, ok := r.lookupRef("bk-lost") require.True(t, ok) - assert.Equal(t, encodeBuildRef("test-org", "my-pipeline", 7), ref) + assert.Equal(t, encodeBuildRef(7), ref) } func TestStatus_BuildkiteNotFound(t *testing.T) { r := newTestRunner(t, http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusNotFound) })) - r.storeRef("some-id", encodeBuildRef("test-org", "my-pipeline", 99)) + r.storeRef("some-id", encodeBuildRef(99)) _, _, err := r.Status(context.Background(), entity.BuildID{ID: "some-id"}) require.Error(t, err) @@ -391,7 +366,7 @@ func TestCancel_CallsBuildkiteWhenRefKnown(t *testing.T) { w.WriteHeader(http.StatusOK) _, _ = w.Write(buildJSON(5, "canceled", "")) })) - r.storeRef("some-id", encodeBuildRef("test-org", "my-pipeline", 5)) + r.storeRef("some-id", encodeBuildRef(5)) require.NoError(t, r.Cancel(context.Background(), entity.BuildID{ID: "some-id"})) drainCancel(t, r) @@ -435,7 +410,7 @@ func TestCancel_AlreadyTerminal_Noop(t *testing.T) { r := newTestRunner(t, http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusUnprocessableEntity) })) - r.storeRef("some-id", encodeBuildRef("test-org", "my-pipeline", 5)) + r.storeRef("some-id", encodeBuildRef(5)) require.NoError(t, r.Cancel(context.Background(), entity.BuildID{ID: "some-id"})) drainCancel(t, r) // must not panic or error @@ -443,7 +418,7 @@ func TestCancel_AlreadyTerminal_Noop(t *testing.T) { func TestCancel_QueueFull_ReturnsError(t *testing.T) { r := newRunner( - Config{OrgSlug: "org", PipelineSlug: "p", Branch: "main"}, + Config{}, &client{httpClient: http.DefaultClient}, 1, 1, ) @@ -454,29 +429,18 @@ func TestCancel_QueueFull_ReturnsError(t *testing.T) { // --- Internal helpers --- func TestEncodeParseBuildRef_RoundTrip(t *testing.T) { - tests := []struct { - org string - pipeline string - number int - }{ - {"myorg", "my-pipeline", 1}, - {"uber", "submit-queue-ci", 9999}, - {"a", "b", 0}, - } - for _, tt := range tests { - ref := encodeBuildRef(tt.org, tt.pipeline, tt.number) - org, pipeline, number, err := parseBuildRef(ref) + for _, n := range []int{1, 9999, 0} { + ref := encodeBuildRef(n) + got, err := parseBuildRef(ref) require.NoError(t, err) - assert.Equal(t, tt.org, org) - assert.Equal(t, tt.pipeline, pipeline) - assert.Equal(t, tt.number, number) + assert.Equal(t, n, got) } } func TestParseBuildRef_Invalid(t *testing.T) { - for _, ref := range []string{"", "noslash", "only/one", "org/pipeline/notanumber"} { + for _, ref := range []string{"", "notanumber", "org/pipeline/1"} { t.Run(ref, func(t *testing.T) { - _, _, _, err := parseBuildRef(ref) + _, err := parseBuildRef(ref) require.Error(t, err) }) } diff --git a/submitqueue/extension/buildrunner/buildkite/client.go b/submitqueue/extension/buildrunner/buildkite/client.go index 119753ff..e7258222 100644 --- a/submitqueue/extension/buildrunner/buildkite/client.go +++ b/submitqueue/extension/buildrunner/buildkite/client.go @@ -31,7 +31,6 @@ type client struct { } type createBuildRequest struct { - Branch string `json:"branch"` Message string `json:"message"` Env map[string]string `json:"env"` MetaData map[string]string `json:"meta_data,omitempty"` @@ -45,21 +44,20 @@ type buildResponse struct { WebURL string `json:"web_url"` } -func (c *client) createBuild(ctx context.Context, org, pipeline string, req createBuildRequest) (buildResponse, error) { +func (c *client) createBuild(ctx context.Context, req createBuildRequest) (buildResponse, error) { body, err := json.Marshal(req) if err != nil { return buildResponse{}, fmt.Errorf("marshal request: %w", err) } - u := fmt.Sprintf("/organizations/%s/pipelines/%s/builds", org, pipeline) var build buildResponse - if err := c.do(ctx, http.MethodPost, u, body, &build); err != nil { + if err := c.do(ctx, http.MethodPost, "/builds", body, &build); err != nil { return buildResponse{}, err } return build, nil } -func (c *client) getBuild(ctx context.Context, org, pipeline string, number int) (buildResponse, error) { - u := fmt.Sprintf("/organizations/%s/pipelines/%s/builds/%d", org, pipeline, number) +func (c *client) getBuild(ctx context.Context, number int) (buildResponse, error) { + u := fmt.Sprintf("/builds/%d", number) var build buildResponse if err := c.do(ctx, http.MethodGet, u, nil, &build); err != nil { return buildResponse{}, err @@ -71,9 +69,8 @@ func (c *client) getBuild(ctx context.Context, org, pipeline string, number int) // value. ok is false when no such build exists yet. This lets Status and Cancel // recover the Buildkite reference from Buildkite itself (the source of truth) // when the in-memory cache misses, e.g. after a process restart. -func (c *client) findBuildByMeta(ctx context.Context, org, pipeline, key, value string) (build buildResponse, ok bool, err error) { - u := fmt.Sprintf("/organizations/%s/pipelines/%s/builds?meta_data[%s]=%s", - org, pipeline, url.QueryEscape(key), url.QueryEscape(value)) +func (c *client) findBuildByMeta(ctx context.Context, key, value string) (build buildResponse, ok bool, err error) { + u := fmt.Sprintf("/builds?meta_data[%s]=%s", url.QueryEscape(key), url.QueryEscape(value)) var builds []buildResponse if err := c.do(ctx, http.MethodGet, u, nil, &builds); err != nil { return buildResponse{}, false, err @@ -87,8 +84,8 @@ func (c *client) findBuildByMeta(ctx context.Context, org, pipeline, key, value // cancelBuild requests cancellation. Returns nil when the build is already // terminal (HTTP 422) — the Buildkite API uses that status to indicate a // non-cancellable build, which the BuildRunner contract treats as a no-op. -func (c *client) cancelBuild(ctx context.Context, org, pipeline string, number int) error { - u := fmt.Sprintf("/organizations/%s/pipelines/%s/builds/%d/cancel", org, pipeline, number) +func (c *client) cancelBuild(ctx context.Context, number int) error { + u := fmt.Sprintf("/builds/%d/cancel", number) req, err := http.NewRequestWithContext(ctx, http.MethodPut, u, nil) if err != nil { return fmt.Errorf("create request: %w", err) diff --git a/submitqueue/extension/buildrunner/buildkite/config.go b/submitqueue/extension/buildrunner/buildkite/config.go index b4e193d8..bb4b9b36 100644 --- a/submitqueue/extension/buildrunner/buildkite/config.go +++ b/submitqueue/extension/buildrunner/buildkite/config.go @@ -18,22 +18,11 @@ import "time" // Config holds the per-queue settings for a Buildkite BuildRunner. type Config struct { - // OrgSlug is the Buildkite organisation slug (URL segment after - // buildkite.com/). Required. - OrgSlug string - - // PipelineSlug is the Buildkite pipeline that runs builds for this queue. - // Required. - PipelineSlug string - // QueueName is the SQ queue this runner serves. Passed as SQ_QUEUE in // the build environment so the pipeline script can select queue-specific // test targets. QueueName string - // Branch is the target branch builds run against (e.g. "main"). Required. - Branch string - // TriggerQueueSize is the buffer capacity of the async trigger channel. // Trigger returns an error when the channel is full (the build controller // will nack and retry). Defaults to 256. From 290ead58a3d42b4b657f36ad438dbec198a404ab Mon Sep 17 00:00:00 2001 From: Jamy Timmermans Date: Thu, 4 Jun 2026 16:27:30 -0700 Subject: [PATCH 4/8] refactor(buildrunner): call Buildkite directly instead of via channels Replace the channel-based async machinery (triggerCh, cancelCh, background worker goroutine, retry loop, submitFailures map) with direct synchronous Buildkite API calls in Trigger and Cancel. Errors propagate to the caller so the queue consumer can nack and retry through the normal path. Config is simplified to QueueName only; the channel buffer sizes, submit timeout, max attempts, and backoff fields are removed. Tests drop the drainTrigger/drainCancel helpers and async-specific cases in favour of straightforward call-and-assert patterns. --- .../buildrunner/buildkite/buildkite.go | 318 ++++-------------- .../buildrunner/buildkite/buildkite_test.go | 164 +-------- .../extension/buildrunner/buildkite/config.go | 24 -- 3 files changed, 78 insertions(+), 428 deletions(-) diff --git a/submitqueue/extension/buildrunner/buildkite/buildkite.go b/submitqueue/extension/buildrunner/buildkite/buildkite.go index 05b2c8ab..0001210d 100644 --- a/submitqueue/extension/buildrunner/buildkite/buildkite.go +++ b/submitqueue/extension/buildrunner/buildkite/buildkite.go @@ -15,15 +15,10 @@ // Package buildkite implements buildrunner.BuildRunner backed by the Buildkite // CI platform. // -// Trigger is non-blocking: it generates a build ID, enqueues the job on a -// buffered channel, and returns immediately. This keeps the orchestrator's -// queue loop decoupled from Buildkite availability — provider-side work -// happens asynchronously, per the BuildRunner contract. A background worker -// drains the channel, submits the build to Buildkite (retrying transient -// failures with backoff), and stamps the SQ build ID into the build's -// metadata. If submission fails after all retries, the build is recorded as a -// submission failure and Status reports it as terminal Failed. Cancel is -// similarly async. +// Trigger calls the Buildkite API directly: it generates a build ID, stamps it +// into the build's metadata, and returns the ID on success. Cancel calls the +// Buildkite API directly as well. Both propagate errors to the caller, which can +// nack and retry via the normal queue consumer path. // // The in-memory map from SQ build ID to Buildkite reference is a pure latency // cache, not the source of truth. Because every build carries its SQ build ID @@ -47,7 +42,6 @@ import ( "net/http" "strconv" "sync" - "time" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/buildrunner" @@ -76,55 +70,19 @@ const ( // a restart), which keeps that cache from being a durable source of truth. const metaKeyBuildID = "sq_build_id" -const ( - defaultTriggerQueueSize = 256 - defaultCancelQueueSize = 256 - defaultSubmitTimeout = 30 * time.Second - defaultMaxSubmitAttempts = 5 - defaultSubmitBackoff = 1 * time.Second -) - -// triggerJob carries everything the background worker needs to submit one build -// to Buildkite. It is enqueued by Trigger and consumed by processTrigger. -type triggerJob struct { - buildID string - baseURIs []string - headURIs []string -} - -// cancelJob carries the build ID the background worker should cancel in -// Buildkite. It is enqueued by Cancel and consumed by processCancel. -type cancelJob struct { - buildID string -} - -// runner implements buildrunner.BuildRunner. A background goroutine drains -// triggerCh and cancelCh for the lifetime of the runner, dispatching each job -// to its own goroutine so that a slow submit (retry/backoff) never head-of-line -// blocks other builds. +// runner implements buildrunner.BuildRunner. type runner struct { cfg Config client *client - // mu protects refs and submitFailures. + // mu protects refs. mu sync.RWMutex // refs maps our internal build ID to the encoded Buildkite reference - // ("{org}/{pipeline}/{number}"). It is a pure latency cache: the durable - // record is the SQ build ID stamped into the Buildkite build's metadata, - // so a missing entry is recovered via the client's metadata lookup rather - // than treated as authoritative. + // ("{number}"). It is a pure latency cache: the durable record is the SQ + // build ID stamped into the Buildkite build's metadata, so a missing entry + // is recovered via the client's metadata lookup rather than treated as + // authoritative. refs map[string]string - // submitFailures records build IDs whose Buildkite submission permanently - // failed (all retries exhausted), mapped to a short reason. Status reports - // these as terminal Failed so the build does not poll Accepted forever. - // This is in-memory only: a restart that loses it leaves the build Accepted, - // which the orchestrator's (out-of-scope) Accepted deadline must catch. - submitFailures map[string]string - - // triggerCh queues pending build-creation jobs from Trigger. - triggerCh chan triggerJob - // cancelCh queues pending build-cancellation jobs from Cancel. - cancelCh chan cancelJob } var _ buildrunner.BuildRunner = (*runner)(nil) @@ -142,80 +100,64 @@ type Params struct { } // NewBuildRunner constructs a Buildkite-backed BuildRunner bound to a single -// pipeline and starts its background worker goroutine. The goroutine runs for -// the lifetime of the process. +// pipeline. // // The HTTPClient must have BaseURLTransport configured to the pipeline's API // root (e.g. "https://api.buildkite.com/v2/organizations/{org}/pipelines/{slug}"), // and an auth transport that injects the Authorization header. func NewBuildRunner(params Params) (buildrunner.BuildRunner, error) { - cfg := params.Config httpClient := params.HTTPClient if httpClient == nil { httpClient = http.DefaultClient } - triggerSize := cfg.TriggerQueueSize - if triggerSize == 0 { - triggerSize = defaultTriggerQueueSize - } - cancelSize := cfg.CancelQueueSize - if cancelSize == 0 { - cancelSize = defaultCancelQueueSize - } - - r := newRunner(cfg, &client{ - httpClient: httpClient, - }, triggerSize, cancelSize) - go r.work() - return r, nil + return newRunner(params.Config, &client{httpClient: httpClient}), nil } -// newRunner constructs a runner without starting the background goroutine. -// Used by New and by tests (which drive the goroutine via drainTrigger / -// drainCancel to avoid timing races). -func newRunner(cfg Config, c *client, triggerSize, cancelSize int) *runner { +// newRunner constructs a runner. Used by NewBuildRunner and by tests. +func newRunner(cfg Config, c *client) *runner { return &runner{ - cfg: cfg, - client: c, - refs: make(map[string]string), - submitFailures: make(map[string]string), - triggerCh: make(chan triggerJob, triggerSize), - cancelCh: make(chan cancelJob, cancelSize), + cfg: cfg, + client: c, + refs: make(map[string]string), } } -// Trigger generates a unique build ID, enqueues an async job to submit the -// build to Buildkite, and returns immediately. The build is visible to Status -// as Accepted until the background worker contacts Buildkite and the build -// becomes discoverable by its stamped metadata. -// -// Returns an error (retryable by the controller) if the trigger queue is full. -func (r *runner) Trigger(_ context.Context, base, head []entity.Change, _ entity.BuildMetadata) (entity.BuildID, error) { +// Trigger generates a unique build ID, calls the Buildkite API to create the +// build, caches the Buildkite reference, and returns the build ID. Errors are +// propagated to the caller so the queue consumer can nack and retry. +func (r *runner) Trigger(ctx context.Context, base, head []entity.Change, _ entity.BuildMetadata) (entity.BuildID, error) { id := newBuildID() - job := triggerJob{ - buildID: id, - baseURIs: flattenURIs(base), - headURIs: flattenURIs(head), + baseJSON, _ := json.Marshal(flattenURIs(base)) + headJSON, _ := json.Marshal(flattenURIs(head)) + + req := createBuildRequest{ + Message: "submitqueue speculative build", + Env: map[string]string{ + EnvKeyBaseURIs: string(baseJSON), + EnvKeyHeadURIs: string(headJSON), + EnvKeyQueue: r.cfg.QueueName, + }, + MetaData: map[string]string{ + metaKeyBuildID: id, + }, } - select { - case r.triggerCh <- job: - return entity.BuildID{ID: id}, nil - default: - return entity.BuildID{}, fmt.Errorf("buildkite: trigger queue full; backpressure from Buildkite API") + + resp, err := r.client.createBuild(ctx, req) + if err != nil { + return entity.BuildID{}, fmt.Errorf("buildkite: create build: %w", err) } + + r.storeRef(id, encodeBuildRef(resp.Number)) + return entity.BuildID{ID: id}, nil } -// Status returns the current build status. While the async submission is still -// in flight (no Buildkite build carries this build ID yet) it returns Accepted. -// Once the build exists, Status fetches the live state from Buildkite and -// returns it with the build URL in BuildMetadata["url"]. +// Status returns the current build status. On a cache miss it recovers the +// Buildkite reference by filtering builds on the stamped SQ build ID, so it +// works after a restart that emptied the in-memory cache. Returns Accepted when +// the build is not yet visible in Buildkite. // -// If the submission permanently failed (all retries exhausted), Status reports -// terminal Failed with the reason in BuildMetadata["error"]. -// -// On a cache miss Status re-derives the Buildkite reference by filtering builds -// on the stamped SQ build ID, so it works after a restart that emptied the -// in-memory cache. +// On a cache hit, Status fetches the live state and returns it with the build +// URL in BuildMetadata["url"]. func (r *runner) Status(ctx context.Context, buildID entity.BuildID) (entity.BuildStatus, entity.BuildMetadata, error) { ref, cached := r.lookupRef(buildID.ID) @@ -231,20 +173,14 @@ func (r *runner) Status(ctx context.Context, buildID entity.BuildID) (entity.Bui return entity.BuildStatusUnknown, nil, fmt.Errorf("buildkite: get build: %w", err) } default: - if reason, failed := r.lookupSubmitFailure(buildID.ID); failed { - // The async submission exhausted its retries; nothing exists in - // Buildkite to poll. Report terminal Failed so the build does not - // loop in Accepted forever. - return entity.BuildStatusFailed, entity.BuildMetadata{"error": reason}, nil - } found, exists, err := r.client.findBuildByMeta(ctx, metaKeyBuildID, buildID.ID) if err != nil { return entity.BuildStatusUnknown, nil, fmt.Errorf("buildkite: find build: %w", err) } if !exists { - // Not yet visible in Buildkite — the submission is still in flight - // (or queued for retry). Report Accepted so the buildsignal poll - // loop keeps waiting without treating this as terminal. + // Not yet visible in Buildkite — e.g. after a restart where the cache + // was lost but the build was created before. Report Accepted so the + // buildsignal poll loop keeps waiting. return entity.BuildStatusAccepted, nil, nil } ref = encodeBuildRef(found.Number) @@ -255,137 +191,32 @@ func (r *runner) Status(ctx context.Context, buildID entity.BuildID) (entity.Bui return mapState(resp.State), entity.BuildMetadata{"url": resp.WebURL}, nil } -// Cancel enqueues an async cancellation and returns immediately, keeping the -// caller's queue loop decoupled from Buildkite availability. The background -// worker delivers the cancel to Buildkite, recovering the build reference from -// the stamped metadata if it is not cached (e.g. after a restart). If the build -// was never submitted, the cancel is a no-op. -// -// Returns an error if the cancel queue is full; the caller should retry. -func (r *runner) Cancel(_ context.Context, buildID entity.BuildID) error { - select { - case r.cancelCh <- cancelJob{buildID: buildID.ID}: - return nil - default: - return fmt.Errorf("buildkite: cancel queue full; try again later") - } -} - -// work is the background consumer goroutine. It dispatches each trigger and -// cancel job to its own goroutine so that a job's retry/backoff does not block -// the others. processTrigger and processCancel guard shared state with r.mu and -// are safe to run concurrently. -func (r *runner) work() { - for { - select { - case job := <-r.triggerCh: - go r.processTrigger(job) - case job := <-r.cancelCh: - go r.processCancel(job) - } - } -} - -// processTrigger submits one build to Buildkite, retrying transient failures -// with backoff. On success it caches the Buildkite reference so subsequent -// Status calls skip the metadata lookup. If every attempt fails the build was -// never created, so it is recorded as a submission failure and Status reports -// it as terminal Failed rather than polling Accepted forever. -func (r *runner) processTrigger(job triggerJob) { - baseJSON, _ := json.Marshal(job.baseURIs) - headJSON, _ := json.Marshal(job.headURIs) - - req := createBuildRequest{ - Message: "submitqueue speculative build", - Env: map[string]string{ - EnvKeyBaseURIs: string(baseJSON), - EnvKeyHeadURIs: string(headJSON), - EnvKeyQueue: r.cfg.QueueName, - }, - MetaData: map[string]string{ - metaKeyBuildID: job.buildID, - }, - } - - var resp buildResponse - err := r.withRetry(func() error { - ctx, cancel := r.opCtx() - defer cancel() - var e error - resp, e = r.client.createBuild(ctx, req) - return e - }) - if err != nil { - r.markSubmitFailed(job.buildID, fmt.Sprintf("buildkite submission failed after retries: %v", err)) - return - } - - r.storeRef(job.buildID, encodeBuildRef(resp.Number)) -} - -// processCancel cancels the Buildkite build, recovering its reference from the -// stamped metadata when the cache misses. No-ops when no build carries this -// build ID yet (trigger not yet processed, or submission failed). -func (r *runner) processCancel(job cancelJob) { - ref, cached := r.lookupRef(job.buildID) +// Cancel calls the Buildkite API to cancel the build. On a cache miss it +// recovers the reference from the stamped metadata. If no build carries this +// build ID (not yet submitted), Cancel is a no-op. +func (r *runner) Cancel(ctx context.Context, buildID entity.BuildID) error { + ref, cached := r.lookupRef(buildID.ID) if !cached { - ctx, cancel := r.opCtx() - found, exists, err := r.client.findBuildByMeta(ctx, metaKeyBuildID, job.buildID) - cancel() - if err != nil || !exists { - // Nothing to cancel (not yet submitted) or a transient lookup - // failure; the caller may re-issue Cancel. - return + found, exists, err := r.client.findBuildByMeta(ctx, metaKeyBuildID, buildID.ID) + if err != nil { + return fmt.Errorf("buildkite: find build for cancel: %w", err) + } + if !exists { + return nil } ref = encodeBuildRef(found.Number) - r.storeRef(job.buildID, ref) + r.storeRef(buildID.ID, ref) } number, err := parseBuildRef(ref) if err != nil { - return - } - - _ = r.withRetry(func() error { - ctx, cancel := r.opCtx() - defer cancel() - return r.client.cancelBuild(ctx, number) - }) -} - -// withRetry runs fn up to MaxSubmitAttempts times with linear backoff, returning -// the last error. Used for the background submit and cancel API calls so a -// transient Buildkite failure does not abandon the work. -func (r *runner) withRetry(fn func() error) error { - attempts := r.cfg.MaxSubmitAttempts - if attempts <= 0 { - attempts = defaultMaxSubmitAttempts - } - backoff := r.cfg.SubmitBackoff - if backoff <= 0 { - backoff = defaultSubmitBackoff - } - - var err error - for attempt := 1; attempt <= attempts; attempt++ { - if err = fn(); err == nil { - return nil - } - if attempt < attempts { - time.Sleep(backoff * time.Duration(attempt)) - } + return fmt.Errorf("buildkite: malformed build ref: %w", err) } - return err -} -// opCtx returns a context bounded by SubmitTimeout for a single background API -// call. The caller must invoke the returned CancelFunc. -func (r *runner) opCtx() (context.Context, context.CancelFunc) { - timeout := r.cfg.SubmitTimeout - if timeout == 0 { - timeout = defaultSubmitTimeout + if err := r.client.cancelBuild(ctx, number); err != nil { + return fmt.Errorf("buildkite: cancel build: %w", err) } - return context.WithTimeout(context.Background(), timeout) + return nil } // lookupRef returns the cached Buildkite reference for a build ID. @@ -403,23 +234,6 @@ func (r *runner) storeRef(buildID, ref string) { r.refs[buildID] = ref } -// markSubmitFailed records that a build's Buildkite submission permanently -// failed, so Status reports it as terminal Failed. -func (r *runner) markSubmitFailed(buildID, reason string) { - r.mu.Lock() - defer r.mu.Unlock() - r.submitFailures[buildID] = reason -} - -// lookupSubmitFailure reports whether a build's submission permanently failed, -// returning the recorded reason. -func (r *runner) lookupSubmitFailure(buildID string) (string, bool) { - r.mu.RLock() - defer r.mu.RUnlock() - reason, ok := r.submitFailures[buildID] - return reason, ok -} - // newBuildID returns a cryptographically random hex string prefixed with "bk-" // that uniquely identifies a build within this runner implementation. func newBuildID() string { diff --git a/submitqueue/extension/buildrunner/buildkite/buildkite_test.go b/submitqueue/extension/buildrunner/buildkite/buildkite_test.go index 8b908784..bcb621ca 100644 --- a/submitqueue/extension/buildrunner/buildkite/buildkite_test.go +++ b/submitqueue/extension/buildrunner/buildkite/buildkite_test.go @@ -21,7 +21,6 @@ import ( "net/http" "net/http/httptest" "testing" - "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -30,10 +29,7 @@ import ( "github.com/uber/submitqueue/submitqueue/extension/buildrunner" ) -// newTestRunner creates a runner backed by a test HTTP server without starting -// the background goroutine. Tests drive job processing synchronously via -// drainTrigger and drainCancel to avoid goroutine timing races. The retry -// backoff is set to a millisecond so retry paths run fast. +// newTestRunner creates a runner backed by a test HTTP server. func newTestRunner(t *testing.T, handler http.Handler) *runner { t.Helper() srv := httptest.NewServer(handler) @@ -41,41 +37,11 @@ func newTestRunner(t *testing.T, handler http.Handler) *runner { c, err := httpclient.NewClient(srv.URL) require.NoError(t, err) return newRunner( - Config{ - QueueName: "my-queue", - SubmitTimeout: 5 * time.Second, - MaxSubmitAttempts: 3, - SubmitBackoff: time.Millisecond, - }, + Config{QueueName: "my-queue"}, &client{httpClient: c}, - 16, // triggerSize - 16, // cancelSize ) } -// drainTrigger synchronously processes the next pending trigger job. -// Use after Trigger() to simulate the background worker in tests. -func drainTrigger(t *testing.T, r *runner) { - t.Helper() - select { - case job := <-r.triggerCh: - r.processTrigger(job) - default: - t.Fatal("drainTrigger: no pending trigger job in channel") - } -} - -// drainCancel synchronously processes the next pending cancel job. -func drainCancel(t *testing.T, r *runner) { - t.Helper() - select { - case job := <-r.cancelCh: - r.processCancel(job) - default: - t.Fatal("drainCancel: no pending cancel job in channel") - } -} - // buildJSON encodes fields into a minimal Buildkite build JSON response. func buildJSON(number int, state, webURL string) []byte { b, _ := json.Marshal(buildResponse{Number: number, State: state, WebURL: webURL}) @@ -104,32 +70,6 @@ func TestNew_ImplementsInterface(t *testing.T) { // --- Trigger --- -func TestTrigger_EnqueuesJobAndReturnsID(t *testing.T) { - r := newTestRunner(t, http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { - w.WriteHeader(http.StatusOK) // should not be reached before drain - })) - - id, err := r.Trigger(context.Background(), nil, nil, nil) - require.NoError(t, err) - assert.NotEmpty(t, id.ID) - - // Exactly one job should be in the channel. - assert.Len(t, r.triggerCh, 1) -} - -func TestTrigger_StatusIsAcceptedBeforeWorkerRuns(t *testing.T) { - // Before the worker submits, no build carries this ID in Buildkite, so the - // metadata lookup returns empty and Status reports Accepted. - r := newTestRunner(t, emptyListHandler(t)) - - id, err := r.Trigger(context.Background(), nil, nil, nil) - require.NoError(t, err) - - status, _, err := r.Status(context.Background(), id) - require.NoError(t, err) - assert.Equal(t, entity.BuildStatusAccepted, status) -} - func TestTrigger_SubmitsCorrectPayloadToBuildkite(t *testing.T) { var capturedMethod string var capturedBody []byte @@ -146,8 +86,7 @@ func TestTrigger_SubmitsCorrectPayloadToBuildkite(t *testing.T) { id, err := r.Trigger(context.Background(), base, head, nil) require.NoError(t, err) - - drainTrigger(t, r) + assert.NotEmpty(t, id.ID) assert.Equal(t, http.MethodPost, capturedMethod) @@ -176,7 +115,6 @@ func TestTrigger_EmptyBase_ProducesJSONArray(t *testing.T) { _, err := r.Trigger(context.Background(), nil, []entity.Change{{URIs: []string{"u"}}}, nil) require.NoError(t, err) - drainTrigger(t, r) var req createBuildRequest require.NoError(t, json.Unmarshal(capturedBody, &req)) @@ -198,7 +136,6 @@ func TestTrigger_MultipleChangesFlattened(t *testing.T) { } _, err := r.Trigger(context.Background(), nil, head, nil) require.NoError(t, err) - drainTrigger(t, r) var req createBuildRequest require.NoError(t, json.Unmarshal(capturedBody, &req)) @@ -208,66 +145,13 @@ func TestTrigger_MultipleChangesFlattened(t *testing.T) { ) } -func TestTrigger_QueueFull_ReturnsError(t *testing.T) { - r := newRunner( - Config{}, - &client{httpClient: http.DefaultClient}, - 1, 1, - ) - // Fill the channel. - _, err := r.Trigger(context.Background(), nil, nil, nil) - require.NoError(t, err) - // Second call must fail. - _, err = r.Trigger(context.Background(), nil, nil, nil) - require.Error(t, err) -} - -func TestProcessTrigger_RetriesTransientFailureThenSucceeds(t *testing.T) { - var posts int - r := newTestRunner(t, http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - require.Equal(t, http.MethodPost, req.Method) - posts++ - if posts < 2 { - w.WriteHeader(http.StatusInternalServerError) - return - } - w.Header().Set("Content-Type", "application/json") - _, _ = w.Write(buildJSON(7, "scheduled", "")) - })) - - id, err := r.Trigger(context.Background(), nil, nil, nil) - require.NoError(t, err) - drainTrigger(t, r) - - assert.Equal(t, 2, posts, "submit should retry after a transient failure") - ref, ok := r.lookupRef(id.ID) - require.True(t, ok) - assert.Equal(t, encodeBuildRef(7), ref) -} - -func TestTrigger_SubmitExhaustsRetries_BuildFails(t *testing.T) { - // create (POST) always fails; the worker exhausts its retries and records a - // submission failure. - r := newTestRunner(t, http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - if req.Method == http.MethodPost { - w.WriteHeader(http.StatusInternalServerError) - return - } - // Nothing was created, so any metadata lookup finds nothing. - w.Header().Set("Content-Type", "application/json") - _, _ = w.Write([]byte("[]")) +func TestTrigger_BuildkiteError_ReturnsError(t *testing.T) { + r := newTestRunner(t, http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusInternalServerError) })) - id, err := r.Trigger(context.Background(), nil, nil, nil) - require.NoError(t, err) - drainTrigger(t, r) - - // With submission permanently failed, Status reports terminal Failed (with a - // reason) rather than polling Accepted forever. - status, meta, err := r.Status(context.Background(), id) - require.NoError(t, err) - assert.Equal(t, entity.BuildStatusFailed, status) - assert.NotEmpty(t, meta["error"]) + _, err := r.Trigger(context.Background(), nil, nil, nil) + require.Error(t, err) } // --- Status --- @@ -305,7 +189,7 @@ func TestStatus_ReturnsLiveBuildkiteState(t *testing.T) { _, _ = w.Write(buildJSON(7, "running", "https://buildkite.com/test-org/my-pipeline/builds/7")) })) - // Inject ref directly (simulates successful processTrigger). + // Inject ref directly (simulates successful Trigger). r.storeRef("some-id", encodeBuildRef(7)) status, meta, err := r.Status(context.Background(), entity.BuildID{ID: "some-id"}) @@ -350,15 +234,6 @@ func TestStatus_BuildkiteNotFound(t *testing.T) { // --- Cancel --- -func TestCancel_EnqueuesJobAndReturnsImmediately(t *testing.T) { - r := newTestRunner(t, http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { - t.Fatal("Buildkite API called before cancel drain") - })) - - require.NoError(t, r.Cancel(context.Background(), entity.BuildID{ID: "some-id"})) - assert.Len(t, r.cancelCh, 1) -} - func TestCancel_CallsBuildkiteWhenRefKnown(t *testing.T) { var cancelCalled bool r := newTestRunner(t, http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { @@ -369,7 +244,6 @@ func TestCancel_CallsBuildkiteWhenRefKnown(t *testing.T) { r.storeRef("some-id", encodeBuildRef(5)) require.NoError(t, r.Cancel(context.Background(), entity.BuildID{ID: "some-id"})) - drainCancel(t, r) assert.True(t, cancelCalled) } @@ -389,20 +263,17 @@ func TestCancel_RecoversRefAfterCacheMiss(t *testing.T) { } })) - // refs is empty; processCancel must recover the ref from metadata first. + // refs is empty; Cancel must recover the ref from metadata first. require.NoError(t, r.Cancel(context.Background(), entity.BuildID{ID: "bk-lost"})) - drainCancel(t, r) assert.True(t, listed, "cancel should look up the build by metadata on cache miss") assert.True(t, cancelled, "cancel should reach Buildkite after recovering the ref") } -func TestCancel_NoopWhenBuildNotYetSubmitted(t *testing.T) { - // The metadata lookup finds nothing, so there is nothing to cancel; the - // handler fails the test if a cancel (PUT) is attempted. +func TestCancel_NoopWhenBuildNotFound(t *testing.T) { + // The metadata lookup finds nothing, so there is nothing to cancel. r := newTestRunner(t, emptyListHandler(t)) require.NoError(t, r.Cancel(context.Background(), entity.BuildID{ID: "some-id"})) - drainCancel(t, r) } func TestCancel_AlreadyTerminal_Noop(t *testing.T) { @@ -413,17 +284,6 @@ func TestCancel_AlreadyTerminal_Noop(t *testing.T) { r.storeRef("some-id", encodeBuildRef(5)) require.NoError(t, r.Cancel(context.Background(), entity.BuildID{ID: "some-id"})) - drainCancel(t, r) // must not panic or error -} - -func TestCancel_QueueFull_ReturnsError(t *testing.T) { - r := newRunner( - Config{}, - &client{httpClient: http.DefaultClient}, - 1, 1, - ) - require.NoError(t, r.Cancel(context.Background(), entity.BuildID{ID: "a"})) - require.Error(t, r.Cancel(context.Background(), entity.BuildID{ID: "b"})) } // --- Internal helpers --- diff --git a/submitqueue/extension/buildrunner/buildkite/config.go b/submitqueue/extension/buildrunner/buildkite/config.go index bb4b9b36..d95f4742 100644 --- a/submitqueue/extension/buildrunner/buildkite/config.go +++ b/submitqueue/extension/buildrunner/buildkite/config.go @@ -14,34 +14,10 @@ package buildkite -import "time" - // Config holds the per-queue settings for a Buildkite BuildRunner. type Config struct { // QueueName is the SQ queue this runner serves. Passed as SQ_QUEUE in // the build environment so the pipeline script can select queue-specific // test targets. QueueName string - - // TriggerQueueSize is the buffer capacity of the async trigger channel. - // Trigger returns an error when the channel is full (the build controller - // will nack and retry). Defaults to 256. - TriggerQueueSize int - - // CancelQueueSize is the buffer capacity of the async cancel channel. - // Defaults to 256. - CancelQueueSize int - - // SubmitTimeout is the per-call HTTP timeout used by the background worker - // when contacting the Buildkite API. Defaults to 30s. - SubmitTimeout time.Duration - - // MaxSubmitAttempts is the number of times the background worker tries a - // Buildkite create/cancel call before giving up, to ride out transient - // failures. Defaults to 5. - MaxSubmitAttempts int - - // SubmitBackoff is the base delay between background-worker retries; the - // delay grows linearly with the attempt number. Defaults to 1s. - SubmitBackoff time.Duration } From 6e8c498ef4224df5f59b31e709a8e557da8472d5 Mon Sep 17 00:00:00 2001 From: Jamy Timmermans Date: Thu, 4 Jun 2026 20:24:51 -0700 Subject: [PATCH 5/8] fixup! reduce complexity --- .../buildrunner/buildkite/buildkite.go | 171 +++++------------- .../buildrunner/buildkite/buildkite_test.go | 124 ++----------- .../extension/buildrunner/buildkite/client.go | 24 +-- .../extension/buildrunner/buildkite/config.go | 8 - 4 files changed, 65 insertions(+), 262 deletions(-) diff --git a/submitqueue/extension/buildrunner/buildkite/buildkite.go b/submitqueue/extension/buildrunner/buildkite/buildkite.go index 0001210d..e8af5c53 100644 --- a/submitqueue/extension/buildrunner/buildkite/buildkite.go +++ b/submitqueue/extension/buildrunner/buildkite/buildkite.go @@ -15,17 +15,9 @@ // Package buildkite implements buildrunner.BuildRunner backed by the Buildkite // CI platform. // -// Trigger calls the Buildkite API directly: it generates a build ID, stamps it -// into the build's metadata, and returns the ID on success. Cancel calls the -// Buildkite API directly as well. Both propagate errors to the caller, which can -// nack and retry via the normal queue consumer path. -// -// The in-memory map from SQ build ID to Buildkite reference is a pure latency -// cache, not the source of truth. Because every build carries its SQ build ID -// in Buildkite metadata, Status and Cancel re-derive the reference with a -// metadata-filtered build lookup whenever the cache misses — including after a -// process restart that empties the map. Nothing about a build's identity lives -// only in memory. +// Trigger calls the Buildkite API to create the build and returns the Buildkite +// build number as the build ID. Status and Cancel parse the number directly +// from the build ID — no local state is required. // // The Buildkite build receives base and head change URIs as JSON-encoded // environment variables (SQ_BASE_URIS, SQ_HEAD_URIS, SQ_QUEUE). The pipeline @@ -35,13 +27,12 @@ package buildkite import ( "context" - "crypto/rand" - "encoding/hex" "encoding/json" "fmt" "net/http" "strconv" - "sync" + + "go.uber.org/zap" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/buildrunner" @@ -64,25 +55,11 @@ const ( EnvKeyQueue = "SQ_QUEUE" ) -// metaKeyBuildID is the Buildkite build-metadata key under which the SQ build -// ID is stored at create time. Status and Cancel filter builds by this key to -// recover the Buildkite reference when the in-memory cache misses (e.g. after -// a restart), which keeps that cache from being a durable source of truth. -const metaKeyBuildID = "sq_build_id" - // runner implements buildrunner.BuildRunner. type runner struct { - cfg Config + cfg buildrunner.Config client *client - - // mu protects refs. - mu sync.RWMutex - // refs maps our internal build ID to the encoded Buildkite reference - // ("{number}"). It is a pure latency cache: the durable record is the SQ - // build ID stamped into the Buildkite build's metadata, so a missing entry - // is recovered via the client's metadata lookup rather than treated as - // authoritative. - refs map[string]string + logger *zap.SugaredLogger } var _ buildrunner.BuildRunner = (*runner)(nil) @@ -91,12 +68,14 @@ var _ buildrunner.BuildRunner = (*runner)(nil) // responsible for configuring HTTPClient with the base URL (via // httpclient.BaseURLTransport) and auth (via an Authorization-header transport). type Params struct { - // Config holds Buildkite-specific configuration for a single queue. - Config Config + // Config holds the per-queue identity for this BuildRunner. + Config buildrunner.Config // HTTPClient is a pre-configured HTTP client. The caller is responsible // for the base URL (via httpclient.BaseURLTransport) and auth (via a // transport layer). If nil, http.DefaultClient is used. HTTPClient *http.Client + // Logger is the structured logger. + Logger *zap.SugaredLogger } // NewBuildRunner constructs a Buildkite-backed BuildRunner bound to a single @@ -110,23 +89,22 @@ func NewBuildRunner(params Params) (buildrunner.BuildRunner, error) { if httpClient == nil { httpClient = http.DefaultClient } - return newRunner(params.Config, &client{httpClient: httpClient}), nil + return newRunner(params.Config, &client{httpClient: httpClient}, params.Logger.Named("buildkite_buildrunner")), nil } // newRunner constructs a runner. Used by NewBuildRunner and by tests. -func newRunner(cfg Config, c *client) *runner { +func newRunner(cfg buildrunner.Config, c *client, logger *zap.SugaredLogger) *runner { return &runner{ cfg: cfg, client: c, - refs: make(map[string]string), + logger: logger, } } -// Trigger generates a unique build ID, calls the Buildkite API to create the -// build, caches the Buildkite reference, and returns the build ID. Errors are -// propagated to the caller so the queue consumer can nack and retry. +// Trigger calls the Buildkite API to create the build and returns the Buildkite +// build number as the build ID. Errors are propagated to the caller so the +// queue consumer can nack and retry. func (r *runner) Trigger(ctx context.Context, base, head []entity.Change, _ entity.BuildMetadata) (entity.BuildID, error) { - id := newBuildID() baseJSON, _ := json.Marshal(flattenURIs(base)) headJSON, _ := json.Marshal(flattenURIs(head)) @@ -137,9 +115,6 @@ func (r *runner) Trigger(ctx context.Context, base, head []entity.Change, _ enti EnvKeyHeadURIs: string(headJSON), EnvKeyQueue: r.cfg.QueueName, }, - MetaData: map[string]string{ - metaKeyBuildID: id, - }, } resp, err := r.client.createBuild(ctx, req) @@ -147,104 +122,45 @@ func (r *runner) Trigger(ctx context.Context, base, head []entity.Change, _ enti return entity.BuildID{}, fmt.Errorf("buildkite: create build: %w", err) } - r.storeRef(id, encodeBuildRef(resp.Number)) - return entity.BuildID{ID: id}, nil + r.logger.Debugw("triggered Buildkite build", + "buildkite_number", resp.Number, + ) + return entity.BuildID{ID: encodeBuildNumber(resp.Number)}, nil } -// Status returns the current build status. On a cache miss it recovers the -// Buildkite reference by filtering builds on the stamped SQ build ID, so it -// works after a restart that emptied the in-memory cache. Returns Accepted when -// the build is not yet visible in Buildkite. -// -// On a cache hit, Status fetches the live state and returns it with the build -// URL in BuildMetadata["url"]. +// Status fetches the current state of the build from Buildkite and returns it +// with the build URL in BuildMetadata["url"]. func (r *runner) Status(ctx context.Context, buildID entity.BuildID) (entity.BuildStatus, entity.BuildMetadata, error) { - ref, cached := r.lookupRef(buildID.ID) + number, err := parseBuildNumber(buildID.ID) + if err != nil { + return entity.BuildStatusUnknown, nil, fmt.Errorf("buildkite: malformed build ID: %w", err) + } - var resp buildResponse - switch { - case cached: - number, err := parseBuildRef(ref) - if err != nil { - return entity.BuildStatusUnknown, nil, fmt.Errorf("buildkite: malformed build ref: %w", err) - } - resp, err = r.client.getBuild(ctx, number) - if err != nil { - return entity.BuildStatusUnknown, nil, fmt.Errorf("buildkite: get build: %w", err) - } - default: - found, exists, err := r.client.findBuildByMeta(ctx, metaKeyBuildID, buildID.ID) - if err != nil { - return entity.BuildStatusUnknown, nil, fmt.Errorf("buildkite: find build: %w", err) - } - if !exists { - // Not yet visible in Buildkite — e.g. after a restart where the cache - // was lost but the build was created before. Report Accepted so the - // buildsignal poll loop keeps waiting. - return entity.BuildStatusAccepted, nil, nil - } - ref = encodeBuildRef(found.Number) - r.storeRef(buildID.ID, ref) - resp = found + resp, err := r.client.getBuild(ctx, number) + if err != nil { + return entity.BuildStatusUnknown, nil, fmt.Errorf("buildkite: get build: %w", err) } return mapState(resp.State), entity.BuildMetadata{"url": resp.WebURL}, nil } -// Cancel calls the Buildkite API to cancel the build. On a cache miss it -// recovers the reference from the stamped metadata. If no build carries this -// build ID (not yet submitted), Cancel is a no-op. +// Cancel calls the Buildkite API to cancel the build. A no-op on already-terminal +// builds (Buildkite returns 422 for those). func (r *runner) Cancel(ctx context.Context, buildID entity.BuildID) error { - ref, cached := r.lookupRef(buildID.ID) - if !cached { - found, exists, err := r.client.findBuildByMeta(ctx, metaKeyBuildID, buildID.ID) - if err != nil { - return fmt.Errorf("buildkite: find build for cancel: %w", err) - } - if !exists { - return nil - } - ref = encodeBuildRef(found.Number) - r.storeRef(buildID.ID, ref) - } - - number, err := parseBuildRef(ref) + number, err := parseBuildNumber(buildID.ID) if err != nil { - return fmt.Errorf("buildkite: malformed build ref: %w", err) + return fmt.Errorf("buildkite: malformed build ID: %w", err) } if err := r.client.cancelBuild(ctx, number); err != nil { return fmt.Errorf("buildkite: cancel build: %w", err) } + r.logger.Debugw("cancelled Buildkite build", + "buildkite_number", number, + ) return nil } -// lookupRef returns the cached Buildkite reference for a build ID. -func (r *runner) lookupRef(buildID string) (string, bool) { - r.mu.RLock() - defer r.mu.RUnlock() - ref, ok := r.refs[buildID] - return ref, ok -} - -// storeRef caches the Buildkite reference for a build ID. -func (r *runner) storeRef(buildID, ref string) { - r.mu.Lock() - defer r.mu.Unlock() - r.refs[buildID] = ref -} - -// newBuildID returns a cryptographically random hex string prefixed with "bk-" -// that uniquely identifies a build within this runner implementation. -func newBuildID() string { - b := make([]byte, 16) - if _, err := rand.Read(b); err != nil { - // crypto/rand.Read only fails when the OS entropy source is broken. - panic(fmt.Sprintf("buildkite: crypto/rand.Read failed: %v", err)) - } - return "bk-" + hex.EncodeToString(b) -} - // flattenURIs concatenates the URI lists from all changes into a single slice. func flattenURIs(changes []entity.Change) []string { uris := make([]string, 0, len(changes)) @@ -254,17 +170,16 @@ func flattenURIs(changes []entity.Change) []string { return uris } -// encodeBuildRef encodes a Buildkite build number as the internal reference -// string stored in r.refs. -func encodeBuildRef(number int) string { +// encodeBuildNumber encodes a Buildkite build number as the SQ build ID. +func encodeBuildNumber(number int) string { return strconv.Itoa(number) } -// parseBuildRef is the inverse of encodeBuildRef. -func parseBuildRef(ref string) (int, error) { - n, err := strconv.Atoi(ref) +// parseBuildNumber is the inverse of encodeBuildNumber. +func parseBuildNumber(id string) (int, error) { + n, err := strconv.Atoi(id) if err != nil { - return 0, fmt.Errorf("invalid build ref %q", ref) + return 0, fmt.Errorf("invalid build ID %q", id) } return n, nil } diff --git a/submitqueue/extension/buildrunner/buildkite/buildkite_test.go b/submitqueue/extension/buildrunner/buildkite/buildkite_test.go index bcb621ca..abd244c0 100644 --- a/submitqueue/extension/buildrunner/buildkite/buildkite_test.go +++ b/submitqueue/extension/buildrunner/buildkite/buildkite_test.go @@ -22,6 +22,8 @@ import ( "net/http/httptest" "testing" + "go.uber.org/zap" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/uber/submitqueue/core/httpclient" @@ -37,8 +39,9 @@ func newTestRunner(t *testing.T, handler http.Handler) *runner { c, err := httpclient.NewClient(srv.URL) require.NoError(t, err) return newRunner( - Config{QueueName: "my-queue"}, + buildrunner.Config{QueueName: "my-queue"}, &client{httpClient: c}, + zap.NewNop().Sugar(), ) } @@ -48,29 +51,17 @@ func buildJSON(number int, state, webURL string) []byte { return b } -// emptyListHandler responds to a metadata-filtered list query with an empty -// array (no build matches yet), failing the test on any other request. -func emptyListHandler(t *testing.T) http.HandlerFunc { - return func(w http.ResponseWriter, req *http.Request) { - if req.Method != http.MethodGet { - t.Fatalf("unexpected %s request; expected only a metadata list lookup", req.Method) - } - w.Header().Set("Content-Type", "application/json") - _, _ = w.Write([]byte("[]")) - } -} - // --- Interface / constructor --- func TestNew_ImplementsInterface(t *testing.T) { - r, err := NewBuildRunner(Params{}) + r, err := NewBuildRunner(Params{Logger: zap.NewNop().Sugar()}) require.NoError(t, err) var _ buildrunner.BuildRunner = r } // --- Trigger --- -func TestTrigger_SubmitsCorrectPayloadToBuildkite(t *testing.T) { +func TestTrigger_SubmitsCorrectPayloadAndReturnsBuildkiteNumber(t *testing.T) { var capturedMethod string var capturedBody []byte @@ -86,7 +77,7 @@ func TestTrigger_SubmitsCorrectPayloadToBuildkite(t *testing.T) { id, err := r.Trigger(context.Background(), base, head, nil) require.NoError(t, err) - assert.NotEmpty(t, id.ID) + assert.Equal(t, encodeBuildNumber(42), id.ID) assert.Equal(t, http.MethodPost, capturedMethod) @@ -95,14 +86,6 @@ func TestTrigger_SubmitsCorrectPayloadToBuildkite(t *testing.T) { assert.Equal(t, `["github://org/repo/pull/1/aaa111"]`, req.Env[EnvKeyBaseURIs]) assert.Equal(t, `["github://org/repo/pull/2/bbb222"]`, req.Env[EnvKeyHeadURIs]) assert.Equal(t, "my-queue", req.Env[EnvKeyQueue]) - // The SQ build ID is stamped into metadata so Status/Cancel can recover the - // build after a cache loss. - assert.Equal(t, id.ID, req.MetaData[metaKeyBuildID]) - - // After a successful submit the ref is cached, so Status uses getBuild. - ref, ok := r.lookupRef(id.ID) - require.True(t, ok) - assert.Equal(t, encodeBuildRef(42), ref) } func TestTrigger_EmptyBase_ProducesJSONArray(t *testing.T) { @@ -189,129 +172,60 @@ func TestStatus_ReturnsLiveBuildkiteState(t *testing.T) { _, _ = w.Write(buildJSON(7, "running", "https://buildkite.com/test-org/my-pipeline/builds/7")) })) - // Inject ref directly (simulates successful Trigger). - r.storeRef("some-id", encodeBuildRef(7)) - - status, meta, err := r.Status(context.Background(), entity.BuildID{ID: "some-id"}) + status, meta, err := r.Status(context.Background(), entity.BuildID{ID: encodeBuildNumber(7)}) require.NoError(t, err) assert.Equal(t, entity.BuildStatusRunning, status) assert.Equal(t, "https://buildkite.com/test-org/my-pipeline/builds/7", meta["url"]) } -func TestStatus_RecoversRefAfterCacheMiss(t *testing.T) { - var listed bool - r := newTestRunner(t, http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - // Cache miss path: Status lists builds filtered by the stamped metadata. - require.Equal(t, http.MethodGet, req.Method) - assert.Contains(t, req.URL.RawQuery, "meta_data") - listed = true - w.Header().Set("Content-Type", "application/json") - _, _ = w.Write([]byte(`[{"number":7,"state":"running","web_url":"https://bk/7"}]`)) - })) - - // refs is empty (e.g. after a restart); Status must recover the ref. - status, meta, err := r.Status(context.Background(), entity.BuildID{ID: "bk-lost"}) - require.NoError(t, err) - assert.True(t, listed) - assert.Equal(t, entity.BuildStatusRunning, status) - assert.Equal(t, "https://bk/7", meta["url"]) - - // The recovered ref is now cached for subsequent calls. - ref, ok := r.lookupRef("bk-lost") - require.True(t, ok) - assert.Equal(t, encodeBuildRef(7), ref) -} - func TestStatus_BuildkiteNotFound(t *testing.T) { r := newTestRunner(t, http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusNotFound) })) - r.storeRef("some-id", encodeBuildRef(99)) - _, _, err := r.Status(context.Background(), entity.BuildID{ID: "some-id"}) + _, _, err := r.Status(context.Background(), entity.BuildID{ID: encodeBuildNumber(99)}) require.Error(t, err) } // --- Cancel --- -func TestCancel_CallsBuildkiteWhenRefKnown(t *testing.T) { +func TestCancel_CallsBuildkite(t *testing.T) { var cancelCalled bool r := newTestRunner(t, http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { cancelCalled = true w.WriteHeader(http.StatusOK) _, _ = w.Write(buildJSON(5, "canceled", "")) })) - r.storeRef("some-id", encodeBuildRef(5)) - require.NoError(t, r.Cancel(context.Background(), entity.BuildID{ID: "some-id"})) + require.NoError(t, r.Cancel(context.Background(), entity.BuildID{ID: encodeBuildNumber(5)})) assert.True(t, cancelCalled) } -func TestCancel_RecoversRefAfterCacheMiss(t *testing.T) { - var listed, cancelled bool - r := newTestRunner(t, http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - switch req.Method { - case http.MethodGet: - listed = true - w.Header().Set("Content-Type", "application/json") - _, _ = w.Write([]byte(`[{"number":5,"state":"running","web_url":""}]`)) - case http.MethodPut: - cancelled = true - w.WriteHeader(http.StatusOK) - default: - t.Fatalf("unexpected %s request", req.Method) - } - })) - - // refs is empty; Cancel must recover the ref from metadata first. - require.NoError(t, r.Cancel(context.Background(), entity.BuildID{ID: "bk-lost"})) - assert.True(t, listed, "cancel should look up the build by metadata on cache miss") - assert.True(t, cancelled, "cancel should reach Buildkite after recovering the ref") -} - -func TestCancel_NoopWhenBuildNotFound(t *testing.T) { - // The metadata lookup finds nothing, so there is nothing to cancel. - r := newTestRunner(t, emptyListHandler(t)) - - require.NoError(t, r.Cancel(context.Background(), entity.BuildID{ID: "some-id"})) -} - func TestCancel_AlreadyTerminal_Noop(t *testing.T) { // Buildkite returns 422 when the build cannot be cancelled (already terminal). r := newTestRunner(t, http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusUnprocessableEntity) })) - r.storeRef("some-id", encodeBuildRef(5)) - require.NoError(t, r.Cancel(context.Background(), entity.BuildID{ID: "some-id"})) + require.NoError(t, r.Cancel(context.Background(), entity.BuildID{ID: encodeBuildNumber(5)})) } // --- Internal helpers --- -func TestEncodeParseBuildRef_RoundTrip(t *testing.T) { +func TestEncodeParseBuildNumber_RoundTrip(t *testing.T) { for _, n := range []int{1, 9999, 0} { - ref := encodeBuildRef(n) - got, err := parseBuildRef(ref) + id := encodeBuildNumber(n) + got, err := parseBuildNumber(id) require.NoError(t, err) assert.Equal(t, n, got) } } -func TestParseBuildRef_Invalid(t *testing.T) { - for _, ref := range []string{"", "notanumber", "org/pipeline/1"} { - t.Run(ref, func(t *testing.T) { - _, err := parseBuildRef(ref) +func TestParseBuildNumber_Invalid(t *testing.T) { + for _, id := range []string{"", "notanumber", "org/pipeline/1"} { + t.Run(id, func(t *testing.T) { + _, err := parseBuildNumber(id) require.Error(t, err) }) } } - -func TestNewBuildID_Unique(t *testing.T) { - seen := make(map[string]bool) - for i := 0; i < 100; i++ { - id := newBuildID() - assert.NotEmpty(t, id) - assert.False(t, seen[id], "duplicate build ID: %s", id) - seen[id] = true - } -} diff --git a/submitqueue/extension/buildrunner/buildkite/client.go b/submitqueue/extension/buildrunner/buildkite/client.go index e7258222..38a217f6 100644 --- a/submitqueue/extension/buildrunner/buildkite/client.go +++ b/submitqueue/extension/buildrunner/buildkite/client.go @@ -21,19 +21,17 @@ import ( "fmt" "io" "net/http" - "net/url" ) // client is a thin wrapper around the Buildkite REST endpoints that BuildRunner -// needs: create, get, list-by-metadata, and cancel a build. +// needs: create, get, and cancel a build. type client struct { httpClient *http.Client } type createBuildRequest struct { - Message string `json:"message"` - Env map[string]string `json:"env"` - MetaData map[string]string `json:"meta_data,omitempty"` + Message string `json:"message"` + Env map[string]string `json:"env"` } // buildResponse is the subset of fields the runner needs from a Buildkite @@ -65,22 +63,6 @@ func (c *client) getBuild(ctx context.Context, number int) (buildResponse, error return build, nil } -// findBuildByMeta returns the build in the pipeline whose meta_data[key] equals -// value. ok is false when no such build exists yet. This lets Status and Cancel -// recover the Buildkite reference from Buildkite itself (the source of truth) -// when the in-memory cache misses, e.g. after a process restart. -func (c *client) findBuildByMeta(ctx context.Context, key, value string) (build buildResponse, ok bool, err error) { - u := fmt.Sprintf("/builds?meta_data[%s]=%s", url.QueryEscape(key), url.QueryEscape(value)) - var builds []buildResponse - if err := c.do(ctx, http.MethodGet, u, nil, &builds); err != nil { - return buildResponse{}, false, err - } - if len(builds) == 0 { - return buildResponse{}, false, nil - } - return builds[0], true, nil -} - // cancelBuild requests cancellation. Returns nil when the build is already // terminal (HTTP 422) — the Buildkite API uses that status to indicate a // non-cancellable build, which the BuildRunner contract treats as a no-op. diff --git a/submitqueue/extension/buildrunner/buildkite/config.go b/submitqueue/extension/buildrunner/buildkite/config.go index d95f4742..0a1a557e 100644 --- a/submitqueue/extension/buildrunner/buildkite/config.go +++ b/submitqueue/extension/buildrunner/buildkite/config.go @@ -13,11 +13,3 @@ // limitations under the License. package buildkite - -// Config holds the per-queue settings for a Buildkite BuildRunner. -type Config struct { - // QueueName is the SQ queue this runner serves. Passed as SQ_QUEUE in - // the build environment so the pipeline script can select queue-specific - // test targets. - QueueName string -} From 5ce5e8cb8f24b986c0e60b6a6aab9a02e9c929d5 Mon Sep 17 00:00:00 2001 From: Jamy Timmermans Date: Thu, 4 Jun 2026 20:31:31 -0700 Subject: [PATCH 6/8] fixup! gazelle --- submitqueue/extension/buildrunner/buildkite/BUILD.bazel | 2 ++ 1 file changed, 2 insertions(+) diff --git a/submitqueue/extension/buildrunner/buildkite/BUILD.bazel b/submitqueue/extension/buildrunner/buildkite/BUILD.bazel index 1b44e422..51026509 100644 --- a/submitqueue/extension/buildrunner/buildkite/BUILD.bazel +++ b/submitqueue/extension/buildrunner/buildkite/BUILD.bazel @@ -12,6 +12,7 @@ go_library( deps = [ "//submitqueue/entity", "//submitqueue/extension/buildrunner", + "@org_uber_go_zap//:zap", ], ) @@ -25,5 +26,6 @@ go_test( "//submitqueue/extension/buildrunner", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", + "@org_uber_go_zap//:zap", ], ) From 4228731751ff3bfc03c56aa989123e6f09cf2cbb Mon Sep 17 00:00:00 2001 From: Jamy Timmermans Date: Thu, 4 Jun 2026 20:36:34 -0700 Subject: [PATCH 7/8] remove config --- .../extension/buildrunner/buildkite/BUILD.bazel | 1 - .../extension/buildrunner/buildkite/config.go | 15 --------------- 2 files changed, 16 deletions(-) delete mode 100644 submitqueue/extension/buildrunner/buildkite/config.go diff --git a/submitqueue/extension/buildrunner/buildkite/BUILD.bazel b/submitqueue/extension/buildrunner/buildkite/BUILD.bazel index 51026509..06a0d4d0 100644 --- a/submitqueue/extension/buildrunner/buildkite/BUILD.bazel +++ b/submitqueue/extension/buildrunner/buildkite/BUILD.bazel @@ -5,7 +5,6 @@ go_library( srcs = [ "buildkite.go", "client.go", - "config.go", ], importpath = "github.com/uber/submitqueue/submitqueue/extension/buildrunner/buildkite", visibility = ["//visibility:public"], diff --git a/submitqueue/extension/buildrunner/buildkite/config.go b/submitqueue/extension/buildrunner/buildkite/config.go deleted file mode 100644 index 0a1a557e..00000000 --- a/submitqueue/extension/buildrunner/buildkite/config.go +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright (c) 2025 Uber Technologies, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package buildkite From 67c40d4600ae5ef2057a0485ad441399ce15f529 Mon Sep 17 00:00:00 2001 From: Jamy Timmermans Date: Thu, 4 Jun 2026 21:59:51 -0700 Subject: [PATCH 8/8] fixup! error on no client or config --- .../extension/buildrunner/buildkite/buildkite.go | 10 ++++++---- .../extension/buildrunner/buildkite/buildkite_test.go | 2 +- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/submitqueue/extension/buildrunner/buildkite/buildkite.go b/submitqueue/extension/buildrunner/buildkite/buildkite.go index e8af5c53..eccd046b 100644 --- a/submitqueue/extension/buildrunner/buildkite/buildkite.go +++ b/submitqueue/extension/buildrunner/buildkite/buildkite.go @@ -85,11 +85,13 @@ type Params struct { // root (e.g. "https://api.buildkite.com/v2/organizations/{org}/pipelines/{slug}"), // and an auth transport that injects the Authorization header. func NewBuildRunner(params Params) (buildrunner.BuildRunner, error) { - httpClient := params.HTTPClient - if httpClient == nil { - httpClient = http.DefaultClient + if params.HTTPClient == nil { + return nil, fmt.Errorf("http client is required") } - return newRunner(params.Config, &client{httpClient: httpClient}, params.Logger.Named("buildkite_buildrunner")), nil + if params.Logger == nil { + return nil, fmt.Errorf("logger is required") + } + return newRunner(params.Config, &client{httpClient: params.HTTPClient}, params.Logger.Named("buildkite_buildrunner")), nil } // newRunner constructs a runner. Used by NewBuildRunner and by tests. diff --git a/submitqueue/extension/buildrunner/buildkite/buildkite_test.go b/submitqueue/extension/buildrunner/buildkite/buildkite_test.go index abd244c0..5f8c83c0 100644 --- a/submitqueue/extension/buildrunner/buildkite/buildkite_test.go +++ b/submitqueue/extension/buildrunner/buildkite/buildkite_test.go @@ -55,7 +55,7 @@ func buildJSON(number int, state, webURL string) []byte { func TestNew_ImplementsInterface(t *testing.T) { r, err := NewBuildRunner(Params{Logger: zap.NewNop().Sugar()}) - require.NoError(t, err) + require.Error(t, err, "http client is required") var _ buildrunner.BuildRunner = r }