diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..01de66d --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,29 @@ +# AGENTS.md + +someguy is a server implementing the [Delegated Routing V1 HTTP API](https://specs.ipfs.tech/routing/http-routing-v1/). +It proxies requests to the Amino DHT and other delegated routing endpoints. It +is a caching proxy, not a libp2p node. + +## Build and test + +```bash +go build ./... +go test ./... +``` + +Run `gofmt` and `go vet ./...` before committing. + +## Code map + +- `main.go`, `server.go`: CLI entry point, host and router wiring. +- `server_routers.go`: router composition (`composableRouter`, `parallelRouter`, `libp2pRouter`, `sanitizeRouter`). +- `server_cached_router.go`, `cached_addr_book.go`: address caching layer. +- `server_dht.go`: DHT setup (standard and accelerated). +- `server_delegated_routing.go`: delegated HTTP routing clients. + +## Documentation + +- [environment-variables.md](docs/environment-variables.md): all config flags and environment variables +- [peer-address-caching.md](docs/peer-address-caching.md): how `/providers` and `/peers` cache and refresh peer addresses +- [metrics.md](docs/metrics.md): Prometheus metrics +- [tracing.md](docs/tracing.md): OpenTelemetry tracing diff --git a/CHANGELOG.md b/CHANGELOG.md index c7bf962..e0983ad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,10 @@ The following emojis are used to highlight certain changes: ### Fixed +- `/routing/v1/peers/{peerid}` now serves addresses cache-first, the same way `/routing/v1/providers/{cid}` does. It answers from the cached address book and host peerstore before falling back to a DHT lookup, so a relay-dependent peer that is absent from peer routing but recently seen as a provider is no longer answered with an empty result. See [`docs/peer-address-caching.md`](https://github.com/ipfs/someguy/blob/main/docs/peer-address-caching.md). +- A completed identify now prunes a peer's cached addresses down to its current advertised set (signed peer record or identify listen addresses) plus any live-connection address, instead of unioning forever. This stops stale certhashes, dead relay circuits, and rotated NAT ports from accumulating across provider lookups and gossip. +- Multiaddrs in `/routing/v1` responses are returned in a stable sorted order. They previously came back in nondeterministic order, so repeated requests for the same peer or provider returned the same addresses shuffled differently. + ### Security ## [v0.13.0] - 2026-05-26 diff --git a/README.md b/README.md index 63a2172..46fd2fe 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ [![GitHub Release](https://img.shields.io/github/v/release/ipfs/someguy?filter=!*rc*)](https://github.com/ipfs/someguy/releases) [![Go Reference](https://pkg.go.dev/badge/github.com/ipfs/someguy.svg)](https://pkg.go.dev/github.com/ipfs/someguy) -Someguy is an [HTTP Delegated Routing V1](https://specs.ipfs.tech/routing/http-routing-v1/) server that proxies requests to the [Amino DHT](https://docs.ipfs.tech/concepts/glossary/#amino) and other Delegated Routing servers such as the [Network Indexer](https://cid.contact). +Someguy is an [HTTP Delegated Routing V1](https://specs.ipfs.tech/routing/http-routing-v1/) server that proxies requests to the [Amino DHT](https://docs.ipfs.tech/concepts/glossary/#amino) and other [delegated routing servers](https://specs.ipfs.tech/routing/http-routing-v1/). [Shipyard](https://ipshipyard.com/) also runs a [public Someguy instance](https://docs.ipfs.tech/concepts/public-utilities/#delegated-routing-endpoint) at `https://delegated-ipfs.dev/routing/v1`. @@ -53,7 +53,7 @@ Run `someguy` as a client or as a server. ### Server -Start the server with `someguy start`. By default it proxies requests to the [IPFS Amino DHT](https://blog.ipfs.tech/2023-09-amino-refactoring/) and the [cid.contact](https://cid.contact) indexer (IPNI) node. +Start the server with `someguy start`. By default it proxies requests to the [IPFS Amino DHT](https://blog.ipfs.tech/2023-09-amino-refactoring/) and other [Delegated Routing V1](https://specs.ipfs.tech/routing/http-routing-v1/) servers. For more details, run `someguy start --help`. @@ -84,6 +84,13 @@ someguy start --ipns-endpoints https://example.com See [environment-variables.md](docs/environment-variables.md) for URL formats and configuration details. +## Documentation + +- [environment-variables.md](docs/environment-variables.md): all config flags and environment variables +- [peer-address-caching.md](docs/peer-address-caching.md): how `/providers` and `/peers` cache and refresh peer addresses +- [metrics.md](docs/metrics.md): Prometheus metrics +- [tracing.md](docs/tracing.md): OpenTelemetry tracing + ## Deployment For self-hosting, run the [prebuilt Docker image](#docker). diff --git a/cached_addr_book.go b/cached_addr_book.go index b68d2ca..8b37a22 100644 --- a/cached_addr_book.go +++ b/cached_addr_book.go @@ -15,6 +15,7 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/libp2p/go-libp2p/core/record" "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem" "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay" ma "github.com/multiformats/go-multiaddr" @@ -92,7 +93,8 @@ type peerState struct { } type cachedAddrBook struct { - addrBook peerstore.AddrBook // memory address book + addrBook peerstore.AddrBook // someguy's own address book: durable, probed, written here + hostPeerstore peerstore.AddrBook // libp2p host peerstore, DHT-populated, read-only fallback peerCache *lru.Cache[peer.ID, peerState] // LRU cache with additional metadata about peer probingEnabled bool isProbing atomic.Bool @@ -102,6 +104,18 @@ type cachedAddrBook struct { type AddrBookOption func(*cachedAddrBook) error +// WithHostPeerstore lets GetCachedAddrs fall back to the libp2p host peerstore, +// which go-libp2p-kad-dht populates with provider addresses during +// FindProviders (under a short TempAddrTTL). This catches peers seen very +// recently as providers that have not yet been copied into someguy's own +// longer-lived address book. +func WithHostPeerstore(ps peerstore.AddrBook) AddrBookOption { + return func(cab *cachedAddrBook) error { + cab.hostPeerstore = ps + return nil + } +} + func WithAllowPrivateIPs() AddrBookOption { return func(cab *cachedAddrBook) error { cab.allowPrivateIPs = true @@ -185,20 +199,24 @@ func (cab *cachedAddrBook) background(ctx context.Context, host host.Host) { peerStateSize.Set(float64(cab.peerCache.Len())) // update metric ttl := cab.getTTL(host.Network().Connectedness(ev.Peer)) - if ev.SignedPeerRecord != nil { - logger.Debug("Caching signed peer record") - cab, ok := peerstore.GetCertifiedAddrBook(cab.addrBook) - if ok { - _, err := cab.ConsumePeerRecord(ev.SignedPeerRecord, ttl) - if err != nil { - logger.Warnf("failed to consume signed peer record: %v", err) - } + + // A completed identify reports the peer's current advertised + // addresses, which supersede the set accumulated from provider + // records, DHT gossip, and earlier identifies. Replace the + // stored set instead of unioning so stale certhashes, dead + // relay circuits, and rotated NAT ports do not pile up. + // + // Drop the remote addresses of inbound connections: that is the + // peer's ephemeral source port, which nobody can dial back to, + // so caching it would reintroduce exactly the junk this prune + // removes. Outbound (and direction-unknown) remotes are kept. + var connAddrs []ma.Multiaddr + for _, c := range host.Network().ConnsToPeer(ev.Peer) { + if c.Stat().Direction != network.DirInbound { + connAddrs = append(connAddrs, c.RemoteMultiaddr()) } - } else { - logger.Debug("No signed peer record, caching listen addresses") - // We don't have a signed peer record, so we use the listen addresses - cab.addrBook.AddAddrs(ev.Peer, ev.ListenAddrs, ttl) } + cab.replacePeerAddrs(ev.Peer, ev.SignedPeerRecord, ev.ListenAddrs, connAddrs, ttl) case event.EvtPeerConnectednessChanged: // If the peer is not connected or limited, we update the TTL if !hasValidConnectedness(ev.Connectedness) { @@ -221,6 +239,52 @@ func (cab *cachedAddrBook) background(ctx context.Context, host host.Host) { } } +// replacePeerAddrs replaces p's stored addresses with the authoritative set +// from a completed identify: the signed peer record, else the identify listen +// addresses, plus any live-connection address so an active session is kept. +// +// Clearing first drops addresses absent from the current set (stale certhashes, +// dead relay circuits, rotated NAT ports) instead of letting them linger to TTL. +// +// libp2p/go-libp2p#3487 does the same prune inside ConsumePeerRecord (not yet +// in the pinned version); re-adding the same set here stays correct once it +// lands, so a dependency bump will not regress this. +func (cab *cachedAddrBook) replacePeerAddrs(p peer.ID, signed *record.Envelope, listenAddrs, connAddrs []ma.Multiaddr, ttl time.Duration) { + // Nothing authoritative to apply. Return before clearing so an identify that + // carried no usable addresses never wipes a peer's existing cached set. + if signed == nil && len(listenAddrs) == 0 && len(connAddrs) == 0 { + return + } + + // Drop the accumulated set so addresses absent from the current advertised + // set are removed instead of unioned. + cab.addrBook.ClearAddrs(p) + + accepted := false + if signed != nil { + if certBook, ok := peerstore.GetCertifiedAddrBook(cab.addrBook); ok { + ok, err := certBook.ConsumePeerRecord(signed, ttl) + if err != nil { + logger.Warnf("failed to consume signed peer record: %v", err) + } + accepted = ok + } + } + if !accepted { + // No signed record, no certified addr book, or the record was rejected + // (e.g. a sequence-number check in some go-libp2p version). Fall back to + // the identify listen addresses so the clear never leaves the peer with + // zero addresses. + cab.addrBook.AddAddrs(p, listenAddrs, ttl) + } + + // Preserve live-connection addresses at the connected TTL even when absent + // from the advertised set, so an active session is never dropped. + if len(connAddrs) > 0 { + cab.addrBook.AddAddrs(p, connAddrs, ConnectedAddrTTL) + } +} + // Loops over all peers with addresses and probes them if they haven't been probed recently func (cab *cachedAddrBook) probePeers(ctx context.Context, host host.Host) { defer cab.isProbing.Store(false) @@ -286,6 +350,13 @@ func (cab *cachedAddrBook) probePeers(ctx context.Context, host host.Host) { func (cab *cachedAddrBook) GetCachedAddrs(p peer.ID) []types.Multiaddr { cachedAddrs := cab.addrBook.Addrs(p) + // Fall back to the host peerstore, which the DHT fills with provider + // addresses during FindProviders (short TempAddrTTL). Lets peer routing + // serve a peer seen as a provider moments ago but absent from peer routing. + if len(cachedAddrs) == 0 && cab.hostPeerstore != nil { + cachedAddrs = cab.hostPeerstore.Addrs(p) + } + if len(cachedAddrs) == 0 { return nil } @@ -297,6 +368,32 @@ func (cab *cachedAddrBook) GetCachedAddrs(p peer.ID) []types.Multiaddr { return result } +// CacheAddrs stores addresses observed for a peer outside of a direct +// connection (e.g. embedded in a provider record returned by FindProviders) so +// that later peer-routing lookups can serve them from the same peerbook. +// Private addresses are dropped unless explicitly allowed. These addresses are +// unverified, so they are stored with the recently-connected TTL and will be +// confirmed or evicted by the probe loop. +func (cab *cachedAddrBook) CacheAddrs(p peer.ID, addrs []types.Multiaddr) { + if len(addrs) == 0 { + return + } + + maddrs := make([]ma.Multiaddr, 0, len(addrs)) + for _, addr := range addrs { + if !cab.allowPrivateIPs && !manet.IsPublicAddr(addr.Multiaddr) { + continue + } + maddrs = append(maddrs, addr.Multiaddr) + } + + if len(maddrs) == 0 { + return + } + + cab.addrBook.AddAddrs(p, maddrs, cab.recentlyConnectedTTL) +} + // Update the peer cache with information about a failed connection // This should be called when a connection attempt to a peer fails func (cab *cachedAddrBook) RecordFailedConnection(p peer.ID) { diff --git a/cached_addr_book_test.go b/cached_addr_book_test.go index 004f54a..2278d30 100644 --- a/cached_addr_book_test.go +++ b/cached_addr_book_test.go @@ -10,7 +10,9 @@ import ( "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/p2p/host/eventbus" + "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem" ma "github.com/multiformats/go-multiaddr" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -25,6 +27,114 @@ func TestCachedAddrBook(t *testing.T) { require.NotNil(t, cab.addrBook) } +func TestGetCachedAddrsHostPeerstoreFallback(t *testing.T) { + testPeer, err := peer.Decode("12D3KooWCZ67sU8oCvKd82Y6c9NgpqgoZYuZEUcg4upHCjK3n1aj") + require.NoError(t, err) + addr := ma.StringCast("/ip4/137.21.14.12/tcp/4001") + + t.Run("falls back to host peerstore when addrBook is empty", func(t *testing.T) { + hostPeerstore := pstoremem.NewAddrBook() + hostPeerstore.AddAddrs(testPeer, []ma.Multiaddr{addr}, peerstore.TempAddrTTL) + + cab, err := newCachedAddrBook(WithAllowPrivateIPs(), WithHostPeerstore(hostPeerstore)) + require.NoError(t, err) + + got := cab.GetCachedAddrs(testPeer) + require.Len(t, got, 1) + require.Equal(t, addr.String(), got[0].String()) + }) + + t.Run("prefers addrBook over host peerstore", func(t *testing.T) { + ownAddr := ma.StringCast("/ip4/1.2.3.4/tcp/4001") + hostPeerstore := pstoremem.NewAddrBook() + hostPeerstore.AddAddrs(testPeer, []ma.Multiaddr{addr}, peerstore.TempAddrTTL) + + cab, err := newCachedAddrBook(WithAllowPrivateIPs(), WithHostPeerstore(hostPeerstore)) + require.NoError(t, err) + cab.addrBook.AddAddrs(testPeer, []ma.Multiaddr{ownAddr}, time.Hour) + + got := cab.GetCachedAddrs(testPeer) + require.Len(t, got, 1) + require.Equal(t, ownAddr.String(), got[0].String()) + }) + + t.Run("returns nil when both are empty", func(t *testing.T) { + cab, err := newCachedAddrBook(WithAllowPrivateIPs(), WithHostPeerstore(pstoremem.NewAddrBook())) + require.NoError(t, err) + require.Nil(t, cab.GetCachedAddrs(testPeer)) + }) +} + +func TestReplacePeerAddrsPrunesStaleAddrs(t *testing.T) { + cab, err := newCachedAddrBook(WithAllowPrivateIPs()) + require.NoError(t, err) + + p := peer.ID("test-peer") + + // Seed an accumulated set, as if learned from provider records and gossip. + stale := []ma.Multiaddr{ + ma.StringCast("/ip4/1.1.1.1/tcp/4001"), + ma.StringCast("/ip4/2.2.2.2/udp/4001/quic-v1"), + } + cab.addrBook.AddAddrs(p, stale, time.Hour) + require.Len(t, cab.addrBook.Addrs(p), 2) + + // A completed identify reports a different current set (no signed record), + // plus one address held by a live connection. + current := []ma.Multiaddr{ma.StringCast("/ip4/3.3.3.3/tcp/4001")} + connAddr := ma.StringCast("/ip4/4.4.4.4/tcp/4001") + + cab.replacePeerAddrs(p, nil, current, []ma.Multiaddr{connAddr}, time.Hour) + + got := make([]string, 0) + for _, a := range cab.addrBook.Addrs(p) { + got = append(got, a.String()) + } + + // Stale addrs are gone; the current advertised addr and the live-connection + // addr remain. + require.ElementsMatch(t, []string{ + "/ip4/3.3.3.3/tcp/4001", + "/ip4/4.4.4.4/tcp/4001", + }, got) +} + +func TestReplacePeerAddrsKeepsLiveConnWhenAdvertisedSetEmpty(t *testing.T) { + cab, err := newCachedAddrBook(WithAllowPrivateIPs()) + require.NoError(t, err) + + p := peer.ID("test-peer") + connAddr := ma.StringCast("/ip4/4.4.4.4/tcp/4001") + + // Identify reported no usable listen addrs, but we hold a live connection. + cab.replacePeerAddrs(p, nil, nil, []ma.Multiaddr{connAddr}, time.Hour) + + got := make([]string, 0) + for _, a := range cab.addrBook.Addrs(p) { + got = append(got, a.String()) + } + require.Equal(t, []string{"/ip4/4.4.4.4/tcp/4001"}, got) +} + +func TestReplacePeerAddrsEmptyInputKeepsExistingAddrs(t *testing.T) { + cab, err := newCachedAddrBook(WithAllowPrivateIPs()) + require.NoError(t, err) + + p := peer.ID("test-peer") + existing := ma.StringCast("/ip4/1.1.1.1/tcp/4001") + cab.addrBook.AddAddrs(p, []ma.Multiaddr{existing}, time.Hour) + + // An identify with no signed record, no listen addrs, and no live + // connection must not wipe the peer's existing cached addresses. + cab.replacePeerAddrs(p, nil, nil, nil, time.Hour) + + got := make([]string, 0) + for _, a := range cab.addrBook.Addrs(p) { + got = append(got, a.String()) + } + require.Equal(t, []string{"/ip4/1.1.1.1/tcp/4001"}, got) +} + func TestBackground(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -229,6 +339,11 @@ func (mn *mockNetwork) Connectedness(p peer.ID) network.Connectedness { return network.NotConnected } +func (mn *mockNetwork) ConnsToPeer(p peer.ID) []network.Conn { + // No live connections in tests + return nil +} + func (mh *mockHost) EventBus() event.Bus { return mh.eventBus } diff --git a/docs/peer-address-caching.md b/docs/peer-address-caching.md new file mode 100644 index 0000000..33249fd --- /dev/null +++ b/docs/peer-address-caching.md @@ -0,0 +1,141 @@ +# Peer Address Caching + +someguy is a caching delegated routing proxy, not a libp2p node. It answers +`/routing/v1` requests by querying backends (the Amino DHT and delegated HTTP +routers) and caching what it learns. Because it serves clients rather than +participating in the network itself, it prioritizes two things over a fresh +lookup on every request: + +- **Latency.** A cached answer returns in microseconds; a DHT walk takes + seconds. +- **Stable, reachable peers.** The cache holds peers that someguy has recently + seen and actively probes, so cached addresses skew toward peers that are + online right now. + +This document describes how the address cache is filled, how it is kept fresh, +and how `/providers` and `/peers` read from it. + +> [!NOTE] +> Address caching requires a DHT-backed instance and the `--cached-addr-book` +> flag (on by default). With `--dht=disabled`, someguy is a plain proxy: +> `/providers` and `/peers` forward to the delegated HTTP endpoints and no +> address caching takes place. + +## The two stores + +someguy keeps peer addresses in two places, both consulted through one read +path (`cachedAddrBook.GetCachedAddrs`): + +| Store | Lifetime | Filled by | +| --- | --- | --- | +| **Cached address book** (`cachedAddrBook`) | 48h (`DefaultProvideValidity`), or permanent while connected | identify events, the probe loop, and addresses observed in provider records | +| **Host peerstore** (`host.Peerstore()`) | 2 minutes (`TempAddrTTL`) | the DHT, which records provider and peer addresses during its own lookups | + +The cached address book is the durable, probed store. The host peerstore is a +short-lived window onto whatever the DHT touched in the last two minutes, read +as a secondary fallback. + +## How the cache is filled and kept fresh + +```mermaid +flowchart TD + subgraph sources [Address sources] + ID[identify on connect] + PROBE[probe loop every 15m] + PROV[addrs seen in provider records] + DHT[DHT lookups] + end + + ID -->|signed peer record or listen addrs| CAB[(cached address book
TTL 48h / permanent while connected)] + PROBE -->|on success: refresh + extend TTL| CAB + PROV -->|CacheAddrs| CAB + DHT -->|TempAddrTTL 2m| PS[(host peerstore)] + + PROBE -.->|dials known addrs hourly| CAB + PROBE -.->|on repeated failure past 48h: evict| CAB +``` + +Addresses enter the cached address book whenever someguy sees a peer: a +successful connect and identify, an address embedded in a provider record +(`CacheAddrs`), or a successful probe. Each sighting resets the entry's TTL to +48 hours, so a frequently requested peer never expires. + +The probe loop runs every 15 minutes. For every cached peer not contacted in +the last hour, it dials the known addresses: + +- **Success** extends the addresses toward a permanent TTL and clears the + failure counter. +- **Failure** doubles a backoff (1h, 2h, 4h, ...). After repeated failures past + 48 hours, someguy evicts the peer. + +This keeps the cache self-healing. Online peers are reverified about hourly; +dead peers are purged. A cached answer is therefore at most about an hour stale +in the common case, which is an acceptable trade for avoiding a DHT walk per +request. + +A completed identify also prunes. Addresses otherwise only accumulate: provider +records, DHT gossip, and successive identifies each add to the union, so a peer +can collect outdated certhashes, dead relay circuits, and rotated NAT ports. +When an identify completes (organically or via a probe), someguy replaces the +peer's stored set with its current advertised addresses, taken from the signed +peer record when present and otherwise from the identify listen addresses, kept +together with any live-connection address so an active session is never dropped. +A reachable peer therefore collapses back to its current advertised set on each +refresh instead of growing without bound. + +## How each endpoint reads the cache + +Both endpoints are cache-first and share the same read path. They differ only +in shape: `/providers` streams many records, so it resolves missing addresses in +the background; `/peers` resolves a single peer, so it falls back inline. + +### `/routing/v1/providers/{cid}` + +```mermaid +flowchart TD + REQ[GET /providers/cid] --> FP[FindProviders on backends] + FP --> REC{provider record
has addrs?} + REC -->|yes| EMIT[return record] + REC -->|no| CACHE{cache hit?
book then peerstore} + CACHE -->|yes| EMIT + CACHE -->|no| DISPATCH[dispatch FindPeer
in background] + DISPATCH --> WAIT{addrs found
before stream ends?} + WAIT -->|yes| EMIT + WAIT -->|no| DROP[omit record] + EMIT --> SEEN[observed addrs cached
via CacheAddrs] +``` + +A provider record often arrives with addresses already attached, in which case +someguy returns it as is and caches the observed addresses. When a record has no +addresses, someguy consults the cache; on a miss it dispatches a background +`FindPeer` so the stream keeps flowing. Records still missing addresses when the +stream ends are dropped. + +### `/routing/v1/peers/{peerid}` + +```mermaid +flowchart TD + REQ[GET /peers/peerid] --> CACHE{cache hit?
book then peerstore} + CACHE -->|yes| EMIT[return cached record] + CACHE -->|no| FP[FindPeers on backends] + FP --> FOUND{found?} + FOUND -->|yes| ENRICH[fill missing addrs
from cache] --> EMIT2[return record] + FOUND -->|no| FAIL[record failed connection] --> NF[404 not found] +``` + +someguy checks the cache first and returns immediately on a hit, without +touching the DHT. Only on a miss does it fall back to peer routing; a record +that comes back without addresses is enriched from the cache, and a peer that is +not found is recorded for backoff before returning a 404. + +## Why cache-first + +As a proxy, someguy returns a fast answer built from peers it knows are +reachable rather than blocking every request on a DHT walk. The cache already +favors online peers through active probing, so a cache-first answer is both +faster and biased toward peers a client can actually dial. Worst-case staleness +of roughly an hour is an acceptable price, as most stable peers keep stable +addresses over such a window. + +Both endpoints follow this rule: they read the cache first and fall back to a +DHT lookup only when the cache cannot answer. diff --git a/go.mod b/go.mod index bade58d..fbb1f5a 100644 --- a/go.mod +++ b/go.mod @@ -67,6 +67,7 @@ require ( github.com/klauspost/compress v1.18.0 // indirect github.com/klauspost/cpuid/v2 v2.3.0 // indirect github.com/koron/go-ssdp v0.0.6 // indirect + github.com/kylelemons/godebug v1.1.0 // indirect github.com/libp2p/go-buffer-pool v0.1.0 // indirect github.com/libp2p/go-cidranger v1.1.0 // indirect github.com/libp2p/go-flow-metrics v0.3.0 // indirect diff --git a/server.go b/server.go index 315f4e1..812fa3d 100644 --- a/server.go +++ b/server.go @@ -174,6 +174,10 @@ func start(ctx context.Context, cfg *config) error { opts = append(opts, WithActiveProbing(cfg.cachedAddrBookActiveProbing)) + // Let the cache fall back to the host peerstore, which the DHT + // populates with provider addresses during FindProviders. + opts = append(opts, WithHostPeerstore(h.Peerstore())) + cachedAddrBook, err = newCachedAddrBook(opts...) if err != nil { return err diff --git a/server_cached_router.go b/server_cached_router.go index 7edc0b7..917558e 100644 --- a/server_cached_router.go +++ b/server_cached_router.go @@ -69,14 +69,28 @@ func (r cachedRouter) FindProviders(ctx context.Context, key cid.Cid, limit int) return iter, nil } -// FindPeers uses a simpler approach than FindProviders because we're dealing with a single PeerRecord, and there's -// no point in trying to dispatch an additional FindPeer call. +// FindPeers serves addresses cache-first, the same way FindProviders does. +// someguy is a caching routing proxy, not a libp2p node, so it favors a +// low-latency answer drawn from recently seen, actively probed peers over a +// fresh DHT walk on every request. It consults the cache first and falls back +// to peer routing only on a miss. See docs/peer-address-caching.md. func (r cachedRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[*types.PeerRecord], error) { + // Cache-first: answer from the peerbook when we already hold addresses. + if cachedAddrs := r.withAddrsFromCache(addrQueryOriginPeers, pid, nil); len(cachedAddrs) > 0 { + rec := &types.PeerRecord{ + Schema: types.SchemaPeer, + ID: &pid, + Addrs: cachedAddrs, + } + return iter.ToResultIter(iter.FromSlice([]*types.PeerRecord{rec})), nil + } + + // Cache miss: fall back to the underlying peer routing (DHT). it, err := r.router.FindPeers(ctx, pid, limit) if err == routing.ErrNotFound { - // ErrNotFound will be returned if either dialing the peer failed or the peer was not found - r.cachedAddrBook.RecordFailedConnection(pid) // record the failure used for probing/backoff purposes + // Record the failure used for probing/backoff purposes. + r.cachedAddrBook.RecordFailedConnection(pid) return nil, routing.ErrNotFound } @@ -84,9 +98,19 @@ func (r cachedRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (it return nil, err } - // update the metrics to indicate that we didn't look up the cache for this lookup - peerAddrLookups.WithLabelValues(addrCacheStateUnused, addrQueryOriginPeers).Inc() - return it, nil + // Enrich records that came back without addresses from the peerbook. + // Read the cache directly rather than via withAddrsFromCache: FindPeers + // already recorded this request's outcome through the cache-first lookup + // above, so recording here would double-count peer_addr_lookups. + return iter.Map(it, func(v iter.Result[*types.PeerRecord]) iter.Result[*types.PeerRecord] { + if v.Err != nil || v.Val == nil || v.Val.ID == nil { + return v + } + if len(v.Val.Addrs) == 0 { + v.Val.Addrs = r.cachedAddrBook.GetCachedAddrs(*v.Val.ID) + } + return v + }), nil } func (r cachedRouter) GetClosestPeers(ctx context.Context, key cid.Cid) (iter.ResultIter[*types.PeerRecord], error) { @@ -187,8 +211,16 @@ func (it *cacheFallbackIter) Next() bool { switch val.Val.GetSchema() { case types.SchemaPeer: if record, ok := val.Val.(*types.PeerRecord); ok { + hadSourceAddrs := len(record.Addrs) > 0 record.Addrs = it.router.withAddrsFromCache(it.queryOrigin, *record.ID, record.Addrs) if len(record.Addrs) > 0 { + // Remember addresses observed in provider records so a later + // FindPeers for this peer can serve them from the peerbook. + // Only cache addrs that came from the source (not ones we + // just read back from cache) to avoid pointless re-adds. + if hadSourceAddrs && it.queryOrigin == addrQueryOriginProviders { + it.router.cachedAddrBook.CacheAddrs(*record.ID, record.Addrs) + } it.current = iter.Result[types.Record]{Val: record} return true } diff --git a/server_cached_router_test.go b/server_cached_router_test.go index e43b703..8b9bf0f 100644 --- a/server_cached_router_test.go +++ b/server_cached_router_test.go @@ -10,6 +10,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/routing" "github.com/multiformats/go-multiaddr" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) @@ -88,15 +89,15 @@ func TestCachedRouter(t *testing.T) { require.Equal(t, publicAddr.String(), peerRecord.Addrs[0].String()) }) - t.Run("Failed FindPeers with cached addresses does not return cached addresses", func(t *testing.T) { + t.Run("FindPeers serves cached addresses without consulting peer routing", func(t *testing.T) { ctx := context.Background() pid := peer.ID("test-peer") - // Create mock router that returns error + // Mock router with no FindPeers expectation: a cache hit must not reach it mr := &mockRouter{} - mr.On("FindPeers", mock.Anything, pid, 10).Return(nil, routing.ErrNotFound) - // Create cached address book with test addresses + // Create cached address book with test addresses (e.g. learned from a + // prior provider lookup), the same peerbook FindProviders consults cab, err := newCachedAddrBook() require.NoError(t, err) @@ -106,6 +107,109 @@ func TestCachedRouter(t *testing.T) { // Create cached router cr := NewCachedRouter(mr, cab) + it, err := cr.FindPeers(ctx, pid, 10) + require.NoError(t, err) + + results, err := iter.ReadAllResults(it) + require.NoError(t, err) + require.Len(t, results, 1) + + // Verify cached addresses were returned cache-first + require.Equal(t, pid, *results[0].ID) + require.Len(t, results[0].Addrs, 1) + require.Equal(t, publicAddr.String(), results[0].Addrs[0].String()) + + // Peer routing must not be consulted on a cache hit + mr.AssertNotCalled(t, "FindPeers", mock.Anything, pid, 10) + }) + + t.Run("FindProviders caches observed addrs so FindPeers can serve them", func(t *testing.T) { + ctx := context.Background() + c := makeCID() + pid := peer.ID("test-peer") + publicAddr := mustMultiaddr(t, "/ip4/137.21.14.12/tcp/4001") + + // FindProviders returns a provider record with addrs embedded (as the + // DHT does), while peer routing reports the peer as not found. + mr := &mockRouter{} + provIter := newMockResultIter([]iter.Result[types.Record]{ + {Val: &types.PeerRecord{Schema: "peer", ID: &pid, Addrs: []types.Multiaddr{publicAddr}}}, + }) + mr.On("FindProviders", mock.Anything, c, 10).Return(provIter, nil) + mr.On("FindPeers", mock.Anything, pid, 10).Return(nil, routing.ErrNotFound) + + cab, err := newCachedAddrBook(WithAllowPrivateIPs()) + require.NoError(t, err) + cr := NewCachedRouter(mr, cab) + + // Drain FindProviders so the observed addrs get cached + provResults, err := cr.FindProviders(ctx, c, 10) + require.NoError(t, err) + _, err = iter.ReadAllResults(provResults) + require.NoError(t, err) + + // FindPeers misses peer routing but should now serve cached addrs + it, err := cr.FindPeers(ctx, pid, 10) + require.NoError(t, err) + results, err := iter.ReadAllResults(it) + require.NoError(t, err) + require.Len(t, results, 1) + require.Equal(t, pid, *results[0].ID) + require.Len(t, results[0].Addrs, 1) + require.Equal(t, publicAddr.String(), results[0].Addrs[0].String()) + }) + + t.Run("FindPeers enrich step does not double-count peer_addr_lookups", func(t *testing.T) { + ctx := context.Background() + pid := peer.ID("test-peer") + publicAddr := mustMultiaddr(t, "/ip4/137.21.14.12/tcp/4001") + + // Cache is empty, so FindPeers falls through to peer routing, which + // returns a record carrying its own addresses. + mr := &mockRouter{} + dhtIter := newMockResultIter([]iter.Result[*types.PeerRecord]{ + {Val: &types.PeerRecord{Schema: "peer", ID: &pid, Addrs: []types.Multiaddr{publicAddr}}}, + }) + mr.On("FindPeers", mock.Anything, pid, 10).Return(dhtIter, nil) + + cab, err := newCachedAddrBook() + require.NoError(t, err) + cr := NewCachedRouter(mr, cab) + + // The {unused, peers} series is written by exactly one line: the + // cache-first lookup passes nil addrs and can never hit it, and no other + // origin uses "peers". So this delta isolates the old enrich double-count + // without being polluted by the process-global counter under -count or + // parallel sibling tests (which only touch hit/miss). + unusedBefore := testutil.ToFloat64(peerAddrLookups.WithLabelValues(addrCacheStateUnused, addrQueryOriginPeers)) + + it, err := cr.FindPeers(ctx, pid, 10) + require.NoError(t, err) + _, err = iter.ReadAllResults(it) + require.NoError(t, err) + + unusedAfter := testutil.ToFloat64(peerAddrLookups.WithLabelValues(addrCacheStateUnused, addrQueryOriginPeers)) + + // The post-DHT enrich step must not record a second lookup for a record + // the DHT already supplied addresses for. + require.Equal(t, 0.0, unusedAfter-unusedBefore, "enrich step must not record an unused lookup") + }) + + t.Run("FindPeers not found with empty cache returns ErrNotFound", func(t *testing.T) { + ctx := context.Background() + pid := peer.ID("test-peer") + + // Create mock router that reports the peer as not found via peer routing + mr := &mockRouter{} + mr.On("FindPeers", mock.Anything, pid, 10).Return(nil, routing.ErrNotFound) + + // Create empty cached address book + cab, err := newCachedAddrBook() + require.NoError(t, err) + + // Create cached router + cr := NewCachedRouter(mr, cab) + _, err = cr.FindPeers(ctx, pid, 10) require.ErrorIs(t, err, routing.ErrNotFound) }) diff --git a/server_routers.go b/server_routers.go index ce7806f..347ac86 100644 --- a/server_routers.go +++ b/server_routers.go @@ -1,10 +1,12 @@ package main import ( + "bytes" "context" "errors" "fmt" "reflect" + "slices" "sync" "time" @@ -602,5 +604,12 @@ func filterPrivateMultiaddr(a []types.Multiaddr) []types.Multiaddr { b = append(b, addr) } + // Sort for a stable response across requests. Addresses can arrive in + // nondeterministic order (e.g. the peerstore stores them in a map), and + // this runs on every record from every router, so all sources are covered. + slices.SortFunc(b, func(x, y types.Multiaddr) int { + return bytes.Compare(x.Multiaddr.Bytes(), y.Multiaddr.Bytes()) + }) + return b } diff --git a/server_routers_test.go b/server_routers_test.go index 3523ab8..127045c 100644 --- a/server_routers_test.go +++ b/server_routers_test.go @@ -717,3 +717,44 @@ func TestManyIter(t *testing.T) { require.NoError(t, manyIter.Close()) }) } + +func TestFilterPrivateMultiaddrSortsAndFilters(t *testing.T) { + mustAddr := func(s string) types.Multiaddr { + m, err := multiaddr.NewMultiaddr(s) + require.NoError(t, err) + return types.Multiaddr{Multiaddr: m} + } + + priv := mustAddr("/ip4/192.168.1.5/tcp/4001") + input := []types.Multiaddr{ + mustAddr("/ip4/9.9.9.9/tcp/4001"), + priv, // private, must be filtered out + mustAddr("/ip4/1.1.1.1/udp/4001/quic-v1"), + mustAddr("/ip4/5.5.5.5/tcp/4001"), + } + + // Shuffle into several orders; the output must be identical every time and + // must never contain the private address. + var want []string + for _, order := range [][]int{{0, 1, 2, 3}, {3, 2, 1, 0}, {2, 0, 3, 1}, {1, 3, 0, 2}} { + in := make([]types.Multiaddr, 0, len(order)) + for _, i := range order { + in = append(in, input[i]) + } + + out := filterPrivateMultiaddr(in) + + got := make([]string, 0, len(out)) + for _, a := range out { + got = append(got, a.String()) + require.NotEqual(t, priv.String(), a.String(), "private addr leaked") + } + require.Len(t, got, 3) + + if want == nil { + want = got + } else { + require.Equal(t, want, got, "output order is not stable across input orders") + } + } +}