diff --git a/docs/features/auth-broker.md b/docs/features/auth-broker.md index 96b44a662..335ed2982 100644 --- a/docs/features/auth-broker.md +++ b/docs/features/auth-broker.md @@ -90,6 +90,21 @@ On each proxied request the broker resolves the per-user credential to inject, i Concurrent requests for the same `(user, upstream)` are coalesced (single-flight) so a burst does not trigger duplicate upstream token flows. A policy-decision hook is evaluated per call immediately before the credential is returned; no policy engine ships yet, so it permits every injection by default. +## Header injection + +The resolved per-user credential is injected into the configured outbound header (`header`, default `Authorization`) using the value template (`header_format`, default `Bearer {token}`), then the request is forwarded to the upstream. + +Injection is a **replacement**, not a merge: + +- Any header on the upstream config whose name matches `header` (case-insensitively) is **removed** before the resolved credential is set, so a brokered upstream presents exactly one value for that header. +- The inbound gateway/IdP token is **never forwarded** to the upstream. Brokering exists precisely so the upstream sees a credential minted *for it*, scoped to the calling user — not the token the user presented to the gateway. + +Injection applies only to **HTTP-family** upstreams (`http`, `sse`, `streamable-http`). Brokering on a `stdio` upstream is rejected — at config validation, and again as a runtime guard at the injection boundary — with a clear "unsupported in this phase" message. + +## Per-(user, server) connection keying + +A **shared** upstream that is brokered per-user must carry **each user's own** credential. Brokered upstream connections are therefore keyed by `(user, server)`, never by server alone: one user's connection (and the credential injected on it) is never reused for another user. The server-component of the key reuses the same `name + URL` scheme as the credential store, so a connection and its cached credential stay in lockstep. + ## See also - [OAuth Authentication](./oauth-authentication.md) — upstream OAuth for the personal edition. diff --git a/internal/serveredition/broker/injector.go b/internal/serveredition/broker/injector.go new file mode 100644 index 000000000..129a599a9 --- /dev/null +++ b/internal/serveredition/broker/injector.go @@ -0,0 +1,108 @@ +//go:build server + +package broker + +import ( + "context" + "errors" + + "github.com/smart-mcp-proxy/mcpproxy-go/internal/config" + "github.com/smart-mcp-proxy/mcpproxy-go/internal/oauth" + "github.com/smart-mcp-proxy/mcpproxy-go/internal/transport" +) + +// ErrBrokerStdioUnsupported is returned when brokering is requested for a +// non-HTTP-family (stdio) upstream. Credential injection only works over +// HTTP/SSE/streamable-HTTP transports in this phase (spec 074 FR-002). This is a +// runtime defense-in-depth check; config validation already rejects such blocks +// at load time. +var ErrBrokerStdioUnsupported = errors.New("auth broker: credential injection is only supported on HTTP-family upstreams (http, sse, streamable-http); stdio brokering is unsupported in this phase") + +// Fallback header/format used when a brokered server's config has not had its +// defaults applied. These mirror config.AuthBrokerConfig.ApplyDefaults (FR-016); +// config validation normally applies them at load time. +const ( + fallbackBrokerHeader = "Authorization" + fallbackBrokerHeaderFormat = "Bearer {token}" +) + +// resolver is the subset of *CredentialResolver the injector depends on. It is +// an interface so tests can substitute a fake without a real store/exchanger. +type resolver interface { + Resolve(ctx context.Context, userID string, server *config.ServerConfig) (*UpstreamCredential, error) +} + +// HeaderInjector turns a per-user resolved upstream credential into the +// transport-layer BrokeredAuth injected on a proxied request. It is the bridge +// between the credential broker (server edition) and the edition-neutral +// transport layer: the transport never imports the broker, it only receives the +// resolved credential as plain data. +// +// The injector enforces the spec-074 brokering invariants at the injection +// boundary: +// - per-user only: an empty userID is rejected (FR-014); +// - HTTP-family only: stdio brokering is rejected (FR-002); +// - replacement, not forwarding: the produced BrokeredAuth replaces any +// configured/inbound auth header (FR-016/FR-017, enforced in transport). +type HeaderInjector struct { + resolver resolver +} + +// NewHeaderInjector builds an injector over a credential resolver. *CredentialResolver +// satisfies the resolver interface. +func NewHeaderInjector(r resolver) *HeaderInjector { + return &HeaderInjector{resolver: r} +} + +// InjectFor resolves the per-user credential for (userID, server) and returns +// the transport.BrokeredAuth to inject. It returns: +// - ErrUnauthenticated if userID is empty (FR-014); +// - ErrBrokerNotConfigured if the server has no auth_broker block; +// - ErrBrokerStdioUnsupported if the server is not HTTP-family (FR-002); +// - any resolver error (e.g. *NotConnectedError carrying a connect URL). +func (h *HeaderInjector) InjectFor(ctx context.Context, userID string, server *config.ServerConfig) (*transport.BrokeredAuth, error) { + if userID == "" { + return nil, ErrUnauthenticated + } + if server == nil || server.AuthBroker == nil { + return nil, ErrBrokerNotConfigured + } + // Defense-in-depth: reject brokering on stdio/non-HTTP upstreams (FR-002). + if transport.DetermineTransportType(server) == transport.TransportStdio { + return nil, ErrBrokerStdioUnsupported + } + + cred, err := h.resolver.Resolve(ctx, userID, server) + if err != nil { + return nil, err + } + if cred == nil || cred.AccessToken == "" { + return nil, ErrNoCredential + } + + header := server.AuthBroker.Header + if header == "" { + header = fallbackBrokerHeader + } + format := server.AuthBroker.HeaderFormat + if format == "" { + format = fallbackBrokerHeaderFormat + } + return &transport.BrokeredAuth{ + Header: header, + Format: format, + Token: cred.AccessToken, + }, nil +} + +// ConnectionKey derives the pooling key for a brokered upstream connection. It +// binds the connection to a single (user, server) pair so a shared upstream +// brokered per-user never reuses one user's credential/connection for another +// (FR-018). The server component reuses the existing oauth.GenerateServerKey +// scheme (name + URL) so it matches the credential store's keying. +func ConnectionKey(userID string, server *config.ServerConfig) string { + if server == nil { + return userID + "\x00" + } + return userID + "\x00" + oauth.GenerateServerKey(server.Name, server.URL) +} diff --git a/internal/serveredition/broker/injector_test.go b/internal/serveredition/broker/injector_test.go new file mode 100644 index 000000000..3a25f5a68 --- /dev/null +++ b/internal/serveredition/broker/injector_test.go @@ -0,0 +1,163 @@ +//go:build server + +package broker + +import ( + "context" + "errors" + "testing" + + "github.com/smart-mcp-proxy/mcpproxy-go/internal/config" + "github.com/smart-mcp-proxy/mcpproxy-go/internal/transport" +) + +// fakeResolver returns a per-user token so the injector can be exercised without +// a real store/exchanger. It records the per-user resolution so tests can assert +// one user's credential is never produced for another (FR-018). +type fakeResolver struct { + tokens map[string]string // userID -> access token + err error +} + +func (f *fakeResolver) Resolve(_ context.Context, userID string, _ *config.ServerConfig) (*UpstreamCredential, error) { + if f.err != nil { + return nil, f.err + } + tok, ok := f.tokens[userID] + if !ok { + return nil, ErrNoCredential + } + return &UpstreamCredential{AccessToken: tok, TokenType: "Bearer"}, nil +} + +func httpBrokerServer() *config.ServerConfig { + s := &config.ServerConfig{ + Name: "ghe", + URL: "https://ghe.example/mcp", + Protocol: "streamable-http", + AuthBroker: &config.AuthBrokerConfig{ + Mode: config.AuthBrokerModeTokenExchange, + TokenEndpoint: "https://idp.example/token", + }, + } + s.AuthBroker.ApplyDefaults() + return s +} + +// FR-016: the resolved per-user credential is rendered into the configured +// header/format (default Authorization: Bearer {token}). +func TestInjector_InjectFor_ProducesBrokeredAuth(t *testing.T) { + inj := NewHeaderInjector(&fakeResolver{tokens: map[string]string{"alice": "alice-tok"}}) + + ba, err := inj.InjectFor(context.Background(), "alice", httpBrokerServer()) + if err != nil { + t.Fatalf("InjectFor: %v", err) + } + if ba.Header != "Authorization" { + t.Fatalf("header = %q, want Authorization", ba.Header) + } + if ba.HeaderValue() != "Bearer alice-tok" { + t.Fatalf("header value = %q, want %q", ba.HeaderValue(), "Bearer alice-tok") + } +} + +// FR-018 + SC-002/003: two users brokered against the SAME shared upstream get +// two distinct outbound tokens; one user's credential is never reused for the +// other. +func TestInjector_TwoUsers_TwoTokens(t *testing.T) { + inj := NewHeaderInjector(&fakeResolver{tokens: map[string]string{ + "alice": "alice-tok", + "bob": "bob-tok", + }}) + server := httpBrokerServer() + + aliceBA, err := inj.InjectFor(context.Background(), "alice", server) + if err != nil { + t.Fatalf("alice: %v", err) + } + bobBA, err := inj.InjectFor(context.Background(), "bob", server) + if err != nil { + t.Fatalf("bob: %v", err) + } + + if aliceBA.HeaderValue() == bobBA.HeaderValue() { + t.Fatalf("two users produced the same outbound token %q (FR-018 violation)", aliceBA.HeaderValue()) + } + + // And the effective outbound headers must carry each user's own token. + aliceHdr := transport.EffectiveHeaders(nil, aliceBA) + bobHdr := transport.EffectiveHeaders(nil, bobBA) + if aliceHdr["Authorization"] != "Bearer alice-tok" || bobHdr["Authorization"] != "Bearer bob-tok" { + t.Fatalf("cross-user token leak: alice=%q bob=%q", aliceHdr["Authorization"], bobHdr["Authorization"]) + } + + // Per-(user,server) connection keys must differ so connections are never + // pooled across users (FR-018). + if ConnectionKey("alice", server) == ConnectionKey("bob", server) { + t.Fatalf("connection key collided across users (FR-018 violation)") + } +} + +// FR-018: ConnectionKey is stable for the same (user, server) and distinct per +// user and per server so a brokered connection is never reused across either. +func TestConnectionKey_StableAndDistinct(t *testing.T) { + s1 := httpBrokerServer() + s2 := httpBrokerServer() + s2.Name = "other" + s2.URL = "https://other.example/mcp" + + k1 := ConnectionKey("alice", s1) + k1again := ConnectionKey("alice", s1) + if k1 != k1again { + t.Fatal("ConnectionKey must be stable for the same (user, server)") + } + if ConnectionKey("alice", s1) == ConnectionKey("alice", s2) { + t.Fatal("ConnectionKey must differ per server") + } + if ConnectionKey("alice", s1) == ConnectionKey("bob", s1) { + t.Fatal("ConnectionKey must differ per user") + } +} + +// FR-002: brokering on a stdio upstream is rejected with a clear, actionable +// message — never silently injected. +func TestInjector_RejectsStdio(t *testing.T) { + inj := NewHeaderInjector(&fakeResolver{tokens: map[string]string{"alice": "x"}}) + stdio := &config.ServerConfig{ + Name: "local", + Protocol: "stdio", + Command: "my-mcp", + AuthBroker: &config.AuthBrokerConfig{ + Mode: config.AuthBrokerModeTokenExchange, + TokenEndpoint: "https://idp.example/token", + }, + } + stdio.AuthBroker.ApplyDefaults() + + _, err := inj.InjectFor(context.Background(), "alice", stdio) + if !errors.Is(err, ErrBrokerStdioUnsupported) { + t.Fatalf("stdio brokering must be rejected with ErrBrokerStdioUnsupported, got %v", err) + } +} + +// A server with no auth_broker block is not brokered: the injector returns +// ErrBrokerNotConfigured and the caller proceeds with today's behaviour. +func TestInjector_NotConfigured(t *testing.T) { + inj := NewHeaderInjector(&fakeResolver{}) + plain := &config.ServerConfig{Name: "plain", URL: "https://x/mcp", Protocol: "streamable-http"} + + _, err := inj.InjectFor(context.Background(), "alice", plain) + if !errors.Is(err, ErrBrokerNotConfigured) { + t.Fatalf("non-brokered server must return ErrBrokerNotConfigured, got %v", err) + } +} + +// An empty userID is rejected before any resolution — brokering is strictly +// per-user (FR-014). +func TestInjector_RejectsAnonymous(t *testing.T) { + inj := NewHeaderInjector(&fakeResolver{tokens: map[string]string{"alice": "x"}}) + _, err := inj.InjectFor(context.Background(), "", httpBrokerServer()) + if !errors.Is(err, ErrUnauthenticated) { + t.Fatalf("anonymous caller must be rejected with ErrUnauthenticated, got %v", err) + } +} diff --git a/internal/serveredition/multiuser/router.go b/internal/serveredition/multiuser/router.go index bff27e015..de1399a67 100644 --- a/internal/serveredition/multiuser/router.go +++ b/internal/serveredition/multiuser/router.go @@ -12,6 +12,7 @@ import ( "github.com/smart-mcp-proxy/mcpproxy-go/internal/auth" "github.com/smart-mcp-proxy/mcpproxy-go/internal/config" + "github.com/smart-mcp-proxy/mcpproxy-go/internal/serveredition/broker" "github.com/smart-mcp-proxy/mcpproxy-go/internal/serveredition/workspace" ) @@ -149,6 +150,29 @@ func (r *Router) GetServerForUser(ctx context.Context, serverName string) (*Serv return nil, fmt.Errorf("server %q not found or not accessible", serverName) } +// BrokeredConnectionKey returns the per-(user, server) pooling key for a +// brokered upstream connection (spec 074 FR-018). It resolves the calling user +// and the named server (shared or personal), then keys the connection by both +// so a shared upstream brokered per-user never reuses one user's +// credential/connection for another. The connection pool MUST use this key when +// caching brokered upstream clients. +// +// It errors if there is no auth context or the server is not accessible to the +// user; it does not require the server to actually declare an auth_broker block, +// so callers can key uniformly and decide per server whether to broker. +func (r *Router) BrokeredConnectionKey(ctx context.Context, serverName string) (string, error) { + ac := auth.AuthContextFromContext(ctx) + if ac == nil { + return "", fmt.Errorf("no authentication context") + } + + info, err := r.GetServerForUser(ctx, serverName) + if err != nil { + return "", err + } + return broker.ConnectionKey(ac.GetUserID(), info.Config), nil +} + // IsServerAccessible returns true if the user from the context can access // the named server. This is a convenience method that does not return // detailed error information. diff --git a/internal/serveredition/multiuser/router_broker_test.go b/internal/serveredition/multiuser/router_broker_test.go new file mode 100644 index 000000000..e11895a23 --- /dev/null +++ b/internal/serveredition/multiuser/router_broker_test.go @@ -0,0 +1,45 @@ +//go:build server + +package multiuser + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// Spec 074 T7 / FR-018: a shared upstream brokered per-user must key its +// connection by (user, server) so one user's credential/connection is never +// reused for another. The router exposes the per-(user,server) connection key +// for the connection pool to use. + +func TestRouter_BrokeredConnectionKey_DistinctPerUser(t *testing.T) { + router, _ := setupRouter(t, []string{"shared-ghe"}, nil) + + aliceKey, err := router.BrokeredConnectionKey(userCtx("alice"), "shared-ghe") + require.NoError(t, err) + bobKey, err := router.BrokeredConnectionKey(userCtx("bob"), "shared-ghe") + require.NoError(t, err) + + assert.NotEmpty(t, aliceKey) + assert.NotEqual(t, aliceKey, bobKey, + "the same shared brokered upstream must key a distinct connection per user (FR-018)") + + // Stable for the same (user, server). + aliceKey2, err := router.BrokeredConnectionKey(userCtx("alice"), "shared-ghe") + require.NoError(t, err) + assert.Equal(t, aliceKey, aliceKey2, "connection key must be stable for the same (user, server)") +} + +func TestRouter_BrokeredConnectionKey_RequiresAuth(t *testing.T) { + router, _ := setupRouter(t, []string{"shared-ghe"}, nil) + _, err := router.BrokeredConnectionKey(noAuthCtx(), "shared-ghe") + assert.Error(t, err, "no auth context must be rejected") +} + +func TestRouter_BrokeredConnectionKey_UnknownServer(t *testing.T) { + router, _ := setupRouter(t, []string{"shared-ghe"}, nil) + _, err := router.BrokeredConnectionKey(userCtx("alice"), "nope") + assert.Error(t, err, "unknown/inaccessible server must error") +} diff --git a/internal/transport/broker_auth.go b/internal/transport/broker_auth.go new file mode 100644 index 000000000..a71512b06 --- /dev/null +++ b/internal/transport/broker_auth.go @@ -0,0 +1,56 @@ +package transport + +import "strings" + +// BrokeredAuth carries a per-user resolved upstream credential to inject into an +// outbound HTTP/SSE request (spec 074, FR-016/FR-017). The server edition's +// credential broker resolves the per-user token and hands it down as plain data +// so the edition-neutral transport layer can inject it without importing any +// server-only package. +// +// Injection REPLACES any inbound or statically-configured header of the same +// name: the gateway/IdP token is never forwarded to the upstream (FR-017). +type BrokeredAuth struct { + // Header is the outbound header the credential is injected into + // (default "Authorization"). + Header string + // Format is the value template; the literal substring "{token}" is replaced + // with Token (default "Bearer {token}"). + Format string + // Token is the resolved per-user credential. + Token string +} + +// tokenPlaceholder is the substring in Format replaced with the resolved token. +const tokenPlaceholder = "{token}" + +// HeaderValue renders the outbound header value from Format, substituting the +// resolved token for the "{token}" placeholder. +func (b *BrokeredAuth) HeaderValue() string { + return strings.ReplaceAll(b.Format, tokenPlaceholder, b.Token) +} + +// EffectiveHeaders returns the outbound header set for a request, injecting the +// brokered per-user credential when one is supplied. +// +// The returned map is always a fresh copy — callers must never mutate the +// server config's header map. When brokered is non-nil, any header in base +// whose name matches brokered.Header case-insensitively is dropped before the +// resolved credential is set, so the configured/inbound auth is REPLACED rather +// than merged or forwarded (FR-017). When brokered is nil, base is returned +// unchanged (as a copy). +func EffectiveHeaders(base map[string]string, brokered *BrokeredAuth) map[string]string { + out := make(map[string]string, len(base)+1) + for k, v := range base { + if brokered != nil && strings.EqualFold(k, brokered.Header) { + // Drop the inbound/configured auth header — replaced below so the + // gateway/IdP token is never forwarded to the upstream (FR-017). + continue + } + out[k] = v + } + if brokered != nil { + out[brokered.Header] = brokered.HeaderValue() + } + return out +} diff --git a/internal/transport/broker_auth_integration_test.go b/internal/transport/broker_auth_integration_test.go new file mode 100644 index 000000000..be2438f82 --- /dev/null +++ b/internal/transport/broker_auth_integration_test.go @@ -0,0 +1,140 @@ +package transport + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/mark3labs/mcp-go/mcp" +) + +// newCapturingMCPServer stands up a minimal streamable-HTTP MCP endpoint that +// records the inbound Authorization header and answers initialize so the +// mcp-go client completes a real request — proving the brokered credential +// reaches the wire (spec 074 FR-016/FR-017). +func newCapturingMCPServer(t *testing.T, gotAuth *string) *httptest.Server { + t.Helper() + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + *gotAuth = r.Header.Get("Authorization") + + var req struct { + ID json.RawMessage `json:"id"` + Method string `json:"method"` + } + _ = json.NewDecoder(r.Body).Decode(&req) + + w.Header().Set("Content-Type", "application/json") + resp := map[string]any{ + "jsonrpc": "2.0", + "id": json.RawMessage(req.ID), + "result": map[string]any{ + "protocolVersion": mcp.LATEST_PROTOCOL_VERSION, + "serverInfo": map[string]any{"name": "cap", "version": "1"}, + "capabilities": map[string]any{}, + }, + } + _ = json.NewEncoder(w).Encode(resp) + })) +} + +func doInitialize(t *testing.T, cfg *HTTPTransportConfig, sse bool) { + t.Helper() + var ( + c interface{ Start(context.Context) error } + err error + ) + if sse { + cl, e := CreateSSEClient(cfg) + c, err = cl, e + } else { + cl, e := CreateHTTPClient(cfg) + c, err = cl, e + } + if err != nil { + t.Fatalf("create client: %v", err) + } + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := c.Start(ctx); err != nil { + t.Fatalf("start: %v", err) + } + // Initialize triggers the first POST carrying the headers. + type initer interface { + Initialize(context.Context, mcp.InitializeRequest) (*mcp.InitializeResult, error) + } + if i, ok := c.(initer); ok { + _, _ = i.Initialize(ctx, mcp.InitializeRequest{}) + } +} + +// FR-017 across the SSE path: the brokered per-user token is on the wire of the +// initial SSE GET stream and the inbound/configured gateway token is not. +func TestCreateSSEClient_InjectsBrokeredAuthOnWire(t *testing.T) { + var gotAuth string + mux := http.NewServeMux() + var srv *httptest.Server + mux.HandleFunc("/sse", func(w http.ResponseWriter, r *http.Request) { + gotAuth = r.Header.Get("Authorization") + w.Header().Set("Content-Type", "text/event-stream") + fl, ok := w.(http.Flusher) + if !ok { + t.Errorf("ResponseWriter is not a Flusher") + return + } + // Tell the client where to POST messages, then hold the stream open. + _, _ = w.Write([]byte("event: endpoint\ndata: " + srv.URL + "/message\n\n")) + fl.Flush() + <-r.Context().Done() + }) + mux.HandleFunc("/message", func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusAccepted) + }) + srv = httptest.NewServer(mux) + defer srv.Close() + + cfg := &HTTPTransportConfig{ + URL: srv.URL + "/sse", + Headers: map[string]string{"Authorization": "Bearer INBOUND-GATEWAY"}, + BrokeredAuth: &BrokeredAuth{ + Header: "Authorization", Format: "Bearer {token}", Token: "per-user-SSE", + }, + } + sseClient, err := CreateSSEClient(cfg) + if err != nil { + t.Fatalf("create SSE client: %v", err) + } + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := sseClient.Start(ctx); err != nil { + t.Fatalf("start SSE client: %v", err) + } + defer sseClient.Close() + + if gotAuth != "Bearer per-user-SSE" { + t.Fatalf("SSE outbound Authorization = %q, want brokered per-user token (inbound must be replaced)", gotAuth) + } +} + +// FR-017 across the HTTP path: the brokered per-user token is on the wire and +// the inbound/configured gateway token is not. +func TestCreateHTTPClient_InjectsBrokeredAuthOnWire(t *testing.T) { + var gotAuth string + srv := newCapturingMCPServer(t, &gotAuth) + defer srv.Close() + + cfg := &HTTPTransportConfig{ + URL: srv.URL, + Headers: map[string]string{"Authorization": "Bearer INBOUND-GATEWAY"}, + BrokeredAuth: &BrokeredAuth{ + Header: "Authorization", Format: "Bearer {token}", Token: "per-user-HTTP", + }, + } + doInitialize(t, cfg, false) + + if gotAuth != "Bearer per-user-HTTP" { + t.Fatalf("HTTP outbound Authorization = %q, want brokered per-user token (inbound must be replaced)", gotAuth) + } +} diff --git a/internal/transport/broker_auth_test.go b/internal/transport/broker_auth_test.go new file mode 100644 index 000000000..84c95bf35 --- /dev/null +++ b/internal/transport/broker_auth_test.go @@ -0,0 +1,116 @@ +package transport + +import "testing" + +// Spec 074 T7 (FR-016/FR-017): the per-user resolved credential is injected into +// the configured outbound header, REPLACING any inbound/configured auth header. +// The inbound gateway/IdP token must never be forwarded. + +func TestBrokeredAuth_HeaderValue(t *testing.T) { + cases := []struct { + name string + format string + token string + want string + }{ + {name: "default bearer", format: "Bearer {token}", token: "u1-tok", want: "Bearer u1-tok"}, + {name: "raw token", format: "{token}", token: "abc", want: "abc"}, + {name: "custom prefix", format: "token {token}", token: "xyz", want: "token xyz"}, + {name: "no placeholder", format: "static-value", token: "ignored", want: "static-value"}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + b := &BrokeredAuth{Header: "Authorization", Format: tc.format, Token: tc.token} + if got := b.HeaderValue(); got != tc.want { + t.Fatalf("HeaderValue() = %q, want %q", got, tc.want) + } + }) + } +} + +func TestEffectiveHeaders_InjectsBrokeredCredential(t *testing.T) { + base := map[string]string{"X-Trace": "on"} + b := &BrokeredAuth{Header: "Authorization", Format: "Bearer {token}", Token: "user-A-token"} + + got := EffectiveHeaders(base, b) + + if got["Authorization"] != "Bearer user-A-token" { + t.Fatalf("outbound Authorization = %q, want %q", got["Authorization"], "Bearer user-A-token") + } + if got["X-Trace"] != "on" { + t.Fatalf("non-auth header should be preserved, got %q", got["X-Trace"]) + } +} + +// FR-017: the inbound gateway/IdP token configured on the server must be +// REPLACED, never merged or forwarded — even if cased differently. +func TestEffectiveHeaders_ReplacesInboundAuthHeader(t *testing.T) { + base := map[string]string{ + "authorization": "Bearer INBOUND-GATEWAY-TOKEN", // lowercase, different casing + "X-Other": "keep", + } + b := &BrokeredAuth{Header: "Authorization", Format: "Bearer {token}", Token: "per-user-token"} + + got := EffectiveHeaders(base, b) + + // Exactly one auth header, carrying the per-user token, never the inbound one. + authCount := 0 + for k, v := range got { + if equalFoldHeader(k, "Authorization") { + authCount++ + if v != "Bearer per-user-token" { + t.Fatalf("auth header = %q, want per-user token, must not forward inbound", v) + } + } + if v == "Bearer INBOUND-GATEWAY-TOKEN" { + t.Fatalf("inbound gateway token was forwarded on outbound header %q (FR-017 violation)", k) + } + } + if authCount != 1 { + t.Fatalf("expected exactly 1 auth header after replacement, got %d", authCount) + } + if got["X-Other"] != "keep" { + t.Fatalf("unrelated header dropped: %q", got["X-Other"]) + } +} + +func TestEffectiveHeaders_NilBrokeredReturnsBaseCopy(t *testing.T) { + base := map[string]string{"Authorization": "Bearer static"} + got := EffectiveHeaders(base, nil) + if got["Authorization"] != "Bearer static" { + t.Fatalf("nil broker must leave base headers intact, got %q", got["Authorization"]) + } + // Must be a copy, not the same map (callers must not mutate the server config). + got["Authorization"] = "mutated" + if base["Authorization"] != "Bearer static" { + t.Fatalf("EffectiveHeaders must not alias/mutate the base map") + } +} + +func TestEffectiveHeaders_InjectsIntoEmptyBase(t *testing.T) { + b := &BrokeredAuth{Header: "Authorization", Format: "Bearer {token}", Token: "t"} + got := EffectiveHeaders(nil, b) + if got["Authorization"] != "Bearer t" { + t.Fatalf("brokered auth must inject even with no base headers, got %q", got["Authorization"]) + } +} + +// equalFoldHeader is a test helper mirroring the case-insensitive header match. +func equalFoldHeader(a, b string) bool { + if len(a) != len(b) { + return false + } + for i := 0; i < len(a); i++ { + ca, cb := a[i], b[i] + if 'A' <= ca && ca <= 'Z' { + ca += 'a' - 'A' + } + if 'A' <= cb && cb <= 'Z' { + cb += 'a' - 'A' + } + if ca != cb { + return false + } + } + return true +} diff --git a/internal/transport/http.go b/internal/transport/http.go index d5c50e22b..b4a931299 100644 --- a/internal/transport/http.go +++ b/internal/transport/http.go @@ -127,13 +127,27 @@ func NewEndpointDeprecatedError(url, message, migrationGuide, newEndpoint string // HTTPTransportConfig holds configuration for HTTP transport type HTTPTransportConfig struct { - URL string - Headers map[string]string - OAuthConfig *client.OAuthConfig - UseOAuth bool + URL string + Headers map[string]string + OAuthConfig *client.OAuthConfig + UseOAuth bool + // BrokeredAuth, when set, injects a per-user resolved credential into the + // outbound headers, replacing any configured/inbound auth header (spec 074 + // FR-016/FR-017). It is edition-neutral plain data so the server-edition + // credential broker can drive injection without this package importing it. + BrokeredAuth *BrokeredAuth TraceEnabled bool // Enable detailed HTTP/SSE frame tracing } +// effectiveHeaders returns the outbound header set, applying brokered per-user +// auth injection when configured (spec 074 FR-016/FR-017). +func (cfg *HTTPTransportConfig) effectiveHeaders() map[string]string { + if cfg.BrokeredAuth == nil { + return cfg.Headers + } + return EffectiveHeaders(cfg.Headers, cfg.BrokeredAuth) +} + // CreateHTTPClient creates a new MCP client using HTTP transport func CreateHTTPClient(cfg *HTTPTransportConfig) (*client.Client, error) { logger := zap.L().Named("transport") @@ -200,6 +214,11 @@ func CreateHTTPClient(cfg *HTTPTransportConfig) (*client.Client, error) { logger.Debug("Creating regular HTTP client", zap.String("url", cfg.URL)) + // Apply brokered per-user auth injection (spec 074): replaces any configured + // auth header with the resolved per-user credential and never forwards the + // inbound gateway/IdP token (FR-017). + headers := cfg.effectiveHeaders() + // If tracing is enabled, create HTTP client with logging transport if cfg.TraceEnabled { logger.Info("🔍 HTTP TRACE MODE ENABLED - All HTTP traffic will be logged") @@ -216,9 +235,9 @@ func CreateHTTPClient(cfg *HTTPTransportConfig) (*client.Client, error) { var httpTransport transport.Interface var err error - if len(cfg.Headers) > 0 { + if len(headers) > 0 { httpTransport, err = transport.NewStreamableHTTP(cfg.URL, - transport.WithHTTPHeaders(cfg.Headers), + transport.WithHTTPHeaders(headers), transport.WithHTTPBasicClient(httpClient)) } else { httpTransport, err = transport.NewStreamableHTTP(cfg.URL, @@ -231,10 +250,10 @@ func CreateHTTPClient(cfg *HTTPTransportConfig) (*client.Client, error) { } // Use regular HTTP client - if len(cfg.Headers) > 0 { - logger.Debug("Adding HTTP headers", zap.Int("header_count", len(cfg.Headers))) + if len(headers) > 0 { + logger.Debug("Adding HTTP headers", zap.Int("header_count", len(headers))) httpTransport, err := transport.NewStreamableHTTP(cfg.URL, - transport.WithHTTPHeaders(cfg.Headers)) + transport.WithHTTPHeaders(headers)) if err != nil { return nil, fmt.Errorf("failed to create HTTP transport: %w", err) } @@ -305,9 +324,15 @@ func CreateSSEClient(cfg *HTTPTransportConfig) (*client.Client, error) { } logger.Debug("Creating regular SSE client", zap.String("url", cfg.URL)) + + // Apply brokered per-user auth injection (spec 074): replaces any configured + // auth header with the resolved per-user credential and never forwards the + // inbound gateway/IdP token (FR-017). + headers := cfg.effectiveHeaders() + // Use regular SSE client - if len(cfg.Headers) > 0 { - logger.Debug("Adding SSE headers", zap.Int("header_count", len(cfg.Headers))) + if len(headers) > 0 { + logger.Debug("Adding SSE headers", zap.Int("header_count", len(headers))) // Create custom HTTP client for SSE - NO Timeout field to allow indefinite streaming // The Timeout field covers the entire request duration, which kills long-lived SSE streams // Instead, we rely on IdleConnTimeout to detect dead connections @@ -335,12 +360,12 @@ func CreateSSEClient(cfg *HTTPTransportConfig) (*client.Client, error) { zap.String("url", cfg.URL), zap.Duration("idle_timeout", 300*time.Second), zap.Duration("header_timeout", 30*time.Second), - zap.Int("header_count", len(cfg.Headers)), + zap.Int("header_count", len(headers)), zap.String("note", "Removed http.Client.Timeout to allow SSE streams longer than 3 minutes")) sseClient, err := client.NewSSEMCPClient(cfg.URL, client.WithHTTPClient(httpClient), - client.WithHeaders(cfg.Headers)) + client.WithHeaders(headers)) if err != nil { return nil, fmt.Errorf("failed to create SSE client: %w", err) } diff --git a/internal/upstream/core/client.go b/internal/upstream/core/client.go index 4051e50ed..93524737e 100644 --- a/internal/upstream/core/client.go +++ b/internal/upstream/core/client.go @@ -19,6 +19,7 @@ import ( "github.com/smart-mcp-proxy/mcpproxy-go/internal/secret" "github.com/smart-mcp-proxy/mcpproxy-go/internal/secureenv" "github.com/smart-mcp-proxy/mcpproxy-go/internal/storage" + proxytransport "github.com/smart-mcp-proxy/mcpproxy-go/internal/transport" "github.com/smart-mcp-proxy/mcpproxy-go/internal/upstream/launcher" "github.com/smart-mcp-proxy/mcpproxy-go/internal/upstream/types" @@ -68,6 +69,14 @@ type Client struct { // when multiple requests are in-flight simultaneously sseRequestMu sync.Mutex + // brokeredAuth, when set, is the per-user upstream credential the gateway + // resolved for this (user, server) connection. The headers-auth strategy + // injects it into the configured outbound header, replacing any inbound or + // statically-configured auth — the gateway/IdP token is never forwarded + // (spec 074 FR-016/FR-017). nil for non-brokered upstreams (unchanged + // behaviour). + brokeredAuth *proxytransport.BrokeredAuth + // Transport type and stderr access (for stdio) transportType string stderr io.Reader diff --git a/internal/upstream/core/connection_http.go b/internal/upstream/core/connection_http.go index ea0cf0d9a..4869fa9d6 100644 --- a/internal/upstream/core/connection_http.go +++ b/internal/upstream/core/connection_http.go @@ -9,27 +9,60 @@ import ( "go.uber.org/zap" ) +// authStrategy pairs an auth strategy's display name with its attempt function. +type authStrategy struct { + name string + fn func(context.Context) error +} + +// httpAuthStrategies returns the ordered HTTP auth strategies to attempt. +// +// A per-user brokered connection is FAIL-CLOSED (spec 074, security-critical): +// the ONLY permitted strategy is the brokered headers. It must never fall back +// to no-auth or shared OAuth — either would connect with the wrong identity and +// defeat per-user isolation (FR-014/FR-017). Non-brokered connections keep the +// historical headers -> no-auth -> OAuth chain unchanged. +func (c *Client) httpAuthStrategies() []authStrategy { + if c.brokeredAuth != nil { + return []authStrategy{{"headers", c.tryHeadersAuth}} + } + return []authStrategy{ + {"headers", c.tryHeadersAuth}, + {"no-auth", c.tryNoAuth}, + {"OAuth", c.tryOAuthAuth}, + } +} + +// sseAuthStrategies is the SSE counterpart of httpAuthStrategies, with the same +// fail-closed guarantee for brokered connections. +func (c *Client) sseAuthStrategies() []authStrategy { + if c.brokeredAuth != nil { + return []authStrategy{{"headers", c.trySSEHeadersAuth}} + } + return []authStrategy{ + {"headers", c.trySSEHeadersAuth}, + {"no-auth", c.trySSENoAuth}, + {"OAuth", c.trySSEOAuthAuth}, + } +} + // connectHTTP establishes HTTP transport connection with auth fallback func (c *Client) connectHTTP(ctx context.Context) error { - // Try authentication strategies in order: headers -> no-auth -> OAuth - authStrategies := []func(context.Context) error{ - c.tryHeadersAuth, - c.tryNoAuth, - c.tryOAuthAuth, - } + // Strategy order (and, for brokered connections, the fail-closed single + // strategy) is decided by httpAuthStrategies. + authStrategies := c.httpAuthStrategies() var lastErr error - for i, authFunc := range authStrategies { - strategyName := []string{"headers", "no-auth", "OAuth"}[i] + for i, strategy := range authStrategies { c.logger.Debug("🔐 Trying authentication strategy", zap.Int("strategy_index", i), - zap.String("strategy", strategyName)) + zap.String("strategy", strategy.name)) - if err := authFunc(ctx); err != nil { + if err := strategy.fn(ctx); err != nil { lastErr = err c.logger.Debug("🚫 Auth strategy failed", zap.Int("strategy_index", i), - zap.String("strategy", strategyName), + zap.String("strategy", strategy.name), zap.Error(err)) // For configuration errors (like no headers), always try next strategy @@ -50,7 +83,7 @@ func (c *Client) connectHTTP(ctx context.Context) error { } c.logger.Info("✅ Authentication successful", zap.Int("strategy_index", i), - zap.String("strategy", strategyName)) + zap.String("strategy", strategy.name)) // Register notification handler for tools/list_changed c.registerNotificationHandler() @@ -63,21 +96,18 @@ func (c *Client) connectHTTP(ctx context.Context) error { // connectSSE establishes SSE transport connection with auth fallback func (c *Client) connectSSE(ctx context.Context) error { - // Try authentication strategies in order: headers -> no-auth -> OAuth - authStrategies := []func(context.Context) error{ - c.trySSEHeadersAuth, - c.trySSENoAuth, - c.trySSEOAuthAuth, - } + // Strategy order (and, for brokered connections, the fail-closed single + // strategy) is decided by sseAuthStrategies. + authStrategies := c.sseAuthStrategies() var lastErr error - for i, authFunc := range authStrategies { - strategyName := []string{"headers", "no-auth", "OAuth"}[i] + for i, strategy := range authStrategies { + strategyName := strategy.name c.logger.Debug("🔐 Trying SSE authentication strategy", zap.Int("strategy_index", i), zap.String("strategy", strategyName)) - if err := authFunc(ctx); err != nil { + if err := strategy.fn(ctx); err != nil { lastErr = err c.logger.Debug("🚫 SSE auth strategy failed", zap.Int("strategy_index", i), @@ -113,13 +143,37 @@ func (c *Client) connectSSE(ctx context.Context) error { return fmt.Errorf("all SSE authentication strategies failed, last error: %w", lastErr) } +// SetBrokeredAuth sets the per-user resolved upstream credential for this +// connection. When set, the headers-auth strategy injects it into the configured +// outbound header, replacing any inbound/configured auth (spec 074 +// FR-016/FR-017). Pass nil to clear it (non-brokered behaviour). +func (c *Client) SetBrokeredAuth(b *transport.BrokeredAuth) { + c.brokeredAuth = b +} + +// canUseHeadersStrategy reports whether the headers-auth strategy can run: it +// needs either statically-configured headers or a per-user brokered credential +// to inject. A brokered upstream commonly carries no static headers (FR-016). +func (c *Client) canUseHeadersStrategy() bool { + return len(c.config.Headers) > 0 || c.brokeredAuth != nil +} + +// brokeredHTTPConfig builds the HTTP transport config for the headers-auth +// strategy, threading the per-user brokered credential through so the transport +// layer injects it (spec 074 FR-016/FR-017). +func (c *Client) brokeredHTTPConfig() *transport.HTTPTransportConfig { + httpConfig := transport.CreateHTTPTransportConfig(c.config, nil) + httpConfig.BrokeredAuth = c.brokeredAuth + return httpConfig +} + // tryHeadersAuth attempts authentication using configured headers func (c *Client) tryHeadersAuth(ctx context.Context) error { - if len(c.config.Headers) == 0 { + if !c.canUseHeadersStrategy() { return fmt.Errorf("no headers configured") } - httpConfig := transport.CreateHTTPTransportConfig(c.config, nil) + httpConfig := c.brokeredHTTPConfig() httpClient, err := transport.CreateHTTPClient(httpConfig) if err != nil { return fmt.Errorf("failed to create HTTP client with headers: %w", err) @@ -171,11 +225,11 @@ func (c *Client) tryNoAuth(ctx context.Context) error { // trySSEHeadersAuth attempts SSE authentication using configured headers func (c *Client) trySSEHeadersAuth(ctx context.Context) error { - if len(c.config.Headers) == 0 { + if !c.canUseHeadersStrategy() { return fmt.Errorf("no headers configured") } - httpConfig := transport.CreateHTTPTransportConfig(c.config, nil) + httpConfig := c.brokeredHTTPConfig() sseClient, err := transport.CreateSSEClient(httpConfig) if err != nil { return fmt.Errorf("failed to create SSE client with headers: %w", err) diff --git a/internal/upstream/core/connection_http_broker_test.go b/internal/upstream/core/connection_http_broker_test.go new file mode 100644 index 000000000..464a62c04 --- /dev/null +++ b/internal/upstream/core/connection_http_broker_test.go @@ -0,0 +1,86 @@ +package core + +import ( + "reflect" + "testing" + + "github.com/smart-mcp-proxy/mcpproxy-go/internal/config" + "github.com/smart-mcp-proxy/mcpproxy-go/internal/transport" +) + +func strategyNames(strategies []authStrategy) []string { + names := make([]string, len(strategies)) + for i, s := range strategies { + names[i] = s.name + } + return names +} + +// Spec 074 T7: a per-user brokered credential set on the client drives outbound +// header injection on the headers-auth strategy, replacing any configured auth +// header (FR-016/FR-017). + +func TestClient_BrokeredHTTPConfig_ReplacesConfiguredAuth(t *testing.T) { + c := &Client{ + config: &config.ServerConfig{ + URL: "https://upstream.example/mcp", + Headers: map[string]string{"Authorization": "Bearer INBOUND-GATEWAY"}, + }, + } + c.SetBrokeredAuth(&transport.BrokeredAuth{ + Header: "Authorization", Format: "Bearer {token}", Token: "user-1-token", + }) + + cfg := c.brokeredHTTPConfig() + if cfg.BrokeredAuth == nil || cfg.BrokeredAuth.Token != "user-1-token" { + t.Fatalf("brokered auth not threaded into transport config: %+v", cfg.BrokeredAuth) + } + + eff := transport.EffectiveHeaders(cfg.Headers, cfg.BrokeredAuth) + if eff["Authorization"] != "Bearer user-1-token" { + t.Fatalf("outbound auth = %q, want per-user token (inbound must be replaced)", eff["Authorization"]) + } +} + +// FR-016: a brokered server may carry no static headers; the headers-auth +// strategy must still be usable so the resolved credential gets injected. +func TestClient_CanUseHeadersStrategy_WithBrokeredAuthOnly(t *testing.T) { + c := &Client{config: &config.ServerConfig{URL: "https://upstream.example/mcp"}} + + if c.canUseHeadersStrategy() { + t.Fatalf("no headers and no brokered auth: headers strategy must be skipped") + } + + c.SetBrokeredAuth(&transport.BrokeredAuth{Header: "Authorization", Format: "Bearer {token}", Token: "t"}) + if !c.canUseHeadersStrategy() { + t.Fatalf("brokered auth present: headers strategy must be usable even with no static headers") + } +} + +// Spec 074 T7 (security-critical): a per-user brokered connection must be +// FAIL-CLOSED. The only permitted auth strategy is the brokered headers; on +// failure the connection must be refused — it must NEVER fall back to no-auth +// (would connect unauthenticated) or shared OAuth (would borrow another +// identity), either of which defeats per-user isolation (FR-014/FR-017). +func TestClient_BrokeredConnection_FailsClosed_OnlyHeadersStrategy(t *testing.T) { + c := &Client{config: &config.ServerConfig{URL: "https://upstream.example/mcp"}} + + // Non-brokered: the historical full fallback chain is preserved. + wantFull := []string{"headers", "no-auth", "OAuth"} + if got := strategyNames(c.httpAuthStrategies()); !reflect.DeepEqual(got, wantFull) { + t.Fatalf("non-brokered HTTP strategies = %v, want %v", got, wantFull) + } + if got := strategyNames(c.sseAuthStrategies()); !reflect.DeepEqual(got, wantFull) { + t.Fatalf("non-brokered SSE strategies = %v, want %v", got, wantFull) + } + + // Brokered: ONLY the headers strategy — no no-auth, no OAuth fallback. + c.SetBrokeredAuth(&transport.BrokeredAuth{Header: "Authorization", Format: "Bearer {token}", Token: "user-1"}) + wantBrokered := []string{"headers"} + if got := strategyNames(c.httpAuthStrategies()); !reflect.DeepEqual(got, wantBrokered) { + t.Fatalf("brokered HTTP strategies = %v, want %v (fail-closed: no no-auth/OAuth fallback)", got, wantBrokered) + } + if got := strategyNames(c.sseAuthStrategies()); !reflect.DeepEqual(got, wantBrokered) { + t.Fatalf("brokered SSE strategies = %v, want %v (fail-closed: no no-auth/OAuth fallback)", got, wantBrokered) + } +}