Skip to content

Commit ce40a52

Browse files
committed
evict backend
1 parent 4d841da commit ce40a52

11 files changed

Lines changed: 445 additions & 9 deletions

File tree

conf/proxy.toml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,14 @@
2323

2424
graceful-close-conn-timeout = 15
2525

26+
# fail-backend-list marks backend pod names or backend addresses as failed. TiProxy will stop routing new
27+
# connections to them and migrate existing connections away.
28+
# fail-backend-list = ["db-2033841436272623616-0f6e346b-tidb-0", "10.0.0.10:4000"]
29+
30+
# failover-timeout is measured in seconds. If a failed backend still has remaining connections after the timeout,
31+
# TiProxy will force close them.
32+
# failover-timeout = 60
33+
2634
# possible values:
2735
# "" => enable static routing.
2836
# "pd-addr:pd-port" => automatically tidb discovery.

lib/config/proxy.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,11 @@ type ProxyServerOnline struct {
6868
// BackendClusters represents multiple backend clusters that the proxy can route to. It can be reloaded
6969
// online.
7070
BackendClusters []BackendCluster `yaml:"backend-clusters,omitempty" toml:"backend-clusters,omitempty" json:"backend-clusters,omitempty" reloadable:"true"`
71+
// FailBackendList contains backend pod names or backend addresses (IP:port) that should be drained immediately
72+
// and excluded from new routing.
73+
FailBackendList []string `yaml:"fail-backend-list,omitempty" toml:"fail-backend-list,omitempty" json:"fail-backend-list,omitempty" reloadable:"true"`
74+
// FailoverTimeout is the grace period in seconds before force closing the remaining connections on failed backends.
75+
FailoverTimeout int `yaml:"failover-timeout,omitempty" toml:"failover-timeout,omitempty" json:"failover-timeout,omitempty" reloadable:"true"`
7176
}
7277

