Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/mcp/server/auth/handlers/register.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from mcp.server.auth.json_response import PydanticJSONResponse
from mcp.server.auth.provider import OAuthAuthorizationServerProvider, RegistrationError, RegistrationErrorCode
from mcp.server.auth.settings import ClientRegistrationOptions
from mcp.server.auth.validation import validate_registered_redirect_uri
from mcp.shared.auth import OAuthClientInformationFull, OAuthClientMetadata

# this alias is a no-op; it's just to separate out the types exposed to the
Expand Down Expand Up @@ -90,6 +91,21 @@ async def handle(self, request: Request) -> Response:
status_code=400,
)

redirect_uris = client_metadata.redirect_uris
assert redirect_uris is not None # enforced by OAuthClientMetadata

for redirect_uri in redirect_uris:
try:
validate_registered_redirect_uri(redirect_uri)
except ValueError as error:
return PydanticJSONResponse(
content=RegistrationErrorResponse(
error="invalid_client_metadata",
error_description=str(error),
),
status_code=400,
)

client_id_issued_at = int(time.time())
client_secret_expires_at = (
client_id_issued_at + self.options.client_secret_expiry_seconds
Expand Down
23 changes: 1 addition & 22 deletions src/mcp/server/auth/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,10 @@
from mcp.server.auth.middleware.client_auth import ClientAuthenticator
from mcp.server.auth.provider import OAuthAuthorizationServerProvider
from mcp.server.auth.settings import ClientRegistrationOptions, RevocationOptions
from mcp.server.auth.validation import validate_issuer_url
from mcp.server.streamable_http import MCP_PROTOCOL_VERSION_HEADER
from mcp.shared.auth import OAuthMetadata, ProtectedResourceMetadata


def validate_issuer_url(url: AnyHttpUrl):
"""Validate that the issuer URL meets OAuth 2.0 requirements.

Args:
url: The issuer URL to validate.

Raises:
ValueError: If the issuer URL is invalid.
"""

# RFC 8414 requires HTTPS, but we allow loopback/localhost HTTP for testing
if url.scheme != "https" and url.host not in ("localhost", "127.0.0.1", "[::1]"):
raise ValueError("Issuer URL must be HTTPS")

# No fragments or query parameters allowed
if url.fragment:
raise ValueError("Issuer URL must not have a fragment")
if url.query:
raise ValueError("Issuer URL must not have a query string")


AUTHORIZATION_PATH = "/authorize"
TOKEN_PATH = "/token"
REGISTRATION_PATH = "/register"
Expand Down
38 changes: 38 additions & 0 deletions src/mcp/server/auth/validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from pydantic import AnyHttpUrl, AnyUrl


def validate_issuer_url(url: AnyHttpUrl):
"""Validate that the issuer URL meets OAuth 2.0 requirements.

Args:
url: The issuer URL to validate.

Raises:
ValueError: If the issuer URL is invalid.
"""

# RFC 8414 requires HTTPS, but we allow loopback/localhost HTTP for testing
if url.scheme != "https" and url.host not in ("localhost", "127.0.0.1", "[::1]"):
raise ValueError("Issuer URL must be HTTPS")

# No fragments or query parameters allowed
if url.fragment:
raise ValueError("Issuer URL must not have a fragment")
if url.query:
raise ValueError("Issuer URL must not have a query string")


def validate_registered_redirect_uri(url: AnyUrl):
"""Validate that a dynamically registered redirect URI is safe to use.

Mirrors the HTTPS-or-loopback policy used for issuer URLs and rejects
dangerous schemes such as javascript:, data:, and file:.
"""
if url.scheme not in ("https", "http"):
raise ValueError("Redirect URI must use HTTPS or HTTP")

if url.scheme != "https" and url.host not in ("localhost", "127.0.0.1", "[::1]"):
raise ValueError("Redirect URI must be HTTPS unless loopback")

if url.fragment:
raise ValueError("Redirect URI must not have a fragment")
7 changes: 0 additions & 7 deletions tests/interaction/_requirements.py
Original file line number Diff line number Diff line change
Expand Up @@ -2053,13 +2053,6 @@ def __post_init__(self) -> None:
"The bundled registration endpoint accepts only redirect URIs that use HTTPS or target a loopback host."
),
transports=("streamable-http",),
divergence=Divergence(
note=(
"Not enforced: the registration handler models redirect_uris as AnyUrl with no scheme or "
"host check, so http://evil.example/callback is accepted and registered. The spec's "
"localhost-or-HTTPS rule is left to the provider implementation."
),
),
),
"hosting:auth:as:token-cache-headers": Requirement(
source="sdk",
Expand Down
19 changes: 7 additions & 12 deletions tests/interaction/auth/test_as_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,22 +279,17 @@ async def test_authorize_with_an_unregistered_redirect_uri_is_rejected_directly(


@requirement("hosting:auth:as:redirect-uri-scheme")
async def test_a_non_loopback_http_redirect_uri_is_accepted_at_registration(
async def test_a_non_loopback_http_redirect_uri_is_rejected_at_registration(
as_app: tuple[httpx.AsyncClient, InMemoryAuthorizationServerProvider],
) -> None:
"""A registration carrying a non-HTTPS, non-loopback redirect URI is accepted.

The spec requires every redirect URI to be either HTTPS or a loopback host; the bundled
registration handler does not enforce this and registers `http://evil.example/callback`
successfully. See the divergence on the requirement.
"""
http, provider = as_app
"""Non-loopback HTTP redirect URIs must be rejected during DCR."""
http, _provider = as_app
body = oauth_client_metadata().model_dump(mode="json", exclude_none=True)
body["redirect_uris"] = ["http://evil.example/callback"]

response = await http.post("/register", json=body)

assert response.status_code == 201
info = OAuthClientInformationFull.model_validate_json(response.content)
assert [str(u) for u in (info.redirect_uris or [])] == ["http://evil.example/callback"]
assert info.client_id in provider.clients
assert response.status_code == 400
error = response.json()
assert error["error"] == "invalid_client_metadata"
assert "unless loopback" in error["error_description"]
34 changes: 32 additions & 2 deletions tests/server/auth/test_routes.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import pytest
from pydantic import AnyHttpUrl
from pydantic import AnyHttpUrl, AnyUrl

from mcp.server.auth.routes import validate_issuer_url
from mcp.server.auth.validation import validate_issuer_url, validate_registered_redirect_uri


def test_validate_issuer_url_https_allowed():
Expand Down Expand Up @@ -45,3 +45,33 @@ def test_validate_issuer_url_fragment_rejected():
def test_validate_issuer_url_query_rejected():
with pytest.raises(ValueError, match="query"):
validate_issuer_url(AnyHttpUrl("https://example.com/path?q=1"))


@pytest.mark.parametrize(
"redirect_uri",
[
"https://example.com/callback",
"http://localhost:8080/callback",
"http://127.0.0.1:8080/callback",
"http://[::1]:8080/callback",
],
)
def test_validate_registered_redirect_uri_allowed(redirect_uri: str):
validate_registered_redirect_uri(AnyUrl(redirect_uri))


@pytest.mark.parametrize(
"redirect_uri,message",
[
("javascript:alert(1)", "HTTPS or HTTP"),
("data:text/html,<script>alert(1)</script>", "HTTPS or HTTP"),
("file:///etc/passwd", "HTTPS or HTTP"),
("vbscript:msgbox(1)", "HTTPS or HTTP"),
("ftp://attacker.example/cb", "HTTPS or HTTP"),
("http://attacker.example/cb", "unless loopback"),
("https://example.com/cb#frag", "fragment"),
],
)
def test_validate_registered_redirect_uri_rejected(redirect_uri: str, message: str):
with pytest.raises(ValueError, match=message):
validate_registered_redirect_uri(AnyUrl(redirect_uri))
Loading