From 5170a0000ab6a766e54dbaeecfafa1f3ae0c0bca Mon Sep 17 00:00:00 2001 From: Miguel Martinez Trivino Date: Sat, 16 May 2026 23:04:25 +0200 Subject: [PATCH] perf(controlplane): single-pod CAS backend validation via advisory lock MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Background CAS-backend validation was running once per replica, hitting external providers N times per tick. The middleware revalidation window (5 min) was also shorter than the background cadence (30 min), so the bulk of validation work happened synchronously on the request path. Changes: - Add DistributedLock interface (in casbackend_checker.go) with a Postgres pg_try_advisory_lock implementation in pkg/data/lock.go. Postgres is used because it's the only mandatory piece of infrastructure across deployments — NATS is optional. - CASBackendChecker.checkBackends acquires the lock per tick; replicas that lose the race skip the tick, so the validation runs exactly once across the cluster. Per-scope keys keep the defaults (30 min) and all-backends (24 h) checkers from blocking each other. - Cap the lock-hold time at 25 min so a hung validation doesn't pin the lock past one tick. Bound the pg_advisory_unlock call at 5 s so a stuck session can't hang the release path — Postgres releases the lock on session disconnect anyway. - Data exposes the underlying *sql.DB so raw SQL features ent doesn't surface (session-scoped locks) are reachable from the data layer. - validationTimeOffset raised from 5 min to 35 min so the middleware no longer revalidates ahead of the background loop. Refs #3125 Assisted-by: Claude Code Signed-off-by: Miguel Martinez Trivino Chainloop-Trace-Sessions: 052e8b56-72b5-4c6c-8d82-ab2d00728889 --- app/controlplane/cmd/wire_gen.go | 3 +- .../usercontext/orgrequirements_middleware.go | 5 +- .../pkg/biz/casbackend_checker.go | 43 +++++++- app/controlplane/pkg/data/data.go | 17 +-- app/controlplane/pkg/data/lock.go | 101 ++++++++++++++++++ 5 files changed, 160 insertions(+), 9 deletions(-) create mode 100644 app/controlplane/pkg/data/lock.go diff --git a/app/controlplane/cmd/wire_gen.go b/app/controlplane/cmd/wire_gen.go index dfa28f207..9ebb39ec8 100644 --- a/app/controlplane/cmd/wire_gen.go +++ b/app/controlplane/cmd/wire_gen.go @@ -410,7 +410,8 @@ func wireApp(contextContext context.Context, bootstrap *conf.Bootstrap, readerWr return nil, nil, err } workflowRunExpirerUseCase := biz.NewWorkflowRunExpirerUseCase(workflowRunRepo, prometheusUseCase, logger) - casBackendChecker := biz.NewCASBackendChecker(logger, casBackendRepo, casBackendUseCase) + distributedLock := data.NewPostgresLock(dataData, logger) + casBackendChecker := biz.NewCASBackendChecker(logger, casBackendRepo, casBackendUseCase, distributedLock) apiTokenStaleRevoker := biz.NewAPITokenStaleRevoker(organizationRepo, apiTokenRepo, apiTokenUseCase, logger) mainApp := newApp(logger, grpcServer, httpServer, httpMetricsServer, httpProfilerServer, workflowRunExpirerUseCase, availablePlugins, userAccessSyncerUseCase, casBackendChecker, apiTokenStaleRevoker, bootstrap) return mainApp, func() { diff --git a/app/controlplane/internal/usercontext/orgrequirements_middleware.go b/app/controlplane/internal/usercontext/orgrequirements_middleware.go index 8c6d7ac70..06da5a2c4 100644 --- a/app/controlplane/internal/usercontext/orgrequirements_middleware.go +++ b/app/controlplane/internal/usercontext/orgrequirements_middleware.go @@ -30,7 +30,10 @@ import ( var orgRequirementsTracer = otelx.Tracer("chainloop-controlplane", "middleware/orgrequirements") -const validationTimeOffset = 5 * time.Minute +// Slightly larger than the background CAS backend checker cadence for default/fallback +// backends (30 min, see cmd/main.go), so the request path doesn't re-validate ahead of +// the background loop while it's still finishing a tick. +const validationTimeOffset = 35 * time.Minute // ValidateCASBackend checks that the current organization has a valid CAS Backend configured // If the last validation happened more than validationTimeOffset ago it will re-run the validation diff --git a/app/controlplane/pkg/biz/casbackend_checker.go b/app/controlplane/pkg/biz/casbackend_checker.go index 6bc106ca1..412a1fe11 100644 --- a/app/controlplane/pkg/biz/casbackend_checker.go +++ b/app/controlplane/pkg/biz/casbackend_checker.go @@ -30,14 +30,35 @@ var casBackendCheckerTracer = otelx.Tracer("chainloop-controlplane", "biz/casbac const ( defaultInterval = 30 * time.Minute defaultValidationTimeout = 10 * time.Second + // Upper bound on how long a single tick is allowed to hold the + // distributed lock. Defends against a hung validation pinning the lock + // past one tick; the next tick will retry. + defaultMaxTickDuration = 25 * time.Minute + + // Separate keys per scope so the two checker goroutines (defaults vs all backends) + // don't block each other. + lockKeyDefaultsScope = "cas-backend-checker:defaults" + lockKeyAllScope = "cas-backend-checker:all" ) +// DistributedLock is a best-effort, cluster-wide mutex used to make sure +// background jobs that should run on a single replica at a time aren't +// duplicated across pods. +type DistributedLock interface { + // TryAcquire attempts to acquire the lock identified by key without + // blocking. If acquired is true, the caller MUST invoke release when + // done. The lock is also released automatically if the underlying + // session is lost (e.g. pod crash). + TryAcquire(ctx context.Context, key string) (acquired bool, release func(), err error) +} + type CASBackendChecker struct { logger *log.Helper casBackendRepo CASBackendRepo caseBackendUseCase *CASBackendUseCase // Validation timeout for each backend check validationTimeout time.Duration + lock DistributedLock } type CASBackendCheckerOpts struct { @@ -53,12 +74,13 @@ type CASBackendCheckerOpts struct { // NewCASBackendChecker creates a new CAS backend checker that will periodically validate // the status of CAS backends -func NewCASBackendChecker(logger log.Logger, casBackendRepo CASBackendRepo, casBackendUseCase *CASBackendUseCase) *CASBackendChecker { +func NewCASBackendChecker(logger log.Logger, casBackendRepo CASBackendRepo, casBackendUseCase *CASBackendUseCase, lock DistributedLock) *CASBackendChecker { return &CASBackendChecker{ logger: log.NewHelper(log.With(logger, "component", "biz/CASBackendChecker")), casBackendRepo: casBackendRepo, caseBackendUseCase: casBackendUseCase, validationTimeout: defaultValidationTimeout, + lock: lock, } } @@ -120,6 +142,25 @@ func (c *CASBackendChecker) Start(ctx context.Context, opts *CASBackendCheckerOp // checkBackends validates all CAS backends (or just default and fallback ones based on configuration) // using a worker pool for parallel processing with timeouts func (c *CASBackendChecker) checkBackends(ctx context.Context, defaultsOrFallbacks bool) error { + key := lockKeyAllScope + if defaultsOrFallbacks { + key = lockKeyDefaultsScope + } + acquired, release, err := c.lock.TryAcquire(ctx, key) + if err != nil { + return fmt.Errorf("acquiring checker lock: %w", err) + } + if !acquired { + c.logger.Debugw("msg", "another replica is running the CAS backend check, skipping", "scope", key) + return nil + } + defer release() + + // Cap how long we can hold the lock. If validations hang, the next tick + // retries instead of one stuck pod pinning the lock indefinitely. + ctx, cancel := context.WithTimeout(ctx, defaultMaxTickDuration) + defer cancel() + ctx, span := otelx.Start(ctx, casBackendCheckerTracer, "CASBackendChecker.checkBackends") defer span.End() diff --git a/app/controlplane/pkg/data/data.go b/app/controlplane/pkg/data/data.go index 6ddb47cd0..04fb558c3 100644 --- a/app/controlplane/pkg/data/data.go +++ b/app/controlplane/pkg/data/data.go @@ -17,6 +17,7 @@ package data import ( "context" + "database/sql" "fmt" "io" "time" @@ -59,11 +60,15 @@ var ProviderSet = wire.NewSet( NewProjectVersionRepo, NewProjectsRepo, NewGroupRepo, + NewPostgresLock, ) // Data . type Data struct { DB *ent.Client + // Exposed for code paths that need raw SQL features ent doesn't surface, + // e.g. session-scoped advisory locks (pg_try_advisory_lock). + SQLDB *sql.DB } // Load DB schema @@ -80,7 +85,7 @@ func NewData(c *config.DatabaseConfig, tp trace.TracerProvider, logger log.Logge } log := log.NewHelper(logger) - db, err := initSQLDatabase(c, tp, log) + db, sqlDB, err := initSQLDatabase(c, tp, log) if err != nil { log.Errorf("error initialing the DB : %v", err) return nil, nil, fmt.Errorf("failed to initialized db: %w", err) @@ -93,12 +98,12 @@ func NewData(c *config.DatabaseConfig, tp trace.TracerProvider, logger log.Logge } } - return &Data{DB: db}, cleanup, nil + return &Data{DB: db, SQLDB: sqlDB}, cleanup, nil } -func initSQLDatabase(c *config.DatabaseConfig, tp trace.TracerProvider, log *log.Helper) (*ent.Client, error) { +func initSQLDatabase(c *config.DatabaseConfig, tp trace.TracerProvider, log *log.Helper) (*ent.Client, *sql.DB, error) { if c.Driver != "pgx" { - return nil, fmt.Errorf("unsupported driver: %s", c.Driver) + return nil, nil, fmt.Errorf("unsupported driver: %s", c.Driver) } log.Debugf("connecting to db: driver=%s", c.Driver) @@ -115,7 +120,7 @@ func initSQLDatabase(c *config.DatabaseConfig, tp trace.TracerProvider, log *log }), ) if err != nil { - return nil, fmt.Errorf("error opening the connection, driver=%s: %w", c.Driver, err) + return nil, nil, fmt.Errorf("error opening the connection, driver=%s: %w", c.Driver, err) } if c.MaxOpenConns > 0 { @@ -139,7 +144,7 @@ func initSQLDatabase(c *config.DatabaseConfig, tp trace.TracerProvider, log *log // NOTE: We do not run migrations automatically anymore // Instead we leverage atlas cli to run migrations - return client, nil + return client, db, nil } func toTimePtr(t time.Time) *time.Time { diff --git a/app/controlplane/pkg/data/lock.go b/app/controlplane/pkg/data/lock.go new file mode 100644 index 000000000..bcd4749b3 --- /dev/null +++ b/app/controlplane/pkg/data/lock.go @@ -0,0 +1,101 @@ +// +// Copyright 2026 The Chainloop Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package data + +import ( + "context" + "database/sql" + "fmt" + "hash/fnv" + "time" + + "github.com/chainloop-dev/chainloop/app/controlplane/pkg/biz" + "github.com/go-kratos/kratos/v2/log" +) + +// Cap on how long the release path may block. Defends against a stuck +// session: if pg_advisory_unlock can't return, we drop the connection +// and let Postgres release the lock on session disconnect. +const advisoryUnlockTimeout = 5 * time.Second + +// PostgresLock implements biz.DistributedLock using Postgres session-level +// advisory locks (pg_try_advisory_lock / pg_advisory_unlock). +// +// Postgres is the only piece of infrastructure that's mandatory for the +// control plane — NATS is optional (used for distributed caches when +// present). Using advisory locks lets us coordinate background jobs +// across replicas without adding a new dependency (a NATS KV lease or a +// dedicated queue) that wouldn't be available in every deployment. +// +// Each lock holds a dedicated connection for its lifetime; releasing the +// lock returns the connection to the pool. If the pod crashes mid-run the +// connection drops and Postgres releases the lock automatically. +type PostgresLock struct { + db *sql.DB + log *log.Helper +} + +func NewPostgresLock(d *Data, logger log.Logger) biz.DistributedLock { + return &PostgresLock{ + db: d.SQLDB, + log: log.NewHelper(logger), + } +} + +func (l *PostgresLock) TryAcquire(ctx context.Context, key string) (bool, func(), error) { + intKey := hashKey(key) + + conn, err := l.db.Conn(ctx) + if err != nil { + return false, nil, fmt.Errorf("acquiring DB connection: %w", err) + } + + var acquired bool + if err := conn.QueryRowContext(ctx, "SELECT pg_try_advisory_lock($1)", intKey).Scan(&acquired); err != nil { + _ = conn.Close() + return false, nil, fmt.Errorf("pg_try_advisory_lock: %w", err) + } + + if !acquired { + _ = conn.Close() + return false, nil, nil + } + + release := func() { + // pg_advisory_unlock must run on the same session that took the lock, + // and must run even if the caller's context was cancelled (e.g. shutdown). + // Bounded so a stuck session can't hang the release path. + releaseCtx, cancel := context.WithTimeout(context.Background(), advisoryUnlockTimeout) + defer cancel() + if _, err := conn.ExecContext(releaseCtx, "SELECT pg_advisory_unlock($1)", intKey); err != nil { + l.log.Warnw("msg", "failed to release advisory lock", "key", key, "error", err) + } + if err := conn.Close(); err != nil { + l.log.Warnw("msg", "failed to return DB connection to pool", "key", key, "error", err) + } + } + return true, release, nil +} + +// hashKey turns an opaque string key into the int64 that +// pg_advisory_lock expects. FNV-1a gives a stable, well-distributed +// mapping with effectively no collision risk for the handful of named +// locks this package uses. +func hashKey(key string) int64 { + h := fnv.New64a() + _, _ = h.Write([]byte(key)) + return int64(h.Sum64()) //nolint:gosec // intentional wraparound; pg accepts any int64 +}