diff --git a/pkg/balance/factor/factor_cpu.go b/pkg/balance/factor/factor_cpu.go index b531a953..0e45635a 100644 --- a/pkg/balance/factor/factor_cpu.go +++ b/pkg/balance/factor/factor_cpu.go @@ -240,7 +240,7 @@ func (fc *FactorCPU) updateCpuPerConn() { // Estimate the current cpu usage by the latest CPU usage, the latest connection count, and the current connection count. func (fc *FactorCPU) getUsage(backend scoredBackend) (avgUsage, latestUsage float64) { snapshot, ok := fc.snapshot[backend.Addr()] - if !ok || snapshot.avgUsage < 0 || latestUsage < 0 { + if !ok || snapshot.avgUsage < 0 || snapshot.latestUsage < 0 { // The metric has missed for minutes. return 1, 1 } diff --git a/pkg/balance/router/router.go b/pkg/balance/router/router.go index ae9a586c..e7d6d63e 100644 --- a/pkg/balance/router/router.go +++ b/pkg/balance/router/router.go @@ -32,7 +32,7 @@ func (b routeCheckBackend) Healthy() bool { type ConnEventReceiver interface { OnRedirectSucceed(from, to string, conn RedirectableConn) error OnRedirectFail(from, to string, conn RedirectableConn) error - OnConnClosed(addr, redirectingAddr string, conn RedirectableConn) error + OnConnClosed(backendAddr string, conn RedirectableConn) error } // Router routes client connections to backends. @@ -225,6 +225,12 @@ func (b *backendWrapper) String() string { // connWrapper wraps RedirectableConn. type connWrapper struct { RedirectableConn + // The backend whose connList contains this connection. It may differ from the backend that the + // connection is physically on when a redirect result is not processed yet, so removing the + // connection from the list must always go through `physicalOwner`. + physicalOwner *backendWrapper + // The backend whose connScore includes this connection. + scoreOwner *backendWrapper // The reason why the redirection happens. redirectReason string // Last redirect start time of this connection. @@ -234,6 +240,16 @@ type connWrapper struct { forceClosing bool } +func (c *connWrapper) transferScore(to *backendWrapper) { + if c.scoreOwner != nil { + c.scoreOwner.connScore-- + } + c.scoreOwner = to + if to != nil { + to.connScore++ + } +} + func backendPodNameFromAddr(addr string) string { host, _, err := net.SplitHostPort(addr) if err != nil { diff --git a/pkg/balance/router/router_score.go b/pkg/balance/router/router_score.go index b0dae4b8..11187bf3 100644 --- a/pkg/balance/router/router_score.go +++ b/pkg/balance/router/router_score.go @@ -157,6 +157,7 @@ func (router *ScoreBasedRouter) onCreateConn(backendInst BackendInst, conn Redir if succeed { connWrapper := &connWrapper{ RedirectableConn: conn, + scoreOwner: backend, createTime: time.Now(), phase: phaseNotRedirected, forceClosing: false, @@ -168,13 +169,32 @@ func (router *ScoreBasedRouter) onCreateConn(backendInst BackendInst, conn Redir } } -func (router *ScoreBasedRouter) removeConn(backend *backendWrapper, ce *glist.Element[*connWrapper]) { +// removeConn removes the connection from the connList that actually contains it. +// Always remove by `physicalOwner` instead of by the backend that the connection is physically on: +// they differ when a redirect result has not been processed yet, and glist.Remove silently +// does nothing if the element is not in the given list, which leaks the connection forever. +func (router *ScoreBasedRouter) removeConn(ce *glist.Element[*connWrapper]) { + backend := ce.Value.physicalOwner + if backend == nil { + router.logger.Warn("unexpected nil physical owner for connection") + return + } + oldLen := backend.connList.Len() backend.connList.Remove(ce) - setBackendConnMetrics(backend.addr, backend.connList.Len()) + newLen := backend.connList.Len() + if newLen != oldLen-1 { + router.logger.Warn("the connection is not in the list", zap.String("backend", backend.addr)) + } + ce.Value.physicalOwner = nil + setBackendConnMetrics(backend.addr, newLen) router.removeBackendIfEmpty(backend) } func (router *ScoreBasedRouter) addConn(backend *backendWrapper, conn *connWrapper) { + if conn.physicalOwner != nil { + router.logger.Warn("unexpected non-nil physical owner for connection") + } + conn.physicalOwner = backend ce := backend.connList.PushBack(conn) setBackendConnMetrics(backend.addr, backend.connList.Len()) router.setConnWrapper(conn, ce) @@ -243,41 +263,38 @@ func (router *ScoreBasedRouter) onRedirectFinished(from, to string, conn Redirec fromBackend := router.ensureBackend(from) toBackend := router.ensureBackend(to) connWrapper := router.getConnWrapper(conn).Value - addMigrateMetrics(from, to, connWrapper.redirectReason, succeed, connWrapper.lastRedirect) // The connection may be closed when this function is waiting for the lock. if connWrapper.phase == phaseClosed { return } + addMigrateMetrics(fromBackend.addr, toBackend.addr, connWrapper.redirectReason, succeed, connWrapper.lastRedirect) if succeed { - router.removeConn(fromBackend, router.getConnWrapper(conn)) + router.removeConn(router.getConnWrapper(conn)) router.addConn(toBackend, connWrapper) connWrapper.phase = phaseRedirectEnd } else { - fromBackend.connScore++ - toBackend.connScore-- - router.removeBackendIfEmpty(toBackend) + connWrapper.transferScore(fromBackend) connWrapper.phase = phaseRedirectFail } } // OnConnClosed implements ConnEventReceiver.OnConnClosed interface. -func (router *ScoreBasedRouter) OnConnClosed(addr, redirectingAddr string, conn RedirectableConn) error { +func (router *ScoreBasedRouter) OnConnClosed(backendAddr string, conn RedirectableConn) error { router.Lock() defer router.Unlock() - backend := router.ensureBackend(addr) connWrapper := router.getConnWrapper(conn) - // If this connection has not redirected yet, decrease the score of the target backend. - if redirectingAddr != "" { - redirectingBackend := router.ensureBackend(redirectingAddr) - redirectingBackend.connScore-- - router.removeBackendIfEmpty(redirectingBackend) - metrics.PendingMigrateGuage.WithLabelValues(addr, redirectingAddr, connWrapper.Value.redirectReason).Dec() - } else { - backend.connScore-- - } - router.removeConn(backend, connWrapper) - connWrapper.Value.phase = phaseClosed + cw := connWrapper.Value + // If the physical owner mismatches the score owner, it means the redirect result has not been processed yet. + if cw.physicalOwner != cw.scoreOwner && cw.physicalOwner != nil && cw.scoreOwner != nil { + addMigrateMetrics(cw.physicalOwner.addr, cw.scoreOwner.addr, cw.redirectReason, false, cw.lastRedirect) + } + cw.transferScore(nil) + // A redirect result may have not been processed yet, in which case the connection is still + // in the old backend's connList while `backendAddr` is the new backend, so remove the connection + // by its physicalOwner. onRedirectFinished won't touch the list once the phase is phaseClosed. + router.removeConn(connWrapper) + cw.phase = phaseClosed return nil } @@ -432,9 +449,7 @@ func (router *ScoreBasedRouter) redirectConn(conn *connWrapper, fromBackend *bac succeed := conn.Redirect(toBackend) if succeed { router.logger.Debug("begin redirect connection", fields...) - fromBackend.connScore-- - router.removeBackendIfEmpty(fromBackend) - toBackend.connScore++ + conn.transferScore(toBackend) conn.phase = phaseRedirectNotify conn.redirectReason = reason metrics.PendingMigrateGuage.WithLabelValues(fromBackend.addr, toBackend.addr, reason).Inc() diff --git a/pkg/balance/router/router_score_test.go b/pkg/balance/router/router_score_test.go index e8249743..2d713130 100644 --- a/pkg/balance/router/router_score_test.go +++ b/pkg/balance/router/router_score_test.go @@ -165,7 +165,7 @@ func (tester *routerTester) closeConnections(num int, redirecting bool) { } } for _, conn := range conns { - err := tester.router.OnConnClosed(conn.from.Addr(), conn.GetRedirectingAddr(), conn) + err := tester.router.OnConnClosed(conn.from.Addr(), conn) require.NoError(tester.t, err) delete(tester.conns, conn.connID) } @@ -247,6 +247,34 @@ func (tester *routerTester) checkBackendConnMetrics() { } } +// checkNoConnLeak checks that no connection wrapper leaks in any connList and all connScores are 0 +// after all the connections are closed. +func (tester *routerTester) checkNoConnLeak() { + for _, backend := range tester.router.backends { + require.Equal(tester.t, 0, backend.connList.Len(), "backend %s leaks connections in connList", backend.addr) + require.Equal(tester.t, 0, backend.connScore, "backend %s has a wrong connScore", backend.addr) + } +} + +// prepareRedirectingConn creates one connection on backend "1" and makes it redirect to backend "2". +func (tester *routerTester) prepareRedirectingConn() *mockRedirectableConn { + tester.addBackends(1) + tester.addConnections(1) + tester.addBackends(1) + // Make backend "1" unhealthy so that the connection is redirected to backend "2". + tester.updateBackendStatusByAddr("1", false) + tester.rebalance(1) + conn := tester.conns[1] + require.Equal(tester.t, "2", conn.GetRedirectingAddr()) + return conn +} + +func (tester *routerTester) readPendingGauge(from, to, reason string) float64 { + val, err := metrics.ReadGauge(metrics.PendingMigrateGuage.WithLabelValues(from, to, reason)) + require.NoError(tester.t, err) + return val +} + func (tester *routerTester) clear() { tester.backendID = 0 tester.connID = 0 @@ -283,6 +311,114 @@ func TestRebalance(t *testing.T) { tester.checkRedirectingNum(20) } +// Test the race that the connection is closed after the redirection succeeds on the connection side +// but before the router processes the redirect result. The connection must be removed from the +// original backend's connList, otherwise it leaks in the connList forever, which inflates b_conn +// metrics and distorts the balance. +func TestCloseRaceWithRedirectSucceed(t *testing.T) { + tester := newRouterTester(t, nil) + conn := tester.prepareRedirectingConn() + reason := tester.router.getConnWrapper(conn).Value.redirectReason + pendingDuringRedirect := tester.readPendingGauge("1", "2", reason) + + // The redirection succeeds on the connection side, and then the connection is closed + // before the router receives OnRedirectSucceed. + conn.redirectSucceed() + require.NoError(t, tester.router.OnConnClosed(conn.from.Addr(), conn)) + delete(tester.conns, conn.connID) + // Now the router processes the redirect result. + require.NoError(t, tester.router.OnRedirectSucceed("1", "2", conn)) + + tester.checkNoConnLeak() + tester.checkBackendConnMetrics() + require.Equal(t, pendingDuringRedirect-1, tester.readPendingGauge("1", "2", reason)) +} + +// Test the race that the connection is closed after the redirection fails on the connection side +// but before the router processes the redirect result. The connScores moved by the redirection +// must be moved back, otherwise they drift away permanently. +func TestCloseRaceWithRedirectFail(t *testing.T) { + tester := newRouterTester(t, nil) + conn := tester.prepareRedirectingConn() + reason := tester.router.getConnWrapper(conn).Value.redirectReason + pendingDuringRedirect := tester.readPendingGauge("1", "2", reason) + + // The redirection fails on the connection side, and then the connection is closed + // before the router receives OnRedirectFail. + conn.redirectFail() + require.NoError(t, tester.router.OnConnClosed(conn.from.Addr(), conn)) + delete(tester.conns, conn.connID) + // Now the router processes the redirect result. + require.NoError(t, tester.router.OnRedirectFail("1", "2", conn)) + + tester.checkNoConnLeak() + tester.checkBackendConnMetrics() + require.Equal(t, pendingDuringRedirect-1, tester.readPendingGauge("1", "2", reason)) +} + +// Test the race that the connection is closed before the connection side processes the redirect +// signal. OnConnClosed reverts the redirection, and the aborted redirect result that arrives later +// must not revert it again, otherwise the connScores and the pending gauge are decreased twice. +func TestCloseRaceWithRedirectAborted(t *testing.T) { + tester := newRouterTester(t, nil) + conn := tester.prepareRedirectingConn() + reason := tester.router.getConnWrapper(conn).Value.redirectReason + pendingDuringRedirect := tester.readPendingGauge("1", "2", reason) + + // The connection is closed before the redirect signal is processed on the connection side. + require.NoError(t, tester.router.OnConnClosed(conn.from.Addr(), conn)) + delete(tester.conns, conn.connID) + // The aborted redirect result arrives later and must be a no-op. + require.NoError(t, tester.router.OnRedirectFail("1", "2", conn)) + + tester.checkNoConnLeak() + tester.checkBackendConnMetrics() + require.Equal(t, pendingDuringRedirect-1, tester.readPendingGauge("1", "2", reason)) +} + +// Test that when the authenticator reconnects to another backend during the handshake, +// the registration of the previous attempt is cleaned up. +func TestReconnectDuringHandshake(t *testing.T) { + tester := newRouterTester(t, nil) + tester.addBackends(2) + conn := tester.createConn() + selector1 := tester.router.GetBackendSelector() + backend1, err := selector1.Next() + require.NoError(t, err) + require.False(t, backend1 == nil || reflect.ValueOf(backend1).IsNil()) + selector1.Finish(conn, true) + // Simulate getBackendIO cleaning up before reconnecting to another backend. + require.NoError(t, tester.router.OnConnClosed(backend1.Addr(), conn)) + selector2 := tester.router.GetBackendSelector() + var backend2 BackendInst + for { + backend2, err = selector2.Next() + require.NoError(t, err) + require.False(t, backend2 == nil || reflect.ValueOf(backend2).IsNil()) + if backend2.Addr() != backend1.Addr() { + break + } + selector2.Finish(conn, false) + } + selector2.Finish(conn, true) + conn.from = backend2 + tester.conns[conn.connID] = conn + + // Only the registration of the second attempt remains. + totalConns, totalScore := 0, 0 + for _, backend := range tester.router.backends { + totalConns += backend.connList.Len() + totalScore += backend.connScore + } + require.Equal(t, 1, totalConns) + require.Equal(t, 1, totalScore) + tester.checkBackendConnMetrics() + + require.NoError(t, tester.router.OnConnClosed(backend2.Addr(), conn)) + delete(tester.conns, conn.connID) + tester.checkNoConnLeak() +} + // Test that the connections are always balanced after rebalance and routing. func TestConnBalanced(t *testing.T) { tester := newRouterTester(t, nil) @@ -618,7 +754,7 @@ func TestConcurrency(t *testing.T) { from, to := conn.getAddr() var err error if i < 1 { - err = router.OnConnClosed(from, conn.GetRedirectingAddr(), conn) + err = router.OnConnClosed(from, conn) conn = nil } else if i < 3 { conn.redirectFail() @@ -634,7 +770,7 @@ func TestConcurrency(t *testing.T) { if i < 2 { // The balancer may happen to redirect it concurrently - that's exactly what may happen. from, _ := conn.getAddr() - err := router.OnConnClosed(from, conn.GetRedirectingAddr(), conn) + err := router.OnConnClosed(from, conn) require.NoError(t, err) conn = nil } diff --git a/pkg/balance/router/router_static.go b/pkg/balance/router/router_static.go index 7b017390..11e93854 100644 --- a/pkg/balance/router/router_static.go +++ b/pkg/balance/router/router_static.go @@ -74,7 +74,7 @@ func (r *StaticRouter) OnRedirectFail(from, to string, conn RedirectableConn) er return nil } -func (r *StaticRouter) OnConnClosed(addr, redirectingAddr string, conn RedirectableConn) error { +func (r *StaticRouter) OnConnClosed(backendAddr string, conn RedirectableConn) error { r.cnt-- return nil } diff --git a/pkg/proxy/backend/backend_conn_mgr.go b/pkg/proxy/backend/backend_conn_mgr.go index 493ae409..5adf25e6 100644 --- a/pkg/proxy/backend/backend_conn_mgr.go +++ b/pkg/proxy/backend/backend_conn_mgr.go @@ -258,7 +258,23 @@ func (mgr *BackendConnManager) newExponentialBackOff() *backoff.ExponentialBackO return b } +// abandonRoutedBackend removes the registration left by a previous successful dial during handshake. +// It is called at the beginning of getBackendIO before routing to another backend. +func (mgr *BackendConnManager) abandonRoutedBackend() { + receiver := mgr.getEventReceiver() + if receiver == nil || mgr.curBackend == nil { + return + } + if err := receiver.OnConnClosed(mgr.curBackend.Addr(), mgr); err != nil { + mgr.logger.Warn("abandon routed backend failed", zap.String("backend_addr", mgr.curBackend.Addr()), zap.NamedError("notify_err", err)) + } + // Clear the receiver so that Close() won't clean up again. + mgr.eventReceiver.Store(nil) + mgr.redirectInfo.Store(nil) +} + func (mgr *BackendConnManager) getBackendIO(ctx context.Context, cctx ConnContext, resp *pnet.HandshakeResp) (pnet.PacketIO, error) { + mgr.abandonRoutedBackend() r, err := mgr.handshakeHandler.GetRouter(cctx, resp) if err != nil { return nil, errors.Wrap(err, ErrProxyErr) @@ -545,6 +561,7 @@ func (mgr *BackendConnManager) tryRedirect(ctx context.Context) { }() // Even if the connection is closing, the redirection result must still be sent to recover the connection scores. if mgr.closeStatus.Load() >= statusNotifyClose || ctx.Err() != nil { + rs.err = ErrClosing return } // It may have been too long since the redirection signal was sent, and the target backend may be unhealthy now. @@ -789,26 +806,22 @@ func (mgr *BackendConnManager) Close() error { handErr := mgr.handshakeHandler.OnConnClose(mgr, mgr.quitSource) var connErr error - var addr string if backendIO := mgr.backendIO.Swap(nil); backendIO != nil { - addr = (*backendIO).RemoteAddr().String() connErr = (*backendIO).Close() } eventReceiver := mgr.getEventReceiver() if eventReceiver != nil { // Notify the receiver if there's any event. - if len(mgr.redirectResCh) > 0 { - mgr.notifyRedirectResult(context.Background(), <-mgr.redirectResCh) + select { + case rs := <-mgr.redirectResCh: + mgr.notifyRedirectResult(context.Background(), rs) + default: } // The connection may have just received the redirecting signal. - if len(addr) > 0 { - var redirectingAddr string - if redirectingBackend := mgr.redirectInfo.Load(); redirectingBackend != nil { - redirectingAddr = (*redirectingBackend).Addr() - } - if err := eventReceiver.OnConnClosed(addr, redirectingAddr, mgr); err != nil { - mgr.logger.Error("close connection error", zap.String("backend_addr", addr), zap.NamedError("notify_err", err)) + if mgr.curBackend != nil { + if err := eventReceiver.OnConnClosed(mgr.curBackend.Addr(), mgr); err != nil { + mgr.logger.Error("close connection error", zap.String("backend_addr", mgr.curBackend.Addr()), zap.NamedError("notify_err", err)) } } } diff --git a/pkg/proxy/backend/backend_conn_mgr_test.go b/pkg/proxy/backend/backend_conn_mgr_test.go index b62ef641..32b44350 100644 --- a/pkg/proxy/backend/backend_conn_mgr_test.go +++ b/pkg/proxy/backend/backend_conn_mgr_test.go @@ -62,10 +62,9 @@ func (mer *mockEventReceiver) OnRedirectFail(from, to string, conn router.Redire return nil } -func (mer *mockEventReceiver) OnConnClosed(from, to string, conn router.RedirectableConn) error { +func (mer *mockEventReceiver) OnConnClosed(backendAddr string, conn router.RedirectableConn) error { mer.eventCh <- event{ - from: from, - to: to, + from: backendAddr, eventName: eventClose, } return nil @@ -802,6 +801,38 @@ func (cp *countingPacketIO) Close() error { return nil } +// Test that the redirection aborted by closing is reported as a failure instead of a success. +// Otherwise, the router moves the connection to the target backend in the connList, which +// leaks the connection in the list. +func TestRedirectAbortedByCloseReportsFail(t *testing.T) { + ts := newBackendMgrTester(t) + runners := []runner{ + // 1st handshake + { + client: ts.mc.authenticate, + proxy: ts.firstHandshake4Proxy, + backend: ts.handshake4Backend, + }, + { + proxy: func(_, _ pnet.PacketIO) error { + // Simulate the race that the redirect signal is received but the connection + // starts closing before tryRedirect processes the signal. + backendInst := router.BackendInst(newMockBackendInst(ts)) + ts.mp.redirectInfo.Store(&backendInst) + ts.mp.closeStatus.Store(statusNotifyClose) + ts.mp.processLock.Lock() + ts.mp.tryRedirect(context.Background()) + ts.mp.processLock.Unlock() + // The aborted redirection must be reported as a failure, not a success. + ts.mp.getEventReceiver().(*mockEventReceiver).checkEvent(ts.t, eventFail) + ts.mp.closeStatus.Store(statusActive) + return nil + }, + }, + } + ts.runTests(runners) +} + func TestForceClose(t *testing.T) { ts := newBackendMgrTester(t) runners := []runner{