feat: add leader election for Discovery and Engine components#1423
feat: add leader election for Discovery and Engine components#1423thunguo wants to merge 1 commit intoapache:developfrom
Conversation
|
There was a problem hiding this comment.
Pull request overview
Adds a DB-backed leader election mechanism so that, in multi-replica deployments, only the elected leader runs Discovery/Engine “business logic” (e.g., list-watch and DB writes), improving consistency (Issue #1425).
Changes:
- Introduces
pkg/core/leaderwith a GORMleader_leasesmodel and aLeaderElectionloop (plus unit tests). - Wires leader election into
DiscoveryandEnginecomponents (conditional on non-memory store). - Exposes DB access plumbing from the Store component (and adds a
Pool()accessor onGormStore) to support leader-election DB usage.
Reviewed changes
Copilot reviewed 9 out of 10 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| pkg/store/dbcommon/gorm_store.go | Adds Pool() accessor intended to expose the shared DB connection pool. |
| pkg/core/store/component.go | Adds a leader.DBSource implementation to expose a shared *gorm.DB from the store layer. |
| pkg/core/leader/model.go | Adds the LeaderLease GORM model for leader_leases. |
| pkg/core/leader/leader.go | Implements DB-based leader election (acquire/renew/release + run loop). |
| pkg/core/leader/db_source.go | Adds DBSource interface for components that can provide a *gorm.DB. |
| pkg/core/leader/leader_test.go | Adds unit tests for leader election behavior using in-memory SQLite. |
| pkg/core/engine/component.go | Integrates leader election into Engine startup flow. |
| pkg/core/discovery/component.go | Integrates leader election into Discovery startup flow; fixes a log message typo. |
| go.mod / go.sum | Dependency graph changes (tidy/reclassification + zookeeper direct dep). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| // LeaderLease is the GORM model for the leader_leases table | ||
| // It uses optimistic locking via the Version field to ensure atomic leader elections | ||
| type LeaderLease struct { | ||
| ID uint `gorm:"primaryKey;autoIncrement"` | ||
| Component string `gorm:"uniqueIndex;size:64;not null"` | ||
| HolderID string `gorm:"size:255;not null"` | ||
| AcquiredAt time.Time `gorm:"not null"` | ||
| ExpiresAt time.Time `gorm:"not null"` | ||
| Version int64 `gorm:"not null;default:0"` |
There was a problem hiding this comment.
The struct/doc comment says the Version field is used for optimistic locking to ensure atomic leader elections, but TryAcquire does not include version in its acquisition UPDATE predicate (it only uses it for Renew). Either update the documentation to match the actual behavior, or extend acquisition to use Version as part of the atomicity guarantee.
| // RunLeaderElection runs the leader election loop | ||
| // It blocks and runs onStartLeading/onStopLeading callbacks as leadership changes | ||
| // This is designed to be run in a separate goroutine | ||
| func (le *LeaderElection) RunLeaderElection(ctx context.Context, stopCh <-chan struct{}, | ||
| onStartLeading func(), onStopLeading func()) { | ||
|
|
||
| ticker := time.NewTicker(le.acquireRetry) | ||
| defer ticker.Stop() | ||
|
|
||
| renewTicker := time.NewTicker(le.renewInterval) | ||
| renewTicker.Stop() // Don't start renewal ticker yet | ||
|
|
||
| isLeader := false | ||
|
|
||
| for { | ||
| select { | ||
| case <-ctx.Done(): | ||
| if isLeader { | ||
| le.Release(context.Background()) | ||
| onStopLeading() | ||
| } | ||
| return | ||
| case <-stopCh: | ||
| if isLeader { | ||
| le.Release(context.Background()) | ||
| onStopLeading() | ||
| } | ||
| return | ||
| case <-ticker.C: | ||
| // Try to acquire leadership if not already leader | ||
| if !isLeader { | ||
| if le.TryAcquire(ctx) { | ||
| logger.Infof("leader election: component %s acquired leadership (holder: %s)", le.component, le.holderID) | ||
| isLeader = true | ||
| renewTicker.Reset(le.renewInterval) | ||
| onStartLeading() | ||
| } |
There was a problem hiding this comment.
RunLeaderElection waits for the first ticker.C before the initial TryAcquire, which can delay leader startup by up to acquireRetry (default 5s). Consider attempting TryAcquire once before starting the ticker loop so a leader can start work immediately on boot.
| // poolProvider is an internal interface for stores that provide DB access | ||
| // This avoids circular imports by not referencing dbcommon directly | ||
| type poolProvider interface { | ||
| Pool() interface{} // Returns *ConnectionPool, but we don't type it to avoid import | ||
| } |
There was a problem hiding this comment.
poolProvider is defined as Pool() interface{}, but GormStore.Pool() returns *ConnectionPool. Go method return types are not covariant, so store.(poolProvider) will never succeed and GetDB() will always return (nil, false), effectively disabling leader election for DB stores. Consider exposing a small, import-cycle-safe interface on DB-backed stores (e.g. a GetDB() *gorm.DB method on the store itself) or adjust the Pool() signature to exactly match the interface and then type-assert to a GetDB() *gorm.DB interface instead of using reflection.
| // GetDB returns the shared DB connection if the underlying store is DB-backed | ||
| // Implements the leader.DBSource interface | ||
| func (sc *storeComponent) GetDB() (*gorm.DB, bool) { | ||
| // Try to get DB from any store that has a Pool() method (all GormStores share the same ConnectionPool) | ||
| for _, store := range sc.stores { | ||
| if pp, ok := store.(poolProvider); ok { | ||
| pool := pp.Pool() | ||
| if pool == nil { | ||
| continue | ||
| } | ||
| // Use reflection to call GetDB() on the pool to avoid importing dbcommon | ||
| poolVal := reflect.ValueOf(pool) | ||
| getDBMethod := poolVal.MethodByName("GetDB") | ||
| if getDBMethod.IsValid() { | ||
| result := getDBMethod.Call(nil) | ||
| if len(result) > 0 { | ||
| if db, ok := result[0].Interface().(*gorm.DB); ok { | ||
| return db, true | ||
| } | ||
| } |
There was a problem hiding this comment.
GetDB() relies on the poolProvider type assertion + reflection. Given the current poolProvider signature mismatch, this loop will never find a DB and leader election will never be initialized. Even after fixing the signature, prefer a direct type assertion to a minimal interface (e.g. interface{ GetDB() *gorm.DB }) instead of reflection to avoid runtime surprises and make this easier to test.
| // Run leader election with callbacks for starting/stopping leadership | ||
| e.leaderElection.RunLeaderElection(ctx, ch, | ||
| func() { // onStartLeading callback | ||
| logger.Infof("engine: became leader, starting business logic") | ||
| if err := e.startBusinessLogic(ch); err != nil { | ||
| logger.Errorf("engine: failed to start business logic: %v", err) | ||
| } | ||
| }, | ||
| func() { // onStopLeading callback | ||
| logger.Warnf("engine: lost leadership, stopping business logic") | ||
| }, | ||
| ) |
There was a problem hiding this comment.
When leadership is acquired, startBusinessLogic(ch) starts informers using the global stop channel ch, but on leadership loss the onStopLeading callback only logs. This means informers/subscribers will continue running (and can be started multiple times on leadership flaps), violating the follower contract and potentially duplicating list-watch + DB writes. Use a per-leadership stop channel/context for informers, and in onStopLeading close it and Unsubscribe any subscribers (or otherwise ensure business logic is idempotent and fully stops on leadership loss).
| // Run leader election with callbacks for starting/stopping leadership | ||
| d.leaderElection.RunLeaderElection(ctx, ch, | ||
| func() { // onStartLeading callback | ||
| logger.Infof("discovery: became leader, starting business logic") | ||
| if err := d.startBusinessLogic(ch); err != nil { | ||
| logger.Errorf("discovery: failed to start business logic: %v", err) | ||
| } | ||
| }, | ||
| func() { // onStopLeading callback | ||
| logger.Warnf("discovery: lost leadership, stopping business logic") | ||
| }, | ||
| ) |
There was a problem hiding this comment.
Same issue as engine: startBusinessLogic(ch) launches informers with the global stop channel, but onStopLeading doesn’t stop them or unsubscribe, so a node that loses leadership will keep running leader-only behavior and repeated elections can start duplicate goroutines / duplicate subscription attempts. Use a dedicated stop channel/context per leadership term and stop/unsubscribe in onStopLeading to ensure followers do not execute list-watch or writes.
| if result.Error != nil { | ||
| // If insertion fails, it means another replica just created it | ||
| // This is expected in concurrent scenarios | ||
| logger.Debugf("leader election: failed to insert lease for component %s (probably created by another replica): %v", le.component, result.Error) | ||
| le.isLeader.Store(false) | ||
| return false |
There was a problem hiding this comment.
TryAcquire treats any Create error as a benign race ("probably created by another replica") and logs it at debug level. That will also hide real DB failures (connectivity, permission, schema issues) and can leave the component stuck as follower without a visible signal. Consider checking specifically for unique-constraint violations and logging/handling other errors as warnings (or returning an error).
| acquireRetry time.Duration | ||
| isLeader atomic.Bool | ||
| currentVersion int64 | ||
| stopCh chan struct{} |
There was a problem hiding this comment.
LeaderElection has a stopCh field that is never used (the stop channel is passed into RunLeaderElection instead). Removing the unused field will simplify the struct and avoid confusion about which stop mechanism is authoritative.
| stopCh chan struct{} |
|
|
||
| // Pool returns the connection pool for this store | ||
| // Used by other components (e.g., leader election) that need direct DB access | ||
| func (gs *GormStore) Pool() *ConnectionPool { |
There was a problem hiding this comment.
This new Pool() accessor returns *ConnectionPool, but the only consumer (storeComponent.GetDB) currently looks for a Pool() interface{} method, so GormStore will not satisfy that interface and the accessor won’t be used. Align the method signature with the consumer approach (or replace this with a GetDB() *gorm.DB method on GormStore to avoid exposing the pool at all).
| func (gs *GormStore) Pool() *ConnectionPool { | |
| func (gs *GormStore) Pool() interface{} { |
| ticker := time.NewTicker(le.acquireRetry) | ||
| defer ticker.Stop() | ||
|
|
||
| renewTicker := time.NewTicker(le.renewInterval) |
There was a problem hiding this comment.
renewTicker is created but not defer-stopped on all exit paths (only stopped in some state transitions). If the function returns while renewTicker is active, the ticker goroutine can leak. Add defer renewTicker.Stop() right after creation (similar to ticker).
| renewTicker := time.NewTicker(le.renewInterval) | |
| renewTicker := time.NewTicker(le.renewInterval) | |
| defer renewTicker.Stop() |
robocanic
left a comment
There was a problem hiding this comment.
Great Work! I left some comments and hope you can discuss it with me.
| } | ||
|
|
||
| // Initialize leader election if using DB store | ||
| if ctx.Config().Store.Type != storecfg.Memory { |
There was a problem hiding this comment.
Suggestion: 这里的多个if else处理可以更优雅一点,不符合预期的情况提前返回。
比如
if ctx.Config().Store.Type == storecfg.Memory return
// 再写后面的逻辑
| func (sc *storeComponent) GetDB() (*gorm.DB, bool) { | ||
| // Try to get DB from any store that has a Pool() method (all GormStores share the same ConnectionPool) | ||
| for _, store := range sc.stores { | ||
| if pp, ok := store.(poolProvider); ok { |
There was a problem hiding this comment.
Suggestion: 这个for-loop里面的if 条件不满足的可以先continue,编码看起来会更优雅一点
| // DefaultLeaseDuration is the default duration for a leader lease | ||
| DefaultLeaseDuration = 30 * time.Second | ||
| // DefaultRenewInterval is the default interval for renewing the lease | ||
| DefaultRenewInterval = 10 * time.Second |
There was a problem hiding this comment.
Question: 这里renewInterval和leaseDuration有整除的关系,会不会出现这样一种情况,某一个正常的节点在上一个周期是主节点,但renew时由于并发的关系被其他节点抢占了主,从而该节点变成了从节点,但是由主->从的这个状态变化并没有让这个节点的discovery和engine停止,下一个周期会有两个节点的discovery和engine都在do list-watch,往数据库里面写数据,会导致脏数据的产生。
这里面有两个核心问题:
- 一个是已经成为主节点的节点,在正常情况下应该一直持有这个lease,除非自己挂了,才需要重新选主
- 如果有异常情况,主节点虽然是正常的,但是由于网络原因在下一个周期没抢到lease,那该节点需要停止主节点才能干的事情



Please provide a description of this PR:
Add leader election for Discovery and Engine components.
Issue #1425
To help us figure out who should review this PR, please put an X in all the areas that this PR affects.
Please check any characteristics that apply to this pull request.