Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
110 commits
Select commit Hold shift + click to select a range
56fda78
buf plugin draft
pompon0 Dec 10, 2025
9e34cae
plugin has been placed
pompon0 Dec 10, 2025
e4ecde6
fmt
pompon0 Dec 11, 2025
29364aa
WIP
pompon0 Dec 11, 2025
e74e728
separate module
pompon0 Dec 12, 2025
3f5798b
added canonical encoding
pompon0 Dec 12, 2025
f8753b7
IsHashable
pompon0 Dec 12, 2025
510c3b9
fix and test
pompon0 Dec 12, 2025
d1b30ff
test updates
pompon0 Dec 12, 2025
9ad06cf
everything in place
pompon0 Dec 12, 2025
117563c
go fmt
pompon0 Dec 12, 2025
c27f7e6
Merge remote-tracking branch 'origin/main' into gprusak-proto
pompon0 Dec 12, 2025
77dc3a7
Merge branch 'main' into gprusak-proto
pompon0 Dec 12, 2025
4c7826a
changed location of the plugin
pompon0 Dec 12, 2025
24a2e25
Merge branch 'gprusak-proto' of https://github.com/sei-protocol/sei-c…
pompon0 Dec 12, 2025
118d566
nits
pompon0 Dec 12, 2025
0189653
panic msg
pompon0 Dec 12, 2025
c0a0ec5
moved all hashable stuff together
pompon0 Dec 15, 2025
15faaa5
moved testonly proto to internal
pompon0 Dec 15, 2025
9794896
fmt
pompon0 Dec 15, 2025
a82ac92
moved under internal
pompon0 Dec 15, 2025
50ae626
comment fix
pompon0 Dec 15, 2025
6160058
Merge branch 'main' into gprusak-proto
pompon0 Dec 15, 2025
51c83d6
ed25519 cleanup wip
pompon0 Dec 15, 2025
e571075
missing changes
pompon0 Dec 15, 2025
bf6e999
compilation fix
pompon0 Dec 15, 2025
f435bee
Merge branch 'gprusak-proto' of https://github.com/sei-protocol/sei-c…
pompon0 Dec 15, 2025
ac44467
Merge branch 'gprusak-proto' into gprusak-validator-proto
pompon0 Dec 15, 2025
9a3edcb
wip - what about non-validator mode?
pompon0 Dec 15, 2025
28bd678
wip
pompon0 Dec 15, 2025
f007432
wip; json is shit
pompon0 Dec 16, 2025
111028e
crypto test
pompon0 Dec 17, 2025
788f2c7
cleanup of typeRegistry
pompon0 Dec 17, 2025
09a5926
test for sig
pompon0 Dec 17, 2025
45766d7
fmt
pompon0 Dec 17, 2025
b0ce59f
copied from sei-v3
pompon0 Dec 17, 2025
ba9aedd
test
pompon0 Dec 17, 2025
afcacab
codex
pompon0 Dec 17, 2025
025fd15
codex tests
pompon0 Dec 18, 2025
72a985f
optional sig
pompon0 Dec 18, 2025
5b90ba9
codex tests
pompon0 Dec 18, 2025
cd7ed89
tests wip
pompon0 Dec 18, 2025
efd3576
Merge remote-tracking branch 'origin/main' into gprusak-validator-proto
pompon0 Dec 18, 2025
a9975f7
tendermint tests pass
pompon0 Dec 18, 2025
e866fc2
all tests compile
pompon0 Dec 18, 2025
e0043d0
fmt
pompon0 Dec 18, 2025
50d521b
Merge remote-tracking branch 'origin/main' into gprusak-validator-proto
pompon0 Dec 18, 2025
7aacf5b
moved roles to separate pr
pompon0 Dec 18, 2025
408f6b4
gofmt
pompon0 Dec 18, 2025
469c57d
stupid encoder
pompon0 Dec 18, 2025
b9c22b9
cachingVerifier
pompon0 Dec 18, 2025
adfabe9
Revert "moved roles to separate pr"
pompon0 Dec 19, 2025
b2ef6a7
partial refactor of SecretConnection
pompon0 Dec 19, 2025
1cd257d
slight refactor of secret connection
pompon0 Dec 22, 2025
cf932f1
dst for ed25519
pompon0 Dec 22, 2025
2fd8910
tm tests compile
pompon0 Dec 22, 2025
124731f
fmt
pompon0 Dec 22, 2025
2b6d4e2
regenerated
pompon0 Jan 1, 2026
4cf6982
Merge remote-tracking branch 'origin/main' into gprusak-validator-proto
pompon0 Jan 1, 2026
2a4d507
Merge branch 'gprusak-validator-proto' into gprusak-validator-proto2
pompon0 Jan 1, 2026
531b712
lint
pompon0 Jan 1, 2026
27e9f94
reverted validator
pompon0 Jan 1, 2026
4de17dc
generate
pompon0 Jan 1, 2026
5298da1
wip
pompon0 Jan 5, 2026
026297c
wip
pompon0 Jan 6, 2026
920f05e
mux draft completed
pompon0 Jan 7, 2026
5bf491d
wip
pompon0 Jan 8, 2026
837e5d2
wip
pompon0 Jan 8, 2026
4a645fc
compiles
pompon0 Jan 8, 2026
c2d3d4d
moved buffering to secret connection
pompon0 Jan 8, 2026
18da48d
fixed tests
pompon0 Jan 8, 2026
5c6f996
fixed stream
pompon0 Jan 8, 2026
b645476
fmt
pompon0 Jan 8, 2026
2e69555
flush in transport
pompon0 Jan 8, 2026
68f5a8e
race in test
pompon0 Jan 8, 2026
8ce00ec
fuzz fix
pompon0 Jan 9, 2026
ce1ad2f
test fix
pompon0 Jan 9, 2026
a5f71d9
mux fixes
pompon0 Jan 9, 2026
386af59
moved buffering to secret conn
pompon0 Jan 9, 2026
51e5092
Merge branch 'gprusak-secret-buf' into gprusak-mux
pompon0 Jan 9, 2026
29dfe38
happy path test draft
pompon0 Jan 9, 2026
22dd8ae
zeroizing the frame
pompon0 Jan 9, 2026
be2ce75
debugging test
pompon0 Jan 12, 2026
28db43c
debugging test
pompon0 Jan 12, 2026
f468f2c
wip
pompon0 Jan 12, 2026
1ec6e27
wip
pompon0 Jan 12, 2026
d473747
happy path works
pompon0 Jan 12, 2026
5591e7a
mux protocol test left
pompon0 Jan 12, 2026
7c19ef3
Merge remote-tracking branch 'origin/main' into gprusak-validator-proto2
pompon0 Jan 12, 2026
37e8e6e
Merge branch 'gprusak-validator-proto2' into gprusak-secret-buf
pompon0 Jan 12, 2026
6ce3309
more tests
pompon0 Jan 13, 2026
5779960
fmt
pompon0 Jan 13, 2026
304e42d
Merge branch 'gprusak-secret-buf' into gprusak-mux
pompon0 Jan 13, 2026
650faba
Merge remote-tracking branch 'origin/main' into gprusak-mux
pompon0 Jan 13, 2026
8ce8e0d
fixed flake
pompon0 Jan 13, 2026
723d44e
buf format -w
pompon0 Jan 13, 2026
49ceed7
mux tests
pompon0 Jan 20, 2026
d8e099f
fixed mux tests
pompon0 Jan 20, 2026
dc8c897
missing files
pompon0 Jan 20, 2026
03ae930
removed unused proto message
pompon0 Jan 20, 2026
bb17648
moved p2p messages
pompon0 Jan 20, 2026
1315244
fixed fuzz targets
pompon0 Jan 20, 2026
660d2d5
gofmt
pompon0 Jan 20, 2026
1e3551b
compilation fix
pompon0 Jan 20, 2026
f3e33d2
reduced gogoproto usage
pompon0 Jan 20, 2026
b7d617a
applied comments
pompon0 Jan 21, 2026
be8e4f1
Merge remote-tracking branch 'origin/main' into gprusak-mux
pompon0 Jan 21, 2026
94c5e09
flake fix
pompon0 Jan 21, 2026
e71f82c
Merge remote-tracking branch 'origin/main' into gprusak-mux
pompon0 Jan 21, 2026
70e2a97
Merge branch 'gprusak-mux' into gprusak-tcp
pompon0 Jan 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 29 additions & 50 deletions sei-tendermint/internal/mux/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/tendermint/tendermint/libs/utils"
"github.com/tendermint/tendermint/libs/utils/scope"
"google.golang.org/protobuf/proto"
"io"
)

