Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ local-stovepipe-gateway-start: build-stovepipe-gateway-linux ## Start Stovepipe

mocks: ## Generate mock files using mockgen
@echo "Generating mocks..."
@$(BAZEL) run @rules_go//go -- generate ./submitqueue/extension/storage/... ./submitqueue/extension/buildrunner/... ./submitqueue/extension/changestore/... ./extension/counter/... ./extension/messagequeue/... ./submitqueue/extension/queueconfig/... ./submitqueue/extension/mergechecker/... ./submitqueue/extension/pusher/... ./submitqueue/extension/scorer/... ./submitqueue/extension/conflict/... ./submitqueue/core/consumer/...
@$(BAZEL) run @rules_go//go -- generate ./submitqueue/extension/storage/... ./submitqueue/extension/buildrunner/... ./submitqueue/extension/changeprovider/... ./submitqueue/extension/changestore/... ./extension/counter/... ./extension/messagequeue/... ./submitqueue/extension/queueconfig/... ./submitqueue/extension/mergechecker/... ./submitqueue/extension/pusher/... ./submitqueue/extension/scorer/... ./submitqueue/extension/conflict/... ./submitqueue/core/consumer/...
@echo "Mocks generated successfully!"

proto: ## Generate protobuf files from .proto definitions
Expand Down
1 change: 1 addition & 0 deletions example/submitqueue/gateway/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
"//extension/messagequeue/mysql",
"//submitqueue/core/consumer",
"//submitqueue/extension/queueconfig/yaml",
"//submitqueue/extension/storage",
"//submitqueue/extension/storage/mysql",
"//submitqueue/gateway/controller",
"//submitqueue/gateway/protopb",
Expand Down
14 changes: 11 additions & 3 deletions example/submitqueue/gateway/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
queueMySQL "github.com/uber/submitqueue/extension/messagequeue/mysql"
"github.com/uber/submitqueue/submitqueue/core/consumer"
yamlqueueconfig "github.com/uber/submitqueue/submitqueue/extension/queueconfig/yaml"
"github.com/uber/submitqueue/submitqueue/extension/storage"
mysqlstorage "github.com/uber/submitqueue/submitqueue/extension/storage/mysql"
"github.com/uber/submitqueue/submitqueue/gateway/controller"
pb "github.com/uber/submitqueue/submitqueue/gateway/protopb"
Expand Down Expand Up @@ -199,8 +200,15 @@ func run() error {
},
))

// Initialize request log store from shared app database connection
requestLogStore := mysqlstorage.NewRequestLogStore(appDB, scope.SubScope("request_log_store"))
// Initialize storage from the shared app database connection. The land
// controller takes a storage.Factory (static: every queue → this store);
// cancel/status use the request log store directly.
store, err := mysqlstorage.NewStorage(appDB, scope.SubScope("storage"))
if err != nil {
return fmt.Errorf("failed to create storage: %w", err)
}
stores := storage.NewStaticFactory(store)
requestLogStore := store.GetRequestLogStore()

