Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion example/submitqueue/gateway/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ go_library(
"//extension/messagequeue/mysql",
"//submitqueue/core/consumer",
"//submitqueue/extension/queueconfig/yaml",
"//submitqueue/extension/storage",
"//submitqueue/extension/storage/mysql",
"//submitqueue/gateway/controller",
"//submitqueue/gateway/protopb",
Expand Down
8 changes: 3 additions & 5 deletions example/submitqueue/gateway/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
queueMySQL "github.com/uber/submitqueue/extension/messagequeue/mysql"
"github.com/uber/submitqueue/submitqueue/core/consumer"
yamlqueueconfig "github.com/uber/submitqueue/submitqueue/extension/queueconfig/yaml"
"github.com/uber/submitqueue/submitqueue/extension/storage"
mysqlstorage "github.com/uber/submitqueue/submitqueue/extension/storage/mysql"
"github.com/uber/submitqueue/submitqueue/gateway/controller"
pb "github.com/uber/submitqueue/submitqueue/gateway/protopb"
Expand Down Expand Up @@ -201,13 +200,12 @@ func run() error {
))

// Initialize storage from the shared app database connection. The land
// controller takes a storage.Factory (static: every queue → this store);
// cancel/status use the request log store directly.
// controller writes to this store directly; cancel/status use the request
// log store directly.
store, err := mysqlstorage.NewStorage(appDB, scope.SubScope("storage"))
if err != nil {
return fmt.Errorf("failed to create storage: %w", err)
}
stores := storage.NewStaticFactory(store)
requestLogStore := store.GetRequestLogStore()

