diff --git a/msal/authority.py b/msal/authority.py index ee16868f..7419c9e4 100644 --- a/msal/authority.py +++ b/msal/authority.py @@ -1,4 +1,5 @@ import json +import re try: from urllib.parse import urlparse except ImportError: # Fall back to Python 2 @@ -16,21 +17,82 @@ AZURE_GOV_SG = "login.sovcloud-identity.sg" WORLD_WIDE = 'login.microsoftonline.com' # There was an alias login.windows.net -WELL_KNOWN_AUTHORITY_HOSTS = frozenset([ - WORLD_WIDE, - "login.microsoft.com", - "login.windows.net", - "sts.windows.net", - DEPRECATED_AZURE_CHINA, - "login.partner.microsoftonline.cn", - "login.microsoftonline.de", # deprecated - 'login-us.microsoftonline.com', - AZURE_US_GOVERNMENT, - "login.usgovcloudapi.net", - AZURE_GOV_FR, - AZURE_GOV_DE, - AZURE_GOV_SG, - ]) + +# Sovereign-cloud sentinels. Aliases of the same cloud map to the same value +# so callers can compare clouds with simple equality. +_CLOUD_PUBLIC = "PUBLIC" +_CLOUD_CHINA = "CHINA" +_CLOUD_GERMANY = "GERMANY" +_CLOUD_US_GOV = "US_GOV" +_CLOUD_US_ALT = "US_ALT" +_CLOUD_PPE = "PPE" +_CLOUD_BLEU = "BLEU" +_CLOUD_DELOS = "DELOS" +_CLOUD_GOV_SG = "GOV_SG" + +# Single source of truth for known Microsoft authority hosts. Add an alias +# here and WELL_KNOWN_AUTHORITY_HOSTS / _KNOWN_HOST_TO_CLOUD pick it up. +_HOSTS_BY_CLOUD = { + _CLOUD_PUBLIC: ( + AZURE_PUBLIC, + "login.microsoft.com", + "login.windows.net", + "sts.windows.net", + ), + _CLOUD_CHINA: ( + "login.partner.microsoftonline.cn", + DEPRECATED_AZURE_CHINA, + ), + _CLOUD_GERMANY: ("login.microsoftonline.de",), # deprecated + _CLOUD_US_GOV: ( + AZURE_US_GOVERNMENT, + "login.usgovcloudapi.net", + ), + _CLOUD_US_ALT: ("login-us.microsoftonline.com",), + _CLOUD_BLEU: (AZURE_GOV_FR,), + _CLOUD_DELOS: (AZURE_GOV_DE,), + _CLOUD_GOV_SG: (AZURE_GOV_SG,), +} + +# Hosts that resolve to a cloud for the cross-cloud check but MUST NOT enter +# WELL_KNOWN_AUTHORITY_HOSTS (which gates instance-discovery skipping). +# - PPE: non-production. +# - ciamlogin.com: bare suffix, never a usable authority on its own; tenant +# subdomains resolve to Public via _resolve_known_cloud's regional logic. +_EXTRA_HOSTS_BY_CLOUD = { + _CLOUD_PPE: ( + "login.windows-ppe.net", + "sts.windows-ppe.net", + "login.microsoft-ppe.com", + ), + _CLOUD_PUBLIC: ("ciamlogin.com",), +} + +# Derived from _HOSTS_BY_CLOUD so a new alias cannot drift out of sync. +WELL_KNOWN_AUTHORITY_HOSTS = frozenset( + host for hosts in _HOSTS_BY_CLOUD.values() for host in hosts) + +_KNOWN_HOST_TO_CLOUD = { + host: cloud + for cloud, hosts in _HOSTS_BY_CLOUD.items() + for host in hosts +} +_KNOWN_HOST_TO_CLOUD.update({ + host: cloud + for cloud, hosts in _EXTRA_HOSTS_BY_CLOUD.items() + for host in hosts +}) + +# Catch a duplicated host at import time rather than in production. +_all_listed_hosts = [ + h for hosts in _HOSTS_BY_CLOUD.values() for h in hosts +] + [ + h for hosts in _EXTRA_HOSTS_BY_CLOUD.values() for h in hosts +] +assert len(_all_listed_hosts) == len(_KNOWN_HOST_TO_CLOUD), ( + "Duplicate host in cloud tables: {}".format( + sorted(h for h in set(_all_listed_hosts) if _all_listed_hosts.count(h) > 1))) +del _all_listed_hosts WELL_KNOWN_B2C_HOSTS = [ "b2clogin.com", @@ -41,6 +103,84 @@ ] _CIAM_DOMAIN_SUFFIX = ".ciamlogin.com" +# RFC 1035 / RFC 1123 DNS label: 1-63 chars, no leading/trailing hyphen. +# Used as a shape gate on the region prefix; not an allow-list of regions. +_REGION_PREFIX_PATTERN = re.compile(r"^[a-z0-9](?:[a-z0-9-]{0,61}[a-z0-9])?$") + + +def _resolve_known_cloud(host): + """Return the cloud sentinel for *host*, or None. + + Trust gate for the cross-cloud check (used by `_are_in_same_cloud`, + `_ensure_endpoint_same_cloud_as_authority`, and Rule 2 of + `has_valid_issuer`). Tightening here is safe; loosening here weakens + all three call sites at once. + + Matches an alias in :data:`_KNOWN_HOST_TO_CLOUD` or a ``{region}.{alias}`` + sub-host where ``{region}`` matches :data:`_REGION_PREFIX_PATTERN`. + Hosts are lowercased here, so callers may pass either the raw + ``urlparse(...).hostname`` (already lowercased per RFC 3986) or any + untrusted string. + """ + if not host: + return None + host = host.lower() + cloud = _KNOWN_HOST_TO_CLOUD.get(host) + if cloud is not None: + return cloud + dot = host.find(".") + if dot <= 0: + return None + prefix = host[:dot] + base = host[dot + 1:] + if _REGION_PREFIX_PATTERN.match(prefix) and base in _KNOWN_HOST_TO_CLOUD: + return _KNOWN_HOST_TO_CLOUD[base] + return None + + +def _are_in_same_cloud(host_a, host_b): + """Default-deny: True iff both hosts resolve to the same known cloud.""" + cloud_a = _resolve_known_cloud(host_a) + if cloud_a is None: + return False + cloud_b = _resolve_known_cloud(host_b) + if cloud_b is None: + return False + return cloud_a == cloud_b + + +def _ensure_endpoint_same_cloud_as_authority( + authority_url, endpoint_url, endpoint_name): + """Reject an OIDC discovery endpoint that crosses sovereign clouds. + + No-op when *authority_url* is a custom domain (custom OIDC IdPs are + unconstrained) or when *endpoint_url* is empty / not absolute. Raises + :class:`ValueError` naming the authority, endpoint kind, and offending + URL; no tokens or secrets are surfaced. + """ + if not endpoint_url: + return + endpoint_parsed = urlparse(endpoint_url) + if not endpoint_parsed.scheme or not endpoint_parsed.hostname: + return # Let downstream parsing surface a non-absolute URL + authority_host = urlparse(authority_url).hostname if authority_url else None + authority_cloud = _resolve_known_cloud(authority_host) + if authority_cloud is None: + return + endpoint_cloud = _resolve_known_cloud(endpoint_parsed.hostname) + if endpoint_cloud is None or endpoint_cloud != authority_cloud: + raise ValueError( + "OIDC discovery for authority '{authority}' returned a " + "{name} '{endpoint}' whose host is not in the same Microsoft " + "sovereign cloud as the authority. MSAL refused to use that " + "endpoint. Verify the OIDC discovery endpoint is not being " + "intercepted and that the configured authority points at the " + "correct sovereign cloud.".format( + authority=authority_url, + name=endpoint_name, + endpoint=endpoint_url, + )) + def _get_instance_discovery_host(instance): return instance if instance in WELL_KNOWN_AUTHORITY_HOSTS else WORLD_WIDE @@ -118,16 +258,29 @@ def __init__( self.device_authorization_endpoint = openid_config.get('device_authorization_endpoint') _, _, self.tenant = canonicalize(self.token_endpoint) # Usually a GUID - # Validate the issuer if using OIDC authority - if self._oidc_authority_url and not self.has_valid_issuer(): - raise ValueError(( - "The issuer '{iss}' does not match the authority '{auth}' or a known pattern. " - "When using the 'oidc_authority' parameter in ClientApplication, the authority " - "will be validated against the issuer from {auth}/.well-known/openid-configuration ." - "If using a known Entra authority (e.g. login.microsoftonline.com) the " - "'authority' parameter should be used instead of 'oidc_authority'. " - "" - ).format(iss=self._issuer, auth=oidc_authority_url)) + # Validate the issuer and enforce same-cloud endpoints (OIDC only). + # See #5927 for the cross-cloud hardening. + if self._oidc_authority_url: + if not self.has_valid_issuer(): + raise ValueError(( + "The issuer '{iss}' does not match the authority '{auth}' or a known pattern. " + "When using the 'oidc_authority' parameter in ClientApplication, the authority " + "will be validated against the issuer from {auth}/.well-known/openid-configuration ." + "If using a known Entra authority (e.g. login.microsoftonline.com) the " + "'authority' parameter should be used instead of 'oidc_authority'. " + "" + ).format(iss=self._issuer, auth=self._oidc_authority_url)) + _ensure_endpoint_same_cloud_as_authority( + self._oidc_authority_url, self.token_endpoint, "token_endpoint") + _ensure_endpoint_same_cloud_as_authority( + self._oidc_authority_url, self.authorization_endpoint, + "authorization_endpoint") + if self.device_authorization_endpoint: + _ensure_endpoint_same_cloud_as_authority( + self._oidc_authority_url, + self.device_authorization_endpoint, + "device_authorization_endpoint") + def _initialize_oidc_authority(self, oidc_authority_url): authority, self.instance, tenant = canonicalize(oidc_authority_url) self.is_adfs = tenant.lower() == 'adfs' # As a convention @@ -201,58 +354,93 @@ def user_realm_discovery(self, username, correlation_id=None, response=None): return {} # This can guide the caller to fall back normal ROPC flow def has_valid_issuer(self): - """ - Returns True if the issuer from OIDC discovery is valid for this authority. - - An issuer is valid if one of the following is true: - - It exactly matches the authority URL (with/without trailing slash) - - It has the same scheme and host as the authority (path can be different) - - The issuer host is a well-known Microsoft authority host - - The issuer host is a regional variant of a well-known host (e.g., westus2.login.microsoft.com) - - For CIAM, hosts that end with well-known B2C hosts (e.g., tenant.b2clogin.com) are accepted as valid issuers + """True if the OIDC issuer is valid for this authority. + + Steps below are evaluated in this order; the bracketed labels are + the historical rule names retained for cross-reference with the + MSAL.NET port (#5927). Order is security-sensitive. + + Step 1 [Case 1]: Exact match. + Step 2 [Case 4]: Same scheme + netloc (paths may differ). + Step 3 [Rule 3]: CIAM tenant pattern (cross-host only). Must run + before Step 4 so a ``.ciamlogin.com`` issuer cannot bypass + tenant matching via Rule 2b (CIAM resolves to Public). + Step 4 [Rule 2]: Same Microsoft cloud. 2a accepts any known-MS + issuer under a custom-domain authority (#5927 federation); + 2b accepts a known-MS issuer under a known-MS authority only + when the two clouds are identical. + Step 5 [Case 3b]: Region-shaped prefix on the authority host. + Step 6 [Case 5]: B2C subdomain (excluding ``.ciamlogin.com``, + handled by Step 3). """ if not self._issuer or not self._oidc_authority_url: return False - # Case 1: Exact match (most common case, normalized for trailing slashes) + # Step 1 [Case 1]: exact match (trailing slash insensitive) if self._issuer.rstrip("/") == self._oidc_authority_url.rstrip("/"): return True issuer_parsed = urlparse(self._issuer) authority_parsed = urlparse(self._oidc_authority_url) issuer_host = issuer_parsed.hostname.lower() if issuer_parsed.hostname else None + authority_host = ( + authority_parsed.hostname.lower() if authority_parsed.hostname else "") if not issuer_host: return False - - # Case 2: Issuer is from a trusted Microsoft host - O(1) lookup - if issuer_host in WELL_KNOWN_AUTHORITY_HOSTS: + + # Step 2 [Case 4]: same scheme + host. Runs before Step 3 so a CIAM + # authority/issuer pair on the same host (different paths) passes. + if (authority_parsed.scheme == issuer_parsed.scheme and + authority_parsed.netloc == issuer_parsed.netloc): return True - # Case 3: Regional variant check - O(1) lookup - # e.g., westus2.login.microsoft.com -> extract "login.microsoft.com" + # Step 3 [Rule 3]: cross-host CIAM issuer. Tenant must match + # authority's first path segment (or first hostname label). Must run + # before Step 4 to block the Rule 2b CIAM bypass. + if issuer_host.endswith(_CIAM_DOMAIN_SUFFIX): + issuer_tenant = issuer_host[:-len(_CIAM_DOMAIN_SUFFIX)] + auth_path_parts = [p for p in authority_parsed.path.split("/") if p] + if auth_path_parts: + authority_tenant = auth_path_parts[0].lower() + else: + authority_tenant = authority_host.split(".", 1)[0] + if issuer_tenant and issuer_tenant == authority_tenant: + normalized_issuer_path = issuer_parsed.path.rstrip("/").lower() + if normalized_issuer_path in ( + "", + "/" + issuer_tenant, + "/" + issuer_tenant + "/v2.0"): + return True + return False # Tenant mismatch: reject. + + # Step 4 [Rule 2]: known Microsoft issuer over HTTPS. + # 2a: custom-domain authority -> accept (#5927 federation). + # 2b: known-MS authority -> accept only if same cloud. + issuer_cloud = _resolve_known_cloud(issuer_host) + if issuer_cloud is not None and issuer_parsed.scheme == "https": + authority_cloud = _resolve_known_cloud(authority_host) + if authority_cloud is None: + return True # 2a + if authority_cloud == issuer_cloud: + return True # 2b + # Cross-cloud: fall through to reject. + + # Step 5 [Case 3b]: region-shaped prefix on the authority host + # (e.g. issuer=us.someweb.com, authority=someweb.com). dot_index = issuer_host.find(".") if dot_index > 0: + prefix = issuer_host[:dot_index] potential_base = issuer_host[dot_index + 1:] - if "." not in issuer_host[:dot_index]: - # 3a: Base host is a trusted Microsoft host - if potential_base in WELL_KNOWN_AUTHORITY_HOSTS: - return True - # 3b: Issuer has a region prefix on the authority host - # e.g. issuer=us.someweb.com, authority=someweb.com - authority_host = authority_parsed.hostname.lower() if authority_parsed.hostname else "" - if potential_base == authority_host: - return True + if (_REGION_PREFIX_PATTERN.match(prefix) + and potential_base == authority_host): + return True - # Case 4: Same scheme and host (path can differ) - if (authority_parsed.scheme == issuer_parsed.scheme and - authority_parsed.netloc == issuer_parsed.netloc): - return True - - # Case 5: Check if issuer host is a subdomain of a well-known B2C host - # e.g., tenant.b2clogin.com matches .b2clogin.com - # but fakeb2clogin.com does not - if any(issuer_host.endswith("." + h) for h in WELL_KNOWN_B2C_HOSTS): + # Step 6 [Case 5]: B2C subdomain. .ciamlogin.com handled by Step 3. + if any( + issuer_host.endswith("." + h) + for h in WELL_KNOWN_B2C_HOSTS + if h != "ciamlogin.com"): return True return False diff --git a/tests/test_authority.py b/tests/test_authority.py index 2f9d1605..4f22ac16 100644 --- a/tests/test_authority.py +++ b/tests/test_authority.py @@ -8,7 +8,7 @@ from msal.authority import * from msal.authority import _CIAM_DOMAIN_SUFFIX from tests import unittest -from tests.http_client import MinimalHttpClient +from tests.http_client import MinimalHttpClient, RecordingHttpClient @unittest.skipIf(os.getenv("TRAVIS_TAG"), "Skip network io during tagged release") @@ -693,13 +693,19 @@ def test_b2c_us_gov_issuer_host(self, tenant_discovery_mock): "Issuer ending with b2clogin.us should be valid") @patch("msal.authority.tenant_discovery") - def test_ciam_issuer_host_via_b2c_check(self, tenant_discovery_mock): - """Test issuer from ciamlogin.com host is accepted via B2C check""" + def test_ciam_issuer_with_tenant_mismatch_should_be_rejected(self, tenant_discovery_mock): + """CIAM-shaped issuer whose subdomain tenant does not match the + authority's first path segment must be rejected (Rule 3 tenant match).""" authority_url = "https://custom-domain.com/tenant" issuer = "https://mytenant.ciamlogin.com/tenant" - authority = self._create_authority_with_issuer(authority_url, issuer, tenant_discovery_mock) - self.assertTrue(authority.has_valid_issuer(), - "Issuer ending with ciamlogin.com should be valid") + tenant_discovery_mock.return_value = { + "authorization_endpoint": "https://example.com/oauth2/authorize", + "token_endpoint": "https://example.com/oauth2/token", + "issuer": issuer, + } + with self.assertRaises(ValueError) as context: + Authority(None, self.http_client, oidc_authority_url=authority_url) + self.assertIn("issuer", str(context.exception).lower()) # Domain spoofing prevention tests @patch("msal.authority.tenant_discovery") @@ -763,3 +769,865 @@ def test_valid_b2c_subdomain_should_be_accepted(self, tenant_discovery_mock): self.assertTrue(authority.has_valid_issuer(), "Legitimate subdomain of b2clogin.com should be valid") + + +# ----------------------------------------------------------------------------- +# Cross-cloud OIDC discovery hardening (mirrors MSAL.NET 4.84.0 fix #5927). +# Categories: B same-cloud aliases, C #5927 federation, D regional same-cloud, +# E CIAM tenant pattern, F cross-cloud rejection (THE FIX), H helper, +# I endpoint check end-to-end, J behavior matrix. +# ----------------------------------------------------------------------------- + +from msal.authority import ( + _are_in_same_cloud, + _resolve_known_cloud, + _ensure_endpoint_same_cloud_as_authority, +) + +# Stable wording in the cross-cloud error; we assert on this rather than an +# error-code constant, matching the rest of MSAL Python. +_XCLOUD_ERROR_SUBSTRING = "sovereign cloud" + + +_PUBLIC = "login.microsoftonline.com" +_PUBLIC_ALIASES = ( + "login.microsoftonline.com", + "login.windows.net", + "login.microsoft.com", + "sts.windows.net", +) +_CHINA = "login.partner.microsoftonline.cn" +_CHINA_ALIASES = ( + "login.partner.microsoftonline.cn", + "login.chinacloudapi.cn", +) +_US_GOV = "login.microsoftonline.us" +_US_GOV_ALIASES = ( + "login.microsoftonline.us", + "login.usgovcloudapi.net", +) + + +def _mock_oidc(tenant_discovery_mock, *, issuer, authorization_endpoint=None, + token_endpoint=None): + tenant_discovery_mock.return_value = { + "authorization_endpoint": authorization_endpoint + or "https://example.com/oauth2/authorize", + "token_endpoint": token_endpoint + or "https://example.com/oauth2/token", + "issuer": issuer, + } + + +# --- (B) Rule 2 - same-cloud combinations across every alias ----------------- + +class TestIssuerValidationSameCloudAliases(unittest.TestCase): + """Authority is a known Microsoft host; issuer is a different alias of the + SAME sovereign cloud. Must accept (Rule 2b).""" + + def setUp(self): + self.http_client = MinimalHttpClient() + + def _check_pass(self, authority_host, issuer_host, tenant_discovery_mock): + authority_url = "https://{}/tenant".format(authority_host) + issuer = "https://{}/tenant".format(issuer_host) + # Same-cloud endpoints so the cross-cloud endpoint check is a no-op. + _mock_oidc( + tenant_discovery_mock, + issuer=issuer, + authorization_endpoint="https://{}/tenant/oauth2/v2.0/authorize".format(authority_host), + token_endpoint="https://{}/tenant/oauth2/v2.0/token".format(authority_host), + ) + a = Authority(None, self.http_client, oidc_authority_url=authority_url) + self.assertTrue(a.has_valid_issuer(), + "Issuer '{}' should be accepted under same-cloud authority '{}'".format( + issuer, authority_url)) + + @patch("msal.authority.tenant_discovery") + def test_public_aliases_pairwise(self, tenant_discovery_mock): + for issuer_host in _PUBLIC_ALIASES: + if issuer_host == _PUBLIC: + continue + self._check_pass(_PUBLIC, issuer_host, tenant_discovery_mock) + + @patch("msal.authority.tenant_discovery") + def test_china_aliases_pairwise(self, tenant_discovery_mock): + for issuer_host in _CHINA_ALIASES: + if issuer_host == _CHINA: + continue + self._check_pass(_CHINA, issuer_host, tenant_discovery_mock) + + @patch("msal.authority.tenant_discovery") + def test_us_gov_aliases_pairwise(self, tenant_discovery_mock): + for issuer_host in _US_GOV_ALIASES: + if issuer_host == _US_GOV: + continue + self._check_pass(_US_GOV, issuer_host, tenant_discovery_mock) + + +# --- (C) Rule 2a - custom-domain federation (#5927 regression) --------------- + +class TestIssuerValidationCustomDomainFederation(unittest.TestCase): + """Custom-domain authority + ANY known-MS issuer in any cloud must pass. + Regression coverage for issue 5927 (parentpay.com-style federations).""" + + def setUp(self): + self.http_client = MinimalHttpClient() + + @patch("msal.authority.tenant_discovery") + def test_parentpay_style_public_cloud(self, tenant_discovery_mock): + authority_url = "https://clientlogin.test.parentpay.com/tid/v2.0" + issuer = "https://login.microsoftonline.com/tid/v2.0" + _mock_oidc(tenant_discovery_mock, issuer=issuer) + a = Authority(None, self.http_client, oidc_authority_url=authority_url) + self.assertTrue(a.has_valid_issuer()) + + @patch("msal.authority.tenant_discovery") + def test_custom_domain_with_us_gov_issuer(self, tenant_discovery_mock): + authority_url = "https://customlogin.example.com/tenant" + issuer = "https://login.microsoftonline.us/tenant" + _mock_oidc(tenant_discovery_mock, issuer=issuer) + a = Authority(None, self.http_client, oidc_authority_url=authority_url) + self.assertTrue(a.has_valid_issuer()) + + @patch("msal.authority.tenant_discovery") + def test_custom_domain_with_china_issuer(self, tenant_discovery_mock): + authority_url = "https://idp.contoso.com/tenant" + issuer = "https://login.partner.microsoftonline.cn/tenant" + _mock_oidc(tenant_discovery_mock, issuer=issuer) + a = Authority(None, self.http_client, oidc_authority_url=authority_url) + self.assertTrue(a.has_valid_issuer()) + + +# --- (D) Rule 3 - regional same-cloud ---------------------------------------- + +class TestIssuerValidationRegionalSameCloud(unittest.TestCase): + """Authority is a known Microsoft host; issuer is a regional sub-host of + the SAME cloud. Must accept.""" + + def setUp(self): + self.http_client = MinimalHttpClient() + + @patch("msal.authority.tenant_discovery") + def test_regional_public(self, tenant_discovery_mock): + authority_url = "https://login.microsoftonline.com/tenant" + issuer = "https://westus2.login.microsoft.com/tenant" + _mock_oidc( + tenant_discovery_mock, issuer=issuer, + authorization_endpoint="https://login.microsoftonline.com/tenant/oauth2/v2.0/authorize", + token_endpoint="https://login.microsoftonline.com/tenant/oauth2/v2.0/token", + ) + a = Authority(None, self.http_client, oidc_authority_url=authority_url) + self.assertTrue(a.has_valid_issuer()) + + @patch("msal.authority.tenant_discovery") + def test_regional_china(self, tenant_discovery_mock): + authority_url = "https://login.partner.microsoftonline.cn/tenant" + issuer = "https://chinaeast2.login.chinacloudapi.cn/tenant" + _mock_oidc( + tenant_discovery_mock, issuer=issuer, + authorization_endpoint="https://login.partner.microsoftonline.cn/tenant/oauth2/v2.0/authorize", + token_endpoint="https://login.partner.microsoftonline.cn/tenant/oauth2/v2.0/token", + ) + a = Authority(None, self.http_client, oidc_authority_url=authority_url) + self.assertTrue(a.has_valid_issuer()) + + @patch("msal.authority.tenant_discovery") + def test_regional_sovereign_de(self, tenant_discovery_mock): + authority_url = "https://login.sovcloud-identity.de/tenant" + issuer = "https://germanywestcentral.login.sovcloud-identity.de/tenant" + _mock_oidc( + tenant_discovery_mock, issuer=issuer, + authorization_endpoint="https://login.sovcloud-identity.de/tenant/oauth2/v2.0/authorize", + token_endpoint="https://login.sovcloud-identity.de/tenant/oauth2/v2.0/token", + ) + a = Authority(None, self.http_client, oidc_authority_url=authority_url) + self.assertTrue(a.has_valid_issuer()) + + +# --- (E) Rule 3 - CIAM tenant pattern (additional) --------------------------- + +class TestIssuerValidationCiamTenantPattern(unittest.TestCase): + + def setUp(self): + self.http_client = MinimalHttpClient() + + @patch("msal.authority.tenant_discovery") + def test_ciam_with_v2_path_segment(self, tenant_discovery_mock): + """https://.ciamlogin.com//v2.0 should pass when the + authority's first path segment matches .""" + authority_url = "https://customdomain.com/contoso" + issuer = "https://contoso.ciamlogin.com/contoso/v2.0" + _mock_oidc(tenant_discovery_mock, issuer=issuer) + a = Authority(None, self.http_client, oidc_authority_url=authority_url) + self.assertTrue(a.has_valid_issuer()) + + @patch("msal.authority.tenant_discovery") + def test_ciam_pathless_authority_uses_first_label_as_tenant( + self, tenant_discovery_mock): + # Authority has no path -> tenant is the first hostname label + authority_url = "https://contoso.ciamlogin.com/contoso.onmicrosoft.com" + # Path-less authority is canonicalized differently; this test uses a + # CIAM authority where the tenant comes from path segment 1. + issuer = "https://contoso.ciamlogin.com/contoso.onmicrosoft.com/v2.0" + # CIAM is Public; mock endpoints on the CIAM authority host so the + # issuer-rule branch is what we exercise (not the endpoint guard). + _mock_oidc( + tenant_discovery_mock, + issuer=issuer, + authorization_endpoint="https://contoso.ciamlogin.com/contoso.onmicrosoft.com/oauth2/v2.0/authorize", + token_endpoint="https://contoso.ciamlogin.com/contoso.onmicrosoft.com/oauth2/v2.0/token", + ) + a = Authority(None, self.http_client, oidc_authority_url=authority_url) + self.assertTrue(a.has_valid_issuer()) + + +# --- (F) Cross-cloud (THE FIX) - must throw --------------------------------- + +class TestIssuerValidationCrossCloudRejection(unittest.TestCase): + """Authority is in cloud A, issuer is in cloud B (A != B). + Must reject. This is the security fix.""" + + def setUp(self): + self.http_client = MinimalHttpClient() + + def _expect_reject(self, authority_url, issuer, tenant_discovery_mock): + # Match endpoints to authority cloud so the issuer mismatch is the + # one that fires (we test endpoint-cloud mismatch separately). + authority_host = urlparse(authority_url).hostname + _mock_oidc( + tenant_discovery_mock, issuer=issuer, + authorization_endpoint="https://{}/tenant/oauth2/v2.0/authorize".format(authority_host), + token_endpoint="https://{}/tenant/oauth2/v2.0/token".format(authority_host), + ) + with self.assertRaises(ValueError) as ctx: + Authority(None, self.http_client, oidc_authority_url=authority_url) + self.assertIn("issuer", str(ctx.exception).lower()) + + @patch("msal.authority.tenant_discovery") + def test_public_authority_china_issuer(self, tenant_discovery_mock): + self._expect_reject( + "https://login.microsoftonline.com/tenant", + "https://login.partner.microsoftonline.cn/tenant", + tenant_discovery_mock) + + @patch("msal.authority.tenant_discovery") + def test_public_authority_us_gov_issuer(self, tenant_discovery_mock): + self._expect_reject( + "https://login.microsoftonline.com/tenant", + "https://login.microsoftonline.us/tenant", + tenant_discovery_mock) + + @patch("msal.authority.tenant_discovery") + def test_us_gov_authority_public_issuer(self, tenant_discovery_mock): + self._expect_reject( + "https://login.microsoftonline.us/tenant", + "https://login.microsoftonline.com/tenant", + tenant_discovery_mock) + + @patch("msal.authority.tenant_discovery") + def test_china_authority_public_issuer(self, tenant_discovery_mock): + self._expect_reject( + "https://login.partner.microsoftonline.cn/tenant", + "https://login.microsoftonline.com/tenant", + tenant_discovery_mock) + + @patch("msal.authority.tenant_discovery") + def test_public_authority_regional_china_issuer(self, tenant_discovery_mock): + self._expect_reject( + "https://login.microsoftonline.com/tenant", + "https://chinaeast2.login.chinacloudapi.cn/tenant", + tenant_discovery_mock) + + @patch("msal.authority.tenant_discovery") + def test_us_gov_authority_regional_public_issuer(self, tenant_discovery_mock): + self._expect_reject( + "https://login.microsoftonline.us/tenant", + "https://westus2.login.microsoft.com/tenant", + tenant_discovery_mock) + + @patch("msal.authority.tenant_discovery") + def test_bleu_authority_delos_issuer_sovereign_to_sovereign( + self, tenant_discovery_mock): + self._expect_reject( + "https://login.sovcloud-identity.fr/tenant", + "https://login.sovcloud-identity.de/tenant", + tenant_discovery_mock) + + +# --- (G) Other rejections (multi-label region prefix, http scheme, etc.) ----- + +class TestIssuerValidationOtherRejections(unittest.TestCase): + + def setUp(self): + self.http_client = MinimalHttpClient() + + @patch("msal.authority.tenant_discovery") + def test_http_scheme_issuer_under_known_authority_is_rejected( + self, tenant_discovery_mock): + # http:// under a known-MS authority must be rejected: Rule 2 + # requires HTTPS so an MITM cannot mint a known-MS-looking issuer. + authority_url = "https://login.microsoftonline.com/tenant" + issuer = "http://login.microsoftonline.com/tenant" + _mock_oidc( + tenant_discovery_mock, issuer=issuer, + authorization_endpoint="https://login.microsoftonline.com/tenant/oauth2/v2.0/authorize", + token_endpoint="https://login.microsoftonline.com/tenant/oauth2/v2.0/token", + ) + with self.assertRaises(ValueError): + Authority(None, self.http_client, oidc_authority_url=authority_url) + + @patch("msal.authority.tenant_discovery") + def test_multi_label_regional_prefix_is_rejected(self, tenant_discovery_mock): + # attacker.evil.login.microsoft.com has TWO labels before the alias + # "login.microsoft.com", so it is not a valid regional sub-host. + authority_url = "https://customlogin.example.com/tenant" + issuer = "https://attacker.evil.login.microsoft.com/tenant" + _mock_oidc(tenant_discovery_mock, issuer=issuer) + with self.assertRaises(ValueError): + Authority(None, self.http_client, oidc_authority_url=authority_url) + + +# --- (H) _are_in_same_cloud helper - direct ---------------------------------- + +class TestAreInSameCloudHelper(unittest.TestCase): + """Direct test of the cloud-resolution helper.""" + + # PASS: same cloud + def test_public_aliases_are_same_cloud(self): + self.assertTrue(_are_in_same_cloud( + "login.microsoftonline.com", "login.windows.net")) + + def test_public_with_regional_public(self): + self.assertTrue(_are_in_same_cloud( + "login.microsoftonline.com", "westus2.login.microsoft.com")) + + def test_us_gov_aliases_are_same_cloud(self): + self.assertTrue(_are_in_same_cloud( + "login.microsoftonline.us", "login.usgovcloudapi.net")) + + def test_china_with_regional_china(self): + self.assertTrue(_are_in_same_cloud( + "login.partner.microsoftonline.cn", + "chinaeast2.login.chinacloudapi.cn")) + + def test_bleu_with_regional_bleu(self): + self.assertTrue(_are_in_same_cloud( + "login.sovcloud-identity.fr", + "francecentral.login.sovcloud-identity.fr")) + + # REJECT: different / unknown + def test_public_vs_china_different_cloud(self): + self.assertFalse(_are_in_same_cloud( + "login.microsoftonline.com", "login.partner.microsoftonline.cn")) + + def test_public_vs_us_gov_different_cloud(self): + self.assertFalse(_are_in_same_cloud( + "login.microsoftonline.com", "login.microsoftonline.us")) + + def test_us_gov_vs_china(self): + self.assertFalse(_are_in_same_cloud( + "login.microsoftonline.us", "login.chinacloudapi.cn")) + + def test_public_vs_germany_legacy(self): + self.assertFalse(_are_in_same_cloud( + "login.microsoftonline.com", "login.microsoftonline.de")) + + def test_bleu_vs_delos(self): + self.assertFalse(_are_in_same_cloud( + "login.sovcloud-identity.fr", "login.sovcloud-identity.de")) + + def test_ppe_vs_public(self): + self.assertFalse(_are_in_same_cloud( + "login.windows-ppe.net", "login.microsoftonline.com")) + + def test_known_vs_custom(self): + self.assertFalse(_are_in_same_cloud( + "login.microsoftonline.com", "custom.example.com")) + + def test_two_custom_hosts(self): + self.assertFalse(_are_in_same_cloud( + "custom.example.com", "another.example.org")) + + def test_custom_vs_none(self): + self.assertFalse(_are_in_same_cloud("custom.example.com", None)) + + def test_none_vs_known(self): + self.assertFalse(_are_in_same_cloud(None, "login.microsoftonline.com")) + + # REJECT: regional-shape failures (base is Public; prefix should NOT match) + def test_multi_label_prefix_rejected(self): + self.assertFalse(_are_in_same_cloud( + "attacker.evil.login.microsoft.com", "login.microsoftonline.com")) + + def test_underscore_prefix_rejected(self): + self.assertFalse(_are_in_same_cloud( + "weird_prefix.login.microsoft.com", "login.microsoftonline.com")) + + def test_uppercase_prefix_normalized(self): + # _resolve_known_cloud lowercases the host, so PREFIX -> prefix + # matches the regex and login.microsoft.com resolves to Public. + # Live SDK Uri.Host is already lowercased; this is synthetic. + self.assertTrue(_are_in_same_cloud( + "PREFIX.login.microsoft.com", "login.microsoftonline.com")) + + def test_leading_hyphen_prefix_rejected(self): + self.assertFalse(_are_in_same_cloud( + "-leading.login.microsoft.com", "login.microsoftonline.com")) + + def test_trailing_hyphen_prefix_rejected(self): + self.assertFalse(_are_in_same_cloud( + "trailing-.login.microsoft.com", "login.microsoftonline.com")) + + def test_64_char_prefix_rejected(self): + long_prefix = "a" * 64 # one over the 63-char DNS-label cap + self.assertFalse(_are_in_same_cloud( + "{}.login.microsoft.com".format(long_prefix), + "login.microsoftonline.com")) + + # ACCEPT: real Azure region prefixes - must continue to work + def test_real_region_westus2(self): + self.assertTrue(_are_in_same_cloud( + "westus2.login.microsoft.com", "login.microsoftonline.com")) + + def test_real_region_eastus2euap(self): + self.assertTrue(_are_in_same_cloud( + "eastus2euap.login.microsoft.com", "login.microsoftonline.com")) + + def test_real_region_chinaeast2(self): + self.assertTrue(_are_in_same_cloud( + "chinaeast2.login.chinacloudapi.cn", + "login.partner.microsoftonline.cn")) + + def test_real_region_usgovvirginia(self): + self.assertTrue(_are_in_same_cloud( + "usgovvirginia.login.microsoftonline.us", + "login.usgovcloudapi.net")) + + def test_real_region_francecentral(self): + self.assertTrue(_are_in_same_cloud( + "francecentral.login.sovcloud-identity.fr", + "login.sovcloud-identity.fr")) + + def test_real_region_southafricanorth(self): + self.assertTrue(_are_in_same_cloud( + "southafricanorth.login.microsoft.com", + "login.microsoftonline.com")) + + # CIAM is registered as Public via _EXTRA_HOSTS_BY_CLOUD so the + # cross-cloud endpoint guard covers it. Make that explicit here. + def test_ciam_subdomain_resolves_to_public(self): + self.assertEqual( + _resolve_known_cloud("contoso.ciamlogin.com"), "PUBLIC") + self.assertTrue(_are_in_same_cloud( + "contoso.ciamlogin.com", "login.microsoftonline.com")) + + def test_ciam_subdomain_vs_china_is_different_cloud(self): + self.assertFalse(_are_in_same_cloud( + "contoso.ciamlogin.com", "login.partner.microsoftonline.cn")) + + def test_well_known_authority_hosts_excludes_ciam(self): + # CIAM lives in _EXTRA_HOSTS_BY_CLOUD specifically so it does NOT + # leak into WELL_KNOWN_AUTHORITY_HOSTS, which gates + # _get_instance_discovery_host. Regression guard. + from msal.authority import WELL_KNOWN_AUTHORITY_HOSTS + self.assertNotIn("ciamlogin.com", WELL_KNOWN_AUTHORITY_HOSTS) + self.assertNotIn("contoso.ciamlogin.com", WELL_KNOWN_AUTHORITY_HOSTS) + + +# --- (I) End-to-end: endpoint same-cloud check (mocked HTTP) ----------------- + +class TestEndpointCrossCloudCheck(unittest.TestCase): + """End-to-end coverage: cross-cloud endpoints are refused before any + token POST; custom OIDC IdPs remain unconstrained.""" + + def setUp(self): + self.http_client = MinimalHttpClient() + + @patch("msal.authority.tenant_discovery") + def test_cross_cloud_token_endpoint_is_refused(self, tenant_discovery_mock): + """Public-cloud authority + China-cloud token_endpoint must throw + the cross-cloud error before any token POST happens.""" + authority_url = "https://login.microsoftonline.com/contoso" + # Issuer matches authority (Rule 1) so issuer validation passes; + # the failure must come from the endpoint check. + tenant_discovery_mock.return_value = { + "issuer": authority_url, + "authorization_endpoint": + "https://login.microsoftonline.com/contoso/oauth2/v2.0/authorize", + "token_endpoint": + "https://login.partner.microsoftonline.cn/contoso/oauth2/v2.0/token", + } + # Use a recording client so we can assert no POST occurred. + client = RecordingHttpClient() + with self.assertRaises(ValueError) as ctx: + Authority(None, client, oidc_authority_url=authority_url) + message = str(ctx.exception) + self.assertIn(_XCLOUD_ERROR_SUBSTRING, message) + self.assertIn( + "https://login.partner.microsoftonline.cn/contoso/oauth2/v2.0/token", + message) + # Authority construction does not POST. The credentials POST that the + # token-acquisition path would have issued must NOT have happened. + self.assertEqual(client.post_calls, []) + + @patch("msal.authority.tenant_discovery") + def test_custom_endpoint_under_known_authority_is_refused( + self, tenant_discovery_mock): + """Public-cloud authority + attacker-domain token_endpoint must throw.""" + authority_url = "https://login.microsoftonline.com/contoso" + tenant_discovery_mock.return_value = { + "issuer": authority_url, + "authorization_endpoint": + "https://login.microsoftonline.com/contoso/oauth2/v2.0/authorize", + "token_endpoint": + "https://attacker.example.com/oauth2/v2.0/token", + } + with self.assertRaises(ValueError) as ctx: + Authority(None, self.http_client, oidc_authority_url=authority_url) + message = str(ctx.exception) + self.assertIn(_XCLOUD_ERROR_SUBSTRING, message) + self.assertIn("https://attacker.example.com/oauth2/v2.0/token", message) + + @patch("msal.authority.tenant_discovery") + def test_cross_cloud_authorization_endpoint_is_refused( + self, tenant_discovery_mock): + """authorization_endpoint also enforced (token_endpoint is OK here).""" + authority_url = "https://login.microsoftonline.com/contoso" + tenant_discovery_mock.return_value = { + "issuer": authority_url, + "authorization_endpoint": + "https://login.partner.microsoftonline.cn/contoso/oauth2/v2.0/authorize", + "token_endpoint": + "https://login.microsoftonline.com/contoso/oauth2/v2.0/token", + } + with self.assertRaises(ValueError) as ctx: + Authority(None, self.http_client, oidc_authority_url=authority_url) + self.assertIn(_XCLOUD_ERROR_SUBSTRING, str(ctx.exception)) + self.assertIn("authorization_endpoint", str(ctx.exception)) + + @patch("msal.authority.tenant_discovery") + def test_custom_idp_endpoint_is_allowed(self, tenant_discovery_mock): + """Custom OIDC IdP authority: endpoint check is a no-op even on an + unrelated host. Per-test unique host to isolate from any future + OIDC-discovery caching.""" + authority_url = "https://idp.allowed-custom-{}.example.com/tenant".format( + id(self)) + tenant_discovery_mock.return_value = { + "issuer": authority_url, + "authorization_endpoint": + "https://idp.allowed-custom-{}.example.com/connect/authorize".format(id(self)), + "token_endpoint": + "https://idp.allowed-custom-{}.example.com/connect/token".format(id(self)), + } + # Should construct without raising the cross-cloud error. + a = Authority(None, self.http_client, oidc_authority_url=authority_url) + self.assertEqual( + a.token_endpoint, + "https://idp.allowed-custom-{}.example.com/connect/token".format(id(self))) + + @patch("msal.authority.tenant_discovery") + def test_cross_cloud_device_authorization_endpoint_is_refused( + self, tenant_discovery_mock): + """device_authorization_endpoint is enforced too: a cross-cloud + device endpoint must be rejected before any device-code POST.""" + authority_url = "https://login.microsoftonline.com/contoso" + tenant_discovery_mock.return_value = { + "issuer": authority_url, + "authorization_endpoint": + "https://login.microsoftonline.com/contoso/oauth2/v2.0/authorize", + "token_endpoint": + "https://login.microsoftonline.com/contoso/oauth2/v2.0/token", + "device_authorization_endpoint": + "https://login.partner.microsoftonline.cn/contoso/oauth2/v2.0/devicecode", + } + with self.assertRaises(ValueError) as ctx: + Authority(None, self.http_client, oidc_authority_url=authority_url) + message = str(ctx.exception) + self.assertIn(_XCLOUD_ERROR_SUBSTRING, message) + self.assertIn("device_authorization_endpoint", message) + self.assertIn( + "https://login.partner.microsoftonline.cn/contoso/oauth2/v2.0/devicecode", + message) + + @patch("msal.authority.tenant_discovery") + def test_missing_device_authorization_endpoint_is_ok( + self, tenant_discovery_mock): + """device_authorization_endpoint is optional in the OIDC discovery + document. Absence must not trip the new guard.""" + authority_url = "https://login.microsoftonline.com/contoso" + tenant_discovery_mock.return_value = { + "issuer": authority_url, + "authorization_endpoint": + "https://login.microsoftonline.com/contoso/oauth2/v2.0/authorize", + "token_endpoint": + "https://login.microsoftonline.com/contoso/oauth2/v2.0/token", + # No device_authorization_endpoint key at all. + } + a = Authority(None, self.http_client, oidc_authority_url=authority_url) + self.assertIsNone(a.device_authorization_endpoint) + + @patch("msal.authority.tenant_discovery") + def test_ciam_authority_with_cross_cloud_token_endpoint_is_refused( + self, tenant_discovery_mock): + """CIAM resolves to Public, so a China token_endpoint under a CIAM + authority must be rejected.""" + authority_url = "https://contoso.ciamlogin.com/contoso.onmicrosoft.com" + tenant_discovery_mock.return_value = { + # Same-host issuer so issuer validation passes via Case 4 and the + # failure comes from the endpoint check. + "issuer": authority_url + "/v2.0", + "authorization_endpoint": + "https://contoso.ciamlogin.com/contoso.onmicrosoft.com/oauth2/v2.0/authorize", + "token_endpoint": + "https://login.partner.microsoftonline.cn/contoso/oauth2/v2.0/token", + } + with self.assertRaises(ValueError) as ctx: + Authority(None, self.http_client, oidc_authority_url=authority_url) + message = str(ctx.exception) + self.assertIn(_XCLOUD_ERROR_SUBSTRING, message) + self.assertIn("token_endpoint", message) + + @patch("msal.authority.tenant_discovery") + def test_ciam_authority_with_same_cloud_public_endpoint_is_allowed( + self, tenant_discovery_mock): + """CIAM resolves to Public, so a Public-cloud endpoint under a CIAM + authority must NOT trip the new guard.""" + authority_url = "https://contoso.ciamlogin.com/contoso.onmicrosoft.com" + tenant_discovery_mock.return_value = { + "issuer": authority_url + "/v2.0", + "authorization_endpoint": + "https://contoso.ciamlogin.com/contoso.onmicrosoft.com/oauth2/v2.0/authorize", + # Public token endpoint under a CIAM authority is unusual but + # in-cloud and therefore allowed. + "token_endpoint": + "https://login.microsoftonline.com/contoso/oauth2/v2.0/token", + } + a = Authority(None, self.http_client, oidc_authority_url=authority_url) + self.assertEqual( + a.token_endpoint, + "https://login.microsoftonline.com/contoso/oauth2/v2.0/token") + + @patch("msal.authority.tenant_discovery") + def test_attacker_ciam_subdomain_under_public_authority_is_refused( + self, tenant_discovery_mock): + """Regression guard for the Rule 2b CIAM bypass: an attacker + '.ciamlogin.com' issuer under a Public authority must be rejected + by Rule 3 before Rule 2b can short-circuit on same-cloud.""" + authority_url = "https://login.microsoftonline.com/contoso" + # Issuer subdomain ('attacker') does NOT match the authority's first + # path segment ('contoso'). Rule 3 must reject. + issuer = "https://attacker.ciamlogin.com/attacker/v2.0" + tenant_discovery_mock.return_value = { + "issuer": issuer, + "authorization_endpoint": + "https://login.microsoftonline.com/contoso/oauth2/v2.0/authorize", + "token_endpoint": + "https://login.microsoftonline.com/contoso/oauth2/v2.0/token", + } + with self.assertRaises(ValueError) as ctx: + Authority(None, self.http_client, oidc_authority_url=authority_url) + # The failure must come from issuer validation, not the endpoint + # check (endpoints are same-cloud here). + self.assertIn("issuer", str(ctx.exception).lower()) + + +class TestEnsureEndpointSameCloudAsAuthorityHelper(unittest.TestCase): + """Direct unit coverage for _ensure_endpoint_same_cloud_as_authority.""" + + def test_no_op_for_custom_authority(self): + # Should NOT raise for any endpoint when authority is custom-domain. + _ensure_endpoint_same_cloud_as_authority( + "https://idp.example.com/tenant", + "https://login.partner.microsoftonline.cn/tenant/oauth2/token", + "token_endpoint") + + def test_no_op_for_empty_endpoint(self): + _ensure_endpoint_same_cloud_as_authority( + "https://login.microsoftonline.com/tenant", "", "token_endpoint") + _ensure_endpoint_same_cloud_as_authority( + "https://login.microsoftonline.com/tenant", None, "token_endpoint") + + def test_raises_for_cross_cloud_endpoint(self): + with self.assertRaises(ValueError) as ctx: + _ensure_endpoint_same_cloud_as_authority( + "https://login.microsoftonline.com/tenant", + "https://login.partner.microsoftonline.cn/tenant/oauth2/token", + "token_endpoint") + message = str(ctx.exception) + self.assertIn(_XCLOUD_ERROR_SUBSTRING, message) + self.assertIn("token_endpoint", message) + + def test_raises_for_unknown_endpoint_under_known_authority(self): + with self.assertRaises(ValueError) as ctx: + _ensure_endpoint_same_cloud_as_authority( + "https://login.microsoftonline.com/tenant", + "https://attacker.example.com/oauth2/token", + "token_endpoint") + self.assertIn(_XCLOUD_ERROR_SUBSTRING, str(ctx.exception)) + + def test_passes_for_same_cloud_regional_endpoint(self): + _ensure_endpoint_same_cloud_as_authority( + "https://login.microsoftonline.com/tenant", + "https://westus2.login.microsoft.com/tenant/oauth2/token", + "token_endpoint") + + def test_passes_for_alias_endpoint(self): + _ensure_endpoint_same_cloud_as_authority( + "https://login.microsoftonline.us/tenant", + "https://login.usgovcloudapi.net/tenant/oauth2/token", + "token_endpoint") + + def test_ciam_authority_is_treated_as_public(self): + # CIAM authorities resolve to Public, so a Public-cloud endpoint + # passes and a cross-cloud one fails. + _ensure_endpoint_same_cloud_as_authority( + "https://contoso.ciamlogin.com/contoso.onmicrosoft.com", + "https://login.microsoftonline.com/contoso/oauth2/token", + "token_endpoint") + with self.assertRaises(ValueError): + _ensure_endpoint_same_cloud_as_authority( + "https://contoso.ciamlogin.com/contoso.onmicrosoft.com", + "https://login.partner.microsoftonline.cn/contoso/oauth2/token", + "token_endpoint") + + def test_endpoint_name_appears_in_error_message(self): + # Any endpoint_name string the caller supplies is surfaced verbatim, + # so callers can attribute a failure to the specific OIDC field. + for name in ( + "token_endpoint", + "authorization_endpoint", + "device_authorization_endpoint"): + with self.assertRaises(ValueError) as ctx: + _ensure_endpoint_same_cloud_as_authority( + "https://login.microsoftonline.com/tenant", + "https://login.partner.microsoftonline.cn/tenant/x", + name) + self.assertIn(name, str(ctx.exception)) + + def test_error_message_does_not_leak_authority_url_format(self): + """The error message lists exactly: authority URL, endpoint name, + offending endpoint URL. No tokens or secrets.""" + try: + _ensure_endpoint_same_cloud_as_authority( + "https://login.microsoftonline.com/tenant", + "https://login.microsoftonline.us/tenant/oauth2/token", + "token_endpoint") + except ValueError as e: + msg = str(e) + self.assertIn(_XCLOUD_ERROR_SUBSTRING, msg) + self.assertIn("https://login.microsoftonline.com/tenant", msg) + self.assertIn("https://login.microsoftonline.us/tenant/oauth2/token", msg) + self.assertIn("token_endpoint", msg) + + +# --- (J) Behavior matrix: BEFORE vs AFTER ------------------------------------ + +class TestCrossCloudBehaviorMatrix(unittest.TestCase): + """Table-driven BEFORE-vs-AFTER matrix; a future regression is visible + at a glance. Categories: pass-pass (unchanged accept), throw-throw + (unchanged reject), pass-throw (the cross-cloud fix).""" + + # Each row: (label, authority, issuer, expected, category) + _MATRIX = [ + # --- pass-pass: unchanged accepts --- + ("rule1_exact", + "https://login.microsoftonline.com/tenant/v2.0", + "https://login.microsoftonline.com/tenant/v2.0", + True, "pass-pass"), + ("rule2_same_cloud_alias", + "https://login.microsoftonline.com/tenant", + "https://login.windows.net/tenant", + True, "pass-pass"), + ("rule3_regional_same_cloud", + "https://login.microsoftonline.com/tenant", + "https://westus2.login.microsoft.com/tenant", + True, "pass-pass"), + ("rule2a_5927_federation_public", + "https://clientlogin.test.parentpay.com/tid/v2.0", + "https://login.microsoftonline.com/tid/v2.0", + True, "pass-pass"), + ("rule2a_5927_federation_us_gov", + "https://customlogin.example.com/tenant", + "https://login.microsoftonline.us/tenant", + True, "pass-pass"), + ("ciam_path_segment_match", + "https://customdomain.com/contoso", + "https://contoso.ciamlogin.com/contoso/v2.0", + True, "pass-pass"), + # --- pass-throw: the cross-cloud fix --- + ("xcloud_public_authority_china_issuer", + "https://login.microsoftonline.com/tenant", + "https://login.partner.microsoftonline.cn/tenant", + False, "pass-throw"), + ("xcloud_public_authority_us_gov_issuer", + "https://login.microsoftonline.com/tenant", + "https://login.microsoftonline.us/tenant", + False, "pass-throw"), + ("xcloud_us_gov_authority_public_issuer", + "https://login.microsoftonline.us/tenant", + "https://login.microsoftonline.com/tenant", + False, "pass-throw"), + ("xcloud_china_authority_public_issuer", + "https://login.partner.microsoftonline.cn/tenant", + "https://login.microsoftonline.com/tenant", + False, "pass-throw"), + ("xcloud_public_authority_regional_china_issuer", + "https://login.microsoftonline.com/tenant", + "https://chinaeast2.login.chinacloudapi.cn/tenant", + False, "pass-throw"), + ("xcloud_us_gov_authority_regional_public_issuer", + "https://login.microsoftonline.us/tenant", + "https://westus2.login.microsoft.com/tenant", + False, "pass-throw"), + ("xcloud_bleu_authority_delos_issuer", + "https://login.sovcloud-identity.fr/tenant", + "https://login.sovcloud-identity.de/tenant", + False, "pass-throw"), + ("xcloud_china_authority_us_gov_issuer", + "https://login.partner.microsoftonline.cn/tenant", + "https://login.microsoftonline.us/tenant", + False, "pass-throw"), + ("ciam_tenant_mismatch", + "https://customdomain.com/tenantA", + "https://tenantB.ciamlogin.com/tenantB/v2.0", + False, "pass-throw"), + # --- throw-throw: unchanged rejects --- + ("http_scheme_under_known_authority", + "https://login.microsoftonline.com/tenant", + "http://login.microsoftonline.com/tenant", + False, "throw-throw"), + ("custom_authority_unknown_issuer", + "https://customdomain.com/tenant", + "https://malicious-site.com/tenant", + False, "throw-throw"), + ("multi_label_regional_lookalike", + "https://customlogin.example.com/tenant", + "https://attacker.evil.login.microsoft.com/tenant", + False, "throw-throw"), + ] + + @patch("msal.authority.tenant_discovery") + def test_matrix(self, tenant_discovery_mock): + for label, authority_url, issuer, expected_pass, category in self._MATRIX: + with self.subTest(label=label, category=category): + # Use endpoints that match the authority host so the + # cross-cloud endpoint check is not what trips the matrix. + authority_host = urlparse(authority_url).hostname + tenant_discovery_mock.return_value = { + "issuer": issuer, + "authorization_endpoint": + "https://{}/oauth2/v2.0/authorize".format(authority_host), + "token_endpoint": + "https://{}/oauth2/v2.0/token".format(authority_host), + } + if expected_pass: + a = Authority(None, MinimalHttpClient(), + oidc_authority_url=authority_url) + self.assertTrue( + a.has_valid_issuer(), + "{}: expected accept".format(label)) + else: + with self.assertRaises(ValueError, msg=label): + Authority(None, MinimalHttpClient(), + oidc_authority_url=authority_url)