7378
type ProxyServer struct {
@@ -134,6 +139,7 @@ func NewConfig() *Config {
134139
cfg.Proxy.FrontendKeepalive, cfg.Proxy.BackendHealthyKeepalive, cfg.Proxy.BackendUnhealthyKeepalive = DefaultKeepAlive()
135140
cfg.Proxy.PDAddrs = "127.0.0.1:2379"
136141
cfg.Proxy.GracefulCloseConnTimeout = 15
142+
cfg.Proxy.FailoverTimeout = 60
137143

138144
cfg.API.Addr = "0.0.0.0:3080"
139145

@@ -160,6 +166,7 @@ func (cfg *Config) Clone() *Config {
160166
newCfg.Labels = maps.Clone(cfg.Labels)
161167
newCfg.Proxy.PublicEndpoints = slices.Clone(cfg.Proxy.PublicEndpoints)
162168
newCfg.Proxy.BackendClusters = slices.Clone(cfg.Proxy.BackendClusters)
169+
newCfg.Proxy.FailBackendList = slices.Clone(cfg.Proxy.FailBackendList)
163170
for i := range newCfg.Proxy.BackendClusters {
164171
newCfg.Proxy.BackendClusters[i].NSServers = slices.Clone(newCfg.Proxy.BackendClusters[i].NSServers)
165172
}
@@ -279,6 +286,23 @@ func (ps *ProxyServer) Check() error {
279286
return errors.Wrapf(ErrInvalidConfigValue, "invalid proxy.backend-clusters.ns-servers: %s", err.Error())
280287
}
281288
}
289+
if ps.FailoverTimeout < 0 {
290+
return errors.Wrapf(ErrInvalidConfigValue, "proxy.failover-timeout must be greater than or equal to 0")
291+
}
292+
failBackends := ps.FailBackendList[:0]
293+
failBackendSet := make(map[string]struct{}, len(ps.FailBackendList))
294+
for i, backendName := range ps.FailBackendList {
295+
backendName = strings.TrimSpace(backendName)
296+
if backendName == "" {
297+
return errors.Wrapf(ErrInvalidConfigValue, "proxy.fail-backend-list[%d] is empty", i)
298+
}
299+
if _, ok := failBackendSet[backendName]; ok {
300+
return errors.Wrapf(ErrInvalidConfigValue, "duplicate proxy.fail-backend-list entry %s", backendName)
301+
}
302+
failBackendSet[backendName] = struct{}{}
303+
failBackends = append(failBackends, backendName)
304+
}
305+
ps.FailBackendList = failBackends
282306
return nil
283307
}
284308

lib/config/proxy_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ var testProxyConfig = Config{
2626
FrontendKeepalive: KeepAlive{Enabled: true},
2727
ProxyProtocol: "v2",
2828
GracefulWaitBeforeShutdown: 10,
29+
FailBackendList: []string{"db-tidb-0", "db-tidb-1"},
30+
FailoverTimeout: 60,
2931
ConnBufferSize: 32 * 1024,
3032
BackendClusters: []BackendCluster{
3133
{
@@ -188,6 +190,24 @@ func TestProxyCheck(t *testing.T) {
188190
},
189191
err: ErrInvalidConfigValue,
190192
},
193+
{
194+
pre: func(t *testing.T, c *Config) {
195+
c.Proxy.FailBackendList = []string{"db-tidb-0", " "}
196+
},
197+
err: ErrInvalidConfigValue,
198+
},
199+
{
200+
pre: func(t *testing.T, c *Config) {
201+
c.Proxy.FailBackendList = []string{"db-tidb-0", "db-tidb-0"}
202+
},
203+
err: ErrInvalidConfigValue,
204+
},
205+
{
206+
pre: func(t *testing.T, c *Config) {
207+
c.Proxy.FailoverTimeout = -1
208+
},
209+
err: ErrInvalidConfigValue,
210+
},
191211
}
192212
for _, tc := range testcases {
193213
cfg := testProxyConfig
@@ -311,10 +331,12 @@ func TestCloneConfig(t *testing.T) {
311331
require.Equal(t, cfg, *clone)
312332
cfg.Labels["c"] = "d"
313333
cfg.Proxy.PublicEndpoints[0] = "2.2.2.0/24"
334+
cfg.Proxy.FailBackendList[0] = "db-tidb-9"
314335
cfg.Proxy.BackendClusters[0].Name = "cluster-updated"
315336
cfg.Proxy.BackendClusters[0].NSServers[0] = "10.0.0.9"
316337
require.NotContains(t, clone.Labels, "c")
317338
require.Equal(t, []string{"1.1.1.0/24"}, clone.Proxy.PublicEndpoints)
339+
require.Equal(t, []string{"db-tidb-0", "db-tidb-1"}, clone.Proxy.FailBackendList)
318340
require.Equal(t, "cluster-a", clone.Proxy.BackendClusters[0].Name)
319341
require.Equal(t, []string{"10.0.0.2", "10.0.0.3"}, clone.Proxy.BackendClusters[0].NSServers)
320342
}

pkg/balance/router/group.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,9 @@ func (g *Group) Balance(ctx context.Context) {
253253
i := 0
254254
for ele := fromBackend.connList.Front(); ele != nil && ctx.Err() == nil && i < count; ele = ele.Next() {
255255
conn := ele.Value
256+
if conn.forceClosing {
257+
continue
258+
}
256259
switch conn.phase {
257260
case phaseRedirectNotify:
258261
// A connection cannot be redirected again when it has not finished redirecting.
@@ -279,6 +282,7 @@ func (g *Group) onCreateConn(backendInst BackendInst, conn RedirectableConn, suc
279282
RedirectableConn: conn,
280283
createTime: time.Now(),
281284
phase: phaseNotRedirected,
285+
forceClosing: false,
282286
}
283287
g.addConn(backend, connWrapper)
284288
conn.SetEventReceiver(g)
@@ -287,6 +291,37 @@ func (g *Group) onCreateConn(backendInst BackendInst, conn RedirectableConn, suc
287291
}
288292
}
289293

294+
func (g *Group) CloseTimedOutFailoverConnections(now time.Time, timeout time.Duration) {
295+
g.Lock()
296+
defer g.Unlock()
297+
for _, backend := range g.backends {
298+
active, since := backend.Failover()
299+
if !active {
300+
continue
301+
}
302+
if timeout > 0 && since.Add(timeout).After(now) {
303+
continue
304+
}
305+
for ele := backend.connList.Front(); ele != nil; ele = ele.Next() {
306+
conn := ele.Value
307+
if conn.phase == phaseClosed || conn.forceClosing {
308+
continue
309+
}
310+
fields := []zap.Field{
311+
zap.Uint64("connID", conn.ConnectionID()),
312+
zap.String("backend_addr", backend.addr),
313+
zap.String("backend_pod", backend.PodName()),
314+
zap.Duration("failover_timeout", timeout),
315+
zap.Duration("failover_elapsed", now.Sub(since)),
316+
}
317+
if conn.ForceClose() {
318+
conn.forceClosing = true
319+
g.lg.Warn("force close connection on failover backend", fields...)
320+
}
321+
}
322+
}
323+
}
324+
290325
func (g *Group) removeConn(backend *backendWrapper, ce *glist.Element[*connWrapper]) {
291326
backend.connList.Remove(ce)
292327
setBackendConnMetrics(backend.addr, backend.connList.Len())

pkg/balance/router/mock_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,16 @@ func (conn *mockRedirectableConn) Redirect(inst BackendInst) bool {
6969
return true
7070
}
7171

72+
func (conn *mockRedirectableConn) ForceClose() bool {
73+
conn.Lock()
74+
defer conn.Unlock()
75+
if conn.closing {
76+
return false
77+
}
78+
conn.closing = true
79+
return true
80+
}
81+
7282
func (conn *mockRedirectableConn) GetRedirectingAddr() string {
7383
conn.Lock()
7484
defer conn.Unlock()

pkg/balance/router/router.go

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package router
55

66
import (
7+
"net"
78
"strings"
89
"sync"
910
"time"
@@ -68,6 +69,8 @@ type RedirectableConn interface {
6869
Value(key any) any
6970
// Redirect returns false if the current conn is not redirectable.
7071
Redirect(backend BackendInst) bool
72+
// ForceClose closes the connection immediately and returns false if it's already closing.
73+
ForceClose() bool
7174
ConnectionID() uint64
7275
ConnInfo() []zap.Field
7376
}
@@ -85,8 +88,11 @@ type backendWrapper struct {
8588
mu struct {
8689
sync.RWMutex
8790
observer.BackendHealth
91+
failoverActive bool
92+
failoverSince time.Time
8893
}
89-
addr string
94+
addr string
95+
podName string
9096
// connScore is used for calculating backend scores and check if the backend can be removed from the list.
9197
// connScore = connList.Len() + incoming connections - outgoing connections.
9298
connScore int
@@ -100,6 +106,7 @@ type backendWrapper struct {
100106
func newBackendWrapper(addr string, health observer.BackendHealth) *backendWrapper {
101107
wrapper := &backendWrapper{
102108
addr: addr,
109+
podName: backendPodNameFromAddr(addr),
103110
connList: glist.New[*connWrapper](),
104111
}
105112
wrapper.setHealth(health)
@@ -128,12 +135,50 @@ func (b *backendWrapper) Addr() string {
128135
}
129136

130137
func (b *backendWrapper) Healthy() bool {
138+
b.mu.RLock()
139+
healthy := b.mu.Healthy && !b.mu.failoverActive
140+
b.mu.RUnlock()
141+
return healthy
142+
}
143+
144+
func (b *backendWrapper) ObservedHealthy() bool {
131145
b.mu.RLock()
132146
healthy := b.mu.Healthy
133147
b.mu.RUnlock()
134148
return healthy
135149
}
136150

151+
func (b *backendWrapper) PodName() string {
152+
return b.podName
153+
}
154+
155+
func (b *backendWrapper) setFailover(active bool, since time.Time) (changed bool, failoverSince time.Time) {
156+
b.mu.Lock()
157+
defer b.mu.Unlock()
158+
if active {
159+
if b.mu.failoverActive {
160+
return false, b.mu.failoverSince
161+
}
162+
b.mu.failoverActive = true
163+
b.mu.failoverSince = since
164+
return true, b.mu.failoverSince
165+
}
166+
if !b.mu.failoverActive {
167+
return false, time.Time{}
168+
}
169+
b.mu.failoverActive = false
170+
b.mu.failoverSince = time.Time{}
171+
return true, time.Time{}
172+
}
173+
174+
func (b *backendWrapper) Failover() (active bool, since time.Time) {
175+
b.mu.RLock()
176+
active = b.mu.failoverActive
177+
since = b.mu.failoverSince
178+
b.mu.RUnlock()
179+
return
180+
}
181+
137182
func (b *backendWrapper) ServerVersion() string {
138183
b.mu.RLock()
139184
version := b.mu.ServerVersion
@@ -213,4 +258,22 @@ type connWrapper struct {
213258
lastRedirect time.Time
214259
createTime time.Time
215260
phase connPhase
261+
forceClosing bool
262+
}
263+
264+
func backendPodNameFromAddr(addr string) string {
265+
host, _, err := net.SplitHostPort(addr)
266+
if err != nil {
267+
host = addr
268+
}
269+
if host == "" {
270+
return ""
271+
}
272+
if ip := net.ParseIP(host); ip != nil {
273+
return host
274+
}
275+
if idx := strings.IndexByte(host, '.'); idx >= 0 {
276+
return host[:idx]
277+
}
278+
return host
216279
}

0 commit comments

Comments
 (0)