Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/balance/factor/factor_cpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (fc *FactorCPU) updateCpuPerConn() {
// Estimate the current cpu usage by the latest CPU usage, the latest connection count, and the current connection count.
func (fc *FactorCPU) getUsage(backend scoredBackend) (avgUsage, latestUsage float64) {
snapshot, ok := fc.snapshot[backend.Addr()]
if !ok || snapshot.avgUsage < 0 || latestUsage < 0 {
if !ok || snapshot.avgUsage < 0 || snapshot.latestUsage < 0 {
// The metric has missed for minutes.
return 1, 1
}
Expand Down
18 changes: 17 additions & 1 deletion pkg/balance/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (b routeCheckBackend) Healthy() bool {
type ConnEventReceiver interface {
OnRedirectSucceed(from, to string, conn RedirectableConn) error
OnRedirectFail(from, to string, conn RedirectableConn) error
OnConnClosed(addr, redirectingAddr string, conn RedirectableConn) error
OnConnClosed(backendAddr string, conn RedirectableConn) error
}

// Router routes client connections to backends.
Expand Down Expand Up @@ -225,6 +225,12 @@ func (b *backendWrapper) String() string {
// connWrapper wraps RedirectableConn.
type connWrapper struct {
RedirectableConn
// The backend whose connList contains this connection. It may differ from the backend that the
// connection is physically on when a redirect result is not processed yet, so removing the
// connection from the list must always go through `physicalOwner`.
physicalOwner *backendWrapper
// The backend whose connScore includes this connection.
scoreOwner *backendWrapper
// The reason why the redirection happens.
redirectReason string
// Last redirect start time of this connection.
Expand All @@ -234,6 +240,16 @@ type connWrapper struct {
forceClosing bool
}

func (c *connWrapper) transferScore(to *backendWrapper) {
if c.scoreOwner != nil {
c.scoreOwner.connScore--
}
c.scoreOwner = to
if to != nil {
to.connScore++
}
}

func backendPodNameFromAddr(addr string) string {
host, _, err := net.SplitHostPort(addr)
if err != nil {
Expand Down
61 changes: 38 additions & 23 deletions pkg/balance/router/router_score.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ func (router *ScoreBasedRouter) onCreateConn(backendInst BackendInst, conn Redir
if succeed {
connWrapper := &connWrapper{
RedirectableConn: conn,
scoreOwner: backend,
createTime: time.Now(),
phase: phaseNotRedirected,
forceClosing: false,
Expand All @@ -168,13 +169,32 @@ func (router *ScoreBasedRouter) onCreateConn(backendInst BackendInst, conn Redir
}
}

func (router *ScoreBasedRouter) removeConn(backend *backendWrapper, ce *glist.Element[*connWrapper]) {
// removeConn removes the connection from the connList that actually contains it.
// Always remove by `physicalOwner` instead of by the backend that the connection is physically on:
// they differ when a redirect result has not been processed yet, and glist.Remove silently
// does nothing if the element is not in the given list, which leaks the connection forever.
func (router *ScoreBasedRouter) removeConn(ce *glist.Element[*connWrapper]) {
backend := ce.Value.physicalOwner
if backend == nil {
router.logger.Warn("unexpected nil physical owner for connection")
return
}
oldLen := backend.connList.Len()
backend.connList.Remove(ce)
setBackendConnMetrics(backend.addr, backend.connList.Len())
newLen := backend.connList.Len()
if newLen != oldLen-1 {
router.logger.Warn("the connection is not in the list", zap.String("backend", backend.addr))
}
ce.Value.physicalOwner = nil
setBackendConnMetrics(backend.addr, newLen)
router.removeBackendIfEmpty(backend)
}

func (router *ScoreBasedRouter) addConn(backend *backendWrapper, conn *connWrapper) {
if conn.physicalOwner != nil {
router.logger.Warn("unexpected non-nil physical owner for connection")
}
conn.physicalOwner = backend
ce := backend.connList.PushBack(conn)
setBackendConnMetrics(backend.addr, backend.connList.Len())
router.setConnWrapper(conn, ce)
Expand Down Expand Up @@ -243,41 +263,38 @@ func (router *ScoreBasedRouter) onRedirectFinished(from, to string, conn Redirec
fromBackend := router.ensureBackend(from)
toBackend := router.ensureBackend(to)
connWrapper := router.getConnWrapper(conn).Value
addMigrateMetrics(from, to, connWrapper.redirectReason, succeed, connWrapper.lastRedirect)
// The connection may be closed when this function is waiting for the lock.
if connWrapper.phase == phaseClosed {
return
}

addMigrateMetrics(fromBackend.addr, toBackend.addr, connWrapper.redirectReason, succeed, connWrapper.lastRedirect)
if succeed {
router.removeConn(fromBackend, router.getConnWrapper(conn))
router.removeConn(router.getConnWrapper(conn))
router.addConn(toBackend, connWrapper)
connWrapper.phase = phaseRedirectEnd
} else {
fromBackend.connScore++
toBackend.connScore--
router.removeBackendIfEmpty(toBackend)
connWrapper.transferScore(fromBackend)
connWrapper.phase = phaseRedirectFail
}
}

// OnConnClosed implements ConnEventReceiver.OnConnClosed interface.
func (router *ScoreBasedRouter) OnConnClosed(addr, redirectingAddr string, conn RedirectableConn) error {
func (router *ScoreBasedRouter) OnConnClosed(backendAddr string, conn RedirectableConn) error {
router.Lock()
defer router.Unlock()
backend := router.ensureBackend(addr)
connWrapper := router.getConnWrapper(conn)
// If this connection has not redirected yet, decrease the score of the target backend.
if redirectingAddr != "" {
redirectingBackend := router.ensureBackend(redirectingAddr)
redirectingBackend.connScore--
router.removeBackendIfEmpty(redirectingBackend)
metrics.PendingMigrateGuage.WithLabelValues(addr, redirectingAddr, connWrapper.Value.redirectReason).Dec()
} else {
backend.connScore--
}
router.removeConn(backend, connWrapper)
connWrapper.Value.phase = phaseClosed
cw := connWrapper.Value
// If the physical owner mismatches the score owner, it means the redirect result has not been processed yet.
if cw.physicalOwner != cw.scoreOwner && cw.physicalOwner != nil && cw.scoreOwner != nil {
addMigrateMetrics(cw.physicalOwner.addr, cw.scoreOwner.addr, cw.redirectReason, false, cw.lastRedirect)
}
cw.transferScore(nil)
// A redirect result may have not been processed yet, in which case the connection is still
// in the old backend's connList while `backendAddr` is the new backend, so remove the connection
// by its physicalOwner. onRedirectFinished won't touch the list once the phase is phaseClosed.
router.removeConn(connWrapper)
cw.phase = phaseClosed
return nil
}

Expand Down Expand Up @@ -432,9 +449,7 @@ func (router *ScoreBasedRouter) redirectConn(conn *connWrapper, fromBackend *bac
succeed := conn.Redirect(toBackend)
if succeed {
router.logger.Debug("begin redirect connection", fields...)
fromBackend.connScore--
router.removeBackendIfEmpty(fromBackend)
toBackend.connScore++
conn.transferScore(toBackend)
conn.phase = phaseRedirectNotify
conn.redirectReason = reason
metrics.PendingMigrateGuage.WithLabelValues(fromBackend.addr, toBackend.addr, reason).Inc()
Expand Down
142 changes: 139 additions & 3 deletions pkg/balance/router/router_score_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (tester *routerTester) closeConnections(num int, redirecting bool) {
}
}
for _, conn := range conns {
err := tester.router.OnConnClosed(conn.from.Addr(), conn.GetRedirectingAddr(), conn)
err := tester.router.OnConnClosed(conn.from.Addr(), conn)
require.NoError(tester.t, err)
delete(tester.conns, conn.connID)
}
Expand Down Expand Up @@ -247,6 +247,34 @@ func (tester *routerTester) checkBackendConnMetrics() {
}
}

// checkNoConnLeak checks that no connection wrapper leaks in any connList and all connScores are 0
// after all the connections are closed.
func (tester *routerTester) checkNoConnLeak() {
for _, backend := range tester.router.backends {
require.Equal(tester.t, 0, backend.connList.Len(), "backend %s leaks connections in connList", backend.addr)
require.Equal(tester.t, 0, backend.connScore, "backend %s has a wrong connScore", backend.addr)
}
}

// prepareRedirectingConn creates one connection on backend "1" and makes it redirect to backend "2".
func (tester *routerTester) prepareRedirectingConn() *mockRedirectableConn {
tester.addBackends(1)
tester.addConnections(1)
tester.addBackends(1)
// Make backend "1" unhealthy so that the connection is redirected to backend "2".
tester.updateBackendStatusByAddr("1", false)
tester.rebalance(1)
conn := tester.conns[1]
require.Equal(tester.t, "2", conn.GetRedirectingAddr())
return conn
}

func (tester *routerTester) readPendingGauge(from, to, reason string) float64 {
val, err := metrics.ReadGauge(metrics.PendingMigrateGuage.WithLabelValues(from, to, reason))
require.NoError(tester.t, err)
return val
}

func (tester *routerTester) clear() {
tester.backendID = 0
tester.connID = 0
Expand Down Expand Up @@ -283,6 +311,114 @@ func TestRebalance(t *testing.T) {
tester.checkRedirectingNum(20)
}

// Test the race that the connection is closed after the redirection succeeds on the connection side
// but before the router processes the redirect result. The connection must be removed from the
// original backend's connList, otherwise it leaks in the connList forever, which inflates b_conn
// metrics and distorts the balance.
func TestCloseRaceWithRedirectSucceed(t *testing.T) {
tester := newRouterTester(t, nil)
conn := tester.prepareRedirectingConn()
reason := tester.router.getConnWrapper(conn).Value.redirectReason
pendingDuringRedirect := tester.readPendingGauge("1", "2", reason)

// The redirection succeeds on the connection side, and then the connection is closed
// before the router receives OnRedirectSucceed.
conn.redirectSucceed()
require.NoError(t, tester.router.OnConnClosed(conn.from.Addr(), conn))
delete(tester.conns, conn.connID)
// Now the router processes the redirect result.
require.NoError(t, tester.router.OnRedirectSucceed("1", "2", conn))

tester.checkNoConnLeak()
tester.checkBackendConnMetrics()
require.Equal(t, pendingDuringRedirect-1, tester.readPendingGauge("1", "2", reason))
}

// Test the race that the connection is closed after the redirection fails on the connection side
// but before the router processes the redirect result. The connScores moved by the redirection
// must be moved back, otherwise they drift away permanently.
func TestCloseRaceWithRedirectFail(t *testing.T) {
tester := newRouterTester(t, nil)
conn := tester.prepareRedirectingConn()
reason := tester.router.getConnWrapper(conn).Value.redirectReason
pendingDuringRedirect := tester.readPendingGauge("1", "2", reason)

// The redirection fails on the connection side, and then the connection is closed
// before the router receives OnRedirectFail.
conn.redirectFail()
require.NoError(t, tester.router.OnConnClosed(conn.from.Addr(), conn))
delete(tester.conns, conn.connID)
// Now the router processes the redirect result.
require.NoError(t, tester.router.OnRedirectFail("1", "2", conn))

tester.checkNoConnLeak()
tester.checkBackendConnMetrics()
require.Equal(t, pendingDuringRedirect-1, tester.readPendingGauge("1", "2", reason))
}

// Test the race that the connection is closed before the connection side processes the redirect
// signal. OnConnClosed reverts the redirection, and the aborted redirect result that arrives later
// must not revert it again, otherwise the connScores and the pending gauge are decreased twice.
func TestCloseRaceWithRedirectAborted(t *testing.T) {
tester := newRouterTester(t, nil)
conn := tester.prepareRedirectingConn()
reason := tester.router.getConnWrapper(conn).Value.redirectReason
pendingDuringRedirect := tester.readPendingGauge("1", "2", reason)

// The connection is closed before the redirect signal is processed on the connection side.
require.NoError(t, tester.router.OnConnClosed(conn.from.Addr(), conn))
delete(tester.conns, conn.connID)
// The aborted redirect result arrives later and must be a no-op.
require.NoError(t, tester.router.OnRedirectFail("1", "2", conn))

tester.checkNoConnLeak()
tester.checkBackendConnMetrics()
require.Equal(t, pendingDuringRedirect-1, tester.readPendingGauge("1", "2", reason))
}

// Test that when the authenticator reconnects to another backend during the handshake,
// the registration of the previous attempt is cleaned up.
func TestReconnectDuringHandshake(t *testing.T) {
tester := newRouterTester(t, nil)
tester.addBackends(2)
conn := tester.createConn()
selector1 := tester.router.GetBackendSelector()
backend1, err := selector1.Next()
require.NoError(t, err)
require.False(t, backend1 == nil || reflect.ValueOf(backend1).IsNil())
selector1.Finish(conn, true)
// Simulate getBackendIO cleaning up before reconnecting to another backend.
require.NoError(t, tester.router.OnConnClosed(backend1.Addr(), conn))
selector2 := tester.router.GetBackendSelector()
var backend2 BackendInst
for {
backend2, err = selector2.Next()
require.NoError(t, err)
require.False(t, backend2 == nil || reflect.ValueOf(backend2).IsNil())
if backend2.Addr() != backend1.Addr() {
break
}
selector2.Finish(conn, false)
}
selector2.Finish(conn, true)
conn.from = backend2
tester.conns[conn.connID] = conn

// Only the registration of the second attempt remains.
totalConns, totalScore := 0, 0
for _, backend := range tester.router.backends {
totalConns += backend.connList.Len()
totalScore += backend.connScore
}
require.Equal(t, 1, totalConns)
require.Equal(t, 1, totalScore)
tester.checkBackendConnMetrics()

require.NoError(t, tester.router.OnConnClosed(backend2.Addr(), conn))
delete(tester.conns, conn.connID)
tester.checkNoConnLeak()
}

// Test that the connections are always balanced after rebalance and routing.
func TestConnBalanced(t *testing.T) {
tester := newRouterTester(t, nil)
Expand Down Expand Up @@ -618,7 +754,7 @@ func TestConcurrency(t *testing.T) {
from, to := conn.getAddr()
var err error
if i < 1 {
err = router.OnConnClosed(from, conn.GetRedirectingAddr(), conn)
err = router.OnConnClosed(from, conn)
conn = nil
} else if i < 3 {
conn.redirectFail()
Expand All @@ -634,7 +770,7 @@ func TestConcurrency(t *testing.T) {
if i < 2 {
// The balancer may happen to redirect it concurrently - that's exactly what may happen.
from, _ := conn.getAddr()
err := router.OnConnClosed(from, conn.GetRedirectingAddr(), conn)
err := router.OnConnClosed(from, conn)
require.NoError(t, err)
conn = nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/balance/router/router_static.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (r *StaticRouter) OnRedirectFail(from, to string, conn RedirectableConn) er
return nil
}

func (r *StaticRouter) OnConnClosed(addr, redirectingAddr string, conn RedirectableConn) error {
func (r *StaticRouter) OnConnClosed(backendAddr string, conn RedirectableConn) error {
r.cnt--
return nil
}
Expand Down
Loading