Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions conf/proxy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@

graceful-close-conn-timeout = 15

# fail-backend-list marks backend pod names or backend addresses as failed. TiProxy will stop routing new
# connections to them and migrate existing connections away.
# fail-backend-list = ["db-2033841436272623616-0f6e346b-tidb-0", "10.0.0.10:4000"]

# failover-timeout is measured in seconds. If a failed backend still has remaining connections after the timeout,
# TiProxy will force close them.
# failover-timeout = 60

# possible values:
# "" => enable static routing.
# "pd-addr:pd-port" => automatically tidb discovery.
Expand Down
29 changes: 25 additions & 4 deletions lib/config/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ type ProxyServerOnline struct {
// BackendClusters represents multiple backend clusters that the proxy can route to. It can be reloaded
// online.
BackendClusters []BackendCluster `yaml:"backend-clusters,omitempty" toml:"backend-clusters,omitempty" json:"backend-clusters,omitempty" reloadable:"true"`
// FailBackendList contains backend pod names or backend addresses (IP:port) that should be drained immediately
// and excluded from new routing.
FailBackendList []string `yaml:"fail-backend-list,omitempty" toml:"fail-backend-list,omitempty" json:"fail-backend-list,omitempty" reloadable:"true"`
// FailoverTimeout is the grace period in seconds before force closing the remaining connections on failed backends.
FailoverTimeout int `yaml:"failover-timeout,omitempty" toml:"failover-timeout,omitempty" json:"failover-timeout,omitempty" reloadable:"true"`
}

