From 55187a073180bda1b69992cfe93e5413d0abdb66 Mon Sep 17 00:00:00 2001 From: KirtiRamchandani Date: Sun, 14 Jun 2026 06:55:05 +0000 Subject: [PATCH] Validate DCR redirect_uris with HTTPS-or-loopback policy Validate DCR redirect_uris with HTTPS-or-loopback policy Fix CI: import order and reject non-loopback redirect test Mark redirect URI scheme requirement as implemented fix(auth): always validate required redirect_uris at registration fix(auth): guard required redirect_uris for pyright and coverage --- src/mcp/server/auth/handlers/register.py | 16 +++++++++ src/mcp/server/auth/routes.py | 23 +------------ src/mcp/server/auth/validation.py | 38 ++++++++++++++++++++++ tests/interaction/_requirements.py | 7 ---- tests/interaction/auth/test_as_handlers.py | 19 ++++------- tests/server/auth/test_routes.py | 34 +++++++++++++++++-- 6 files changed, 94 insertions(+), 43 deletions(-) create mode 100644 src/mcp/server/auth/validation.py diff --git a/src/mcp/server/auth/handlers/register.py b/src/mcp/server/auth/handlers/register.py index 79eb0fb0c1..bede1c2020 100644 --- a/src/mcp/server/auth/handlers/register.py +++ b/src/mcp/server/auth/handlers/register.py @@ -12,6 +12,7 @@ from mcp.server.auth.json_response import PydanticJSONResponse from mcp.server.auth.provider import OAuthAuthorizationServerProvider, RegistrationError, RegistrationErrorCode from mcp.server.auth.settings import ClientRegistrationOptions +from mcp.server.auth.validation import validate_registered_redirect_uri from mcp.shared.auth import OAuthClientInformationFull, OAuthClientMetadata # this alias is a no-op; it's just to separate out the types exposed to the @@ -90,6 +91,21 @@ async def handle(self, request: Request) -> Response: status_code=400, ) + redirect_uris = client_metadata.redirect_uris + assert redirect_uris is not None # enforced by OAuthClientMetadata + + for redirect_uri in redirect_uris: + try: + validate_registered_redirect_uri(redirect_uri) + except ValueError as error: + return PydanticJSONResponse( + content=RegistrationErrorResponse( + error="invalid_client_metadata", + error_description=str(error), + ), + status_code=400, + ) + client_id_issued_at = int(time.time()) client_secret_expires_at = ( client_id_issued_at + self.options.client_secret_expiry_seconds diff --git a/src/mcp/server/auth/routes.py b/src/mcp/server/auth/routes.py index a72e819477..05305e8ea5 100644 --- a/src/mcp/server/auth/routes.py +++ b/src/mcp/server/auth/routes.py @@ -17,31 +17,10 @@ from mcp.server.auth.middleware.client_auth import ClientAuthenticator from mcp.server.auth.provider import OAuthAuthorizationServerProvider from mcp.server.auth.settings import ClientRegistrationOptions, RevocationOptions +from mcp.server.auth.validation import validate_issuer_url from mcp.server.streamable_http import MCP_PROTOCOL_VERSION_HEADER from mcp.shared.auth import OAuthMetadata, ProtectedResourceMetadata - -def validate_issuer_url(url: AnyHttpUrl): - """Validate that the issuer URL meets OAuth 2.0 requirements. - - Args: - url: The issuer URL to validate. - - Raises: - ValueError: If the issuer URL is invalid. - """ - - # RFC 8414 requires HTTPS, but we allow loopback/localhost HTTP for testing - if url.scheme != "https" and url.host not in ("localhost", "127.0.0.1", "[::1]"): - raise ValueError("Issuer URL must be HTTPS") - - # No fragments or query parameters allowed - if url.fragment: - raise ValueError("Issuer URL must not have a fragment") - if url.query: - raise ValueError("Issuer URL must not have a query string") - - AUTHORIZATION_PATH = "/authorize" TOKEN_PATH = "/token" REGISTRATION_PATH = "/register" diff --git a/src/mcp/server/auth/validation.py b/src/mcp/server/auth/validation.py new file mode 100644 index 0000000000..2a27dbafb6 --- /dev/null +++ b/src/mcp/server/auth/validation.py @@ -0,0 +1,38 @@ +from pydantic import AnyHttpUrl, AnyUrl + + +def validate_issuer_url(url: AnyHttpUrl): + """Validate that the issuer URL meets OAuth 2.0 requirements. + + Args: + url: The issuer URL to validate. + + Raises: + ValueError: If the issuer URL is invalid. + """ + + # RFC 8414 requires HTTPS, but we allow loopback/localhost HTTP for testing + if url.scheme != "https" and url.host not in ("localhost", "127.0.0.1", "[::1]"): + raise ValueError("Issuer URL must be HTTPS") + + # No fragments or query parameters allowed + if url.fragment: + raise ValueError("Issuer URL must not have a fragment") + if url.query: + raise ValueError("Issuer URL must not have a query string") + + +def validate_registered_redirect_uri(url: AnyUrl): + """Validate that a dynamically registered redirect URI is safe to use. + + Mirrors the HTTPS-or-loopback policy used for issuer URLs and rejects + dangerous schemes such as javascript:, data:, and file:. + """ + if url.scheme not in ("https", "http"): + raise ValueError("Redirect URI must use HTTPS or HTTP") + + if url.scheme != "https" and url.host not in ("localhost", "127.0.0.1", "[::1]"): + raise ValueError("Redirect URI must be HTTPS unless loopback") + + if url.fragment: + raise ValueError("Redirect URI must not have a fragment") diff --git a/tests/interaction/_requirements.py b/tests/interaction/_requirements.py index caed8905d0..07263efb85 100644 --- a/tests/interaction/_requirements.py +++ b/tests/interaction/_requirements.py @@ -2053,13 +2053,6 @@ def __post_init__(self) -> None: "The bundled registration endpoint accepts only redirect URIs that use HTTPS or target a loopback host." ), transports=("streamable-http",), - divergence=Divergence( - note=( - "Not enforced: the registration handler models redirect_uris as AnyUrl with no scheme or " - "host check, so http://evil.example/callback is accepted and registered. The spec's " - "localhost-or-HTTPS rule is left to the provider implementation." - ), - ), ), "hosting:auth:as:token-cache-headers": Requirement( source="sdk", diff --git a/tests/interaction/auth/test_as_handlers.py b/tests/interaction/auth/test_as_handlers.py index 5cb4e92d86..8eb6883f1e 100644 --- a/tests/interaction/auth/test_as_handlers.py +++ b/tests/interaction/auth/test_as_handlers.py @@ -279,22 +279,17 @@ async def test_authorize_with_an_unregistered_redirect_uri_is_rejected_directly( @requirement("hosting:auth:as:redirect-uri-scheme") -async def test_a_non_loopback_http_redirect_uri_is_accepted_at_registration( +async def test_a_non_loopback_http_redirect_uri_is_rejected_at_registration( as_app: tuple[httpx.AsyncClient, InMemoryAuthorizationServerProvider], ) -> None: - """A registration carrying a non-HTTPS, non-loopback redirect URI is accepted. - - The spec requires every redirect URI to be either HTTPS or a loopback host; the bundled - registration handler does not enforce this and registers `http://evil.example/callback` - successfully. See the divergence on the requirement. - """ - http, provider = as_app + """Non-loopback HTTP redirect URIs must be rejected during DCR.""" + http, _provider = as_app body = oauth_client_metadata().model_dump(mode="json", exclude_none=True) body["redirect_uris"] = ["http://evil.example/callback"] response = await http.post("/register", json=body) - assert response.status_code == 201 - info = OAuthClientInformationFull.model_validate_json(response.content) - assert [str(u) for u in (info.redirect_uris or [])] == ["http://evil.example/callback"] - assert info.client_id in provider.clients + assert response.status_code == 400 + error = response.json() + assert error["error"] == "invalid_client_metadata" + assert "unless loopback" in error["error_description"] diff --git a/tests/server/auth/test_routes.py b/tests/server/auth/test_routes.py index 3d13b5ba53..082deb4e35 100644 --- a/tests/server/auth/test_routes.py +++ b/tests/server/auth/test_routes.py @@ -1,7 +1,7 @@ import pytest -from pydantic import AnyHttpUrl +from pydantic import AnyHttpUrl, AnyUrl -from mcp.server.auth.routes import validate_issuer_url +from mcp.server.auth.validation import validate_issuer_url, validate_registered_redirect_uri def test_validate_issuer_url_https_allowed(): @@ -45,3 +45,33 @@ def test_validate_issuer_url_fragment_rejected(): def test_validate_issuer_url_query_rejected(): with pytest.raises(ValueError, match="query"): validate_issuer_url(AnyHttpUrl("https://example.com/path?q=1")) + + +@pytest.mark.parametrize( + "redirect_uri", + [ + "https://example.com/callback", + "http://localhost:8080/callback", + "http://127.0.0.1:8080/callback", + "http://[::1]:8080/callback", + ], +) +def test_validate_registered_redirect_uri_allowed(redirect_uri: str): + validate_registered_redirect_uri(AnyUrl(redirect_uri)) + + +@pytest.mark.parametrize( + "redirect_uri,message", + [ + ("javascript:alert(1)", "HTTPS or HTTP"), + ("data:text/html,", "HTTPS or HTTP"), + ("file:///etc/passwd", "HTTPS or HTTP"), + ("vbscript:msgbox(1)", "HTTPS or HTTP"), + ("ftp://attacker.example/cb", "HTTPS or HTTP"), + ("http://attacker.example/cb", "unless loopback"), + ("https://example.com/cb#frag", "fragment"), + ], +) +def test_validate_registered_redirect_uri_rejected(redirect_uri: str, message: str): + with pytest.raises(ValueError, match=message): + validate_registered_redirect_uri(AnyUrl(redirect_uri))