From 829ceba962b820c825e6dfc60af65d76f890aa7c Mon Sep 17 00:00:00 2001 From: djshow832 Date: Mon, 15 Jun 2026 10:59:33 +0800 Subject: [PATCH 1/2] This is an automated cherry-pick of #1173 Signed-off-by: ti-chi-bot --- pkg/balance/factor/factor_cpu.go | 5 + pkg/balance/router/group.go | 615 +++++++++++++ pkg/balance/router/router.go | 20 + pkg/balance/router/router_score.go | 132 +++ pkg/balance/router/router_score_test.go | 968 +++++++++++++++++++++ pkg/balance/router/router_static.go | 4 + pkg/proxy/backend/backend_conn_mgr.go | 31 +- pkg/proxy/backend/backend_conn_mgr_test.go | 62 ++ 8 files changed, 1835 insertions(+), 2 deletions(-) create mode 100644 pkg/balance/router/group.go diff --git a/pkg/balance/factor/factor_cpu.go b/pkg/balance/factor/factor_cpu.go index b531a9530..93b655efd 100644 --- a/pkg/balance/factor/factor_cpu.go +++ b/pkg/balance/factor/factor_cpu.go @@ -239,8 +239,13 @@ func (fc *FactorCPU) updateCpuPerConn() { // Estimate the current cpu usage by the latest CPU usage, the latest connection count, and the current connection count. func (fc *FactorCPU) getUsage(backend scoredBackend) (avgUsage, latestUsage float64) { +<<<<<<< HEAD snapshot, ok := fc.snapshot[backend.Addr()] if !ok || snapshot.avgUsage < 0 || latestUsage < 0 { +======= + snapshot, ok := fc.snapshot[backend.ID()] + if !ok || snapshot.avgUsage < 0 || snapshot.latestUsage < 0 { +>>>>>>> 9fafc2f1 (balance, proxy: fix TiDB CPU imbalance when balance.policy="resource" (#1173)) // The metric has missed for minutes. return 1, 1 } diff --git a/pkg/balance/router/group.go b/pkg/balance/router/group.go new file mode 100644 index 000000000..b103e2f41 --- /dev/null +++ b/pkg/balance/router/group.go @@ -0,0 +1,615 @@ +// Copyright 2025 PingCAP, Inc. +// SPDX-License-Identifier: Apache-2.0 + +package router + +import ( + "context" + "net" + "reflect" + "slices" + "sync" + "time" + + glist "github.com/bahlo/generic-list-go" + "github.com/pingcap/tiproxy/lib/config" + "github.com/pingcap/tiproxy/lib/util/errors" + "github.com/pingcap/tiproxy/pkg/balance/observer" + "github.com/pingcap/tiproxy/pkg/balance/policy" + "github.com/pingcap/tiproxy/pkg/manager/backendcluster" + "github.com/pingcap/tiproxy/pkg/metrics" + "github.com/pingcap/tiproxy/pkg/util/netutil" + "go.uber.org/zap" +) + +type MatchType int + +const ( + // Match all connections, used when there is only one backend group. + MatchAll MatchType = iota + // Match connections based on the client CIDR. + MatchClientCIDR + // Match connections based on proxy CIDR. If proxy-protocol is disabled, route by the client CIDR. + MatchProxyCIDR + // Match connections based on the local SQL listener port. + MatchPort +) + +var _ ConnEventReceiver = (*Group)(nil) + +type routeCheckBackend struct { + *backendWrapper + healthy bool +} + +func (b routeCheckBackend) Healthy() bool { + return b.healthy +} + +// Group is used for one backend group that can be matched by CIDR, username, database, or resource group list. +type Group struct { + sync.Mutex + matchType MatchType + lg *zap.Logger + policy policy.BalancePolicy + // The values that this group is matched by. E.g. for MatchCIDR, the value is the CIDR list. + values []string + // parsed CIDR list for faster match + cidrList []*net.IPNet + backends map[string]*backendWrapper + // failoverTargets contains backend pod names or addresses configured in fail-backend-list. + failoverTargets map[string]struct{} + failoverTimeout time.Duration + ignoreFailover bool + // To limit the speed of redirection. + lastRedirectTime time.Time +} + +func NewGroup(values []string, bpCreator func(lg *zap.Logger) policy.BalancePolicy, matchType MatchType, lg *zap.Logger) (*Group, error) { + if len(values) > 0 { + lg = lg.With(zap.Strings("values", values)) + } + lg.Info("new group created") + + group := &Group{ + matchType: matchType, + lg: lg, + values: values, + backends: make(map[string]*backendWrapper), + failoverTargets: make(map[string]struct{}), + policy: bpCreator(lg.Named("policy")), + } + err := group.parseValues() + if err != nil { + err = errors.Wrapf(err, "failed to parse values") + } + return group, err +} + +func (g *Group) parseValues() error { + switch g.matchType { + case MatchClientCIDR, MatchProxyCIDR: + cidrList, parseErr := netutil.ParseCIDRList(g.values) + if parseErr != nil { + return parseErr + } + g.cidrList = cidrList + } + return nil +} + +func (g *Group) Match(clientInfo ClientInfo) bool { + switch g.matchType { + case MatchClientCIDR, MatchProxyCIDR: + addr := clientInfo.ProxyAddr + if g.matchType == MatchClientCIDR { + addr = clientInfo.ClientAddr + } + ip, err := netutil.NetAddr2IP(addr) + if err != nil { + g.lg.Error("checking CIDR failed", zap.Stringer("addr", addr), zap.Error(err)) + return false + } + contains, err := netutil.CIDRContainsIP(g.cidrList, ip) + if err != nil { + g.lg.Error("checking CIDR failed", zap.Stringer("addr", addr), zap.Error(err)) + } + return contains + } + return true +} + +func (g *Group) EqualValues(values []string) bool { + switch g.matchType { + case MatchClientCIDR, MatchProxyCIDR, MatchPort: + if len(g.values) != len(values) { + return false + } + for _, v := range g.values { + if !slices.Contains(values, v) { + return false + } + } + return true + } + return false +} + +// Intersect returns if any cidrs are the same. +// In next-gen, backend cidrs may increase or decrease but they stay in the same group. +// E.g. enable public endpoint (3 cidrs) -> enable private endpoint (6 cidrs) -> disable public endpoint (3 cidrs). +func (g *Group) Intersect(values []string) bool { + switch g.matchType { + case MatchClientCIDR, MatchProxyCIDR, MatchPort: + for _, v := range g.values { + if slices.Contains(values, v) { + return true + } + } + return false + } + return false +} + +// Backend CIDRs may change anytime. +func (g *Group) RefreshCidr() { + g.Lock() + defer g.Unlock() + switch g.matchType { + case MatchClientCIDR, MatchProxyCIDR: + valueMap := make(map[string]struct{}, len(g.values)) + for _, b := range g.backends { + cidrs := b.Cidr() + for _, cidr := range cidrs { + valueMap[cidr] = struct{}{} + } + } + values := make([]string, 0, len(valueMap)) + for k := range valueMap { + values = append(values, k) + } + g.values = values + if err := g.parseValues(); err != nil { + g.lg.Error("failed to parse values", zap.Error(err)) + } + } +} + +func (g *Group) AddBackend(backendID string, backend *backendWrapper) { + g.Lock() + defer g.Unlock() + g.backends[backendID] = backend + backend.group = g +} + +// removeBackendIfIdle removes the backend from the group only if it has no connections and no +// pending incoming/outgoing scores. +func (g *Group) removeBackendIfIdle(backendID string, backend *backendWrapper) (removed, empty bool) { + g.Lock() + defer g.Unlock() + if backend.connList.Len() != 0 || backend.connScore > 0 { + return false, false + } + delete(g.backends, backendID) + return true, len(g.backends) == 0 +} + +func getConnWrapper(conn RedirectableConn) *glist.Element[*connWrapper] { + return conn.Value(_routerKey).(*glist.Element[*connWrapper]) +} + +func setConnWrapper(conn RedirectableConn, ce *glist.Element[*connWrapper]) { + conn.SetValue(_routerKey, ce) +} + +func (g *Group) routeableObservedBackendsLocked(failoverBackendIDs map[string]struct{}) []policy.BackendCtx { + backends := make([]policy.BackendCtx, 0, len(g.backends)) + for _, backend := range g.backends { + if !backend.ObservedHealthy() { + continue + } + healthy := true + if failoverBackendIDs != nil { + _, healthy = failoverBackendIDs[backend.ID()] + healthy = !healthy + } + backends = append(backends, routeCheckBackend{ + backendWrapper: backend, + healthy: healthy, + }) + } + return g.policy.RouteableBackends(backends) +} + +func (g *Group) backendInFailoverListLocked(backend *backendWrapper) bool { + _, active := g.failoverTargets[backend.PodName()] + if !active { + _, active = g.failoverTargets[backend.Addr()] + } + return active +} + +func (g *Group) setFailoverConfigLocked(cfg *config.Config) { + targets := make(map[string]struct{}, len(cfg.Proxy.FailBackendList)) + for _, backend := range cfg.Proxy.FailBackendList { + targets[backend] = struct{}{} + } + g.failoverTargets = targets + g.failoverTimeout = time.Duration(cfg.Proxy.FailoverTimeout) * time.Second +} + +func (g *Group) updateFailoverLocked(now time.Time) { + failoverBackendIDs := make(map[string]struct{}, len(g.backends)) + for _, backend := range g.backends { + if g.backendInFailoverListLocked(backend) { + failoverBackendIDs[backend.ID()] = struct{}{} + } + } + + routeable := g.routeableObservedBackendsLocked(nil) + if len(routeable) > 0 { + remaining := g.routeableObservedBackendsLocked(failoverBackendIDs) + if len(remaining) == 0 { + matched := 0 + for _, backend := range routeable { + if _, ok := failoverBackendIDs[backend.ID()]; ok { + matched++ + } + } + if !g.ignoreFailover { + g.lg.Warn("fail-backend-list would leave no routeable backend in group, ignore the list for this group", + zap.Int("routeable_backend_count", len(routeable)), + zap.Int("matched_routeable_backend_count", matched)) + } + g.ignoreFailover = true + clear(failoverBackendIDs) + } else { + g.ignoreFailover = false + } + } else { + g.ignoreFailover = false + } + + for _, backend := range g.backends { + _, active := failoverBackendIDs[backend.ID()] + since := time.Time{} + if active { + since = now + } + changed, since := backend.setFailover(since) + if !changed { + continue + } + fields := []zap.Field{ + zap.String("backend_addr", backend.Addr()), + zap.String("backend_pod", backend.PodName()), + zap.Duration("failover_timeout", g.failoverTimeout), + } + if active { + fields = append(fields, zap.Time("failover_since", since)) + g.lg.Warn("backend enters failover", fields...) + continue + } + g.lg.Info("backend exits failover", fields...) + } +} + +func (g *Group) UpdateFailover(now time.Time) { + g.Lock() + defer g.Unlock() + g.updateFailoverLocked(now) +} + +func (g *Group) Route(excluded []BackendInst) (policy.BackendCtx, error) { + g.Lock() + defer g.Unlock() + + if len(g.backends) == 0 { + return nil, ErrNoBackend + } + backends := make([]policy.BackendCtx, 0, len(g.backends)) + for _, backend := range g.backends { + if !backend.Healthy() { + continue + } + // Exclude the backends that are already tried. + found := false + for _, e := range excluded { + if backend.ID() == e.ID() { + found = true + break + } + } + if found { + continue + } + backends = append(backends, backend) + } + + idlestBackend := g.policy.BackendToRoute(backends) + if idlestBackend == nil || reflect.ValueOf(idlestBackend).IsNil() { + return nil, ErrNoBackend + } + backend := idlestBackend.(*backendWrapper) + backend.connScore++ + return backend, nil +} + +func (g *Group) Balance(ctx context.Context) { + g.Lock() + defer g.Unlock() + backends := make([]policy.BackendCtx, 0, len(g.backends)) + for _, backend := range g.backends { + backends = append(backends, backend) + } + + busiestBackend, idlestBackend, balanceCount, reason, logFields := g.policy.BackendsToBalance(backends) + if balanceCount == 0 { + return + } + fromBackend, toBackend := busiestBackend.(*backendWrapper), idlestBackend.(*backendWrapper) + + // Control the speed of migration. + curTime := time.Now() + migrationInterval := time.Duration(float64(time.Second) / balanceCount) + count := 0 + if migrationInterval < rebalanceInterval*2 { + // If we need to migrate multiple connections in each round, calculate the connection count for each round. + count = int((rebalanceInterval-1)/migrationInterval) + 1 + } else { + // If we need to wait for multiple rounds to migrate a connection, calculate the interval for each connection. + if curTime.Sub(g.lastRedirectTime) >= migrationInterval { + count = 1 + } else { + return + } + } + // Migrate balanceCount connections. + i := 0 + for ele := fromBackend.connList.Front(); ele != nil && ctx.Err() == nil && i < count; ele = ele.Next() { + conn := ele.Value + if conn.forceClosing { + continue + } + switch conn.phase { + case phaseRedirectNotify: + // A connection cannot be redirected again when it has not finished redirecting. + continue + case phaseRedirectFail: + // If it failed recently, it will probably fail this time. + if conn.lastRedirect.Add(redirectFailMinInterval).After(curTime) { + continue + } + } + if g.redirectConn(conn, fromBackend, toBackend, reason, logFields, curTime) { + g.lastRedirectTime = curTime + i++ + } + } +} + +func (g *Group) onCreateConn(backendInst BackendInst, conn RedirectableConn, succeed bool) { + g.Lock() + defer g.Unlock() + backend := g.ensureBackend(backendInst.ID()) + if succeed { + connWrapper := &connWrapper{ + RedirectableConn: conn, + scoreOwner: backend, + createTime: time.Now(), + phase: phaseNotRedirected, + forceClosing: false, + } + g.addConn(backend, connWrapper) + conn.SetEventReceiver(g) + } else { + backend.connScore-- + } +} + +func (g *Group) CloseTimedOutFailoverConnections(now time.Time) { + g.Lock() + defer g.Unlock() + for _, backend := range g.backends { + since := backend.FailoverSince() + if since.IsZero() { + continue + } + if g.failoverTimeout > 0 && since.Add(g.failoverTimeout).After(now) { + continue + } + for ele := backend.connList.Front(); ele != nil; ele = ele.Next() { + conn := ele.Value + if conn.phase == phaseClosed || conn.forceClosing { + continue + } + fields := []zap.Field{ + zap.Uint64("connID", conn.ConnectionID()), + zap.String("backend_addr", backend.addr), + zap.String("backend_pod", backend.PodName()), + zap.Duration("failover_timeout", g.failoverTimeout), + zap.Duration("failover_elapsed", now.Sub(since)), + } + if conn.ForceClose() { + conn.forceClosing = true + g.lg.Info("force close connection on failover backend", fields...) + } + } + } +} + +// removeConn removes the connection from the connList that actually contains it. +// Always remove by `physicalOwner` instead of by the backend that the connection is physically on: +// they differ when a redirect result has not been processed yet, and glist.Remove silently +// does nothing if the element is not in the given list, which leaks the connection forever. +func (g *Group) removeConn(ce *glist.Element[*connWrapper]) { + backend := ce.Value.physicalOwner + if backend == nil { + g.lg.Warn("unexpected nil physical owner for connection") + return + } + oldLen := backend.connList.Len() + backend.connList.Remove(ce) + newLen := backend.connList.Len() + if newLen != oldLen-1 { + g.lg.Warn("the connection is not in the list", zap.String("backend", backend.id)) + } + ce.Value.physicalOwner = nil + setBackendConnMetrics(backend.addr, newLen) +} + +func (g *Group) addConn(backend *backendWrapper, conn *connWrapper) { + if conn.physicalOwner != nil { + g.lg.Warn("unexpected non-nil physical owner for connection") + } + conn.physicalOwner = backend + ce := backend.connList.PushBack(conn) + setBackendConnMetrics(backend.addr, backend.connList.Len()) + setConnWrapper(conn, ce) +} + +// RedirectConnections implements Router.RedirectConnections interface. +// It redirects all connections compulsively. It's only used for testing. +func (g *Group) RedirectConnections() error { + g.Lock() + defer g.Unlock() + for _, backend := range g.backends { + for ce := backend.connList.Front(); ce != nil; ce = ce.Next() { + // This is only for test, so we allow it to reconnect to the same backend. + connWrapper := ce.Value + if connWrapper.phase != phaseRedirectNotify { + connWrapper.phase = phaseRedirectNotify + connWrapper.redirectReason = "test" + if connWrapper.Redirect(backend) { + metrics.PendingMigrateGuage.WithLabelValues(backend.addr, backend.addr, connWrapper.redirectReason).Inc() + } + } + } + } + return nil +} + +func (g *Group) ensureBackend(backendID string) *backendWrapper { + backend, ok := g.backends[backendID] + if ok { + return backend + } + // The backend should always exist if it will be needed. Add a warning and add it back. + g.lg.Warn("backend is not found in the router", zap.String("backend_id", backendID), zap.Stack("stack")) + // Try to parse the IP from the backendID. It's generally not suggested to parse it, but in this + // strange case we tried our best to recover and make the backend ip valid. + // For the formats of backendID, ref `backend_id.go`. It's generated and recorded in `GetTiDBTopology` + // for the first time. + _, addr := backendcluster.ParseBackendID(backendID) + ip, _, _ := net.SplitHostPort(addr) + backend = newBackendWrapper(backendID, observer.BackendHealth{ + BackendInfo: observer.BackendInfo{ + Addr: addr, + IP: ip, + StatusPort: 10080, // impossible anyway + }, + SupportRedirection: true, + Healthy: false, + }) + g.backends[backendID] = backend + return backend +} + +// OnRedirectSucceed implements ConnEventReceiver.OnRedirectSucceed interface. +func (g *Group) OnRedirectSucceed(from, to string, conn RedirectableConn) error { + g.onRedirectFinished(from, to, conn, true) + return nil +} + +// OnRedirectFail implements ConnEventReceiver.OnRedirectFail interface. +func (g *Group) OnRedirectFail(from, to string, conn RedirectableConn) error { + g.onRedirectFinished(from, to, conn, false) + return nil +} + +func (g *Group) onRedirectFinished(from, to string, conn RedirectableConn, succeed bool) { + g.Lock() + defer g.Unlock() + fromBackend := g.ensureBackend(from) + toBackend := g.ensureBackend(to) + connWrapper := getConnWrapper(conn).Value + // The connection may be closed when this function is waiting for the lock. + if connWrapper.phase == phaseClosed { + return + } + + addMigrateMetrics(fromBackend.addr, toBackend.addr, connWrapper.redirectReason, succeed, connWrapper.lastRedirect) + if succeed { + g.removeConn(getConnWrapper(conn)) + g.addConn(toBackend, connWrapper) + connWrapper.phase = phaseRedirectEnd + } else { + connWrapper.transferScore(fromBackend) + connWrapper.phase = phaseRedirectFail + } +} + +// OnConnClosed implements ConnEventReceiver.OnConnClosed interface. +func (g *Group) OnConnClosed(backendID string, conn RedirectableConn) error { + g.Lock() + defer g.Unlock() + connWrapper := getConnWrapper(conn) + cw := connWrapper.Value + // If the physical owner mismatches the score owner, it means the redirect result has not been processed yet. + if cw.physicalOwner != cw.scoreOwner && cw.physicalOwner != nil && cw.scoreOwner != nil { + addMigrateMetrics(cw.physicalOwner.addr, cw.scoreOwner.addr, cw.redirectReason, false, cw.lastRedirect) + } + cw.transferScore(nil) + // A redirect result may have not been processed yet, in which case the connection is still + // in the old backend's connList while `backendID` is the new backend, so remove the connection + // by its physicalOwner. onRedirectFinished won't touch the list once the phase is phaseClosed. + g.removeConn(connWrapper) + cw.phase = phaseClosed + return nil +} + +func (g *Group) redirectConn(conn *connWrapper, fromBackend *backendWrapper, toBackend *backendWrapper, + reason string, logFields []zap.Field, curTime time.Time) bool { + // Skip the connection if it's closing. + fields := []zap.Field{ + zap.Uint64("connID", conn.ConnectionID()), + zap.String("from", fromBackend.addr), + zap.String("to", toBackend.addr), + } + if !conn.lastRedirect.IsZero() { + fields = append(fields, zap.Duration("since_last_redirect", curTime.Sub(conn.lastRedirect))) + } + fields = append(fields, logFields...) + succeed := conn.Redirect(toBackend) + if succeed { + g.lg.Debug("begin redirect connection", fields...) + conn.transferScore(toBackend) + conn.phase = phaseRedirectNotify + conn.redirectReason = reason + metrics.PendingMigrateGuage.WithLabelValues(fromBackend.addr, toBackend.addr, reason).Inc() + } else { + // Avoid it to be redirected again immediately. + conn.phase = phaseRedirectFail + g.lg.Debug("skip redirecting because it's closing", fields...) + } + conn.lastRedirect = curTime + return succeed +} + +func (g *Group) ConnCount() int { + g.Lock() + defer g.Unlock() + j := 0 + for _, backend := range g.backends { + j += backend.connList.Len() + } + return j +} + +func (g *Group) SetConfig(cfg *config.Config) { + g.Lock() + defer g.Unlock() + g.policy.SetConfig(cfg) + g.setFailoverConfigLocked(cfg) + g.updateFailoverLocked(time.Now()) +} diff --git a/pkg/balance/router/router.go b/pkg/balance/router/router.go index ae9a586c0..4ef89a144 100644 --- a/pkg/balance/router/router.go +++ b/pkg/balance/router/router.go @@ -32,7 +32,11 @@ func (b routeCheckBackend) Healthy() bool { type ConnEventReceiver interface { OnRedirectSucceed(from, to string, conn RedirectableConn) error OnRedirectFail(from, to string, conn RedirectableConn) error +<<<<<<< HEAD OnConnClosed(addr, redirectingAddr string, conn RedirectableConn) error +======= + OnConnClosed(backendID string, conn RedirectableConn) error +>>>>>>> 9fafc2f1 (balance, proxy: fix TiDB CPU imbalance when balance.policy="resource" (#1173)) } // Router routes client connections to backends. @@ -225,6 +229,12 @@ func (b *backendWrapper) String() string { // connWrapper wraps RedirectableConn. type connWrapper struct { RedirectableConn + // The backend whose connList contains this connection. It may differ from the backend that the + // connection is physically on when a redirect result is not processed yet, so removing the + // connection from the list must always go through `physicalOwner`. + physicalOwner *backendWrapper + // The backend whose connScore includes this connection. + scoreOwner *backendWrapper // The reason why the redirection happens. redirectReason string // Last redirect start time of this connection. @@ -234,6 +244,16 @@ type connWrapper struct { forceClosing bool } +func (c *connWrapper) transferScore(to *backendWrapper) { + if c.scoreOwner != nil { + c.scoreOwner.connScore-- + } + c.scoreOwner = to + if to != nil { + to.connScore++ + } +} + func backendPodNameFromAddr(addr string) string { host, _, err := net.SplitHostPort(addr) if err != nil { diff --git a/pkg/balance/router/router_score.go b/pkg/balance/router/router_score.go index b0dae4b88..ed18ca0ef 100644 --- a/pkg/balance/router/router_score.go +++ b/pkg/balance/router/router_score.go @@ -327,6 +327,138 @@ func (router *ScoreBasedRouter) updateBackendHealth(healthResults observer.Healt if len(serverVersion) > 0 { router.serverVersion = serverVersion } +<<<<<<< HEAD +======= + if router.supportRedirection != supportRedirection { + router.logger.Info("updated supporting redirection", zap.Bool("support", supportRedirection)) + router.supportRedirection = supportRedirection + } +} + +func matchPortValue(clusterName, port string) string { + if clusterName == "" { + return port + } + return fmt.Sprintf("%s:%s", clusterName, port) +} + +func (router *ScoreBasedRouter) backendGroupValues(backend *backendWrapper) []string { + switch router.matchType { + case MatchClientCIDR, MatchProxyCIDR: + return backend.Cidr() + case MatchPort: + port := backend.TiProxyPort() + if port != "" { + return []string{matchPortValue(backend.ClusterName(), port)} + } + } + return nil +} + +func (router *ScoreBasedRouter) rebuildPortConflictDetector() { + if router.matchType != MatchPort { + router.portConflictDetector = nil + return + } + detector := newPortConflictDetector() + for _, group := range router.groups { + for _, value := range group.values { + clusterName, port, ok := strings.Cut(value, ":") + if !ok { + port = value + clusterName = "" + } + detector.bind(port, clusterName, group) + } + } + router.portConflictDetector = detector +} + +// Update the groups after the backend list is updated. +// called in the lock. +func (router *ScoreBasedRouter) updateGroups() { + for _, backend := range router.backends { + // An unhealthy backend can be removed once it has no connections. connList and connScore are + // protected by the group lock instead of the router lock, so read them through + // removeBackendIfIdle to avoid racing with connection events. A backend without a group has + // never been routed to, so it has no connections and can be removed directly. + if !backend.ObservedHealthy() { + removed, empty := true, false + if backend.group != nil { + removed, empty = backend.group.removeBackendIfIdle(backend.id, backend) + } + if removed { + delete(router.backends, backend.id) + // remove empty groups + if backend.group != nil && empty { + router.groups = slices.DeleteFunc(router.groups, func(g *Group) bool { + return g == backend.group + }) + } + continue + } + } + // If the labels were correctly set, we won't update its group even if the labels change. + if backend.group != nil { + switch router.matchType { + case MatchClientCIDR, MatchProxyCIDR, MatchPort: + values := router.backendGroupValues(backend) + if !backend.group.EqualValues(values) { + router.logger.Warn("backend routing values changed, keep the existing group until it is removed", + zap.String("backend_id", backend.id), + zap.String("addr", backend.Addr()), + zap.Strings("current_values", values), + zap.Strings("group_values", backend.group.values)) + } + } + continue + } + + // If the backend is not in any group, add it to a new group if its label is set. + // In operator deployment, the labels are set dynamically. + var group *Group + switch router.matchType { + case MatchAll: + if len(router.groups) == 0 { + group, _ = NewGroup(nil, router.bpCreator, router.matchType, router.logger) + router.groups = append(router.groups, group) + } + group = router.groups[0] + case MatchClientCIDR, MatchProxyCIDR, MatchPort: + values := router.backendGroupValues(backend) + if len(values) == 0 { + break + } + for _, g := range router.groups { + if g.Intersect(values) { + group = g + break + } + } + if group == nil { + g, err := NewGroup(values, router.bpCreator, router.matchType, router.logger) + if err == nil { + group = g + if router.cfgGetter != nil { + if cfg := router.cfgGetter.GetConfig(); cfg != nil { + group.SetConfig(cfg) + } + } + router.groups = append(router.groups, group) + } + // maybe too many logs, ignore the error now + } + } + if group == nil { + continue + } + group.AddBackend(backend.id, backend) + } + for _, group := range router.groups { + group.RefreshCidr() + } + router.rebuildPortConflictDetector() +>>>>>>> 9fafc2f1 (balance, proxy: fix TiDB CPU imbalance when balance.policy="resource" (#1173)) } func (router *ScoreBasedRouter) rebalanceLoop(ctx context.Context) { diff --git a/pkg/balance/router/router_score_test.go b/pkg/balance/router/router_score_test.go index e82497437..e064394b9 100644 --- a/pkg/balance/router/router_score_test.go +++ b/pkg/balance/router/router_score_test.go @@ -165,7 +165,11 @@ func (tester *routerTester) closeConnections(num int, redirecting bool) { } } for _, conn := range conns { +<<<<<<< HEAD err := tester.router.OnConnClosed(conn.from.Addr(), conn.GetRedirectingAddr(), conn) +======= + err := tester.router.groups[0].OnConnClosed(conn.from.ID(), conn) +>>>>>>> 9fafc2f1 (balance, proxy: fix TiDB CPU imbalance when balance.policy="resource" (#1173)) require.NoError(tester.t, err) delete(tester.conns, conn.connID) } @@ -247,6 +251,34 @@ func (tester *routerTester) checkBackendConnMetrics() { } } +// checkNoConnLeak checks that no connection wrapper leaks in any connList and all connScores are 0 +// after all the connections are closed. +func (tester *routerTester) checkNoConnLeak() { + for _, backend := range tester.router.backends { + require.Equal(tester.t, 0, backend.connList.Len(), "backend %s leaks connections in connList", backend.addr) + require.Equal(tester.t, 0, backend.connScore, "backend %s has a wrong connScore", backend.addr) + } +} + +// prepareRedirectingConn creates one connection on backend "1" and makes it redirect to backend "2". +func (tester *routerTester) prepareRedirectingConn() *mockRedirectableConn { + tester.addBackends(1) + tester.addConnections(1) + tester.addBackends(1) + // Make backend "1" unhealthy so that the connection is redirected to backend "2". + tester.updateBackendStatusByAddr("1", false) + tester.rebalance(1) + conn := tester.conns[1] + require.Equal(tester.t, "2", conn.GetRedirectingBackendID()) + return conn +} + +func (tester *routerTester) readPendingGauge(from, to, reason string) float64 { + val, err := metrics.ReadGauge(metrics.PendingMigrateGuage.WithLabelValues(from, to, reason)) + require.NoError(tester.t, err) + return val +} + func (tester *routerTester) clear() { tester.backendID = 0 tester.connID = 0 @@ -283,6 +315,114 @@ func TestRebalance(t *testing.T) { tester.checkRedirectingNum(20) } +// Test the race that the connection is closed after the redirection succeeds on the connection side +// but before the router processes the redirect result. The connection must be removed from the +// original backend's connList, otherwise it leaks in the connList forever, which inflates b_conn +// metrics and distorts the balance. +func TestCloseRaceWithRedirectSucceed(t *testing.T) { + tester := newRouterTester(t, nil) + conn := tester.prepareRedirectingConn() + reason := getConnWrapper(conn).Value.redirectReason + pendingDuringRedirect := tester.readPendingGauge("1", "2", reason) + + // The redirection succeeds on the connection side, and then the connection is closed + // before the router receives OnRedirectSucceed. + conn.redirectSucceed() + require.NoError(t, tester.router.groups[0].OnConnClosed(conn.from.ID(), conn)) + delete(tester.conns, conn.connID) + // Now the router processes the redirect result. + require.NoError(t, tester.router.groups[0].OnRedirectSucceed("1", "2", conn)) + + tester.checkNoConnLeak() + tester.checkBackendConnMetrics() + require.Equal(t, pendingDuringRedirect-1, tester.readPendingGauge("1", "2", reason)) +} + +// Test the race that the connection is closed after the redirection fails on the connection side +// but before the router processes the redirect result. The connScores moved by the redirection +// must be moved back, otherwise they drift away permanently. +func TestCloseRaceWithRedirectFail(t *testing.T) { + tester := newRouterTester(t, nil) + conn := tester.prepareRedirectingConn() + reason := getConnWrapper(conn).Value.redirectReason + pendingDuringRedirect := tester.readPendingGauge("1", "2", reason) + + // The redirection fails on the connection side, and then the connection is closed + // before the router receives OnRedirectFail. + conn.redirectFail() + require.NoError(t, tester.router.groups[0].OnConnClosed(conn.from.ID(), conn)) + delete(tester.conns, conn.connID) + // Now the router processes the redirect result. + require.NoError(t, tester.router.groups[0].OnRedirectFail("1", "2", conn)) + + tester.checkNoConnLeak() + tester.checkBackendConnMetrics() + require.Equal(t, pendingDuringRedirect-1, tester.readPendingGauge("1", "2", reason)) +} + +// Test the race that the connection is closed before the connection side processes the redirect +// signal. OnConnClosed reverts the redirection, and the aborted redirect result that arrives later +// must not revert it again, otherwise the connScores and the pending gauge are decreased twice. +func TestCloseRaceWithRedirectAborted(t *testing.T) { + tester := newRouterTester(t, nil) + conn := tester.prepareRedirectingConn() + reason := getConnWrapper(conn).Value.redirectReason + pendingDuringRedirect := tester.readPendingGauge("1", "2", reason) + + // The connection is closed before the redirect signal is processed on the connection side. + require.NoError(t, tester.router.groups[0].OnConnClosed(conn.from.ID(), conn)) + delete(tester.conns, conn.connID) + // The aborted redirect result arrives later and must be a no-op. + require.NoError(t, tester.router.groups[0].OnRedirectFail("1", "2", conn)) + + tester.checkNoConnLeak() + tester.checkBackendConnMetrics() + require.Equal(t, pendingDuringRedirect-1, tester.readPendingGauge("1", "2", reason)) +} + +// Test that when the authenticator reconnects to another backend during the handshake, +// the registration of the previous attempt is cleaned up. +func TestReconnectDuringHandshake(t *testing.T) { + tester := newRouterTester(t, nil) + tester.addBackends(2) + conn := tester.createConn() + selector1 := tester.router.GetBackendSelector(ClientInfo{}) + backend1, err := selector1.Next() + require.NoError(t, err) + require.False(t, backend1 == nil || reflect.ValueOf(backend1).IsNil()) + selector1.Finish(conn, true) + // Simulate getBackendIO cleaning up before reconnecting to another backend. + require.NoError(t, tester.router.groups[0].OnConnClosed(backend1.ID(), conn)) + selector2 := tester.router.GetBackendSelector(ClientInfo{}) + var backend2 BackendInst + for { + backend2, err = selector2.Next() + require.NoError(t, err) + require.False(t, backend2 == nil || reflect.ValueOf(backend2).IsNil()) + if backend2.ID() != backend1.ID() { + break + } + selector2.Finish(conn, false) + } + selector2.Finish(conn, true) + conn.from = backend2 + tester.conns[conn.connID] = conn + + // Only the registration of the second attempt remains. + totalConns, totalScore := 0, 0 + for _, backend := range tester.router.backends { + totalConns += backend.connList.Len() + totalScore += backend.connScore + } + require.Equal(t, 1, totalConns) + require.Equal(t, 1, totalScore) + tester.checkBackendConnMetrics() + + require.NoError(t, tester.router.groups[0].OnConnClosed(backend2.ID(), conn)) + delete(tester.conns, conn.connID) + tester.checkNoConnLeak() +} + // Test that the connections are always balanced after rebalance and routing. func TestConnBalanced(t *testing.T) { tester := newRouterTester(t, nil) @@ -618,7 +758,11 @@ func TestConcurrency(t *testing.T) { from, to := conn.getAddr() var err error if i < 1 { +<<<<<<< HEAD err = router.OnConnClosed(from, conn.GetRedirectingAddr(), conn) +======= + err = router.groups[0].OnConnClosed(from, conn) +>>>>>>> 9fafc2f1 (balance, proxy: fix TiDB CPU imbalance when balance.policy="resource" (#1173)) conn = nil } else if i < 3 { conn.redirectFail() @@ -633,8 +777,13 @@ func TestConcurrency(t *testing.T) { i := rand.Intn(10) if i < 2 { // The balancer may happen to redirect it concurrently - that's exactly what may happen. +<<<<<<< HEAD from, _ := conn.getAddr() err := router.OnConnClosed(from, conn.GetRedirectingAddr(), conn) +======= + from, _ := conn.getBackendIDs() + err := router.groups[0].OnConnClosed(from, conn) +>>>>>>> 9fafc2f1 (balance, proxy: fix TiDB CPU imbalance when balance.policy="resource" (#1173)) require.NoError(t, err) conn = nil } @@ -977,7 +1126,178 @@ func TestWatchConfig(t *testing.T) { Labels: map[string]string{"k1": "v2"}, } require.Eventually(t, func() bool { +<<<<<<< HEAD return policy.getConfig().Labels["k1"] == "v2" +======= + return p.getConfig().Labels["k1"] == "v2" + }, 3*time.Second, 10*time.Millisecond) +} + +func TestChannelClosed(t *testing.T) { + tests := []struct { + name string + closeChannel func(cfgCh chan *config.Config, bo *mockBackendObserver) + }{ + { + name: "config", + closeChannel: func(cfgCh chan *config.Config, _ *mockBackendObserver) { + close(cfgCh) + }, + }, + { + name: "health", + closeChannel: func(_ chan *config.Config, bo *mockBackendObserver) { + bo.Close() + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + lg, _ := logger.CreateLoggerForTest(t) + router := NewScoreBasedRouter(lg) + cfgCh := make(chan *config.Config) + cfg := &config.Config{} + cfgGetter := newMockConfigGetter(cfg) + p := &mockBalancePolicy{} + bpCreator := func(lg *zap.Logger) policy.BalancePolicy { + p.Init(cfg) + return p + } + bo := newMockBackendObserver() + router.Init(context.Background(), bo, bpCreator, cfgGetter, cfgCh) + t.Cleanup(bo.Close) + t.Cleanup(router.Close) + + bo.addBackend("0", nil) + bo.notify(nil) + require.Eventually(t, func() bool { + router.Lock() + defer router.Unlock() + return len(router.groups) == 1 + }, 3*time.Second, 10*time.Millisecond) + + tt.closeChannel(cfgCh, bo) + time.Sleep(100 * time.Millisecond) + }) + } +} + +func TestWatchFailoverConfig(t *testing.T) { + lg, _ := logger.CreateLoggerForTest(t) + router := NewScoreBasedRouter(lg) + cfgCh := make(chan *config.Config) + addr := "db-2033841436272623616-0f6e346b-tidb-0.peer.ns.svc:4000" + addr2 := "db-2033841436272623616-0f6e346b-tidb-1.peer.ns.svc:4000" + cfgGetter := newMockConfigGetter(&config.Config{ + Proxy: config.ProxyServer{ + ProxyServerOnline: config.ProxyServerOnline{ + FailoverTimeout: 60, + }, + }, + }) + bo := newMockBackendObserver() + router.Init(context.Background(), bo, simpleBpCreator, cfgGetter, cfgCh) + t.Cleanup(bo.Close) + t.Cleanup(router.Close) + + bo.addBackend(addr, nil) + bo.addBackend(addr2, nil) + bo.notify(nil) + require.Eventually(t, func() bool { + backend := router.backends[addr] + return backend != nil && backend.Healthy() + }, 3*time.Second, 10*time.Millisecond) + + cfgCh <- &config.Config{ + Proxy: config.ProxyServer{ + ProxyServerOnline: config.ProxyServerOnline{ + FailBackendList: []string{"db-2033841436272623616-0f6e346b-tidb-0"}, + FailoverTimeout: 60, + }, + }, + } + require.Eventually(t, func() bool { + backend := router.backends[addr] + return backend != nil && !backend.Healthy() + }, 3*time.Second, 10*time.Millisecond) + + cfgCh <- &config.Config{ + Proxy: config.ProxyServer{ + ProxyServerOnline: config.ProxyServerOnline{ + FailBackendList: []string{addr}, + FailoverTimeout: 60, + }, + }, + } + require.Eventually(t, func() bool { + backend := router.backends[addr] + return backend != nil && !backend.Healthy() + }, 3*time.Second, 10*time.Millisecond) + + cfgCh <- &config.Config{ + Proxy: config.ProxyServer{ + ProxyServerOnline: config.ProxyServerOnline{ + FailoverTimeout: 60, + }, + }, + } + require.Eventually(t, func() bool { + backend := router.backends[addr] + return backend != nil && backend.Healthy() + }, 3*time.Second, 10*time.Millisecond) +} + +func TestNewGroupUsesLatestConfigGetter(t *testing.T) { + lg, _ := logger.CreateLoggerForTest(t) + router := NewScoreBasedRouter(lg) + cfgCh := make(chan *config.Config) + cfgGetter := newMockConfigGetter(&config.Config{ + Balance: config.Balance{ + RoutingRule: config.MatchPortStr, + }, + }) + bo := newMockBackendObserver() + router.Init(context.Background(), bo, simpleBpCreator, cfgGetter, cfgCh) + t.Cleanup(bo.Close) + t.Cleanup(router.Close) + + addr1 := "db-2033841436272623616-0f6e346b-tidb-0.peer.ns.svc:4000" + addr2 := "db-2033841436272623616-0f6e346b-tidb-1.peer.ns.svc:4000" + addr3 := "db-2033841436272623616-0f6e346b-tidb-2.peer.ns.svc:4000" + bo.addBackend(addr1, map[string]string{config.TiProxyPortLabelName: "10080"}) + bo.notify(nil) + require.Eventually(t, func() bool { + router.Lock() + defer router.Unlock() + backend := router.backends[addr1] + return backend != nil && backend.Healthy() + }, 3*time.Second, 10*time.Millisecond) + + nextCfg := &config.Config{ + Balance: config.Balance{ + RoutingRule: config.MatchPortStr, + }, + Proxy: config.ProxyServer{ + ProxyServerOnline: config.ProxyServerOnline{ + FailBackendList: []string{addr2}, + FailoverTimeout: 60, + }, + }, + } + cfgGetter.setConfig(nextCfg) + cfgCh <- nextCfg + + bo.addBackend(addr2, map[string]string{config.TiProxyPortLabelName: "10081"}) + bo.addBackend(addr3, map[string]string{config.TiProxyPortLabelName: "10081"}) + bo.notify(nil) + require.Eventually(t, func() bool { + backend := router.backends[addr2] + return backend != nil && !backend.Healthy() + }, 3*time.Second, 10*time.Millisecond) + require.Eventually(t, func() bool { + backend := router.backends[addr3] + return backend != nil && backend.Healthy() +>>>>>>> 9fafc2f1 (balance, proxy: fix TiDB CPU imbalance when balance.policy="resource" (#1173)) }, 3*time.Second, 10*time.Millisecond) } @@ -1114,3 +1434,651 @@ func TestRedirectFail(t *testing.T) { require.Equal(t, 1, tester.getBackendByIndex(0).connScore) require.Equal(t, 1, tester.getBackendByIndex(1).connScore) } +<<<<<<< HEAD +======= + +func TestSkipRedirection(t *testing.T) { + tester := newRouterTester(t, nil) + backends := map[string]*observer.BackendHealth{ + "0": { + BackendInfo: observer.BackendInfo{ + Addr: "0", + }, + Healthy: true, + SupportRedirection: false, + }, + "1": { + BackendInfo: observer.BackendInfo{ + Addr: "1", + }, + Healthy: true, + SupportRedirection: true, + }, + } + result := observer.NewHealthResult(backends, nil) + tester.router.updateBackendHealth(result) + require.False(t, tester.router.supportRedirection) + + tester.addConnections(10) + require.Equal(t, 5, tester.getBackendByIndex(0).connScore) + backends["0"].Healthy = false + tester.router.updateBackendHealth(result) + tester.rebalance(1) + require.Equal(t, 5, tester.getBackendByIndex(0).connScore) + + backends["0"].SupportRedirection = true + tester.router.updateBackendHealth(result) + require.True(t, tester.router.supportRedirection) + tester.rebalance(1) + require.NotEqual(t, 5, tester.getBackendByIndex(0).connScore) +} + +func TestGroupBackends(t *testing.T) { + lg, _ := logger.CreateLoggerForTest(t) + router := NewScoreBasedRouter(lg) + cfgCh := make(chan *config.Config) + cfg := &config.Config{ + Balance: config.Balance{ + RoutingRule: config.MatchClientCIDRStr, + }, + } + cfgGetter := newMockConfigGetter(cfg) + p := &mockBalancePolicy{} + bpCreator := func(_ *zap.Logger) policy.BalancePolicy { + p.Init(cfg) + return p + } + bo := newMockBackendObserver() + router.Init(context.Background(), bo, bpCreator, cfgGetter, cfgCh) + t.Cleanup(bo.Close) + t.Cleanup(router.Close) + + tests := []struct { + addr string + labels map[string]string + groupCount int + backendCount int + cidrs []string + }{ + { + addr: "0", + labels: nil, + groupCount: 0, + backendCount: 1, + cidrs: nil, + }, + { + addr: "1", + labels: map[string]string{"cidr": "1.1.1.1/32"}, + groupCount: 1, + backendCount: 2, + cidrs: []string{"1.1.1.1/32"}, + }, + { + addr: "2", + labels: map[string]string{"cidr": "1.1.1.1/32 , "}, + groupCount: 1, + backendCount: 3, + cidrs: []string{"1.1.1.1/32"}, + }, + { + addr: "3", + labels: map[string]string{"cidr": "1.1.2.1/32, 1.1.3.1/32"}, + groupCount: 2, + backendCount: 4, + cidrs: []string{"1.1.2.1/32", "1.1.3.1/32"}, + }, + { + addr: "4", + labels: map[string]string{"cidr": "1.1.2.1/32,, 1.1.3.1/32 "}, + groupCount: 2, + backendCount: 5, + cidrs: []string{"1.1.2.1/32", "1.1.3.1/32"}, + }, + { + addr: "0", + labels: map[string]string{"cidr": " 1.1.1.1/32 "}, + groupCount: 2, + backendCount: 5, + cidrs: []string{"1.1.1.1/32"}, + }, + { + addr: "1", + labels: map[string]string{"cidr": "1.1.1.1/32, 1.1.4.1/32"}, + groupCount: 2, + backendCount: 5, + cidrs: []string{"1.1.1.1/32", "1.1.4.1/32"}, + }, + { + addr: "3", + labels: map[string]string{"cidr": "1.1.2.1/32"}, + groupCount: 2, + backendCount: 5, + cidrs: []string{"1.1.2.1/32", "1.1.3.1/32"}, + }, + } + + for i, test := range tests { + bo.addBackend(test.addr, test.labels) + bo.notify(nil) + require.Eventually(t, func() bool { + router.Lock() + defer router.Unlock() + if len(router.groups) != test.groupCount { + return false + } + if len(router.backends) != test.backendCount { + return false + } + backend, ok := router.backends[test.addr] + if !ok { + return false + } + group := backend.group + if test.cidrs == nil { + return group == nil + } + if group == nil { + return false + } + return group.EqualValues(test.cidrs) + }, 3*time.Second, 10*time.Millisecond, "test %d", i) + } +} + +func TestGroupBackendsByPort(t *testing.T) { + lg, _ := logger.CreateLoggerForTest(t) + router := NewScoreBasedRouter(lg) + cfg := &config.Config{ + Balance: config.Balance{ + RoutingRule: config.MatchPortStr, + }, + } + cfgGetter := newMockConfigGetter(cfg) + bo := newMockBackendObserver() + router.Init(context.Background(), bo, simpleBpCreator, cfgGetter, make(<-chan *config.Config)) + t.Cleanup(bo.Close) + t.Cleanup(router.Close) + + tests := []struct { + addr string + labels map[string]string + groupCount int + backendCount int + port string + }{ + { + addr: "0", + labels: nil, + groupCount: 0, + backendCount: 1, + }, + { + addr: "1", + labels: map[string]string{config.TiProxyPortLabelName: "10080"}, + groupCount: 1, + backendCount: 2, + port: "10080", + }, + { + addr: "2", + labels: map[string]string{config.TiProxyPortLabelName: "10080"}, + groupCount: 1, + backendCount: 3, + port: "10080", + }, + { + addr: "3", + labels: map[string]string{config.TiProxyPortLabelName: "10081"}, + groupCount: 2, + backendCount: 4, + port: "10081", + }, + } + + for i, test := range tests { + bo.addBackend(test.addr, test.labels) + bo.notify(nil) + require.Eventually(t, func() bool { + router.Lock() + defer router.Unlock() + if len(router.groups) != test.groupCount { + return false + } + if len(router.backends) != test.backendCount { + return false + } + group := router.backends[test.addr].group + if test.port == "" { + return group == nil + } + return group != nil && slices.Equal(group.values, []string{test.port}) + }, 3*time.Second, 10*time.Millisecond, "test %d", i) + } +} + +func TestRouteAndRebalanceByPort(t *testing.T) { + cfg := &config.Config{ + Balance: config.Balance{ + RoutingRule: config.MatchPortStr, + }, + } + bp := &mockBalancePolicy{} + tester := newRouterTester(t, bp) + tester.router.matchType = MatchPort + bp.backendToRoute = func(backends []policy.BackendCtx) policy.BackendCtx { + if len(backends) == 0 { + return nil + } + return backends[0] + } + bp.backendsToBalance = func(backends []policy.BackendCtx) (from policy.BackendCtx, to policy.BackendCtx, balanceCount float64, reason string, logFields []zapcore.Field) { + if len(backends) < 2 { + return nil, nil, 0, "", nil + } + var busiest, idlest policy.BackendCtx + for _, backend := range backends { + if busiest == nil || backend.ConnCount() > busiest.ConnCount() { + busiest = backend + } + if idlest == nil || backend.ConnCount() < idlest.ConnCount() { + idlest = backend + } + } + if busiest == nil || idlest == nil || busiest == idlest { + return nil, nil, 0, "", nil + } + return busiest, idlest, 100, "conn", nil + } + tester.router.cfgGetter = newMockConfigGetter(cfg) + + tester.backends["1"] = &observer.BackendHealth{ + Healthy: true, + SupportRedirection: true, + BackendInfo: observer.BackendInfo{ + Addr: "1", + Labels: map[string]string{config.TiProxyPortLabelName: "10080"}, + }, + } + tester.backends["2"] = &observer.BackendHealth{ + Healthy: true, + SupportRedirection: true, + BackendInfo: observer.BackendInfo{ + Addr: "2", + Labels: map[string]string{config.TiProxyPortLabelName: "10080"}, + }, + } + tester.backends["3"] = &observer.BackendHealth{ + Healthy: true, + SupportRedirection: true, + BackendInfo: observer.BackendInfo{ + Addr: "3", + Labels: map[string]string{config.TiProxyPortLabelName: "10081"}, + }, + } + tester.notifyHealth() + + for range 10 { + conn := tester.createConn() + backend := tester.route(conn, ClientInfo{ListenerPort: "10080"}) + require.NotNil(t, backend) + conn.from = backend + tester.conns[conn.connID] = conn + } + for _, conn := range tester.conns { + require.Equal(t, "10080", tester.router.backends[conn.from.ID()].TiProxyPort()) + } + + tester.rebalance(10) + redirecting := 0 + for _, conn := range tester.conns { + if conn.to == nil || reflect.ValueOf(conn.to).IsNil() { + continue + } + redirecting++ + require.Equal(t, "10080", tester.router.backends[conn.to.ID()].TiProxyPort()) + require.NotEqual(t, "3", conn.to.Addr()) + } + require.Greater(t, redirecting, 0) +} + +func TestRouteByPortBlocksConflictingClusters(t *testing.T) { + cfg := &config.Config{ + Balance: config.Balance{ + RoutingRule: config.MatchPortStr, + }, + } + cfgGetter := newMockConfigGetter(cfg) + bo := newMockBackendObserver() + router := NewScoreBasedRouter(zap.NewNop()) + router.Init(context.Background(), bo, simpleBpCreator, cfgGetter, make(<-chan *config.Config)) + t.Cleanup(bo.Close) + t.Cleanup(router.Close) + + bo.addBackendWithCluster("a1", "cluster-a", map[string]string{ + config.TiProxyPortLabelName: "10080", + }) + bo.addBackendWithCluster("b1", "cluster-b", map[string]string{ + config.TiProxyPortLabelName: "10080", + }) + bo.notify(nil) + + require.Eventually(t, func() bool { + router.Lock() + defer router.Unlock() + return len(router.groups) == 2 && router.portConflictDetector != nil + }, 3*time.Second, 10*time.Millisecond) + + selector := router.GetBackendSelector(ClientInfo{ListenerPort: "10080"}) + _, err := selector.Next() + require.Error(t, err) + require.True(t, errors.Is(err, ErrPortConflict)) +} + +func TestRouteByPortRecoversAfterConflictIsRemoved(t *testing.T) { + cfg := &config.Config{ + Balance: config.Balance{ + RoutingRule: config.MatchPortStr, + }, + } + cfgGetter := newMockConfigGetter(cfg) + bo := newMockBackendObserver() + router := NewScoreBasedRouter(zap.NewNop()) + router.Init(context.Background(), bo, simpleBpCreator, cfgGetter, make(<-chan *config.Config)) + t.Cleanup(bo.Close) + t.Cleanup(router.Close) + + bo.addBackendWithCluster("a1", "cluster-a", map[string]string{ + config.TiProxyPortLabelName: "10080", + }) + bo.addBackendWithCluster("b1", "cluster-b", map[string]string{ + config.TiProxyPortLabelName: "10080", + }) + bo.notify(nil) + + require.Eventually(t, func() bool { + selector := router.GetBackendSelector(ClientInfo{ListenerPort: "10080"}) + _, err := selector.Next() + return errors.Is(err, ErrPortConflict) + }, 3*time.Second, 10*time.Millisecond) + + bo.healthLock.Lock() + delete(bo.healths, "b1") + bo.healthLock.Unlock() + bo.notify(nil) + + require.Eventually(t, func() bool { + selector := router.GetBackendSelector(ClientInfo{ListenerPort: "10080"}) + backend, err := selector.Next() + return err == nil && backend != nil && backend.ID() == "a1" + }, 3*time.Second, 10*time.Millisecond) +} + +func TestKeepExistingPortGroupWhenPortLabelChanges(t *testing.T) { + cfg := &config.Config{ + Balance: config.Balance{ + RoutingRule: config.MatchPortStr, + }, + } + cfgGetter := newMockConfigGetter(cfg) + bo := newMockBackendObserver() + lg, text := logger.CreateLoggerForTest(t) + router := NewScoreBasedRouter(lg) + router.Init(context.Background(), bo, simpleBpCreator, cfgGetter, make(<-chan *config.Config)) + t.Cleanup(bo.Close) + t.Cleanup(router.Close) + + bo.addBackendWithCluster("backend-1", "cluster-a", map[string]string{ + config.TiProxyPortLabelName: "10080", + }) + bo.notify(nil) + + var oldGroup *Group + require.Eventually(t, func() bool { + router.Lock() + defer router.Unlock() + backend := router.backends["backend-1"] + if backend == nil { + return false + } + oldGroup = backend.group + return oldGroup != nil && slices.Equal(oldGroup.values, []string{"cluster-a:10080"}) + }, 3*time.Second, 10*time.Millisecond) + + conn := newMockRedirectableConn(t, 1) + selector := router.GetBackendSelector(ClientInfo{ListenerPort: "10080"}) + backend, err := selector.Next() + require.NoError(t, err) + selector.Finish(conn, true) + conn.from = backend + + bo.healthLock.Lock() + bo.healths["backend-1"].ClusterName = "cluster-a" + bo.healthLock.Unlock() + bo.setLabels("backend-1", map[string]string{ + config.TiProxyPortLabelName: "10081", + }) + bo.notify(nil) + + require.Eventually(t, func() bool { + router.Lock() + defer router.Unlock() + backend := router.backends["backend-1"] + return backend != nil && backend.group == oldGroup + }, 3*time.Second, 10*time.Millisecond) + require.Eventually(t, func() bool { + return strings.Contains(text.String(), "backend routing values changed, keep the existing group until it is removed") + }, 3*time.Second, 10*time.Millisecond) + + conn.Lock() + require.Equal(t, oldGroup, conn.receiver) + conn.Unlock() + + oldSelector := router.GetBackendSelector(ClientInfo{ListenerPort: "10080"}) + backend, err = oldSelector.Next() + require.NoError(t, err) + require.Equal(t, "backend-1", backend.ID()) + + newSelector := router.GetBackendSelector(ClientInfo{ListenerPort: "10081"}) + _, err = newSelector.Next() + require.ErrorIs(t, err, ErrNoBackend) +} + +func TestPortConflictGroupsStayClusterScoped(t *testing.T) { + tester := newRouterTester(t, nil) + tester.router.matchType = MatchPort + tester.backends["a1"] = &observer.BackendHealth{ + Healthy: true, + SupportRedirection: true, + BackendInfo: observer.BackendInfo{ + Addr: "shared-a-1:4000", + ClusterName: "cluster-a", + Labels: map[string]string{ + config.TiProxyPortLabelName: "10080", + }, + }, + } + tester.backends["a2"] = &observer.BackendHealth{ + Healthy: true, + SupportRedirection: true, + BackendInfo: observer.BackendInfo{ + Addr: "shared-a-2:4000", + ClusterName: "cluster-a", + Labels: map[string]string{ + config.TiProxyPortLabelName: "10080", + }, + }, + } + tester.backends["b1"] = &observer.BackendHealth{ + Healthy: true, + SupportRedirection: true, + BackendInfo: observer.BackendInfo{ + Addr: "shared-b-1:4000", + ClusterName: "cluster-b", + Labels: map[string]string{ + config.TiProxyPortLabelName: "10080", + }, + }, + } + tester.backends["b2"] = &observer.BackendHealth{ + Healthy: true, + SupportRedirection: true, + BackendInfo: observer.BackendInfo{ + Addr: "shared-b-2:4000", + ClusterName: "cluster-b", + Labels: map[string]string{ + config.TiProxyPortLabelName: "10080", + }, + }, + } + tester.notifyHealth() + + groupA := findGroupByValues(t, tester.router, []string{"cluster-a:10080"}) + groupB := findGroupByValues(t, tester.router, []string{"cluster-b:10080"}) + require.NotSame(t, groupA, groupB) + for _, backend := range groupA.backends { + require.Equal(t, "cluster-a", backend.ClusterName()) + } + for _, backend := range groupB.backends { + require.Equal(t, "cluster-b", backend.ClusterName()) + } +} + +func TestPortConflictBlocksRoutingButAllowsIntraClusterRebalance(t *testing.T) { + bp := &mockBalancePolicy{} + tester := newRouterTester(t, bp) + tester.router.matchType = MatchPort + bp.backendsToBalance = func(backends []policy.BackendCtx) (from policy.BackendCtx, to policy.BackendCtx, balanceCount float64, reason string, logFields []zapcore.Field) { + if len(backends) < 2 { + return nil, nil, 0, "", nil + } + var busiest, idlest policy.BackendCtx + for _, backend := range backends { + if busiest == nil || backend.ConnCount() > busiest.ConnCount() { + busiest = backend + } + if idlest == nil || backend.ConnCount() < idlest.ConnCount() { + idlest = backend + } + } + if busiest == nil || idlest == nil || busiest == idlest { + return nil, nil, 0, "", nil + } + return busiest, idlest, 100, "conn", nil + } + + tester.backends["a1"] = &observer.BackendHealth{ + Healthy: true, + SupportRedirection: true, + BackendInfo: observer.BackendInfo{ + Addr: "shared-a-1:4000", + ClusterName: "cluster-a", + Labels: map[string]string{ + config.TiProxyPortLabelName: "10080", + }, + }, + } + tester.backends["a2"] = &observer.BackendHealth{ + Healthy: true, + SupportRedirection: true, + BackendInfo: observer.BackendInfo{ + Addr: "shared-a-2:4000", + ClusterName: "cluster-a", + Labels: map[string]string{ + config.TiProxyPortLabelName: "10080", + }, + }, + } + tester.backends["b1"] = &observer.BackendHealth{ + Healthy: true, + SupportRedirection: true, + BackendInfo: observer.BackendInfo{ + Addr: "shared-b-1:4000", + ClusterName: "cluster-b", + Labels: map[string]string{ + config.TiProxyPortLabelName: "10080", + }, + }, + } + tester.notifyHealth() + + selector := tester.router.GetBackendSelector(ClientInfo{ListenerPort: "10080"}) + _, err := selector.Next() + require.Error(t, err) + require.True(t, errors.Is(err, ErrPortConflict)) + + groupA := findGroupByValues(t, tester.router, []string{"cluster-a:10080"}) + backendA1 := tester.router.backends["a1"] + for range 6 { + conn := tester.createConn() + groupA.onCreateConn(backendA1, conn, true) + conn.from = backendA1 + tester.conns[conn.connID] = conn + } + + groupA.lastRedirectTime = time.Time{} + groupA.Balance(context.Background()) + + redirecting := 0 + for _, conn := range tester.conns { + if conn.to == nil || reflect.ValueOf(conn.to).IsNil() { + continue + } + redirecting++ + require.Equal(t, "cluster-a", tester.router.backends[conn.to.ID()].ClusterName()) + require.Equal(t, "a2", conn.to.ID()) + } + require.Greater(t, redirecting, 0) +} + +func TestRouteBackendsWithSameAddrDifferentIDs(t *testing.T) { + tester := newRouterTester(t, nil) + tester.router.matchType = MatchAll + tester.backends["cluster-a/shared:4000"] = &observer.BackendHealth{ + Healthy: true, + SupportRedirection: true, + BackendInfo: observer.BackendInfo{ + Addr: "shared:4000", + ClusterName: "cluster-a", + }, + } + tester.backends["cluster-b/shared:4000"] = &observer.BackendHealth{ + Healthy: true, + SupportRedirection: true, + BackendInfo: observer.BackendInfo{ + Addr: "shared:4000", + ClusterName: "cluster-b", + }, + } + tester.notifyHealth() + + selector := tester.router.GetBackendSelector(ClientInfo{}) + first, err := selector.Next() + require.NoError(t, err) + second, err := selector.Next() + require.NoError(t, err) + + require.Equal(t, "shared:4000", first.Addr()) + require.Equal(t, "shared:4000", second.Addr()) + require.NotEqual(t, first.ID(), second.ID()) +} + +func findGroupByValues(t *testing.T, router *ScoreBasedRouter, values []string) *Group { + t.Helper() + router.Lock() + defer router.Unlock() + for _, group := range router.groups { + if group.matchType == MatchPort { + if slices.Equal(group.values, values) { + return group + } + continue + } + if group.EqualValues(values) { + return group + } + } + require.FailNow(t, "group not found", "values=%v", values) + return nil +} +>>>>>>> 9fafc2f1 (balance, proxy: fix TiDB CPU imbalance when balance.policy="resource" (#1173)) diff --git a/pkg/balance/router/router_static.go b/pkg/balance/router/router_static.go index 7b0173900..bdfc0c08a 100644 --- a/pkg/balance/router/router_static.go +++ b/pkg/balance/router/router_static.go @@ -74,7 +74,11 @@ func (r *StaticRouter) OnRedirectFail(from, to string, conn RedirectableConn) er return nil } +<<<<<<< HEAD func (r *StaticRouter) OnConnClosed(addr, redirectingAddr string, conn RedirectableConn) error { +======= +func (r *StaticRouter) OnConnClosed(backendID string, conn RedirectableConn) error { +>>>>>>> 9fafc2f1 (balance, proxy: fix TiDB CPU imbalance when balance.policy="resource" (#1173)) r.cnt-- return nil } diff --git a/pkg/proxy/backend/backend_conn_mgr.go b/pkg/proxy/backend/backend_conn_mgr.go index 493ae4096..c0d40b0a5 100644 --- a/pkg/proxy/backend/backend_conn_mgr.go +++ b/pkg/proxy/backend/backend_conn_mgr.go @@ -258,7 +258,24 @@ func (mgr *BackendConnManager) newExponentialBackOff() *backoff.ExponentialBackO return b } +// abandonRoutedBackend removes the registration left by a previous successful dial during handshake. +// It is called at the beginning of getBackendIO before routing to another backend. +func (mgr *BackendConnManager) abandonRoutedBackend() { + receiver := mgr.getEventReceiver() + if receiver == nil || mgr.curBackend == nil { + return + } + if err := receiver.OnConnClosed(mgr.curBackend.ID(), mgr); err != nil { + mgr.logger.Warn("abandon routed backend failed", zap.String("backend_id", mgr.curBackend.ID()), + zap.String("backend_addr", mgr.curBackend.Addr()), zap.NamedError("notify_err", err)) + } + // Clear the receiver so that Close() won't clean up again. + mgr.eventReceiver.Store(nil) + mgr.redirectInfo.Store(nil) +} + func (mgr *BackendConnManager) getBackendIO(ctx context.Context, cctx ConnContext, resp *pnet.HandshakeResp) (pnet.PacketIO, error) { + mgr.abandonRoutedBackend() r, err := mgr.handshakeHandler.GetRouter(cctx, resp) if err != nil { return nil, errors.Wrap(err, ErrProxyErr) @@ -545,6 +562,7 @@ func (mgr *BackendConnManager) tryRedirect(ctx context.Context) { }() // Even if the connection is closing, the redirection result must still be sent to recover the connection scores. if mgr.closeStatus.Load() >= statusNotifyClose || ctx.Err() != nil { + rs.err = ErrClosing return } // It may have been too long since the redirection signal was sent, and the target backend may be unhealthy now. @@ -798,10 +816,13 @@ func (mgr *BackendConnManager) Close() error { eventReceiver := mgr.getEventReceiver() if eventReceiver != nil { // Notify the receiver if there's any event. - if len(mgr.redirectResCh) > 0 { - mgr.notifyRedirectResult(context.Background(), <-mgr.redirectResCh) + select { + case rs := <-mgr.redirectResCh: + mgr.notifyRedirectResult(context.Background(), rs) + default: } // The connection may have just received the redirecting signal. +<<<<<<< HEAD if len(addr) > 0 { var redirectingAddr string if redirectingBackend := mgr.redirectInfo.Load(); redirectingBackend != nil { @@ -809,6 +830,12 @@ func (mgr *BackendConnManager) Close() error { } if err := eventReceiver.OnConnClosed(addr, redirectingAddr, mgr); err != nil { mgr.logger.Error("close connection error", zap.String("backend_addr", addr), zap.NamedError("notify_err", err)) +======= + if mgr.curBackend != nil { + if err := eventReceiver.OnConnClosed(mgr.curBackend.ID(), mgr); err != nil { + mgr.logger.Error("close connection error", zap.String("backend_id", mgr.curBackend.ID()), + zap.String("backend_addr", mgr.curBackend.Addr()), zap.NamedError("notify_err", err)) +>>>>>>> 9fafc2f1 (balance, proxy: fix TiDB CPU imbalance when balance.policy="resource" (#1173)) } } } diff --git a/pkg/proxy/backend/backend_conn_mgr_test.go b/pkg/proxy/backend/backend_conn_mgr_test.go index b62ef6410..515ce6547 100644 --- a/pkg/proxy/backend/backend_conn_mgr_test.go +++ b/pkg/proxy/backend/backend_conn_mgr_test.go @@ -62,10 +62,16 @@ func (mer *mockEventReceiver) OnRedirectFail(from, to string, conn router.Redire return nil } +<<<<<<< HEAD func (mer *mockEventReceiver) OnConnClosed(from, to string, conn router.RedirectableConn) error { mer.eventCh <- event{ from: from, to: to, +======= +func (mer *mockEventReceiver) OnConnClosed(backendID string, conn router.RedirectableConn) error { + mer.eventCh <- event{ + from: backendID, +>>>>>>> 9fafc2f1 (balance, proxy: fix TiDB CPU imbalance when balance.policy="resource" (#1173)) eventName: eventClose, } return nil @@ -786,6 +792,7 @@ func TestGracefulCloseWhenActive(t *testing.T) { ts.runTests(runners) } +<<<<<<< HEAD type countingPacketIO struct { pnet.PacketIO gracefulCloseCnt atomic.Int32 @@ -800,6 +807,61 @@ func (cp *countingPacketIO) GracefulClose() error { func (cp *countingPacketIO) Close() error { cp.closeCnt.Add(1) return nil +======= +// Test that the redirection aborted by closing is reported as a failure instead of a success. +// Otherwise, the router moves the connection to the target backend in the connList, which +// leaks the connection in the list. +func TestRedirectAbortedByCloseReportsFail(t *testing.T) { + ts := newBackendMgrTester(t) + runners := []runner{ + // 1st handshake + { + client: ts.mc.authenticate, + proxy: ts.firstHandshake4Proxy, + backend: ts.handshake4Backend, + }, + { + proxy: func(_, _ pnet.PacketIO) error { + // Simulate the race that the redirect signal is received but the connection + // starts closing before tryRedirect processes the signal. + backendInst := router.BackendInst(newMockBackendInst(ts)) + ts.mp.redirectInfo.Store(&backendInst) + ts.mp.closeStatus.Store(statusNotifyClose) + ts.mp.processLock.Lock() + ts.mp.tryRedirect(context.Background()) + ts.mp.processLock.Unlock() + // The aborted redirection must be reported as a failure, not a success. + ts.mp.getEventReceiver().(*mockEventReceiver).checkEvent(ts.t, eventFail) + ts.mp.closeStatus.Store(statusActive) + return nil + }, + }, + } + ts.runTests(runners) +} + +func TestGracefulCloseBeforeHandshake(t *testing.T) { + ts := newBackendMgrTester(t) + runners := []runner{ + // try to gracefully close before handshake + { + proxy: func(_, _ pnet.PacketIO) error { + ts.mp.GracefulClose() + return nil + }, + }, + // connect fails + { + proxy: func(clientIO, backendIO pnet.PacketIO) error { + err := ts.mp.Connect(context.Background(), clientIO, ts.mp.frontendTLSConfig, ts.mp.backendTLSConfig, "", "", "") + require.Error(ts.t, err) + require.Equal(t, SrcProxyQuit, ts.mp.QuitSource()) + return nil + }, + }, + } + ts.runTests(runners) +>>>>>>> 9fafc2f1 (balance, proxy: fix TiDB CPU imbalance when balance.policy="resource" (#1173)) } func TestForceClose(t *testing.T) { From 4f8821356028a49ae141dca82524d4a3970e4457 Mon Sep 17 00:00:00 2001 From: djshow832 Date: Mon, 15 Jun 2026 19:51:47 +0800 Subject: [PATCH 2/2] fix conflicts --- pkg/balance/factor/factor_cpu.go | 5 - pkg/balance/router/group.go | 615 --------------- pkg/balance/router/router.go | 6 +- pkg/balance/router/router_score.go | 193 +---- pkg/balance/router/router_score_test.go | 868 +-------------------- pkg/balance/router/router_static.go | 6 +- pkg/proxy/backend/backend_conn_mgr.go | 22 +- pkg/proxy/backend/backend_conn_mgr_test.go | 39 +- 8 files changed, 66 insertions(+), 1688 deletions(-) delete mode 100644 pkg/balance/router/group.go diff --git a/pkg/balance/factor/factor_cpu.go b/pkg/balance/factor/factor_cpu.go index 93b655efd..0e45635a9 100644 --- a/pkg/balance/factor/factor_cpu.go +++ b/pkg/balance/factor/factor_cpu.go @@ -239,13 +239,8 @@ func (fc *FactorCPU) updateCpuPerConn() { // Estimate the current cpu usage by the latest CPU usage, the latest connection count, and the current connection count. func (fc *FactorCPU) getUsage(backend scoredBackend) (avgUsage, latestUsage float64) { -<<<<<<< HEAD snapshot, ok := fc.snapshot[backend.Addr()] - if !ok || snapshot.avgUsage < 0 || latestUsage < 0 { -======= - snapshot, ok := fc.snapshot[backend.ID()] if !ok || snapshot.avgUsage < 0 || snapshot.latestUsage < 0 { ->>>>>>> 9fafc2f1 (balance, proxy: fix TiDB CPU imbalance when balance.policy="resource" (#1173)) // The metric has missed for minutes. return 1, 1 } diff --git a/pkg/balance/router/group.go b/pkg/balance/router/group.go deleted file mode 100644 index b103e2f41..000000000 --- a/pkg/balance/router/group.go +++ /dev/null @@ -1,615 +0,0 @@ -// Copyright 2025 PingCAP, Inc. -// SPDX-License-Identifier: Apache-2.0 - -package router - -import ( - "context" - "net" - "reflect" - "slices" - "sync" - "time" - - glist "github.com/bahlo/generic-list-go" - "github.com/pingcap/tiproxy/lib/config" - "github.com/pingcap/tiproxy/lib/util/errors" - "github.com/pingcap/tiproxy/pkg/balance/observer" - "github.com/pingcap/tiproxy/pkg/balance/policy" - "github.com/pingcap/tiproxy/pkg/manager/backendcluster" - "github.com/pingcap/tiproxy/pkg/metrics" - "github.com/pingcap/tiproxy/pkg/util/netutil" - "go.uber.org/zap" -) - -type MatchType int - -const ( - // Match all connections, used when there is only one backend group. - MatchAll MatchType = iota - // Match connections based on the client CIDR. - MatchClientCIDR - // Match connections based on proxy CIDR. If proxy-protocol is disabled, route by the client CIDR. - MatchProxyCIDR - // Match connections based on the local SQL listener port. - MatchPort -) - -var _ ConnEventReceiver = (*Group)(nil) - -type routeCheckBackend struct { - *backendWrapper - healthy bool -} - -func (b routeCheckBackend) Healthy() bool { - return b.healthy -} - -// Group is used for one backend group that can be matched by CIDR, username, database, or resource group list. -type Group struct { - sync.Mutex - matchType MatchType - lg *zap.Logger - policy policy.BalancePolicy - // The values that this group is matched by. E.g. for MatchCIDR, the value is the CIDR list. - values []string - // parsed CIDR list for faster match - cidrList []*net.IPNet - backends map[string]*backendWrapper - // failoverTargets contains backend pod names or addresses configured in fail-backend-list. - failoverTargets map[string]struct{} - failoverTimeout time.Duration - ignoreFailover bool - // To limit the speed of redirection. - lastRedirectTime time.Time -} - -func NewGroup(values []string, bpCreator func(lg *zap.Logger) policy.BalancePolicy, matchType MatchType, lg *zap.Logger) (*Group, error) { - if len(values) > 0 { - lg = lg.With(zap.Strings("values", values)) - } - lg.Info("new group created") - - group := &Group{ - matchType: matchType, - lg: lg, - values: values, - backends: make(map[string]*backendWrapper), - failoverTargets: make(map[string]struct{}), - policy: bpCreator(lg.Named("policy")), - } - err := group.parseValues() - if err != nil { - err = errors.Wrapf(err, "failed to parse values") - } - return group, err -} - -func (g *Group) parseValues() error { - switch g.matchType { - case MatchClientCIDR, MatchProxyCIDR: - cidrList, parseErr := netutil.ParseCIDRList(g.values) - if parseErr != nil { - return parseErr - } - g.cidrList = cidrList - } - return nil -} - -func (g *Group) Match(clientInfo ClientInfo) bool { - switch g.matchType { - case MatchClientCIDR, MatchProxyCIDR: - addr := clientInfo.ProxyAddr - if g.matchType == MatchClientCIDR { - addr = clientInfo.ClientAddr - } - ip, err := netutil.NetAddr2IP(addr) - if err != nil { - g.lg.Error("checking CIDR failed", zap.Stringer("addr", addr), zap.Error(err)) - return false - } - contains, err := netutil.CIDRContainsIP(g.cidrList, ip) - if err != nil { - g.lg.Error("checking CIDR failed", zap.Stringer("addr", addr), zap.Error(err)) - } - return contains - } - return true -} - -func (g *Group) EqualValues(values []string) bool { - switch g.matchType { - case MatchClientCIDR, MatchProxyCIDR, MatchPort: - if len(g.values) != len(values) { - return false - } - for _, v := range g.values { - if !slices.Contains(values, v) { - return false - } - } - return true - } - return false -} - -// Intersect returns if any cidrs are the same. -// In next-gen, backend cidrs may increase or decrease but they stay in the same group. -// E.g. enable public endpoint (3 cidrs) -> enable private endpoint (6 cidrs) -> disable public endpoint (3 cidrs). -func (g *Group) Intersect(values []string) bool { - switch g.matchType { - case MatchClientCIDR, MatchProxyCIDR, MatchPort: - for _, v := range g.values { - if slices.Contains(values, v) { - return true - } - } - return false - } - return false -} - -// Backend CIDRs may change anytime. -func (g *Group) RefreshCidr() { - g.Lock() - defer g.Unlock() - switch g.matchType { - case MatchClientCIDR, MatchProxyCIDR: - valueMap := make(map[string]struct{}, len(g.values)) - for _, b := range g.backends { - cidrs := b.Cidr() - for _, cidr := range cidrs { - valueMap[cidr] = struct{}{} - } - } - values := make([]string, 0, len(valueMap)) - for k := range valueMap { - values = append(values, k) - } - g.values = values - if err := g.parseValues(); err != nil { - g.lg.Error("failed to parse values", zap.Error(err)) - } - } -} - -func (g *Group) AddBackend(backendID string, backend *backendWrapper) { - g.Lock() - defer g.Unlock() - g.backends[backendID] = backend - backend.group = g -} - -// removeBackendIfIdle removes the backend from the group only if it has no connections and no -// pending incoming/outgoing scores. -func (g *Group) removeBackendIfIdle(backendID string, backend *backendWrapper) (removed, empty bool) { - g.Lock() - defer g.Unlock() - if backend.connList.Len() != 0 || backend.connScore > 0 { - return false, false - } - delete(g.backends, backendID) - return true, len(g.backends) == 0 -} - -func getConnWrapper(conn RedirectableConn) *glist.Element[*connWrapper] { - return conn.Value(_routerKey).(*glist.Element[*connWrapper]) -} - -func setConnWrapper(conn RedirectableConn, ce *glist.Element[*connWrapper]) { - conn.SetValue(_routerKey, ce) -} - -func (g *Group) routeableObservedBackendsLocked(failoverBackendIDs map[string]struct{}) []policy.BackendCtx { - backends := make([]policy.BackendCtx, 0, len(g.backends)) - for _, backend := range g.backends { - if !backend.ObservedHealthy() { - continue - } - healthy := true - if failoverBackendIDs != nil { - _, healthy = failoverBackendIDs[backend.ID()] - healthy = !healthy - } - backends = append(backends, routeCheckBackend{ - backendWrapper: backend, - healthy: healthy, - }) - } - return g.policy.RouteableBackends(backends) -} - -func (g *Group) backendInFailoverListLocked(backend *backendWrapper) bool { - _, active := g.failoverTargets[backend.PodName()] - if !active { - _, active = g.failoverTargets[backend.Addr()] - } - return active -} - -func (g *Group) setFailoverConfigLocked(cfg *config.Config) { - targets := make(map[string]struct{}, len(cfg.Proxy.FailBackendList)) - for _, backend := range cfg.Proxy.FailBackendList { - targets[backend] = struct{}{} - } - g.failoverTargets = targets - g.failoverTimeout = time.Duration(cfg.Proxy.FailoverTimeout) * time.Second -} - -func (g *Group) updateFailoverLocked(now time.Time) { - failoverBackendIDs := make(map[string]struct{}, len(g.backends)) - for _, backend := range g.backends { - if g.backendInFailoverListLocked(backend) { - failoverBackendIDs[backend.ID()] = struct{}{} - } - } - - routeable := g.routeableObservedBackendsLocked(nil) - if len(routeable) > 0 { - remaining := g.routeableObservedBackendsLocked(failoverBackendIDs) - if len(remaining) == 0 { - matched := 0 - for _, backend := range routeable { - if _, ok := failoverBackendIDs[backend.ID()]; ok { - matched++ - } - } - if !g.ignoreFailover { - g.lg.Warn("fail-backend-list would leave no routeable backend in group, ignore the list for this group", - zap.Int("routeable_backend_count", len(routeable)), - zap.Int("matched_routeable_backend_count", matched)) - } - g.ignoreFailover = true - clear(failoverBackendIDs) - } else { - g.ignoreFailover = false - } - } else { - g.ignoreFailover = false - } - - for _, backend := range g.backends { - _, active := failoverBackendIDs[backend.ID()] - since := time.Time{} - if active { - since = now - } - changed, since := backend.setFailover(since) - if !changed { - continue - } - fields := []zap.Field{ - zap.String("backend_addr", backend.Addr()), - zap.String("backend_pod", backend.PodName()), - zap.Duration("failover_timeout", g.failoverTimeout), - } - if active { - fields = append(fields, zap.Time("failover_since", since)) - g.lg.Warn("backend enters failover", fields...) - continue - } - g.lg.Info("backend exits failover", fields...) - } -} - -func (g *Group) UpdateFailover(now time.Time) { - g.Lock() - defer g.Unlock() - g.updateFailoverLocked(now) -} - -func (g *Group) Route(excluded []BackendInst) (policy.BackendCtx, error) { - g.Lock() - defer g.Unlock() - - if len(g.backends) == 0 { - return nil, ErrNoBackend - } - backends := make([]policy.BackendCtx, 0, len(g.backends)) - for _, backend := range g.backends { - if !backend.Healthy() { - continue - } - // Exclude the backends that are already tried. - found := false - for _, e := range excluded { - if backend.ID() == e.ID() { - found = true - break - } - } - if found { - continue - } - backends = append(backends, backend) - } - - idlestBackend := g.policy.BackendToRoute(backends) - if idlestBackend == nil || reflect.ValueOf(idlestBackend).IsNil() { - return nil, ErrNoBackend - } - backend := idlestBackend.(*backendWrapper) - backend.connScore++ - return backend, nil -} - -func (g *Group) Balance(ctx context.Context) { - g.Lock() - defer g.Unlock() - backends := make([]policy.BackendCtx, 0, len(g.backends)) - for _, backend := range g.backends { - backends = append(backends, backend) - } - - busiestBackend, idlestBackend, balanceCount, reason, logFields := g.policy.BackendsToBalance(backends) - if balanceCount == 0 { - return - } - fromBackend, toBackend := busiestBackend.(*backendWrapper), idlestBackend.(*backendWrapper) - - // Control the speed of migration. - curTime := time.Now() - migrationInterval := time.Duration(float64(time.Second) / balanceCount) - count := 0 - if migrationInterval < rebalanceInterval*2 { - // If we need to migrate multiple connections in each round, calculate the connection count for each round. - count = int((rebalanceInterval-1)/migrationInterval) + 1 - } else { - // If we need to wait for multiple rounds to migrate a connection, calculate the interval for each connection. - if curTime.Sub(g.lastRedirectTime) >= migrationInterval { - count = 1 - } else { - return - } - } - // Migrate balanceCount connections. - i := 0 - for ele := fromBackend.connList.Front(); ele != nil && ctx.Err() == nil && i < count; ele = ele.Next() { - conn := ele.Value - if conn.forceClosing { - continue - } - switch conn.phase { - case phaseRedirectNotify: - // A connection cannot be redirected again when it has not finished redirecting. - continue - case phaseRedirectFail: - // If it failed recently, it will probably fail this time. - if conn.lastRedirect.Add(redirectFailMinInterval).After(curTime) { - continue - } - } - if g.redirectConn(conn, fromBackend, toBackend, reason, logFields, curTime) { - g.lastRedirectTime = curTime - i++ - } - } -} - -func (g *Group) onCreateConn(backendInst BackendInst, conn RedirectableConn, succeed bool) { - g.Lock() - defer g.Unlock() - backend := g.ensureBackend(backendInst.ID()) - if succeed { - connWrapper := &connWrapper{ - RedirectableConn: conn, - scoreOwner: backend, - createTime: time.Now(), - phase: phaseNotRedirected, - forceClosing: false, - } - g.addConn(backend, connWrapper) - conn.SetEventReceiver(g) - } else { - backend.connScore-- - } -} - -func (g *Group) CloseTimedOutFailoverConnections(now time.Time) { - g.Lock() - defer g.Unlock() - for _, backend := range g.backends { - since := backend.FailoverSince() - if since.IsZero() { - continue - } - if g.failoverTimeout > 0 && since.Add(g.failoverTimeout).After(now) { - continue - } - for ele := backend.connList.Front(); ele != nil; ele = ele.Next() { - conn := ele.Value - if conn.phase == phaseClosed || conn.forceClosing { - continue - } - fields := []zap.Field{ - zap.Uint64("connID", conn.ConnectionID()), - zap.String("backend_addr", backend.addr), - zap.String("backend_pod", backend.PodName()), - zap.Duration("failover_timeout", g.failoverTimeout), - zap.Duration("failover_elapsed", now.Sub(since)), - } - if conn.ForceClose() { - conn.forceClosing = true - g.lg.Info("force close connection on failover backend", fields...) - } - } - } -} - -// removeConn removes the connection from the connList that actually contains it. -// Always remove by `physicalOwner` instead of by the backend that the connection is physically on: -// they differ when a redirect result has not been processed yet, and glist.Remove silently -// does nothing if the element is not in the given list, which leaks the connection forever. -func (g *Group) removeConn(ce *glist.Element[*connWrapper]) { - backend := ce.Value.physicalOwner - if backend == nil { - g.lg.Warn("unexpected nil physical owner for connection") - return - } - oldLen := backend.connList.Len() - backend.connList.Remove(ce) - newLen := backend.connList.Len() - if newLen != oldLen-1 { - g.lg.Warn("the connection is not in the list", zap.String("backend", backend.id)) - } - ce.Value.physicalOwner = nil - setBackendConnMetrics(backend.addr, newLen) -} - -func (g *Group) addConn(backend *backendWrapper, conn *connWrapper) { - if conn.physicalOwner != nil { - g.lg.Warn("unexpected non-nil physical owner for connection") - } - conn.physicalOwner = backend - ce := backend.connList.PushBack(conn) - setBackendConnMetrics(backend.addr, backend.connList.Len()) - setConnWrapper(conn, ce) -} - -// RedirectConnections implements Router.RedirectConnections interface. -// It redirects all connections compulsively. It's only used for testing. -func (g *Group) RedirectConnections() error { - g.Lock() - defer g.Unlock() - for _, backend := range g.backends { - for ce := backend.connList.Front(); ce != nil; ce = ce.Next() { - // This is only for test, so we allow it to reconnect to the same backend. - connWrapper := ce.Value - if connWrapper.phase != phaseRedirectNotify { - connWrapper.phase = phaseRedirectNotify - connWrapper.redirectReason = "test" - if connWrapper.Redirect(backend) { - metrics.PendingMigrateGuage.WithLabelValues(backend.addr, backend.addr, connWrapper.redirectReason).Inc() - } - } - } - } - return nil -} - -func (g *Group) ensureBackend(backendID string) *backendWrapper { - backend, ok := g.backends[backendID] - if ok { - return backend - } - // The backend should always exist if it will be needed. Add a warning and add it back. - g.lg.Warn("backend is not found in the router", zap.String("backend_id", backendID), zap.Stack("stack")) - // Try to parse the IP from the backendID. It's generally not suggested to parse it, but in this - // strange case we tried our best to recover and make the backend ip valid. - // For the formats of backendID, ref `backend_id.go`. It's generated and recorded in `GetTiDBTopology` - // for the first time. - _, addr := backendcluster.ParseBackendID(backendID) - ip, _, _ := net.SplitHostPort(addr) - backend = newBackendWrapper(backendID, observer.BackendHealth{ - BackendInfo: observer.BackendInfo{ - Addr: addr, - IP: ip, - StatusPort: 10080, // impossible anyway - }, - SupportRedirection: true, - Healthy: false, - }) - g.backends[backendID] = backend - return backend -} - -// OnRedirectSucceed implements ConnEventReceiver.OnRedirectSucceed interface. -func (g *Group) OnRedirectSucceed(from, to string, conn RedirectableConn) error { - g.onRedirectFinished(from, to, conn, true) - return nil -} - -// OnRedirectFail implements ConnEventReceiver.OnRedirectFail interface. -func (g *Group) OnRedirectFail(from, to string, conn RedirectableConn) error { - g.onRedirectFinished(from, to, conn, false) - return nil -} - -func (g *Group) onRedirectFinished(from, to string, conn RedirectableConn, succeed bool) { - g.Lock() - defer g.Unlock() - fromBackend := g.ensureBackend(from) - toBackend := g.ensureBackend(to) - connWrapper := getConnWrapper(conn).Value - // The connection may be closed when this function is waiting for the lock. - if connWrapper.phase == phaseClosed { - return - } - - addMigrateMetrics(fromBackend.addr, toBackend.addr, connWrapper.redirectReason, succeed, connWrapper.lastRedirect) - if succeed { - g.removeConn(getConnWrapper(conn)) - g.addConn(toBackend, connWrapper) - connWrapper.phase = phaseRedirectEnd - } else { - connWrapper.transferScore(fromBackend) - connWrapper.phase = phaseRedirectFail - } -} - -// OnConnClosed implements ConnEventReceiver.OnConnClosed interface. -func (g *Group) OnConnClosed(backendID string, conn RedirectableConn) error { - g.Lock() - defer g.Unlock() - connWrapper := getConnWrapper(conn) - cw := connWrapper.Value - // If the physical owner mismatches the score owner, it means the redirect result has not been processed yet. - if cw.physicalOwner != cw.scoreOwner && cw.physicalOwner != nil && cw.scoreOwner != nil { - addMigrateMetrics(cw.physicalOwner.addr, cw.scoreOwner.addr, cw.redirectReason, false, cw.lastRedirect) - } - cw.transferScore(nil) - // A redirect result may have not been processed yet, in which case the connection is still - // in the old backend's connList while `backendID` is the new backend, so remove the connection - // by its physicalOwner. onRedirectFinished won't touch the list once the phase is phaseClosed. - g.removeConn(connWrapper) - cw.phase = phaseClosed - return nil -} - -func (g *Group) redirectConn(conn *connWrapper, fromBackend *backendWrapper, toBackend *backendWrapper, - reason string, logFields []zap.Field, curTime time.Time) bool { - // Skip the connection if it's closing. - fields := []zap.Field{ - zap.Uint64("connID", conn.ConnectionID()), - zap.String("from", fromBackend.addr), - zap.String("to", toBackend.addr), - } - if !conn.lastRedirect.IsZero() { - fields = append(fields, zap.Duration("since_last_redirect", curTime.Sub(conn.lastRedirect))) - } - fields = append(fields, logFields...) - succeed := conn.Redirect(toBackend) - if succeed { - g.lg.Debug("begin redirect connection", fields...) - conn.transferScore(toBackend) - conn.phase = phaseRedirectNotify - conn.redirectReason = reason - metrics.PendingMigrateGuage.WithLabelValues(fromBackend.addr, toBackend.addr, reason).Inc() - } else { - // Avoid it to be redirected again immediately. - conn.phase = phaseRedirectFail - g.lg.Debug("skip redirecting because it's closing", fields...) - } - conn.lastRedirect = curTime - return succeed -} - -func (g *Group) ConnCount() int { - g.Lock() - defer g.Unlock() - j := 0 - for _, backend := range g.backends { - j += backend.connList.Len() - } - return j -} - -func (g *Group) SetConfig(cfg *config.Config) { - g.Lock() - defer g.Unlock() - g.policy.SetConfig(cfg) - g.setFailoverConfigLocked(cfg) - g.updateFailoverLocked(time.Now()) -} diff --git a/pkg/balance/router/router.go b/pkg/balance/router/router.go index 4ef89a144..e7d6d63e6 100644 --- a/pkg/balance/router/router.go +++ b/pkg/balance/router/router.go @@ -32,11 +32,7 @@ func (b routeCheckBackend) Healthy() bool { type ConnEventReceiver interface { OnRedirectSucceed(from, to string, conn RedirectableConn) error OnRedirectFail(from, to string, conn RedirectableConn) error -<<<<<<< HEAD - OnConnClosed(addr, redirectingAddr string, conn RedirectableConn) error -======= - OnConnClosed(backendID string, conn RedirectableConn) error ->>>>>>> 9fafc2f1 (balance, proxy: fix TiDB CPU imbalance when balance.policy="resource" (#1173)) + OnConnClosed(backendAddr string, conn RedirectableConn) error } // Router routes client connections to backends. diff --git a/pkg/balance/router/router_score.go b/pkg/balance/router/router_score.go index ed18ca0ef..11187bf33 100644 --- a/pkg/balance/router/router_score.go +++ b/pkg/balance/router/router_score.go @@ -157,6 +157,7 @@ func (router *ScoreBasedRouter) onCreateConn(backendInst BackendInst, conn Redir if succeed { connWrapper := &connWrapper{ RedirectableConn: conn, + scoreOwner: backend, createTime: time.Now(), phase: phaseNotRedirected, forceClosing: false, @@ -168,13 +169,32 @@ func (router *ScoreBasedRouter) onCreateConn(backendInst BackendInst, conn Redir } } -func (router *ScoreBasedRouter) removeConn(backend *backendWrapper, ce *glist.Element[*connWrapper]) { +// removeConn removes the connection from the connList that actually contains it. +// Always remove by `physicalOwner` instead of by the backend that the connection is physically on: +// they differ when a redirect result has not been processed yet, and glist.Remove silently +// does nothing if the element is not in the given list, which leaks the connection forever. +func (router *ScoreBasedRouter) removeConn(ce *glist.Element[*connWrapper]) { + backend := ce.Value.physicalOwner + if backend == nil { + router.logger.Warn("unexpected nil physical owner for connection") + return + } + oldLen := backend.connList.Len() backend.connList.Remove(ce) - setBackendConnMetrics(backend.addr, backend.connList.Len()) + newLen := backend.connList.Len() + if newLen != oldLen-1 { + router.logger.Warn("the connection is not in the list", zap.String("backend", backend.addr)) + } + ce.Value.physicalOwner = nil + setBackendConnMetrics(backend.addr, newLen) router.removeBackendIfEmpty(backend) } func (router *ScoreBasedRouter) addConn(backend *backendWrapper, conn *connWrapper) { + if conn.physicalOwner != nil { + router.logger.Warn("unexpected non-nil physical owner for connection") + } + conn.physicalOwner = backend ce := backend.connList.PushBack(conn) setBackendConnMetrics(backend.addr, backend.connList.Len()) router.setConnWrapper(conn, ce) @@ -243,41 +263,38 @@ func (router *ScoreBasedRouter) onRedirectFinished(from, to string, conn Redirec fromBackend := router.ensureBackend(from) toBackend := router.ensureBackend(to) connWrapper := router.getConnWrapper(conn).Value - addMigrateMetrics(from, to, connWrapper.redirectReason, succeed, connWrapper.lastRedirect) // The connection may be closed when this function is waiting for the lock. if connWrapper.phase == phaseClosed { return } + addMigrateMetrics(fromBackend.addr, toBackend.addr, connWrapper.redirectReason, succeed, connWrapper.lastRedirect) if succeed { - router.removeConn(fromBackend, router.getConnWrapper(conn)) + router.removeConn(router.getConnWrapper(conn)) router.addConn(toBackend, connWrapper) connWrapper.phase = phaseRedirectEnd } else { - fromBackend.connScore++ - toBackend.connScore-- - router.removeBackendIfEmpty(toBackend) + connWrapper.transferScore(fromBackend) connWrapper.phase = phaseRedirectFail } } // OnConnClosed implements ConnEventReceiver.OnConnClosed interface. -func (router *ScoreBasedRouter) OnConnClosed(addr, redirectingAddr string, conn RedirectableConn) error { +func (router *ScoreBasedRouter) OnConnClosed(backendAddr string, conn RedirectableConn) error { router.Lock() defer router.Unlock() - backend := router.ensureBackend(addr) connWrapper := router.getConnWrapper(conn) - // If this connection has not redirected yet, decrease the score of the target backend. - if redirectingAddr != "" { - redirectingBackend := router.ensureBackend(redirectingAddr) - redirectingBackend.connScore-- - router.removeBackendIfEmpty(redirectingBackend) - metrics.PendingMigrateGuage.WithLabelValues(addr, redirectingAddr, connWrapper.Value.redirectReason).Dec() - } else { - backend.connScore-- - } - router.removeConn(backend, connWrapper) - connWrapper.Value.phase = phaseClosed + cw := connWrapper.Value + // If the physical owner mismatches the score owner, it means the redirect result has not been processed yet. + if cw.physicalOwner != cw.scoreOwner && cw.physicalOwner != nil && cw.scoreOwner != nil { + addMigrateMetrics(cw.physicalOwner.addr, cw.scoreOwner.addr, cw.redirectReason, false, cw.lastRedirect) + } + cw.transferScore(nil) + // A redirect result may have not been processed yet, in which case the connection is still + // in the old backend's connList while `backendAddr` is the new backend, so remove the connection + // by its physicalOwner. onRedirectFinished won't touch the list once the phase is phaseClosed. + router.removeConn(connWrapper) + cw.phase = phaseClosed return nil } @@ -327,138 +344,6 @@ func (router *ScoreBasedRouter) updateBackendHealth(healthResults observer.Healt if len(serverVersion) > 0 { router.serverVersion = serverVersion } -<<<<<<< HEAD -======= - if router.supportRedirection != supportRedirection { - router.logger.Info("updated supporting redirection", zap.Bool("support", supportRedirection)) - router.supportRedirection = supportRedirection - } -} - -func matchPortValue(clusterName, port string) string { - if clusterName == "" { - return port - } - return fmt.Sprintf("%s:%s", clusterName, port) -} - -func (router *ScoreBasedRouter) backendGroupValues(backend *backendWrapper) []string { - switch router.matchType { - case MatchClientCIDR, MatchProxyCIDR: - return backend.Cidr() - case MatchPort: - port := backend.TiProxyPort() - if port != "" { - return []string{matchPortValue(backend.ClusterName(), port)} - } - } - return nil -} - -func (router *ScoreBasedRouter) rebuildPortConflictDetector() { - if router.matchType != MatchPort { - router.portConflictDetector = nil - return - } - detector := newPortConflictDetector() - for _, group := range router.groups { - for _, value := range group.values { - clusterName, port, ok := strings.Cut(value, ":") - if !ok { - port = value - clusterName = "" - } - detector.bind(port, clusterName, group) - } - } - router.portConflictDetector = detector -} - -// Update the groups after the backend list is updated. -// called in the lock. -func (router *ScoreBasedRouter) updateGroups() { - for _, backend := range router.backends { - // An unhealthy backend can be removed once it has no connections. connList and connScore are - // protected by the group lock instead of the router lock, so read them through - // removeBackendIfIdle to avoid racing with connection events. A backend without a group has - // never been routed to, so it has no connections and can be removed directly. - if !backend.ObservedHealthy() { - removed, empty := true, false - if backend.group != nil { - removed, empty = backend.group.removeBackendIfIdle(backend.id, backend) - } - if removed { - delete(router.backends, backend.id) - // remove empty groups - if backend.group != nil && empty { - router.groups = slices.DeleteFunc(router.groups, func(g *Group) bool { - return g == backend.group - }) - } - continue - } - } - // If the labels were correctly set, we won't update its group even if the labels change. - if backend.group != nil { - switch router.matchType { - case MatchClientCIDR, MatchProxyCIDR, MatchPort: - values := router.backendGroupValues(backend) - if !backend.group.EqualValues(values) { - router.logger.Warn("backend routing values changed, keep the existing group until it is removed", - zap.String("backend_id", backend.id), - zap.String("addr", backend.Addr()), - zap.Strings("current_values", values), - zap.Strings("group_values", backend.group.values)) - } - } - continue - } - - // If the backend is not in any group, add it to a new group if its label is set. - // In operator deployment, the labels are set dynamically. - var group *Group - switch router.matchType { - case MatchAll: - if len(router.groups) == 0 { - group, _ = NewGroup(nil, router.bpCreator, router.matchType, router.logger) - router.groups = append(router.groups, group) - } - group = router.groups[0] - case MatchClientCIDR, MatchProxyCIDR, MatchPort: - values := router.backendGroupValues(backend) - if len(values) == 0 { - break - } - for _, g := range router.groups { - if g.Intersect(values) { - group = g - break - } - } - if group == nil { - g, err := NewGroup(values, router.bpCreator, router.matchType, router.logger) - if err == nil { - group = g - if router.cfgGetter != nil { - if cfg := router.cfgGetter.GetConfig(); cfg != nil { - group.SetConfig(cfg) - } - } - router.groups = append(router.groups, group) - } - // maybe too many logs, ignore the error now - } - } - if group == nil { - continue - } - group.AddBackend(backend.id, backend) - } - for _, group := range router.groups { - group.RefreshCidr() - } - router.rebuildPortConflictDetector() ->>>>>>> 9fafc2f1 (balance, proxy: fix TiDB CPU imbalance when balance.policy="resource" (#1173)) } func (router *ScoreBasedRouter) rebalanceLoop(ctx context.Context) { @@ -564,9 +449,7 @@ func (router *ScoreBasedRouter) redirectConn(conn *connWrapper, fromBackend *bac succeed := conn.Redirect(toBackend) if succeed { router.logger.Debug("begin redirect connection", fields...) - fromBackend.connScore-- - router.removeBackendIfEmpty(fromBackend) - toBackend.connScore++ + conn.transferScore(toBackend) conn.phase = phaseRedirectNotify conn.redirectReason = reason metrics.PendingMigrateGuage.WithLabelValues(fromBackend.addr, toBackend.addr, reason).Inc() diff --git a/pkg/balance/router/router_score_test.go b/pkg/balance/router/router_score_test.go index e064394b9..2d713130a 100644 --- a/pkg/balance/router/router_score_test.go +++ b/pkg/balance/router/router_score_test.go @@ -165,11 +165,7 @@ func (tester *routerTester) closeConnections(num int, redirecting bool) { } } for _, conn := range conns { -<<<<<<< HEAD - err := tester.router.OnConnClosed(conn.from.Addr(), conn.GetRedirectingAddr(), conn) -======= - err := tester.router.groups[0].OnConnClosed(conn.from.ID(), conn) ->>>>>>> 9fafc2f1 (balance, proxy: fix TiDB CPU imbalance when balance.policy="resource" (#1173)) + err := tester.router.OnConnClosed(conn.from.Addr(), conn) require.NoError(tester.t, err) delete(tester.conns, conn.connID) } @@ -269,7 +265,7 @@ func (tester *routerTester) prepareRedirectingConn() *mockRedirectableConn { tester.updateBackendStatusByAddr("1", false) tester.rebalance(1) conn := tester.conns[1] - require.Equal(tester.t, "2", conn.GetRedirectingBackendID()) + require.Equal(tester.t, "2", conn.GetRedirectingAddr()) return conn } @@ -322,16 +318,16 @@ func TestRebalance(t *testing.T) { func TestCloseRaceWithRedirectSucceed(t *testing.T) { tester := newRouterTester(t, nil) conn := tester.prepareRedirectingConn() - reason := getConnWrapper(conn).Value.redirectReason + reason := tester.router.getConnWrapper(conn).Value.redirectReason pendingDuringRedirect := tester.readPendingGauge("1", "2", reason) // The redirection succeeds on the connection side, and then the connection is closed // before the router receives OnRedirectSucceed. conn.redirectSucceed() - require.NoError(t, tester.router.groups[0].OnConnClosed(conn.from.ID(), conn)) + require.NoError(t, tester.router.OnConnClosed(conn.from.Addr(), conn)) delete(tester.conns, conn.connID) // Now the router processes the redirect result. - require.NoError(t, tester.router.groups[0].OnRedirectSucceed("1", "2", conn)) + require.NoError(t, tester.router.OnRedirectSucceed("1", "2", conn)) tester.checkNoConnLeak() tester.checkBackendConnMetrics() @@ -344,16 +340,16 @@ func TestCloseRaceWithRedirectSucceed(t *testing.T) { func TestCloseRaceWithRedirectFail(t *testing.T) { tester := newRouterTester(t, nil) conn := tester.prepareRedirectingConn() - reason := getConnWrapper(conn).Value.redirectReason + reason := tester.router.getConnWrapper(conn).Value.redirectReason pendingDuringRedirect := tester.readPendingGauge("1", "2", reason) // The redirection fails on the connection side, and then the connection is closed // before the router receives OnRedirectFail. conn.redirectFail() - require.NoError(t, tester.router.groups[0].OnConnClosed(conn.from.ID(), conn)) + require.NoError(t, tester.router.OnConnClosed(conn.from.Addr(), conn)) delete(tester.conns, conn.connID) // Now the router processes the redirect result. - require.NoError(t, tester.router.groups[0].OnRedirectFail("1", "2", conn)) + require.NoError(t, tester.router.OnRedirectFail("1", "2", conn)) tester.checkNoConnLeak() tester.checkBackendConnMetrics() @@ -366,14 +362,14 @@ func TestCloseRaceWithRedirectFail(t *testing.T) { func TestCloseRaceWithRedirectAborted(t *testing.T) { tester := newRouterTester(t, nil) conn := tester.prepareRedirectingConn() - reason := getConnWrapper(conn).Value.redirectReason + reason := tester.router.getConnWrapper(conn).Value.redirectReason pendingDuringRedirect := tester.readPendingGauge("1", "2", reason) // The connection is closed before the redirect signal is processed on the connection side. - require.NoError(t, tester.router.groups[0].OnConnClosed(conn.from.ID(), conn)) + require.NoError(t, tester.router.OnConnClosed(conn.from.Addr(), conn)) delete(tester.conns, conn.connID) // The aborted redirect result arrives later and must be a no-op. - require.NoError(t, tester.router.groups[0].OnRedirectFail("1", "2", conn)) + require.NoError(t, tester.router.OnRedirectFail("1", "2", conn)) tester.checkNoConnLeak() tester.checkBackendConnMetrics() @@ -386,20 +382,20 @@ func TestReconnectDuringHandshake(t *testing.T) { tester := newRouterTester(t, nil) tester.addBackends(2) conn := tester.createConn() - selector1 := tester.router.GetBackendSelector(ClientInfo{}) + selector1 := tester.router.GetBackendSelector() backend1, err := selector1.Next() require.NoError(t, err) require.False(t, backend1 == nil || reflect.ValueOf(backend1).IsNil()) selector1.Finish(conn, true) // Simulate getBackendIO cleaning up before reconnecting to another backend. - require.NoError(t, tester.router.groups[0].OnConnClosed(backend1.ID(), conn)) - selector2 := tester.router.GetBackendSelector(ClientInfo{}) + require.NoError(t, tester.router.OnConnClosed(backend1.Addr(), conn)) + selector2 := tester.router.GetBackendSelector() var backend2 BackendInst for { backend2, err = selector2.Next() require.NoError(t, err) require.False(t, backend2 == nil || reflect.ValueOf(backend2).IsNil()) - if backend2.ID() != backend1.ID() { + if backend2.Addr() != backend1.Addr() { break } selector2.Finish(conn, false) @@ -418,7 +414,7 @@ func TestReconnectDuringHandshake(t *testing.T) { require.Equal(t, 1, totalScore) tester.checkBackendConnMetrics() - require.NoError(t, tester.router.groups[0].OnConnClosed(backend2.ID(), conn)) + require.NoError(t, tester.router.OnConnClosed(backend2.Addr(), conn)) delete(tester.conns, conn.connID) tester.checkNoConnLeak() } @@ -758,11 +754,7 @@ func TestConcurrency(t *testing.T) { from, to := conn.getAddr() var err error if i < 1 { -<<<<<<< HEAD - err = router.OnConnClosed(from, conn.GetRedirectingAddr(), conn) -======= - err = router.groups[0].OnConnClosed(from, conn) ->>>>>>> 9fafc2f1 (balance, proxy: fix TiDB CPU imbalance when balance.policy="resource" (#1173)) + err = router.OnConnClosed(from, conn) conn = nil } else if i < 3 { conn.redirectFail() @@ -777,13 +769,8 @@ func TestConcurrency(t *testing.T) { i := rand.Intn(10) if i < 2 { // The balancer may happen to redirect it concurrently - that's exactly what may happen. -<<<<<<< HEAD from, _ := conn.getAddr() - err := router.OnConnClosed(from, conn.GetRedirectingAddr(), conn) -======= - from, _ := conn.getBackendIDs() - err := router.groups[0].OnConnClosed(from, conn) ->>>>>>> 9fafc2f1 (balance, proxy: fix TiDB CPU imbalance when balance.policy="resource" (#1173)) + err := router.OnConnClosed(from, conn) require.NoError(t, err) conn = nil } @@ -1126,178 +1113,7 @@ func TestWatchConfig(t *testing.T) { Labels: map[string]string{"k1": "v2"}, } require.Eventually(t, func() bool { -<<<<<<< HEAD return policy.getConfig().Labels["k1"] == "v2" -======= - return p.getConfig().Labels["k1"] == "v2" - }, 3*time.Second, 10*time.Millisecond) -} - -func TestChannelClosed(t *testing.T) { - tests := []struct { - name string - closeChannel func(cfgCh chan *config.Config, bo *mockBackendObserver) - }{ - { - name: "config", - closeChannel: func(cfgCh chan *config.Config, _ *mockBackendObserver) { - close(cfgCh) - }, - }, - { - name: "health", - closeChannel: func(_ chan *config.Config, bo *mockBackendObserver) { - bo.Close() - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - lg, _ := logger.CreateLoggerForTest(t) - router := NewScoreBasedRouter(lg) - cfgCh := make(chan *config.Config) - cfg := &config.Config{} - cfgGetter := newMockConfigGetter(cfg) - p := &mockBalancePolicy{} - bpCreator := func(lg *zap.Logger) policy.BalancePolicy { - p.Init(cfg) - return p - } - bo := newMockBackendObserver() - router.Init(context.Background(), bo, bpCreator, cfgGetter, cfgCh) - t.Cleanup(bo.Close) - t.Cleanup(router.Close) - - bo.addBackend("0", nil) - bo.notify(nil) - require.Eventually(t, func() bool { - router.Lock() - defer router.Unlock() - return len(router.groups) == 1 - }, 3*time.Second, 10*time.Millisecond) - - tt.closeChannel(cfgCh, bo) - time.Sleep(100 * time.Millisecond) - }) - } -} - -func TestWatchFailoverConfig(t *testing.T) { - lg, _ := logger.CreateLoggerForTest(t) - router := NewScoreBasedRouter(lg) - cfgCh := make(chan *config.Config) - addr := "db-2033841436272623616-0f6e346b-tidb-0.peer.ns.svc:4000" - addr2 := "db-2033841436272623616-0f6e346b-tidb-1.peer.ns.svc:4000" - cfgGetter := newMockConfigGetter(&config.Config{ - Proxy: config.ProxyServer{ - ProxyServerOnline: config.ProxyServerOnline{ - FailoverTimeout: 60, - }, - }, - }) - bo := newMockBackendObserver() - router.Init(context.Background(), bo, simpleBpCreator, cfgGetter, cfgCh) - t.Cleanup(bo.Close) - t.Cleanup(router.Close) - - bo.addBackend(addr, nil) - bo.addBackend(addr2, nil) - bo.notify(nil) - require.Eventually(t, func() bool { - backend := router.backends[addr] - return backend != nil && backend.Healthy() - }, 3*time.Second, 10*time.Millisecond) - - cfgCh <- &config.Config{ - Proxy: config.ProxyServer{ - ProxyServerOnline: config.ProxyServerOnline{ - FailBackendList: []string{"db-2033841436272623616-0f6e346b-tidb-0"}, - FailoverTimeout: 60, - }, - }, - } - require.Eventually(t, func() bool { - backend := router.backends[addr] - return backend != nil && !backend.Healthy() - }, 3*time.Second, 10*time.Millisecond) - - cfgCh <- &config.Config{ - Proxy: config.ProxyServer{ - ProxyServerOnline: config.ProxyServerOnline{ - FailBackendList: []string{addr}, - FailoverTimeout: 60, - }, - }, - } - require.Eventually(t, func() bool { - backend := router.backends[addr] - return backend != nil && !backend.Healthy() - }, 3*time.Second, 10*time.Millisecond) - - cfgCh <- &config.Config{ - Proxy: config.ProxyServer{ - ProxyServerOnline: config.ProxyServerOnline{ - FailoverTimeout: 60, - }, - }, - } - require.Eventually(t, func() bool { - backend := router.backends[addr] - return backend != nil && backend.Healthy() - }, 3*time.Second, 10*time.Millisecond) -} - -func TestNewGroupUsesLatestConfigGetter(t *testing.T) { - lg, _ := logger.CreateLoggerForTest(t) - router := NewScoreBasedRouter(lg) - cfgCh := make(chan *config.Config) - cfgGetter := newMockConfigGetter(&config.Config{ - Balance: config.Balance{ - RoutingRule: config.MatchPortStr, - }, - }) - bo := newMockBackendObserver() - router.Init(context.Background(), bo, simpleBpCreator, cfgGetter, cfgCh) - t.Cleanup(bo.Close) - t.Cleanup(router.Close) - - addr1 := "db-2033841436272623616-0f6e346b-tidb-0.peer.ns.svc:4000" - addr2 := "db-2033841436272623616-0f6e346b-tidb-1.peer.ns.svc:4000" - addr3 := "db-2033841436272623616-0f6e346b-tidb-2.peer.ns.svc:4000" - bo.addBackend(addr1, map[string]string{config.TiProxyPortLabelName: "10080"}) - bo.notify(nil) - require.Eventually(t, func() bool { - router.Lock() - defer router.Unlock() - backend := router.backends[addr1] - return backend != nil && backend.Healthy() - }, 3*time.Second, 10*time.Millisecond) - - nextCfg := &config.Config{ - Balance: config.Balance{ - RoutingRule: config.MatchPortStr, - }, - Proxy: config.ProxyServer{ - ProxyServerOnline: config.ProxyServerOnline{ - FailBackendList: []string{addr2}, - FailoverTimeout: 60, - }, - }, - } - cfgGetter.setConfig(nextCfg) - cfgCh <- nextCfg - - bo.addBackend(addr2, map[string]string{config.TiProxyPortLabelName: "10081"}) - bo.addBackend(addr3, map[string]string{config.TiProxyPortLabelName: "10081"}) - bo.notify(nil) - require.Eventually(t, func() bool { - backend := router.backends[addr2] - return backend != nil && !backend.Healthy() - }, 3*time.Second, 10*time.Millisecond) - require.Eventually(t, func() bool { - backend := router.backends[addr3] - return backend != nil && backend.Healthy() ->>>>>>> 9fafc2f1 (balance, proxy: fix TiDB CPU imbalance when balance.policy="resource" (#1173)) }, 3*time.Second, 10*time.Millisecond) } @@ -1434,651 +1250,3 @@ func TestRedirectFail(t *testing.T) { require.Equal(t, 1, tester.getBackendByIndex(0).connScore) require.Equal(t, 1, tester.getBackendByIndex(1).connScore) } -<<<<<<< HEAD -======= - -func TestSkipRedirection(t *testing.T) { - tester := newRouterTester(t, nil) - backends := map[string]*observer.BackendHealth{ - "0": { - BackendInfo: observer.BackendInfo{ - Addr: "0", - }, - Healthy: true, - SupportRedirection: false, - }, - "1": { - BackendInfo: observer.BackendInfo{ - Addr: "1", - }, - Healthy: true, - SupportRedirection: true, - }, - } - result := observer.NewHealthResult(backends, nil) - tester.router.updateBackendHealth(result) - require.False(t, tester.router.supportRedirection) - - tester.addConnections(10) - require.Equal(t, 5, tester.getBackendByIndex(0).connScore) - backends["0"].Healthy = false - tester.router.updateBackendHealth(result) - tester.rebalance(1) - require.Equal(t, 5, tester.getBackendByIndex(0).connScore) - - backends["0"].SupportRedirection = true - tester.router.updateBackendHealth(result) - require.True(t, tester.router.supportRedirection) - tester.rebalance(1) - require.NotEqual(t, 5, tester.getBackendByIndex(0).connScore) -} - -func TestGroupBackends(t *testing.T) { - lg, _ := logger.CreateLoggerForTest(t) - router := NewScoreBasedRouter(lg) - cfgCh := make(chan *config.Config) - cfg := &config.Config{ - Balance: config.Balance{ - RoutingRule: config.MatchClientCIDRStr, - }, - } - cfgGetter := newMockConfigGetter(cfg) - p := &mockBalancePolicy{} - bpCreator := func(_ *zap.Logger) policy.BalancePolicy { - p.Init(cfg) - return p - } - bo := newMockBackendObserver() - router.Init(context.Background(), bo, bpCreator, cfgGetter, cfgCh) - t.Cleanup(bo.Close) - t.Cleanup(router.Close) - - tests := []struct { - addr string - labels map[string]string - groupCount int - backendCount int - cidrs []string - }{ - { - addr: "0", - labels: nil, - groupCount: 0, - backendCount: 1, - cidrs: nil, - }, - { - addr: "1", - labels: map[string]string{"cidr": "1.1.1.1/32"}, - groupCount: 1, - backendCount: 2, - cidrs: []string{"1.1.1.1/32"}, - }, - { - addr: "2", - labels: map[string]string{"cidr": "1.1.1.1/32 , "}, - groupCount: 1, - backendCount: 3, - cidrs: []string{"1.1.1.1/32"}, - }, - { - addr: "3", - labels: map[string]string{"cidr": "1.1.2.1/32, 1.1.3.1/32"}, - groupCount: 2, - backendCount: 4, - cidrs: []string{"1.1.2.1/32", "1.1.3.1/32"}, - }, - { - addr: "4", - labels: map[string]string{"cidr": "1.1.2.1/32,, 1.1.3.1/32 "}, - groupCount: 2, - backendCount: 5, - cidrs: []string{"1.1.2.1/32", "1.1.3.1/32"}, - }, - { - addr: "0", - labels: map[string]string{"cidr": " 1.1.1.1/32 "}, - groupCount: 2, - backendCount: 5, - cidrs: []string{"1.1.1.1/32"}, - }, - { - addr: "1", - labels: map[string]string{"cidr": "1.1.1.1/32, 1.1.4.1/32"}, - groupCount: 2, - backendCount: 5, - cidrs: []string{"1.1.1.1/32", "1.1.4.1/32"}, - }, - { - addr: "3", - labels: map[string]string{"cidr": "1.1.2.1/32"}, - groupCount: 2, - backendCount: 5, - cidrs: []string{"1.1.2.1/32", "1.1.3.1/32"}, - }, - } - - for i, test := range tests { - bo.addBackend(test.addr, test.labels) - bo.notify(nil) - require.Eventually(t, func() bool { - router.Lock() - defer router.Unlock() - if len(router.groups) != test.groupCount { - return false - } - if len(router.backends) != test.backendCount { - return false - } - backend, ok := router.backends[test.addr] - if !ok { - return false - } - group := backend.group - if test.cidrs == nil { - return group == nil - } - if group == nil { - return false - } - return group.EqualValues(test.cidrs) - }, 3*time.Second, 10*time.Millisecond, "test %d", i) - } -} - -func TestGroupBackendsByPort(t *testing.T) { - lg, _ := logger.CreateLoggerForTest(t) - router := NewScoreBasedRouter(lg) - cfg := &config.Config{ - Balance: config.Balance{ - RoutingRule: config.MatchPortStr, - }, - } - cfgGetter := newMockConfigGetter(cfg) - bo := newMockBackendObserver() - router.Init(context.Background(), bo, simpleBpCreator, cfgGetter, make(<-chan *config.Config)) - t.Cleanup(bo.Close) - t.Cleanup(router.Close) - - tests := []struct { - addr string - labels map[string]string - groupCount int - backendCount int - port string - }{ - { - addr: "0", - labels: nil, - groupCount: 0, - backendCount: 1, - }, - { - addr: "1", - labels: map[string]string{config.TiProxyPortLabelName: "10080"}, - groupCount: 1, - backendCount: 2, - port: "10080", - }, - { - addr: "2", - labels: map[string]string{config.TiProxyPortLabelName: "10080"}, - groupCount: 1, - backendCount: 3, - port: "10080", - }, - { - addr: "3", - labels: map[string]string{config.TiProxyPortLabelName: "10081"}, - groupCount: 2, - backendCount: 4, - port: "10081", - }, - } - - for i, test := range tests { - bo.addBackend(test.addr, test.labels) - bo.notify(nil) - require.Eventually(t, func() bool { - router.Lock() - defer router.Unlock() - if len(router.groups) != test.groupCount { - return false - } - if len(router.backends) != test.backendCount { - return false - } - group := router.backends[test.addr].group - if test.port == "" { - return group == nil - } - return group != nil && slices.Equal(group.values, []string{test.port}) - }, 3*time.Second, 10*time.Millisecond, "test %d", i) - } -} - -func TestRouteAndRebalanceByPort(t *testing.T) { - cfg := &config.Config{ - Balance: config.Balance{ - RoutingRule: config.MatchPortStr, - }, - } - bp := &mockBalancePolicy{} - tester := newRouterTester(t, bp) - tester.router.matchType = MatchPort - bp.backendToRoute = func(backends []policy.BackendCtx) policy.BackendCtx { - if len(backends) == 0 { - return nil - } - return backends[0] - } - bp.backendsToBalance = func(backends []policy.BackendCtx) (from policy.BackendCtx, to policy.BackendCtx, balanceCount float64, reason string, logFields []zapcore.Field) { - if len(backends) < 2 { - return nil, nil, 0, "", nil - } - var busiest, idlest policy.BackendCtx - for _, backend := range backends { - if busiest == nil || backend.ConnCount() > busiest.ConnCount() { - busiest = backend - } - if idlest == nil || backend.ConnCount() < idlest.ConnCount() { - idlest = backend - } - } - if busiest == nil || idlest == nil || busiest == idlest { - return nil, nil, 0, "", nil - } - return busiest, idlest, 100, "conn", nil - } - tester.router.cfgGetter = newMockConfigGetter(cfg) - - tester.backends["1"] = &observer.BackendHealth{ - Healthy: true, - SupportRedirection: true, - BackendInfo: observer.BackendInfo{ - Addr: "1", - Labels: map[string]string{config.TiProxyPortLabelName: "10080"}, - }, - } - tester.backends["2"] = &observer.BackendHealth{ - Healthy: true, - SupportRedirection: true, - BackendInfo: observer.BackendInfo{ - Addr: "2", - Labels: map[string]string{config.TiProxyPortLabelName: "10080"}, - }, - } - tester.backends["3"] = &observer.BackendHealth{ - Healthy: true, - SupportRedirection: true, - BackendInfo: observer.BackendInfo{ - Addr: "3", - Labels: map[string]string{config.TiProxyPortLabelName: "10081"}, - }, - } - tester.notifyHealth() - - for range 10 { - conn := tester.createConn() - backend := tester.route(conn, ClientInfo{ListenerPort: "10080"}) - require.NotNil(t, backend) - conn.from = backend - tester.conns[conn.connID] = conn - } - for _, conn := range tester.conns { - require.Equal(t, "10080", tester.router.backends[conn.from.ID()].TiProxyPort()) - } - - tester.rebalance(10) - redirecting := 0 - for _, conn := range tester.conns { - if conn.to == nil || reflect.ValueOf(conn.to).IsNil() { - continue - } - redirecting++ - require.Equal(t, "10080", tester.router.backends[conn.to.ID()].TiProxyPort()) - require.NotEqual(t, "3", conn.to.Addr()) - } - require.Greater(t, redirecting, 0) -} - -func TestRouteByPortBlocksConflictingClusters(t *testing.T) { - cfg := &config.Config{ - Balance: config.Balance{ - RoutingRule: config.MatchPortStr, - }, - } - cfgGetter := newMockConfigGetter(cfg) - bo := newMockBackendObserver() - router := NewScoreBasedRouter(zap.NewNop()) - router.Init(context.Background(), bo, simpleBpCreator, cfgGetter, make(<-chan *config.Config)) - t.Cleanup(bo.Close) - t.Cleanup(router.Close) - - bo.addBackendWithCluster("a1", "cluster-a", map[string]string{ - config.TiProxyPortLabelName: "10080", - }) - bo.addBackendWithCluster("b1", "cluster-b", map[string]string{ - config.TiProxyPortLabelName: "10080", - }) - bo.notify(nil) - - require.Eventually(t, func() bool { - router.Lock() - defer router.Unlock() - return len(router.groups) == 2 && router.portConflictDetector != nil - }, 3*time.Second, 10*time.Millisecond) - - selector := router.GetBackendSelector(ClientInfo{ListenerPort: "10080"}) - _, err := selector.Next() - require.Error(t, err) - require.True(t, errors.Is(err, ErrPortConflict)) -} - -func TestRouteByPortRecoversAfterConflictIsRemoved(t *testing.T) { - cfg := &config.Config{ - Balance: config.Balance{ - RoutingRule: config.MatchPortStr, - }, - } - cfgGetter := newMockConfigGetter(cfg) - bo := newMockBackendObserver() - router := NewScoreBasedRouter(zap.NewNop()) - router.Init(context.Background(), bo, simpleBpCreator, cfgGetter, make(<-chan *config.Config)) - t.Cleanup(bo.Close) - t.Cleanup(router.Close) - - bo.addBackendWithCluster("a1", "cluster-a", map[string]string{ - config.TiProxyPortLabelName: "10080", - }) - bo.addBackendWithCluster("b1", "cluster-b", map[string]string{ - config.TiProxyPortLabelName: "10080", - }) - bo.notify(nil) - - require.Eventually(t, func() bool { - selector := router.GetBackendSelector(ClientInfo{ListenerPort: "10080"}) - _, err := selector.Next() - return errors.Is(err, ErrPortConflict) - }, 3*time.Second, 10*time.Millisecond) - - bo.healthLock.Lock() - delete(bo.healths, "b1") - bo.healthLock.Unlock() - bo.notify(nil) - - require.Eventually(t, func() bool { - selector := router.GetBackendSelector(ClientInfo{ListenerPort: "10080"}) - backend, err := selector.Next() - return err == nil && backend != nil && backend.ID() == "a1" - }, 3*time.Second, 10*time.Millisecond) -} - -func TestKeepExistingPortGroupWhenPortLabelChanges(t *testing.T) { - cfg := &config.Config{ - Balance: config.Balance{ - RoutingRule: config.MatchPortStr, - }, - } - cfgGetter := newMockConfigGetter(cfg) - bo := newMockBackendObserver() - lg, text := logger.CreateLoggerForTest(t) - router := NewScoreBasedRouter(lg) - router.Init(context.Background(), bo, simpleBpCreator, cfgGetter, make(<-chan *config.Config)) - t.Cleanup(bo.Close) - t.Cleanup(router.Close) - - bo.addBackendWithCluster("backend-1", "cluster-a", map[string]string{ - config.TiProxyPortLabelName: "10080", - }) - bo.notify(nil) - - var oldGroup *Group - require.Eventually(t, func() bool { - router.Lock() - defer router.Unlock() - backend := router.backends["backend-1"] - if backend == nil { - return false - } - oldGroup = backend.group - return oldGroup != nil && slices.Equal(oldGroup.values, []string{"cluster-a:10080"}) - }, 3*time.Second, 10*time.Millisecond) - - conn := newMockRedirectableConn(t, 1) - selector := router.GetBackendSelector(ClientInfo{ListenerPort: "10080"}) - backend, err := selector.Next() - require.NoError(t, err) - selector.Finish(conn, true) - conn.from = backend - - bo.healthLock.Lock() - bo.healths["backend-1"].ClusterName = "cluster-a" - bo.healthLock.Unlock() - bo.setLabels("backend-1", map[string]string{ - config.TiProxyPortLabelName: "10081", - }) - bo.notify(nil) - - require.Eventually(t, func() bool { - router.Lock() - defer router.Unlock() - backend := router.backends["backend-1"] - return backend != nil && backend.group == oldGroup - }, 3*time.Second, 10*time.Millisecond) - require.Eventually(t, func() bool { - return strings.Contains(text.String(), "backend routing values changed, keep the existing group until it is removed") - }, 3*time.Second, 10*time.Millisecond) - - conn.Lock() - require.Equal(t, oldGroup, conn.receiver) - conn.Unlock() - - oldSelector := router.GetBackendSelector(ClientInfo{ListenerPort: "10080"}) - backend, err = oldSelector.Next() - require.NoError(t, err) - require.Equal(t, "backend-1", backend.ID()) - - newSelector := router.GetBackendSelector(ClientInfo{ListenerPort: "10081"}) - _, err = newSelector.Next() - require.ErrorIs(t, err, ErrNoBackend) -} - -func TestPortConflictGroupsStayClusterScoped(t *testing.T) { - tester := newRouterTester(t, nil) - tester.router.matchType = MatchPort - tester.backends["a1"] = &observer.BackendHealth{ - Healthy: true, - SupportRedirection: true, - BackendInfo: observer.BackendInfo{ - Addr: "shared-a-1:4000", - ClusterName: "cluster-a", - Labels: map[string]string{ - config.TiProxyPortLabelName: "10080", - }, - }, - } - tester.backends["a2"] = &observer.BackendHealth{ - Healthy: true, - SupportRedirection: true, - BackendInfo: observer.BackendInfo{ - Addr: "shared-a-2:4000", - ClusterName: "cluster-a", - Labels: map[string]string{ - config.TiProxyPortLabelName: "10080", - }, - }, - } - tester.backends["b1"] = &observer.BackendHealth{ - Healthy: true, - SupportRedirection: true, - BackendInfo: observer.BackendInfo{ - Addr: "shared-b-1:4000", - ClusterName: "cluster-b", - Labels: map[string]string{ - config.TiProxyPortLabelName: "10080", - }, - }, - } - tester.backends["b2"] = &observer.BackendHealth{ - Healthy: true, - SupportRedirection: true, - BackendInfo: observer.BackendInfo{ - Addr: "shared-b-2:4000", - ClusterName: "cluster-b", - Labels: map[string]string{ - config.TiProxyPortLabelName: "10080", - }, - }, - } - tester.notifyHealth() - - groupA := findGroupByValues(t, tester.router, []string{"cluster-a:10080"}) - groupB := findGroupByValues(t, tester.router, []string{"cluster-b:10080"}) - require.NotSame(t, groupA, groupB) - for _, backend := range groupA.backends { - require.Equal(t, "cluster-a", backend.ClusterName()) - } - for _, backend := range groupB.backends { - require.Equal(t, "cluster-b", backend.ClusterName()) - } -} - -func TestPortConflictBlocksRoutingButAllowsIntraClusterRebalance(t *testing.T) { - bp := &mockBalancePolicy{} - tester := newRouterTester(t, bp) - tester.router.matchType = MatchPort - bp.backendsToBalance = func(backends []policy.BackendCtx) (from policy.BackendCtx, to policy.BackendCtx, balanceCount float64, reason string, logFields []zapcore.Field) { - if len(backends) < 2 { - return nil, nil, 0, "", nil - } - var busiest, idlest policy.BackendCtx - for _, backend := range backends { - if busiest == nil || backend.ConnCount() > busiest.ConnCount() { - busiest = backend - } - if idlest == nil || backend.ConnCount() < idlest.ConnCount() { - idlest = backend - } - } - if busiest == nil || idlest == nil || busiest == idlest { - return nil, nil, 0, "", nil - } - return busiest, idlest, 100, "conn", nil - } - - tester.backends["a1"] = &observer.BackendHealth{ - Healthy: true, - SupportRedirection: true, - BackendInfo: observer.BackendInfo{ - Addr: "shared-a-1:4000", - ClusterName: "cluster-a", - Labels: map[string]string{ - config.TiProxyPortLabelName: "10080", - }, - }, - } - tester.backends["a2"] = &observer.BackendHealth{ - Healthy: true, - SupportRedirection: true, - BackendInfo: observer.BackendInfo{ - Addr: "shared-a-2:4000", - ClusterName: "cluster-a", - Labels: map[string]string{ - config.TiProxyPortLabelName: "10080", - }, - }, - } - tester.backends["b1"] = &observer.BackendHealth{ - Healthy: true, - SupportRedirection: true, - BackendInfo: observer.BackendInfo{ - Addr: "shared-b-1:4000", - ClusterName: "cluster-b", - Labels: map[string]string{ - config.TiProxyPortLabelName: "10080", - }, - }, - } - tester.notifyHealth() - - selector := tester.router.GetBackendSelector(ClientInfo{ListenerPort: "10080"}) - _, err := selector.Next() - require.Error(t, err) - require.True(t, errors.Is(err, ErrPortConflict)) - - groupA := findGroupByValues(t, tester.router, []string{"cluster-a:10080"}) - backendA1 := tester.router.backends["a1"] - for range 6 { - conn := tester.createConn() - groupA.onCreateConn(backendA1, conn, true) - conn.from = backendA1 - tester.conns[conn.connID] = conn - } - - groupA.lastRedirectTime = time.Time{} - groupA.Balance(context.Background()) - - redirecting := 0 - for _, conn := range tester.conns { - if conn.to == nil || reflect.ValueOf(conn.to).IsNil() { - continue - } - redirecting++ - require.Equal(t, "cluster-a", tester.router.backends[conn.to.ID()].ClusterName()) - require.Equal(t, "a2", conn.to.ID()) - } - require.Greater(t, redirecting, 0) -} - -func TestRouteBackendsWithSameAddrDifferentIDs(t *testing.T) { - tester := newRouterTester(t, nil) - tester.router.matchType = MatchAll - tester.backends["cluster-a/shared:4000"] = &observer.BackendHealth{ - Healthy: true, - SupportRedirection: true, - BackendInfo: observer.BackendInfo{ - Addr: "shared:4000", - ClusterName: "cluster-a", - }, - } - tester.backends["cluster-b/shared:4000"] = &observer.BackendHealth{ - Healthy: true, - SupportRedirection: true, - BackendInfo: observer.BackendInfo{ - Addr: "shared:4000", - ClusterName: "cluster-b", - }, - } - tester.notifyHealth() - - selector := tester.router.GetBackendSelector(ClientInfo{}) - first, err := selector.Next() - require.NoError(t, err) - second, err := selector.Next() - require.NoError(t, err) - - require.Equal(t, "shared:4000", first.Addr()) - require.Equal(t, "shared:4000", second.Addr()) - require.NotEqual(t, first.ID(), second.ID()) -} - -func findGroupByValues(t *testing.T, router *ScoreBasedRouter, values []string) *Group { - t.Helper() - router.Lock() - defer router.Unlock() - for _, group := range router.groups { - if group.matchType == MatchPort { - if slices.Equal(group.values, values) { - return group - } - continue - } - if group.EqualValues(values) { - return group - } - } - require.FailNow(t, "group not found", "values=%v", values) - return nil -} ->>>>>>> 9fafc2f1 (balance, proxy: fix TiDB CPU imbalance when balance.policy="resource" (#1173)) diff --git a/pkg/balance/router/router_static.go b/pkg/balance/router/router_static.go index bdfc0c08a..11e938543 100644 --- a/pkg/balance/router/router_static.go +++ b/pkg/balance/router/router_static.go @@ -74,11 +74,7 @@ func (r *StaticRouter) OnRedirectFail(from, to string, conn RedirectableConn) er return nil } -<<<<<<< HEAD -func (r *StaticRouter) OnConnClosed(addr, redirectingAddr string, conn RedirectableConn) error { -======= -func (r *StaticRouter) OnConnClosed(backendID string, conn RedirectableConn) error { ->>>>>>> 9fafc2f1 (balance, proxy: fix TiDB CPU imbalance when balance.policy="resource" (#1173)) +func (r *StaticRouter) OnConnClosed(backendAddr string, conn RedirectableConn) error { r.cnt-- return nil } diff --git a/pkg/proxy/backend/backend_conn_mgr.go b/pkg/proxy/backend/backend_conn_mgr.go index c0d40b0a5..5adf25e6b 100644 --- a/pkg/proxy/backend/backend_conn_mgr.go +++ b/pkg/proxy/backend/backend_conn_mgr.go @@ -265,9 +265,8 @@ func (mgr *BackendConnManager) abandonRoutedBackend() { if receiver == nil || mgr.curBackend == nil { return } - if err := receiver.OnConnClosed(mgr.curBackend.ID(), mgr); err != nil { - mgr.logger.Warn("abandon routed backend failed", zap.String("backend_id", mgr.curBackend.ID()), - zap.String("backend_addr", mgr.curBackend.Addr()), zap.NamedError("notify_err", err)) + if err := receiver.OnConnClosed(mgr.curBackend.Addr(), mgr); err != nil { + mgr.logger.Warn("abandon routed backend failed", zap.String("backend_addr", mgr.curBackend.Addr()), zap.NamedError("notify_err", err)) } // Clear the receiver so that Close() won't clean up again. mgr.eventReceiver.Store(nil) @@ -807,9 +806,7 @@ func (mgr *BackendConnManager) Close() error { handErr := mgr.handshakeHandler.OnConnClose(mgr, mgr.quitSource) var connErr error - var addr string if backendIO := mgr.backendIO.Swap(nil); backendIO != nil { - addr = (*backendIO).RemoteAddr().String() connErr = (*backendIO).Close() } @@ -822,20 +819,9 @@ func (mgr *BackendConnManager) Close() error { default: } // The connection may have just received the redirecting signal. -<<<<<<< HEAD - if len(addr) > 0 { - var redirectingAddr string - if redirectingBackend := mgr.redirectInfo.Load(); redirectingBackend != nil { - redirectingAddr = (*redirectingBackend).Addr() - } - if err := eventReceiver.OnConnClosed(addr, redirectingAddr, mgr); err != nil { - mgr.logger.Error("close connection error", zap.String("backend_addr", addr), zap.NamedError("notify_err", err)) -======= if mgr.curBackend != nil { - if err := eventReceiver.OnConnClosed(mgr.curBackend.ID(), mgr); err != nil { - mgr.logger.Error("close connection error", zap.String("backend_id", mgr.curBackend.ID()), - zap.String("backend_addr", mgr.curBackend.Addr()), zap.NamedError("notify_err", err)) ->>>>>>> 9fafc2f1 (balance, proxy: fix TiDB CPU imbalance when balance.policy="resource" (#1173)) + if err := eventReceiver.OnConnClosed(mgr.curBackend.Addr(), mgr); err != nil { + mgr.logger.Error("close connection error", zap.String("backend_addr", mgr.curBackend.Addr()), zap.NamedError("notify_err", err)) } } } diff --git a/pkg/proxy/backend/backend_conn_mgr_test.go b/pkg/proxy/backend/backend_conn_mgr_test.go index 515ce6547..32b443502 100644 --- a/pkg/proxy/backend/backend_conn_mgr_test.go +++ b/pkg/proxy/backend/backend_conn_mgr_test.go @@ -62,16 +62,9 @@ func (mer *mockEventReceiver) OnRedirectFail(from, to string, conn router.Redire return nil } -<<<<<<< HEAD -func (mer *mockEventReceiver) OnConnClosed(from, to string, conn router.RedirectableConn) error { +func (mer *mockEventReceiver) OnConnClosed(backendAddr string, conn router.RedirectableConn) error { mer.eventCh <- event{ - from: from, - to: to, -======= -func (mer *mockEventReceiver) OnConnClosed(backendID string, conn router.RedirectableConn) error { - mer.eventCh <- event{ - from: backendID, ->>>>>>> 9fafc2f1 (balance, proxy: fix TiDB CPU imbalance when balance.policy="resource" (#1173)) + from: backendAddr, eventName: eventClose, } return nil @@ -792,7 +785,6 @@ func TestGracefulCloseWhenActive(t *testing.T) { ts.runTests(runners) } -<<<<<<< HEAD type countingPacketIO struct { pnet.PacketIO gracefulCloseCnt atomic.Int32 @@ -807,7 +799,8 @@ func (cp *countingPacketIO) GracefulClose() error { func (cp *countingPacketIO) Close() error { cp.closeCnt.Add(1) return nil -======= +} + // Test that the redirection aborted by closing is reported as a failure instead of a success. // Otherwise, the router moves the connection to the target backend in the connList, which // leaks the connection in the list. @@ -840,30 +833,6 @@ func TestRedirectAbortedByCloseReportsFail(t *testing.T) { ts.runTests(runners) } -func TestGracefulCloseBeforeHandshake(t *testing.T) { - ts := newBackendMgrTester(t) - runners := []runner{ - // try to gracefully close before handshake - { - proxy: func(_, _ pnet.PacketIO) error { - ts.mp.GracefulClose() - return nil - }, - }, - // connect fails - { - proxy: func(clientIO, backendIO pnet.PacketIO) error { - err := ts.mp.Connect(context.Background(), clientIO, ts.mp.frontendTLSConfig, ts.mp.backendTLSConfig, "", "", "") - require.Error(ts.t, err) - require.Equal(t, SrcProxyQuit, ts.mp.QuitSource()) - return nil - }, - }, - } - ts.runTests(runners) ->>>>>>> 9fafc2f1 (balance, proxy: fix TiDB CPU imbalance when balance.policy="resource" (#1173)) -} - func TestForceClose(t *testing.T) { ts := newBackendMgrTester(t) runners := []runner{