// Load queue configurations from YAML. Path is required so the gateway
Expand All @@ -223,7 +221,7 @@ func run() error {

// Create controllers and wrap them for gRPC
pingController := controller.NewPingController(logger, scope)
landController := controller.NewLandController(logger.Sugar(), scope, cnt, stores, queueConfigs, registry)
landController := controller.NewLandController(logger.Sugar(), scope, cnt, store, queueConfigs, registry)
cancelController := controller.NewCancelController(logger.Sugar(), scope, requestLogStore, registry)
statusController := controller.NewStatusController(logger.Sugar(), scope, requestLogStore)
gatewayServer := &GatewayServer{
Expand Down
26 changes: 11 additions & 15 deletions example/submitqueue/orchestrator/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,14 +461,10 @@ type conflictFactory struct{ impl conflict.Analyzer }
func (f conflictFactory) For(conflict.Config) (conflict.Analyzer, error) { return f.impl, nil }

func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, mc mergechecker.MergeChecker, cp changeprovider.ChangeProvider, psh pusher.Pusher, br buildrunner.BuildRunner, cnt counter.Counter, store storage.Storage, changeStore changestore.ChangeStore) error {
// Static storage factory: every queue resolves to the one configured store.
// The factory is the injection point for future per-queue backends.
stores := storage.NewStaticFactory(store)

requestController := start.NewController(
logger,
scope,
stores,
store,
changeStore,
registry,
consumer.TopicKeyStart,
Expand All @@ -481,7 +477,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
cancelController := cancel.NewController(
logger,
scope,
stores,
store,
registry,
consumer.TopicKeyCancel,
"orchestrator-cancel",
Expand All @@ -493,7 +489,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
validateController := validate.NewController(
logger,
scope,
stores,
store,
changeStore,
registry,
mergeCheckerFactory{impl: mc},
Expand All @@ -510,7 +506,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
scope,
registry,
cnt,
stores,
store,
// TODO: replace with a real conflict analyzer (e.g. one backed by
// Tango target analysis). The "all" stub serializes the queue.
conflictFactory{impl: all.New()},
Expand All @@ -524,7 +520,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
scoreController := score.NewController(
logger,
scope,
stores,
store,
// TODO: replace with a real scorer
scorerFactory{impl: heuristic.New(
[]heuristic.Bucket{
Expand All @@ -549,7 +545,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
speculateController := speculate.NewController(
logger,
scope,
stores,
store,
registry,
consumer.TopicKeySpeculate,
"orchestrator-speculate",
Expand All @@ -561,7 +557,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
buildController := build.NewController(
logger,
scope,
stores,
store,
buildRunnerFactory{impl: br},
registry,
consumer.TopicKeyBuild,
Expand All @@ -574,7 +570,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
buildsignalController := buildsignal.NewController(
logger,
scope,
stores,
store,
buildRunnerFactory{impl: br},
registry,
consumer.TopicKeyBuildSignal,
Expand All @@ -587,7 +583,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
mergeController := merge.NewController(
logger,
scope,
stores,
store,
registry,
pusherFactory{impl: psh},
consumer.TopicKeyMerge,
Expand All @@ -600,7 +596,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
concludeController := conclude.NewController(
logger,
scope,
stores,
store,
registry,
consumer.TopicKeyConclude,
"orchestrator-conclude",
Expand All @@ -612,7 +608,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
logController := logctrl.NewController(
logger,
scope,
stores,
store,
consumer.TopicKeyLog,
"orchestrator-log",
)
Expand Down
13 changes: 1 addition & 12 deletions submitqueue/extension/storage/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
load("@rules_go//go:def.bzl", "go_library", "go_test")
load("@rules_go//go:def.bzl", "go_library")

go_library(
name = "storage",
Expand All @@ -7,7 +7,6 @@ go_library(
"batch_store.go",
"build_store.go",
"change_provider_store.go",
"factory.go",
"request_log_store.go",
"request_store.go",
"speculation_tree_store.go",
Expand All @@ -17,13 +16,3 @@ go_library(
visibility = ["//visibility:public"],
deps = ["//submitqueue/entity"],
)

go_test(
name = "storage_test",
srcs = ["factory_test.go"],
embed = [":storage"],
deps = [
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
],
)
34 changes: 0 additions & 34 deletions submitqueue/extension/storage/factory.go

This file was deleted.

48 changes: 0 additions & 48 deletions submitqueue/extension/storage/factory_test.go

This file was deleted.

39 changes: 0 additions & 39 deletions submitqueue/extension/storage/mock/storage_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 0 additions & 12 deletions submitqueue/extension/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,3 @@ type Storage interface {
// Close closes the storage and all underlying connections. Should only be called once at the end of the program.
Close() error
}

// Factory returns the Storage backing a named queue. It exists so a single
// queue can be migrated to a different backend without affecting others; the
// default implementation (NewStaticFactory) returns the same Storage for
// every queue. Callers resolve the Storage per message from the queue name
// carried in the message envelope, before any entity lookup.
type Factory interface {
// For returns the Storage for the named queue. An empty name selects the
// default backend. It returns an error if no backend is configured for the
// queue.
For(name string) (Storage, error)
}
16 changes: 4 additions & 12 deletions submitqueue/gateway/controller/land.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,20 +64,20 @@ type LandController struct {
logger *zap.SugaredLogger
metricsScope tally.Scope
counter counter.Counter
stores storage.Factory
store storage.Storage
queueConfigs queueconfig.Store
registry consumer.TopicRegistry
}

// NewLandController creates a new instance of the gateway land controller.
// The controller publishes land requests to the topic registered under
// consumer.TopicKeyStart in the registry.
func NewLandController(logger *zap.SugaredLogger, scope tally.Scope, counter counter.Counter, stores storage.Factory, queueConfigs queueconfig.Store, registry consumer.TopicRegistry) *LandController {
func NewLandController(logger *zap.SugaredLogger, scope tally.Scope, counter counter.Counter, store storage.Storage, queueConfigs queueconfig.Store, registry consumer.TopicRegistry) *LandController {
return &LandController{
logger: logger,
metricsScope: scope.SubScope("land_controller"),
counter: counter,
stores: stores,
store: store,
queueConfigs: queueConfigs,
registry: registry,
}
Expand Down Expand Up @@ -110,14 +110,6 @@ func (c *LandController) Land(ctx context.Context, req *pb.LandRequest) (resp *p
return nil, fmt.Errorf("LandController failed to look up queue %q: %w", queue, err)
}

// Resolve the storage backend for this entityqueue. The queue is known up front
// here (from the request), so this is the one place per-queue storage
// routing is actionable.
store, err := c.stores.For(queue)
if err != nil {
return nil, fmt.Errorf("LandController failed to resolve storage for queue=%s: %w", queue, err)
}

// TODO: pass default queue land strategy to resolver function to process a default.
strategy, err := resolveRequestLandStrategy(req.Strategy)
if err != nil {
Expand All @@ -141,7 +133,7 @@ func (c *LandController) Land(ctx context.Context, req *pb.LandRequest) (resp *p
// It is important to record the status before publishing to the queue for processing. It is important to publish straight to the database and not via a entityqueue.
// Gateway has to stay consistent with the request log.
logEntry := entity.NewRequestLog(landRequest.ID, entity.RequestStatusAccepted, 0, "", nil)
if err := store.GetRequestLogStore().Insert(ctx, logEntry); err != nil {
if err := c.store.GetRequestLogStore().Insert(ctx, logEntry); err != nil {
return nil, fmt.Errorf("LandController failed to insert request log for sqid=%s: %w", landRequest.ID, err)
}

Expand Down
Loading
Loading