const handshakeMaxSize = 10 * 1024 // 10kB
Expand All @@ -46,8 +45,6 @@ var errTooManyMsgs = errors.New("too many messages")
var errTooLargeMsg = errors.New("message too large")
var errUnknownKind = errors.New("unknown kind")

type errConn struct{ error }

type Config struct {
// Maximal number of bytes in a frame (excluding header).
FrameSize uint64
Expand Down Expand Up @@ -127,15 +124,13 @@ func newRunner(mux *Mux) *runner {
// If the stream does not exist yet, it tries to create it as an accept (inbound) stream.
// In that case the inbound stream limit for the given kind is checked.
func (r *runner) getOrAccept(id streamID, kind StreamKind) (*streamState, error) {
fmt.Printf("getOrAccept(%v)\n", id)
for inner := range r.inner.RLock() {
s, ok := inner.streams[id]
if ok {
return s, nil
}
}
for inner := range r.inner.Lock() {
fmt.Printf("accepting stream %v\n", id)
if id.isConnect() {
return nil, errUnknownStream
}
Expand Down Expand Up @@ -220,36 +215,36 @@ func (r *runner) runSend(ctx context.Context, conn conn.Conn) error {
if err != nil {
panic(err)
}
if _, err := conn.Write([]byte{byte(len(headerRaw))}); err != nil {
return errConn{err}
if err := conn.Write(ctx, []byte{byte(len(headerRaw))}); err != nil {
return err
}
if _, err := conn.Write(headerRaw); err != nil {
return errConn{err}
if err := conn.Write(ctx, headerRaw); err != nil {
return err
}
if _, err := conn.Write(f.Payload); err != nil {
return errConn{err}
if err := conn.Write(ctx, f.Payload); err != nil {
return err
}
}
if flush {
if err := conn.Flush(); err != nil {
return errConn{err}
if err := conn.Flush(ctx); err != nil {
return err
}
}
}
}

// runRecv receives and processes the incoming frames sequentially.
func (r *runner) runRecv(conn conn.Conn) error {
func (r *runner) runRecv(ctx context.Context, conn conn.Conn) error {
for {
// frame size is hard capped here at 255B.
// Currently we have 7 varint fields (up to 77B)
var headerSize [1]byte
if _, err := conn.Read(headerSize[:]); err != nil {
return errConn{err}
if err := conn.Read(ctx, headerSize[:]); err != nil {
return err
}
headerRaw := make([]byte, headerSize[0])
if _, err := io.ReadFull(conn, headerRaw[:]); err != nil {
return errConn{err}
if err := conn.Read(ctx, headerRaw[:]); err != nil {
return err
}
var h pb.Header
if err := proto.Unmarshal(headerRaw, &h); err != nil {
Expand All @@ -263,7 +258,6 @@ func (r *runner) runRecv(conn conn.Conn) error {
return err
}
for sInner := range s.inner.Lock() {
fmt.Printf("sInner.closed.remote[%v] = %v, %v\n", s.id, sInner.closed.remote, &h)
if sInner.closed.remote {
return errFrameAfterClose
}
Expand All @@ -286,7 +280,7 @@ func (r *runner) runRecv(conn conn.Conn) error {
}
// Read the payload.
payload := make([]byte, ps)
if _, err := io.ReadFull(conn, payload[:]); err != nil {
if err := conn.Read(ctx, payload[:]); err != nil {
return err
}
s.RemotePayload(payload)
Expand All @@ -307,52 +301,37 @@ func (r *runner) runRecv(conn conn.Conn) error {

// Run runs the multiplexer for the given connection.
// It closes the connection before return.
func (m *Mux) Run(ctx context.Context, conn conn.Conn) error {
return scope.Run(ctx, func(ctx context.Context, s scope.Scope) error {
// Close on cancel.
s.Spawn(func() error {
<-ctx.Done()
s.Cancel(ctx.Err())
conn.Close()
return nil
})

func (m *Mux) Run(ctx context.Context, conn conn.Conn) (err error) {
return utils.IgnoreCancel(scope.Run(ctx, func(ctx context.Context, s scope.Scope) error {
// Handshake exchange.
handshake, err := scope.Run1(ctx, func(ctx context.Context, s scope.Scope) (*handshake, error) {
s.Spawn(func() error {
handshakeRaw, err := proto.Marshal(handshakeConv.Encode(&handshake{Kinds: m.cfg.Kinds}))
if err != nil {
panic(err)
}
handshakeRaw := handshakeConv.Marshal(&handshake{Kinds: m.cfg.Kinds})
sizeRaw := binary.LittleEndian.AppendUint32(nil, uint32(len(handshakeRaw)))
if _, err := conn.Write(sizeRaw); err != nil {
return errConn{err}
if err := conn.Write(ctx, sizeRaw); err != nil {
return err
}
if _, err := conn.Write(handshakeRaw); err != nil {
return errConn{err}
if err := conn.Write(ctx, handshakeRaw); err != nil {
return err
}
if err := conn.Flush(); err != nil {
return errConn{err}
if err := conn.Flush(ctx); err != nil {
return err
}
return nil
})
var sizeRaw [4]byte
if _, err := io.ReadFull(conn, sizeRaw[:]); err != nil {
return nil, errConn{err}
if err := conn.Read(ctx, sizeRaw[:]); err != nil {
return nil, err
}
size := binary.LittleEndian.Uint32(sizeRaw[:])
if size > handshakeMaxSize {
return nil, fmt.Errorf("handshake too large")
}
handshakeRaw := make([]byte, size)
if _, err := io.ReadFull(conn, handshakeRaw[:]); err != nil {
return nil, errConn{err}
}
var handshakeProto pb.Handshake
if err := proto.Unmarshal(handshakeRaw, &handshakeProto); err != nil {
if err := conn.Read(ctx, handshakeRaw[:]); err != nil {
return nil, err
}
return handshakeConv.Decode(&handshakeProto)
return handshakeConv.Unmarshal(handshakeRaw)
})
if err != nil {
return err
Expand All @@ -374,9 +353,9 @@ func (m *Mux) Run(ctx context.Context, conn conn.Conn) error {
}
// Run the tasks.
s.Spawn(func() error { return r.runSend(ctx, conn) })
s.Spawn(func() error { return r.runRecv(conn) })
s.Spawn(func() error { return r.runRecv(ctx, conn) })
return nil
})
}))
}

// queue is a queue of frames to send, consumed by runSend.
Expand Down
69 changes: 32 additions & 37 deletions sei-tendermint/internal/mux/mux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"github.com/tendermint/tendermint/crypto/ed25519"
"github.com/tendermint/tendermint/internal/p2p/conn"
"github.com/tendermint/tendermint/libs/utils"
"github.com/tendermint/tendermint/libs/utils/require"
Expand All @@ -14,25 +13,21 @@ import (
"testing"
)

// Ignores cancellation and connection errors.
// Wrap Mux.Run() calls in this - in tests we manage both ends of the connection,
// so there is a race condition between disconnects and cancellation, when test
// is shutting down.
func ignoreDisconnect(err error) error {
if utils.ErrorAs[errConn](err).IsPresent() {
return nil
}
return utils.IgnoreCancel(err)
}

func testConn(t *testing.T) (*conn.SecretConnection, *conn.SecretConnection) {
func spawnBgPipe(ctx context.Context, s scope.Scope) (*conn.SecretConnection, *conn.SecretConnection) {
c1, c2 := tcp.TestPipe()
s.SpawnBg(func() error {
_ = c1.Run(ctx)
return nil
})
s.SpawnBg(func() error {
_ = c2.Run(ctx)
return nil
})
var scs [2]*conn.SecretConnection
utils.OrPanic(scope.Run(t.Context(), func(ctx context.Context, s scope.Scope) error {
utils.OrPanic(scope.Run(ctx, func(ctx context.Context, s scope.Scope) error {
for i, c := range utils.Slice(c1, c2) {
t.Cleanup(func() { c.Close() })
s.Spawn(func() error {
scs[i] = utils.OrPanic1(conn.MakeSecretConnection(ctx, c, ed25519.GenerateSecretKey()))
scs[i] = utils.OrPanic1(conn.MakeSecretConnection(ctx, c))
return nil
})
}
Expand All @@ -53,7 +48,7 @@ func transform(msg []byte) []byte {
}

func runServer(ctx context.Context, rng utils.Rng, mux *Mux) error {
return scope.Run(ctx, func(ctx context.Context, s scope.Scope) error {
return utils.IgnoreCancel(scope.Run(ctx, func(ctx context.Context, s scope.Scope) error {
for kind := range mux.cfg.Kinds {
_ = rng.Split()
s.Spawn(func() error {
Expand Down Expand Up @@ -94,7 +89,7 @@ func runServer(ctx context.Context, rng utils.Rng, mux *Mux) error {
})
}
return nil
})
}))
}

type clientSet struct {
Expand Down Expand Up @@ -230,13 +225,13 @@ func runClients(ctx context.Context, rng utils.Rng, mux *Mux) error {
func TestHappyPath(t *testing.T) {
rng := utils.TestRng()
kindCount := 5
c1, c2 := testConn(t)
err := scope.Run(t.Context(), func(ctx context.Context, s scope.Scope) error {
c1, c2 := spawnBgPipe(ctx, s)
for _, c := range utils.Slice(c1, c2) {
mux := makeMux(rng, kindCount)
serverRng := rng.Split()
s.SpawnBgNamed("mux", func() error { return ignoreDisconnect(mux.Run(ctx, c)) })
s.SpawnBgNamed("server", func() error { return utils.IgnoreCancel(runServer(ctx, serverRng, mux)) })
s.SpawnBgNamed("mux", func() error { return mux.Run(ctx, c) })
s.SpawnBgNamed("server", func() error { return runServer(ctx, serverRng, mux) })
clientRng := rng.Split()
s.SpawnNamed("client", func() error { return runClients(ctx, clientRng, mux) })
}
Expand Down Expand Up @@ -281,15 +276,15 @@ func makeConfig(kinds ...StreamKind) *Config {
}

func TestStreamKindsMismatch(t *testing.T) {
c1, c2 := testConn(t)
err := scope.Run(t.Context(), func(ctx context.Context, s scope.Scope) error {
c1, c2 := spawnBgPipe(ctx, s)
var k0, k1, k2 StreamKind = 0, 1, 2
muxs := utils.Slice(
NewMux(makeConfig(k0, k1)),
NewMux(makeConfig(k1, k2)),
)
for i, c := range utils.Slice(c1, c2) {
s.SpawnBg(func() error { return ignoreDisconnect(muxs[i].Run(ctx, c)) })
s.SpawnBg(func() error { return muxs[i].Run(ctx, c) })
}
// Connecting/accepting of unconfigured kind should error.
if _, err := muxs[0].Connect(ctx, k2, 10, 10); !errors.Is(err, errUnknownKind) {
Expand Down Expand Up @@ -346,8 +341,8 @@ func TestStreamKindsMismatch(t *testing.T) {
// Test checking that closing a stream does not drop messages on the floor:
// sending and receiving still works as long as messages fit into a window.
func TestClosedStream(t *testing.T) {
c1, c2 := testConn(t)
err := scope.Run(t.Context(), func(ctx context.Context, s scope.Scope) error {
c1, c2 := spawnBgPipe(ctx, s)
kind := StreamKind(0)
window := uint64(4)
msg := []byte("hello")
Expand All @@ -356,7 +351,7 @@ func TestClosedStream(t *testing.T) {
NewMux(makeConfig(kind)),
)
for i, c := range utils.Slice(c1, c2) {
s.SpawnBg(func() error { return ignoreDisconnect(muxs[i].Run(ctx, c)) })
s.SpawnBg(func() error { return muxs[i].Run(ctx, c) })
}
s.Spawn(func() error {
// Just accept a single stream and close immediately.
Expand Down Expand Up @@ -407,14 +402,14 @@ func TestClosedStream(t *testing.T) {
}

func TestProtocol_TooLargeMsg(t *testing.T) {
c1, c2 := testConn(t)
err := scope.Run(t.Context(), func(ctx context.Context, s scope.Scope) error {
c1, c2 := spawnBgPipe(ctx, s)
kind := StreamKind(0)
maxMsgSize := uint64(10)

// Bad mux.
badMux := NewMux(makeConfig(kind))
s.SpawnBg(func() error { return ignoreDisconnect(badMux.Run(ctx, c1)) })
s.SpawnBg(func() error { return badMux.Run(ctx, c1) })
s.SpawnBg(func() error {
t.Log("Connect stream.")
stream, err := badMux.Connect(ctx, kind, 0, 0)
Expand Down Expand Up @@ -455,15 +450,15 @@ func TestProtocol_TooLargeMsg(t *testing.T) {
}

func TestProtocol_TooManyMsgs(t *testing.T) {
c1, c2 := testConn(t)
err := scope.Run(t.Context(), func(ctx context.Context, s scope.Scope) error {
c1, c2 := spawnBgPipe(ctx, s)
kind := StreamKind(0)
maxMsgSize := uint64(10)
window := uint64(3)

// Bad mux.
badMux := NewMux(makeConfig(kind))
s.SpawnBg(func() error { return ignoreDisconnect(badMux.Run(ctx, c1)) })
s.SpawnBg(func() error { return badMux.Run(ctx, c1) })
s.SpawnBg(func() error {
t.Log("Connect stream.")
stream, err := badMux.Connect(ctx, kind, 0, 0)
Expand Down Expand Up @@ -508,14 +503,14 @@ func TestProtocol_TooManyMsgs(t *testing.T) {
}

func TestProtocol_FrameAfterClose(t *testing.T) {
c1, c2 := testConn(t)
err := scope.Run(t.Context(), func(ctx context.Context, s scope.Scope) error {
c1, c2 := spawnBgPipe(ctx, s)
kind := StreamKind(0)

// Bad mux.
badMux := NewMux(makeConfig(kind))
maxMsgSize := uint64(10)
s.SpawnBg(func() error { return ignoreDisconnect(badMux.Run(ctx, c1)) })
s.SpawnBg(func() error { return badMux.Run(ctx, c1) })
s.SpawnBg(func() error {
t.Log("Connect stream.")
stream, err := badMux.Connect(ctx, kind, 0, 0)
Expand Down Expand Up @@ -566,13 +561,13 @@ func TestProtocol_FrameAfterClose(t *testing.T) {
}

func TestProtocol_TooManyAccepts(t *testing.T) {
c1, c2 := testConn(t)
err := scope.Run(t.Context(), func(ctx context.Context, s scope.Scope) error {
c1, c2 := spawnBgPipe(ctx, s)
kind := StreamKind(0)

// Bad mux.
badMux := NewMux(makeConfig(kind))
s.SpawnBg(func() error { return ignoreDisconnect(badMux.Run(ctx, c1)) })
s.SpawnBg(func() error { return badMux.Run(ctx, c1) })
s.SpawnBg(func() error {
// Artificially connect too many streams.
for queue, ctrl := range badMux.queue.Lock() {
Expand All @@ -599,13 +594,13 @@ func TestProtocol_TooManyAccepts(t *testing.T) {
}

func TestProtocol_UnknownStream(t *testing.T) {
c1, c2 := testConn(t)
err := scope.Run(t.Context(), func(ctx context.Context, s scope.Scope) error {
c1, c2 := spawnBgPipe(ctx, s)
kind := StreamKind(0)

// Bad mux.
badMux := NewMux(makeConfig(kind))
s.SpawnBg(func() error { return ignoreDisconnect(badMux.Run(ctx, c1)) })
s.SpawnBg(func() error { return badMux.Run(ctx, c1) })
s.SpawnBg(func() error {
// Artificially accept a stream without connect.
for queue, ctrl := range badMux.queue.Lock() {
Expand All @@ -630,14 +625,14 @@ func TestProtocol_UnknownStream(t *testing.T) {
}

func TestProtocol_UnknownKind(t *testing.T) {
c1, c2 := testConn(t)
err := scope.Run(t.Context(), func(ctx context.Context, s scope.Scope) error {
c1, c2 := spawnBgPipe(ctx, s)
kind := StreamKind(0)
badKind := StreamKind(1)

// Bad mux.
badMux := NewMux(makeConfig(kind))
s.SpawnBg(func() error { return ignoreDisconnect(badMux.Run(ctx, c1)) })
s.SpawnBg(func() error { return badMux.Run(ctx, c1) })
s.SpawnBg(func() error {
// Artificially connect a stream of unknown kind.
for queue, ctrl := range badMux.queue.Lock() {
Expand Down
Loading
Loading