// Load queue configurations from YAML. Path is required so the gateway
// can reject requests for unknown queues at the edge.
Expand All @@ -215,7 +223,7 @@ func run() error {

// Create controllers and wrap them for gRPC
pingController := controller.NewPingController(logger, scope)
landController := controller.NewLandController(logger.Sugar(), scope, cnt, requestLogStore, queueConfigs, registry)
landController := controller.NewLandController(logger.Sugar(), scope, cnt, stores, queueConfigs, registry)
cancelController := controller.NewCancelController(logger.Sugar(), scope, requestLogStore, registry)
statusController := controller.NewStatusController(logger.Sugar(), scope, requestLogStore)
gatewayServer := &GatewayServer{
Expand Down
30 changes: 4 additions & 26 deletions example/submitqueue/gateway/server/queues.yaml
Original file line number Diff line number Diff line change
@@ -1,31 +1,9 @@
# Example queue configurations consumed by the gateway YAML store.
# The gateway loads this file at startup; QUEUE_CONFIG_PATH must point
# at it. Each entry maps a queue name to the VCS repository + target
# branch and selects the extension implementations used downstream.
# The gateway loads this file at startup; QUEUE_CONFIG_PATH must point at it.
# QueueConfig is just the registry of valid queue names — all behavioral and
# VCS configuration lives in the extension factory implementations wired in the
# server, not here.
queues:
- name: test-queue
vcs_type: git
vcs_address: git@github.com:uber/submitqueue.git
target: main
build_runner: buildkite.com/uber/submitqueue-ci
change_provider: github
merge_checker: github
land_provider: github

- name: e2e-test-queue
vcs_type: git
vcs_address: git@github.com:uber/submitqueue.git
target: main
build_runner: buildkite.com/uber/submitqueue-ci
change_provider: github
merge_checker: github
land_provider: github

- name: e2e-cancel-queue
vcs_type: git
vcs_address: git@github.com:uber/submitqueue.git
target: main
build_runner: buildkite.com/uber/submitqueue-ci
change_provider: github
merge_checker: github
land_provider: github
2 changes: 2 additions & 0 deletions example/submitqueue/orchestrator/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ go_library(
"//submitqueue/extension/changeprovider/github",
"//submitqueue/extension/changestore",
"//submitqueue/extension/changestore/mysql",
"//submitqueue/extension/conflict",
"//submitqueue/extension/conflict/all",
"//submitqueue/extension/mergechecker",
"//submitqueue/extension/mergechecker/github",
"//submitqueue/extension/pusher",
"//submitqueue/extension/pusher/git",
"//submitqueue/extension/scorer",
"//submitqueue/extension/scorer/heuristic",
"//submitqueue/extension/storage",
"//submitqueue/extension/storage/mysql",
Expand Down
83 changes: 59 additions & 24 deletions example/submitqueue/orchestrator/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,13 @@ import (
githubprovider "github.com/uber/submitqueue/submitqueue/extension/changeprovider/github"
"github.com/uber/submitqueue/submitqueue/extension/changestore"
mysqlchangestore "github.com/uber/submitqueue/submitqueue/extension/changestore/mysql"
"github.com/uber/submitqueue/submitqueue/extension/conflict"
"github.com/uber/submitqueue/submitqueue/extension/conflict/all"
"github.com/uber/submitqueue/submitqueue/extension/mergechecker"
githubchecker "github.com/uber/submitqueue/submitqueue/extension/mergechecker/github"
"github.com/uber/submitqueue/submitqueue/extension/pusher"
gitpusher "github.com/uber/submitqueue/submitqueue/extension/pusher/git"
"github.com/uber/submitqueue/submitqueue/extension/scorer"
"github.com/uber/submitqueue/submitqueue/extension/scorer/heuristic"
"github.com/uber/submitqueue/submitqueue/extension/storage"
mysqlstorage "github.com/uber/submitqueue/submitqueue/extension/storage/mysql"
Expand Down Expand Up @@ -425,11 +427,48 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe
// │ │ │
// └────────┴───────────────────────┘

// Static per-extension factories for the example server: every queue resolves
// to the single configured implementation. A real deployment would vary the
// returned implementation by cfg.QueueName (and inject per-queue config).
type changeProviderFactory struct{ impl changeprovider.ChangeProvider }

func (f changeProviderFactory) For(changeprovider.Config) (changeprovider.ChangeProvider, error) {
return f.impl, nil
}

type mergeCheckerFactory struct{ impl mergechecker.MergeChecker }

func (f mergeCheckerFactory) For(mergechecker.Config) (mergechecker.MergeChecker, error) {
return f.impl, nil
}

type pusherFactory struct{ impl pusher.Pusher }

func (f pusherFactory) For(pusher.Config) (pusher.Pusher, error) { return f.impl, nil }

type buildRunnerFactory struct{ impl buildrunner.BuildRunner }

func (f buildRunnerFactory) For(buildrunner.Config) (buildrunner.BuildRunner, error) {
return f.impl, nil
}

type scorerFactory struct{ impl scorer.Scorer }

func (f scorerFactory) For(scorer.Config) (scorer.Scorer, error) { return f.impl, nil }

type conflictFactory struct{ impl conflict.Analyzer }

func (f conflictFactory) For(conflict.Config) (conflict.Analyzer, error) { return f.impl, nil }

func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, mc mergechecker.MergeChecker, cp changeprovider.ChangeProvider, psh pusher.Pusher, br buildrunner.BuildRunner, cnt counter.Counter, store storage.Storage, changeStore changestore.ChangeStore) error {
// Static storage factory: every queue resolves to the one configured store.
// The factory is the injection point for future per-queue backends.
stores := storage.NewStaticFactory(store)

requestController := start.NewController(
logger,
scope,
store,
stores,
changeStore,
registry,
consumer.TopicKeyStart,
Expand All @@ -442,7 +481,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
cancelController := cancel.NewController(
logger,
scope,
store,
stores,
registry,
consumer.TopicKeyCancel,
"orchestrator-cancel",
Expand All @@ -454,11 +493,11 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
validateController := validate.NewController(
logger,
scope,
store,
stores,
changeStore,
registry,
mc,
cp,
mergeCheckerFactory{impl: mc},
changeProviderFactory{impl: cp},
consumer.TopicKeyValidate,
"orchestrator-validate",
)
Expand All @@ -471,10 +510,10 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
scope,
registry,
cnt,
store,
stores,
// TODO: replace with a real conflict analyzer (e.g. one backed by
// Tango target analysis). The "all" stub serializes the queue.
all.New(),
conflictFactory{impl: all.New()},
consumer.TopicKeyBatch,
"orchestrator-batch",
)
Expand All @@ -485,9 +524,9 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
scoreController := score.NewController(
logger,
scope,
store,
stores,
// TODO: replace with a real scorer
heuristic.New(
scorerFactory{impl: heuristic.New(
[]heuristic.Bucket{
{Min: 0, Max: 1, Score: 0.95},
{Min: 2, Max: 5, Score: 0.80},
Expand All @@ -498,7 +537,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
return len(change.URIs), nil
},
scope.SubScope("scorer"),
),
)},
registry,
consumer.TopicKeyScore,
"orchestrator-score",
Expand All @@ -510,7 +549,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
speculateController := speculate.NewController(
logger,
scope,
store,
stores,
registry,
consumer.TopicKeySpeculate,
"orchestrator-speculate",
Expand All @@ -522,8 +561,8 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
buildController := build.NewController(
logger,
scope,
store,
br,
stores,
buildRunnerFactory{impl: br},
registry,
consumer.TopicKeyBuild,
"orchestrator-build",
Expand All @@ -535,8 +574,8 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
buildsignalController := buildsignal.NewController(
logger,
scope,
store,
br,
stores,
buildRunnerFactory{impl: br},
registry,
consumer.TopicKeyBuildSignal,
"orchestrator-buildsignal",
Expand All @@ -548,9 +587,9 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
mergeController := merge.NewController(
logger,
scope,
store,
stores,
registry,
psh,
pusherFactory{impl: psh},
consumer.TopicKeyMerge,
"orchestrator-merge",
)
Expand All @@ -561,7 +600,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
concludeController := conclude.NewController(
logger,
scope,
store,
stores,
registry,
consumer.TopicKeyConclude,
"orchestrator-conclude",
Expand All @@ -573,7 +612,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
logController := logctrl.NewController(
logger,
scope,
store,
stores,
consumer.TopicKeyLog,
"orchestrator-log",
)
Expand Down Expand Up @@ -619,14 +658,10 @@ func newMergeChecker(logger *zap.Logger, scope tally.Scope) (mergechecker.MergeC

client.Timeout = parseTimeout(os.Getenv("GITHUB_TIMEOUT"), 30*time.Second)

github := githubchecker.NewMergeChecker(githubchecker.Params{
return githubchecker.NewMergeChecker(githubchecker.Params{
HTTPClient: client,
Logger: logger.Sugar(),
MetricsScope: scope.SubScope("mergechecker"),
})

return mergechecker.NewMultiChecker(map[string]mergechecker.MergeChecker{
"github": github,
}), nil
}

Expand Down
49 changes: 5 additions & 44 deletions submitqueue/entity/queue_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,52 +14,13 @@

package entity

// QueueConfig holds the configuration for a single submit queue.
// Each queue maps a VCS repository + target to a processing pipeline.
// A repository can have multiple queues, but each queue has exactly one target.
// Immutable after creation.
// QueueConfig identifies a single submit queue. It is the registry of valid
// queue names; the gateway validates that a land request targets a known queue.
// All behavioral and VCS configuration lives in the extension factory
// implementations, which are constructed per integrator deployment — the system
// hands a factory only the queue name. Immutable after creation.
type QueueConfig struct {
// Name uniquely identifies this queue within the system.
// Referenced by Request.Queue.
Name string `json:"name" yaml:"name"`

// VCSType identifies the version control system (e.g., "git", "svn", "perforce").
// A queue operates on exactly one VCS.
VCSType string `json:"vcs_type" yaml:"vcs_type"`

// VCSAddress identifies the repository in the version control system.
// The format is VCS-specific:
// - Git: remote URL (e.g., "git@github.com:uber/submitqueue.git")
// - Perforce: depot path (e.g., "//depot/project")
// - SVN: repository URL (e.g., "https://svn.example.com/repos/project")
VCSAddress string `json:"vcs_address" yaml:"vcs_address"`

// Target is the landing target where changes are merged.
// The format is VCS-specific:
// - Git: branch ref (e.g., "main", "release/v2")
// - Perforce: stream or depot path (e.g., "//depot/main/...")
// - SVN: repository path (e.g., "trunk/")
Target string `json:"target" yaml:"target"`

// BuildRunner identifies the CI pipeline or job that runs builds for this queue.
// Opaque to the system; meaningful only to the build runner extension implementation.
// Examples:
// - Buildkite: "buildkite.com/uber/submitqueue-ci"
// - Jenkins: "jenkins.example.com/job/submitqueue-verify"
BuildRunner string `json:"build_runner" yaml:"build_runner"`

// ChangeProvider identifies the change provider implementation for this queue.
// Opaque to the system; meaningful only to the change provider extension implementation.
// Examples: "github", "gitlab", "phabricator"
ChangeProvider string `json:"change_provider" yaml:"change_provider"`

// MergeChecker identifies the merge checker implementation for this queue.
// Opaque to the system; meaningful only to the merge checker extension implementation.
// Examples: "github", "gitlab"
MergeChecker string `json:"merge_checker" yaml:"merge_checker"`

// LandProvider identifies the land provider implementation for this queue.
// Opaque to the system; meaningful only to the land provider extension implementation.
// Examples: "github", "gitlab"
LandProvider string `json:"land_provider" yaml:"land_provider"`
}
20 changes: 17 additions & 3 deletions submitqueue/extension/buildrunner/build_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,10 @@ type BuildRunner interface {
// asynchronously. Callers learn the build's progress via Status, not
// via Trigger.
//
// queueName selects the runner-specific job configuration.
// Returns an error if the request is invalid.
// The runner is already bound to its queue's job configuration by the
// Factory that built it. Returns an error if the request is invalid.
Trigger(
ctx context.Context,
queueName string,
base []entity.Change,
head []entity.Change,
metadata entity.BuildMetadata,
Expand All @@ -80,3 +79,18 @@ type BuildRunner interface {
// on already-terminal builds. Returns an error if the build does not exist.
Cancel(ctx context.Context, buildID entity.BuildID) error
}

// Config carries the per-queue identity handed to a Factory. The system knows
// only the queue name; everything an implementation needs (endpoint, pipeline,
// credentials) is injected at construction by the integrator.
type Config struct {
// QueueName identifies the queue this BuildRunner serves.
QueueName string
}

// Factory builds the BuildRunner for a queue. Implementations are provided by
// integrators (and tests) and inject whatever they need at construction.
type Factory interface {
// For returns the BuildRunner for the given queue.
For(cfg Config) (BuildRunner, error)
}
1 change: 1 addition & 0 deletions submitqueue/extension/buildrunner/mock/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//submitqueue/entity",
"//submitqueue/extension/buildrunner",
"@org_uber_go_mock//gomock",
],
)
Loading
Loading