type ProxyServer struct {
Expand Down Expand Up @@ -136,6 +141,7 @@ func NewConfig() *Config {
cfg.Proxy.FrontendKeepalive, cfg.Proxy.BackendHealthyKeepalive, cfg.Proxy.BackendUnhealthyKeepalive = DefaultKeepAlive()
cfg.Proxy.PDAddrs = "127.0.0.1:2379"
cfg.Proxy.GracefulCloseConnTimeout = 15
cfg.Proxy.FailoverTimeout = 60

cfg.API.Addr = "0.0.0.0:3080"

Expand All @@ -162,6 +168,7 @@ func (cfg *Config) Clone() *Config {
newCfg.Labels = maps.Clone(cfg.Labels)
newCfg.Proxy.PublicEndpoints = slices.Clone(cfg.Proxy.PublicEndpoints)
newCfg.Proxy.BackendClusters = slices.Clone(cfg.Proxy.BackendClusters)
newCfg.Proxy.FailBackendList = slices.Clone(cfg.Proxy.FailBackendList)
for i := range newCfg.Proxy.BackendClusters {
newCfg.Proxy.BackendClusters[i].NSServers = slices.Clone(newCfg.Proxy.BackendClusters[i].NSServers)
}
Expand Down Expand Up @@ -260,10 +267,6 @@ func (ps *ProxyServer) Check() error {
if _, err := ps.GetSQLAddrs(); err != nil {
return errors.Wrapf(ErrInvalidConfigValue, "invalid proxy.addr or proxy.port-range: %s", err.Error())
}
if len(ps.BackendClusters) == 0 {
return nil
}

clusterNames := make(map[string]struct{}, len(ps.BackendClusters))
for i, cluster := range ps.BackendClusters {
name := strings.TrimSpace(cluster.Name)
Expand All @@ -281,6 +284,24 @@ func (ps *ProxyServer) Check() error {
return errors.Wrapf(ErrInvalidConfigValue, "invalid proxy.backend-clusters.ns-servers: %s", err.Error())
}
}

if ps.FailoverTimeout < 0 {
return errors.Wrapf(ErrInvalidConfigValue, "proxy.failover-timeout must be greater than or equal to 0")
}
failBackends := ps.FailBackendList[:0]
failBackendSet := make(map[string]struct{}, len(ps.FailBackendList))
for i, backendName := range ps.FailBackendList {
backendName = strings.TrimSpace(backendName)
if backendName == "" {
return errors.Wrapf(ErrInvalidConfigValue, "proxy.fail-backend-list[%d] is empty", i)
}
if _, ok := failBackendSet[backendName]; ok {
continue
}
failBackendSet[backendName] = struct{}{}
failBackends = append(failBackends, backendName)
}
ps.FailBackendList = failBackends
return nil
}

Expand Down
16 changes: 16 additions & 0 deletions lib/config/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ var testProxyConfig = Config{
FrontendKeepalive: KeepAlive{Enabled: true},
ProxyProtocol: "v2",
GracefulWaitBeforeShutdown: 10,
FailBackendList: []string{"db-tidb-0", "db-tidb-1"},
FailoverTimeout: 60,
ConnBufferSize: 32 * 1024,
BackendClusters: []BackendCluster{
{
Expand Down Expand Up @@ -188,6 +190,18 @@ func TestProxyCheck(t *testing.T) {
},
err: ErrInvalidConfigValue,
},
{
pre: func(t *testing.T, c *Config) {
c.Proxy.FailBackendList = []string{"db-tidb-0", " "}
},
err: ErrInvalidConfigValue,
},
{
pre: func(t *testing.T, c *Config) {
c.Proxy.FailoverTimeout = -1
},
err: ErrInvalidConfigValue,
},
}
for _, tc := range testcases {
cfg := testProxyConfig
Expand Down Expand Up @@ -311,10 +325,12 @@ func TestCloneConfig(t *testing.T) {
require.Equal(t, cfg, *clone)
cfg.Labels["c"] = "d"
cfg.Proxy.PublicEndpoints[0] = "2.2.2.0/24"
cfg.Proxy.FailBackendList[0] = "db-tidb-9"
cfg.Proxy.BackendClusters[0].Name = "cluster-updated"
cfg.Proxy.BackendClusters[0].NSServers[0] = "10.0.0.9"
require.NotContains(t, clone.Labels, "c")
require.Equal(t, []string{"1.1.1.0/24"}, clone.Proxy.PublicEndpoints)
require.Equal(t, []string{"db-tidb-0", "db-tidb-1"}, clone.Proxy.FailBackendList)
require.Equal(t, "cluster-a", clone.Proxy.BackendClusters[0].Name)
require.Equal(t, []string{"10.0.0.2", "10.0.0.3"}, clone.Proxy.BackendClusters[0].NSServers)
}
Expand Down
35 changes: 35 additions & 0 deletions pkg/balance/router/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,9 @@ func (g *Group) Balance(ctx context.Context) {
i := 0
for ele := fromBackend.connList.Front(); ele != nil && ctx.Err() == nil && i < count; ele = ele.Next() {
conn := ele.Value
if conn.forceClosing {
continue
}
switch conn.phase {
case phaseRedirectNotify:
// A connection cannot be redirected again when it has not finished redirecting.
Expand All @@ -282,6 +285,7 @@ func (g *Group) onCreateConn(backendInst BackendInst, conn RedirectableConn, suc
RedirectableConn: conn,
createTime: time.Now(),
phase: phaseNotRedirected,
forceClosing: false,
}
g.addConn(backend, connWrapper)
conn.SetEventReceiver(g)
Expand All @@ -290,6 +294,37 @@ func (g *Group) onCreateConn(backendInst BackendInst, conn RedirectableConn, suc
}
}

func (g *Group) CloseTimedOutFailoverConnections(now time.Time, timeout time.Duration) {
g.Lock()
defer g.Unlock()
for _, backend := range g.backends {
since := backend.FailoverSince()
if since.IsZero() {
continue
}
if timeout > 0 && since.Add(timeout).After(now) {
continue
}
for ele := backend.connList.Front(); ele != nil; ele = ele.Next() {
conn := ele.Value
if conn.phase == phaseClosed || conn.forceClosing {
continue
Comment on lines +310 to +311
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Skip force-closing connections that are mid-redirect

CloseTimedOutFailoverConnections currently force-closes any non-closed connection, including ones in phaseRedirectNotify. With failover-timeout=0 (or a very short timeout), rebalance() can queue a redirect in Balance() and then immediately call ForceClose() on the same session in the same tick, which races with the pending redirect signal and can leave router bookkeeping inconsistent (incorrect connScore/list state when close and redirect callbacks arrive in opposite order). Guarding phaseRedirectNotify here (or cancelling/marking the pending redirect before close) avoids this failover race.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BackendConnManager.Close() handles the concurrency problem.

}
fields := []zap.Field{
zap.Uint64("connID", conn.ConnectionID()),
zap.String("backend_addr", backend.addr),
zap.String("backend_pod", backend.PodName()),
zap.Duration("failover_timeout", timeout),
zap.Duration("failover_elapsed", now.Sub(since)),
}
if conn.ForceClose() {
conn.forceClosing = true
g.lg.Info("force close connection on failover backend", fields...)
}
}
}
}

func (g *Group) removeConn(backend *backendWrapper, ce *glist.Element[*connWrapper]) {
backend.connList.Remove(ce)
setBackendConnMetrics(backend.addr, backend.connList.Len())
Expand Down
10 changes: 10 additions & 0 deletions pkg/balance/router/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,16 @@ func (conn *mockRedirectableConn) Redirect(inst BackendInst) bool {
return true
}

func (conn *mockRedirectableConn) ForceClose() bool {
conn.Lock()
defer conn.Unlock()
if conn.closing {
return false
}
conn.closing = true
return true
}

func (conn *mockRedirectableConn) GetRedirectingBackendID() string {
conn.Lock()
defer conn.Unlock()
Expand Down
63 changes: 61 additions & 2 deletions pkg/balance/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package router

import (
"net"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -69,6 +70,8 @@ type RedirectableConn interface {
Value(key any) any
// Redirect returns false if the current conn is not redirectable.
Redirect(backend BackendInst) bool
// ForceClose closes the connection immediately and returns false if it's already closing.
ForceClose() bool
ConnectionID() uint64
ConnInfo() []zap.Field
}
Expand All @@ -88,9 +91,11 @@ type backendWrapper struct {
mu struct {
sync.RWMutex
observer.BackendHealth
failoverSince time.Time
}
id string
addr string
id string
addr string
podName string
// connScore is used for calculating backend scores and check if the backend can be removed from the list.
// connScore = connList.Len() + incoming connections - outgoing connections.
connScore int
Expand All @@ -105,6 +110,7 @@ func newBackendWrapper(id string, health observer.BackendHealth) *backendWrapper
wrapper := &backendWrapper{
id: id,
addr: health.Addr,
podName: backendPodNameFromAddr(health.Addr),
connList: glist.New[*connWrapper](),
}
wrapper.setHealth(health)
Expand Down Expand Up @@ -137,12 +143,47 @@ func (b *backendWrapper) Addr() string {
}

func (b *backendWrapper) Healthy() bool {
b.mu.RLock()
healthy := b.mu.Healthy && b.mu.failoverSince.IsZero()
b.mu.RUnlock()
return healthy
}

func (b *backendWrapper) ObservedHealthy() bool {
b.mu.RLock()
healthy := b.mu.Healthy
b.mu.RUnlock()
return healthy
}

func (b *backendWrapper) PodName() string {
return b.podName
}

func (b *backendWrapper) setFailover(since time.Time) (changed bool, failoverSince time.Time) {
b.mu.Lock()
defer b.mu.Unlock()
if !since.IsZero() {
if !b.mu.failoverSince.IsZero() {
return false, b.mu.failoverSince
}
b.mu.failoverSince = since
return true, b.mu.failoverSince
}
if b.mu.failoverSince.IsZero() {
return false, time.Time{}
}
b.mu.failoverSince = time.Time{}
return true, time.Time{}
}

func (b *backendWrapper) FailoverSince() (since time.Time) {
b.mu.RLock()
since = b.mu.failoverSince
b.mu.RUnlock()
return
}

func (b *backendWrapper) ServerVersion() string {
b.mu.RLock()
version := b.mu.ServerVersion
Expand Down Expand Up @@ -236,4 +277,22 @@ type connWrapper struct {
lastRedirect time.Time
createTime time.Time
phase connPhase
forceClosing bool
}

func backendPodNameFromAddr(addr string) string {
host, _, err := net.SplitHostPort(addr)
if err != nil {
host = addr
}
if host == "" {
return ""
}
if ip := net.ParseIP(host); ip != nil {
return host
}
if idx := strings.IndexByte(host, '.'); idx >= 0 {
return host[:idx]
}
return host
}
Loading