diff --git a/app/controlplane/cmd/wire_gen.go b/app/controlplane/cmd/wire_gen.go index dfa28f207..9ebb39ec8 100644 --- a/app/controlplane/cmd/wire_gen.go +++ b/app/controlplane/cmd/wire_gen.go @@ -410,7 +410,8 @@ func wireApp(contextContext context.Context, bootstrap *conf.Bootstrap, readerWr return nil, nil, err } workflowRunExpirerUseCase := biz.NewWorkflowRunExpirerUseCase(workflowRunRepo, prometheusUseCase, logger) - casBackendChecker := biz.NewCASBackendChecker(logger, casBackendRepo, casBackendUseCase) + distributedLock := data.NewPostgresLock(dataData, logger) + casBackendChecker := biz.NewCASBackendChecker(logger, casBackendRepo, casBackendUseCase, distributedLock) apiTokenStaleRevoker := biz.NewAPITokenStaleRevoker(organizationRepo, apiTokenRepo, apiTokenUseCase, logger) mainApp := newApp(logger, grpcServer, httpServer, httpMetricsServer, httpProfilerServer, workflowRunExpirerUseCase, availablePlugins, userAccessSyncerUseCase, casBackendChecker, apiTokenStaleRevoker, bootstrap) return mainApp, func() { diff --git a/app/controlplane/internal/usercontext/orgrequirements_middleware.go b/app/controlplane/internal/usercontext/orgrequirements_middleware.go index 8c6d7ac70..06da5a2c4 100644 --- a/app/controlplane/internal/usercontext/orgrequirements_middleware.go +++ b/app/controlplane/internal/usercontext/orgrequirements_middleware.go @@ -30,7 +30,10 @@ import ( var orgRequirementsTracer = otelx.Tracer("chainloop-controlplane", "middleware/orgrequirements") -const validationTimeOffset = 5 * time.Minute +// Slightly larger than the background CAS backend checker cadence for default/fallback +// backends (30 min, see cmd/main.go), so the request path doesn't re-validate ahead of +// the background loop while it's still finishing a tick. +const validationTimeOffset = 35 * time.Minute // ValidateCASBackend checks that the current organization has a valid CAS Backend configured // If the last validation happened more than validationTimeOffset ago it will re-run the validation diff --git a/app/controlplane/pkg/biz/casbackend_checker.go b/app/controlplane/pkg/biz/casbackend_checker.go index 6bc106ca1..412a1fe11 100644 --- a/app/controlplane/pkg/biz/casbackend_checker.go +++ b/app/controlplane/pkg/biz/casbackend_checker.go @@ -30,14 +30,35 @@ var casBackendCheckerTracer = otelx.Tracer("chainloop-controlplane", "biz/casbac const ( defaultInterval = 30 * time.Minute defaultValidationTimeout = 10 * time.Second + // Upper bound on how long a single tick is allowed to hold the + // distributed lock. Defends against a hung validation pinning the lock + // past one tick; the next tick will retry. + defaultMaxTickDuration = 25 * time.Minute + + // Separate keys per scope so the two checker goroutines (defaults vs all backends) + // don't block each other. + lockKeyDefaultsScope = "cas-backend-checker:defaults" + lockKeyAllScope = "cas-backend-checker:all" ) +// DistributedLock is a best-effort, cluster-wide mutex used to make sure +// background jobs that should run on a single replica at a time aren't +// duplicated across pods. +type DistributedLock interface { + // TryAcquire attempts to acquire the lock identified by key without + // blocking. If acquired is true, the caller MUST invoke release when + // done. The lock is also released automatically if the underlying + // session is lost (e.g. pod crash). + TryAcquire(ctx context.Context, key string) (acquired bool, release func(), err error) +} + type CASBackendChecker struct { logger *log.Helper casBackendRepo CASBackendRepo caseBackendUseCase *CASBackendUseCase // Validation timeout for each backend check validationTimeout time.Duration + lock DistributedLock } type CASBackendCheckerOpts struct { @@ -53,12 +74,13 @@ type CASBackendCheckerOpts struct { // NewCASBackendChecker creates a new CAS backend checker that will periodically validate // the status of CAS backends -func NewCASBackendChecker(logger log.Logger, casBackendRepo CASBackendRepo, casBackendUseCase *CASBackendUseCase) *CASBackendChecker { +func NewCASBackendChecker(logger log.Logger, casBackendRepo CASBackendRepo, casBackendUseCase *CASBackendUseCase, lock DistributedLock) *CASBackendChecker { return &CASBackendChecker{ logger: log.NewHelper(log.With(logger, "component", "biz/CASBackendChecker")), casBackendRepo: casBackendRepo, caseBackendUseCase: casBackendUseCase, validationTimeout: defaultValidationTimeout, + lock: lock, } } @@ -120,6 +142,25 @@ func (c *CASBackendChecker) Start(ctx context.Context, opts *CASBackendCheckerOp // checkBackends validates all CAS backends (or just default and fallback ones based on configuration) // using a worker pool for parallel processing with timeouts func (c *CASBackendChecker) checkBackends(ctx context.Context, defaultsOrFallbacks bool) error { + key := lockKeyAllScope + if defaultsOrFallbacks { + key = lockKeyDefaultsScope + } + acquired, release, err := c.lock.TryAcquire(ctx, key) + if err != nil { + return fmt.Errorf("acquiring checker lock: %w", err) + } + if !acquired { + c.logger.Debugw("msg", "another replica is running the CAS backend check, skipping", "scope", key) + return nil + } + defer release() + + // Cap how long we can hold the lock. If validations hang, the next tick + // retries instead of one stuck pod pinning the lock indefinitely. + ctx, cancel := context.WithTimeout(ctx, defaultMaxTickDuration) + defer cancel() + ctx, span := otelx.Start(ctx, casBackendCheckerTracer, "CASBackendChecker.checkBackends") defer span.End() diff --git a/app/controlplane/pkg/data/data.go b/app/controlplane/pkg/data/data.go index 6ddb47cd0..04fb558c3 100644 --- a/app/controlplane/pkg/data/data.go +++ b/app/controlplane/pkg/data/data.go @@ -17,6 +17,7 @@ package data import ( "context" + "database/sql" "fmt" "io" "time" @@ -59,11 +60,15 @@ var ProviderSet = wire.NewSet( NewProjectVersionRepo, NewProjectsRepo, NewGroupRepo, + NewPostgresLock, ) // Data . type Data struct { DB *ent.Client + // Exposed for code paths that need raw SQL features ent doesn't surface, + // e.g. session-scoped advisory locks (pg_try_advisory_lock). + SQLDB *sql.DB } // Load DB schema @@ -80,7 +85,7 @@ func NewData(c *config.DatabaseConfig, tp trace.TracerProvider, logger log.Logge } log := log.NewHelper(logger) - db, err := initSQLDatabase(c, tp, log) + db, sqlDB, err := initSQLDatabase(c, tp, log) if err != nil { log.Errorf("error initialing the DB : %v", err) return nil, nil, fmt.Errorf("failed to initialized db: %w", err) @@ -93,12 +98,12 @@ func NewData(c *config.DatabaseConfig, tp trace.TracerProvider, logger log.Logge } } - return &Data{DB: db}, cleanup, nil + return &Data{DB: db, SQLDB: sqlDB}, cleanup, nil } -func initSQLDatabase(c *config.DatabaseConfig, tp trace.TracerProvider, log *log.Helper) (*ent.Client, error) { +func initSQLDatabase(c *config.DatabaseConfig, tp trace.TracerProvider, log *log.Helper) (*ent.Client, *sql.DB, error) { if c.Driver != "pgx" { - return nil, fmt.Errorf("unsupported driver: %s", c.Driver) + return nil, nil, fmt.Errorf("unsupported driver: %s", c.Driver) } log.Debugf("connecting to db: driver=%s", c.Driver) @@ -115,7 +120,7 @@ func initSQLDatabase(c *config.DatabaseConfig, tp trace.TracerProvider, log *log }), ) if err != nil { - return nil, fmt.Errorf("error opening the connection, driver=%s: %w", c.Driver, err) + return nil, nil, fmt.Errorf("error opening the connection, driver=%s: %w", c.Driver, err) } if c.MaxOpenConns > 0 { @@ -139,7 +144,7 @@ func initSQLDatabase(c *config.DatabaseConfig, tp trace.TracerProvider, log *log // NOTE: We do not run migrations automatically anymore // Instead we leverage atlas cli to run migrations - return client, nil + return client, db, nil } func toTimePtr(t time.Time) *time.Time { diff --git a/app/controlplane/pkg/data/lock.go b/app/controlplane/pkg/data/lock.go new file mode 100644 index 000000000..bcd4749b3 --- /dev/null +++ b/app/controlplane/pkg/data/lock.go @@ -0,0 +1,101 @@ +// +// Copyright 2026 The Chainloop Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package data + +import ( + "context" + "database/sql" + "fmt" + "hash/fnv" + "time" + + "github.com/chainloop-dev/chainloop/app/controlplane/pkg/biz" + "github.com/go-kratos/kratos/v2/log" +) + +// Cap on how long the release path may block. Defends against a stuck +// session: if pg_advisory_unlock can't return, we drop the connection +// and let Postgres release the lock on session disconnect. +const advisoryUnlockTimeout = 5 * time.Second + +// PostgresLock implements biz.DistributedLock using Postgres session-level +// advisory locks (pg_try_advisory_lock / pg_advisory_unlock). +// +// Postgres is the only piece of infrastructure that's mandatory for the +// control plane — NATS is optional (used for distributed caches when +// present). Using advisory locks lets us coordinate background jobs +// across replicas without adding a new dependency (a NATS KV lease or a +// dedicated queue) that wouldn't be available in every deployment. +// +// Each lock holds a dedicated connection for its lifetime; releasing the +// lock returns the connection to the pool. If the pod crashes mid-run the +// connection drops and Postgres releases the lock automatically. +type PostgresLock struct { + db *sql.DB + log *log.Helper +} + +func NewPostgresLock(d *Data, logger log.Logger) biz.DistributedLock { + return &PostgresLock{ + db: d.SQLDB, + log: log.NewHelper(logger), + } +} + +func (l *PostgresLock) TryAcquire(ctx context.Context, key string) (bool, func(), error) { + intKey := hashKey(key) + + conn, err := l.db.Conn(ctx) + if err != nil { + return false, nil, fmt.Errorf("acquiring DB connection: %w", err) + } + + var acquired bool + if err := conn.QueryRowContext(ctx, "SELECT pg_try_advisory_lock($1)", intKey).Scan(&acquired); err != nil { + _ = conn.Close() + return false, nil, fmt.Errorf("pg_try_advisory_lock: %w", err) + } + + if !acquired { + _ = conn.Close() + return false, nil, nil + } + + release := func() { + // pg_advisory_unlock must run on the same session that took the lock, + // and must run even if the caller's context was cancelled (e.g. shutdown). + // Bounded so a stuck session can't hang the release path. + releaseCtx, cancel := context.WithTimeout(context.Background(), advisoryUnlockTimeout) + defer cancel() + if _, err := conn.ExecContext(releaseCtx, "SELECT pg_advisory_unlock($1)", intKey); err != nil { + l.log.Warnw("msg", "failed to release advisory lock", "key", key, "error", err) + } + if err := conn.Close(); err != nil { + l.log.Warnw("msg", "failed to return DB connection to pool", "key", key, "error", err) + } + } + return true, release, nil +} + +// hashKey turns an opaque string key into the int64 that +// pg_advisory_lock expects. FNV-1a gives a stable, well-distributed +// mapping with effectively no collision risk for the handful of named +// locks this package uses. +func hashKey(key string) int64 { + h := fnv.New64a() + _, _ = h.Write([]byte(key)) + return int64(h.Sum64()) //nolint:gosec // intentional wraparound; pg accepts any int64 +}