From d1e0c47b63b7022d3a85d6c02f62275e8862f4ca Mon Sep 17 00:00:00 2001 From: Madhavendra Rathore Date: Fri, 29 May 2026 04:36:08 +0530 Subject: [PATCH 1/3] Extract SPOG org-id from cluster http_path for non-Thrift requests For all-purpose-compute Thrift connections on SPOG (custom-URL) hosts the http_path is /sql/protocolv1/o// and the workspace ID is encoded in the path itself. PoPP routes the Thrift request correctly off the /o// segment, so the connection succeeds without an explicit ?o= query parameter. Other requests on the same connection (telemetry uploads to /telemetry-ext, feature-flag fetches, SEA REST calls) hit different paths that don't carry the workspace ID. Previously _extract_spog_headers only looked at ?o= in the http_path, so the x-databricks-org-id header was never set for cluster URLs without ?o=. On SPOG hosts PoPP then had no workspace context for these requests and redirected them to /login, silently dropping telemetry. Extend _extract_spog_headers to also extract the workspace ID from the cluster path segment as a fallback when ?o= is absent. Priority order: explicit caller header > ?o= query param > /o// path segment. Adds five unit tests covering the new cluster-path extraction, leading slash, query-param-wins priority, explicit-header-wins priority, and a warehouse-path regression guard. Signed-off-by: Madhavendra Rathore --- src/databricks/sql/session.py | 68 ++++++++++++++++++++++++++--------- tests/unit/test_session.py | 38 ++++++++++++++++++++ 2 files changed, 89 insertions(+), 17 deletions(-) diff --git a/src/databricks/sql/session.py b/src/databricks/sql/session.py index 2910576f8..33e02a231 100644 --- a/src/databricks/sql/session.py +++ b/src/databricks/sql/session.py @@ -1,4 +1,5 @@ import logging +import re from typing import Dict, Tuple, List, Optional, Any, Type from databricks.sql.thrift_api.TCLIService import ttypes @@ -170,37 +171,70 @@ def _create_backend( } return databricks_client_class(**common_args) + # All-purpose-compute Thrift http_path: + # [/]sql/protocolv1/o//[/...][?...] + _CLUSTER_PATH_ORG_ID_RE = re.compile(r"(?:^|/)sql/protocolv1/o/(\d+)/[^/?]+") + @staticmethod def _extract_spog_headers(http_path, existing_headers): - """Extract ?o= from http_path and return as a header dict for SPOG routing.""" - if not http_path or "?" not in http_path: + """Extract the workspace ID from http_path for SPOG routing and return it + as an ``x-databricks-org-id`` header dict. + + Two sources are inspected, in priority order: + 1. ``?o=`` query parameter in http_path (warehouse paths + typically encode the workspace this way on SPOG). + 2. ``/sql/protocolv1/o//`` path segment + (all-purpose compute paths embed the workspace in the path itself). + + An explicit ``x-databricks-org-id`` already set by the caller wins over + both. Returns an empty dict when no workspace ID can be determined. + + On SPOG (Custom URL) hosts this header is required for non-Thrift + endpoints — telemetry, feature flags, SEA — to be routed to the right + workspace. Without it, PoPP falls back to default routing and + workspace-scoped requests are redirected to ``/login``. + """ + if not http_path: return {} - from urllib.parse import parse_qs - - query_string = http_path.split("?", 1)[1] - params = parse_qs(query_string) - org_id = params.get("o", [None])[0] - if not org_id: + # Caller already set the header — never override. + if any(k == "x-databricks-org-id" for k, _ in existing_headers): logger.debug( - "SPOG header extraction: http_path has query string but no ?o= param, " - "skipping x-databricks-org-id injection" + "SPOG header extraction: x-databricks-org-id already set by caller, " + "not extracting from http_path" ) return {} - # Don't override if explicitly set - if any(k == "x-databricks-org-id" for k, _ in existing_headers): + org_id = None + source = None + + if "?" in http_path: + from urllib.parse import parse_qs + + query_string = http_path.split("?", 1)[1] + params = parse_qs(query_string) + value = params.get("o", [None])[0] + if value: + org_id = value + source = "?o= in http_path" + + if org_id is None: + cluster_match = Session._CLUSTER_PATH_ORG_ID_RE.search(http_path) + if cluster_match: + org_id = cluster_match.group(1) + source = "cluster path segment" + + if org_id is None: logger.debug( - "SPOG header extraction: x-databricks-org-id already set by caller, " - "not overriding with ?o=%s from http_path", - org_id, + "SPOG header extraction: no workspace ID found in http_path, " + "skipping x-databricks-org-id injection" ) return {} logger.debug( - "SPOG header extraction: injecting x-databricks-org-id=%s " - "(extracted from ?o= in http_path)", + "SPOG header extraction: injecting x-databricks-org-id=%s (extracted from %s)", org_id, + source, ) return {"x-databricks-org-id": org_id} diff --git a/tests/unit/test_session.py b/tests/unit/test_session.py index 5d37cd9a5..f31c785db 100644 --- a/tests/unit/test_session.py +++ b/tests/unit/test_session.py @@ -273,3 +273,41 @@ def test_multiple_query_params(self): "/sql/1.0/warehouses/abc123?o=12345&extra=val", [] ) assert result == {"x-databricks-org-id": "12345"} + + def test_extracts_org_id_from_cluster_path_segment(self): + # All-purpose-compute path embeds workspace ID in /o//. + # Without ?o=, the driver must still set x-databricks-org-id so that + # telemetry and other non-Thrift requests route to the right workspace + # on SPOG hosts. + result = Session._extract_spog_headers( + "sql/protocolv1/o/6051921418418893/0528-220959-uzmcn1qt", [] + ) + assert result == {"x-databricks-org-id": "6051921418418893"} + + def test_extracts_org_id_from_cluster_path_with_leading_slash(self): + result = Session._extract_spog_headers( + "/sql/protocolv1/o/6051921418418893/0528-220959-uzmcn1qt", [] + ) + assert result == {"x-databricks-org-id": "6051921418418893"} + + def test_query_param_wins_over_cluster_path_segment(self): + # When both forms are present, ?o= takes precedence. + result = Session._extract_spog_headers( + "sql/protocolv1/o/111/0528-220959-uzmcn1qt?o=222", [] + ) + assert result == {"x-databricks-org-id": "222"} + + def test_explicit_header_wins_over_cluster_path_segment(self): + existing = [("x-databricks-org-id", "from-caller")] + result = Session._extract_spog_headers( + "sql/protocolv1/o/111/0528-220959-uzmcn1qt", existing + ) + assert result == {} + + def test_warehouse_path_without_query_param_returns_empty(self): + # Regression guard: the new cluster-path regex must not accidentally + # match warehouse paths (which never embed the workspace ID). + result = Session._extract_spog_headers( + "/sql/1.0/warehouses/abc123", [] + ) + assert result == {} From 3a665972963b1c1efdecda5664048bc860968e03 Mon Sep 17 00:00:00 2001 From: Madhavendra Rathore Date: Sat, 30 May 2026 19:43:14 +0530 Subject: [PATCH 2/3] Respect mixed-case SPOG org-id headers Signed-off-by: Madhavendra Rathore --- src/databricks/sql/session.py | 14 +++++++------- tests/unit/test_session.py | 7 +++++++ 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/src/databricks/sql/session.py b/src/databricks/sql/session.py index 33e02a231..7f52872ec 100644 --- a/src/databricks/sql/session.py +++ b/src/databricks/sql/session.py @@ -73,10 +73,10 @@ def __init__( base_headers = [("User-Agent", self.useragent_header)] all_headers = (http_headers or []) + base_headers - # Extract ?o= from http_path for SPOG routing. - # On SPOG hosts, the httpPath contains ?o= which routes Thrift - # requests via the URL. For SEA, telemetry, and feature flags (which use - # separate endpoints), we inject x-databricks-org-id as an HTTP header. + # Extract workspace context from http_path for SPOG routing. + # On SPOG hosts, the http_path can contain either ?o= or an + # all-purpose-compute /o// path segment. For SEA, telemetry, + # and feature flags, we inject x-databricks-org-id as an HTTP header. self._spog_headers = self._extract_spog_headers(http_path, all_headers) if self._spog_headers: all_headers = all_headers + list(self._spog_headers.items()) @@ -197,8 +197,8 @@ def _extract_spog_headers(http_path, existing_headers): if not http_path: return {} - # Caller already set the header — never override. - if any(k == "x-databricks-org-id" for k, _ in existing_headers): + # Caller already set the header; never override. Header names are case-insensitive. + if any(k.lower() == "x-databricks-org-id" for k, _ in existing_headers): logger.debug( "SPOG header extraction: x-databricks-org-id already set by caller, " "not extracting from http_path" @@ -239,7 +239,7 @@ def _extract_spog_headers(http_path, existing_headers): return {"x-databricks-org-id": org_id} def get_spog_headers(self): - """Returns SPOG routing headers (x-databricks-org-id) if ?o= was in http_path.""" + """Returns extracted SPOG routing headers (x-databricks-org-id), if any.""" return dict(self._spog_headers) def open(self): diff --git a/tests/unit/test_session.py b/tests/unit/test_session.py index f31c785db..f0a98393f 100644 --- a/tests/unit/test_session.py +++ b/tests/unit/test_session.py @@ -268,6 +268,13 @@ def test_explicit_header_takes_precedence(self): ) assert result == {} + def test_explicit_header_takes_precedence_case_insensitively(self): + existing = [("X-Databricks-Org-Id", "explicit-value")] + result = Session._extract_spog_headers( + "/sql/1.0/warehouses/abc123?o=6051921418418893", existing + ) + assert result == {} + def test_multiple_query_params(self): result = Session._extract_spog_headers( "/sql/1.0/warehouses/abc123?o=12345&extra=val", [] From b35a5bf1ac95f5693ae05230579d3a97617da9e9 Mon Sep 17 00:00:00 2001 From: Madhavendra Rathore Date: Mon, 1 Jun 2026 18:39:59 +0530 Subject: [PATCH 3/3] Validate SPOG org id extraction Signed-off-by: Madhavendra Rathore --- src/databricks/sql/session.py | 7 ++--- tests/unit/test_session.py | 50 ++++++++++++++++++++++++++++------- 2 files changed, 45 insertions(+), 12 deletions(-) diff --git a/src/databricks/sql/session.py b/src/databricks/sql/session.py index 7f52872ec..f642dbf7e 100644 --- a/src/databricks/sql/session.py +++ b/src/databricks/sql/session.py @@ -173,7 +173,8 @@ def _create_backend( # All-purpose-compute Thrift http_path: # [/]sql/protocolv1/o//[/...][?...] - _CLUSTER_PATH_ORG_ID_RE = re.compile(r"(?:^|/)sql/protocolv1/o/(\d+)/[^/?]+") + _ORG_ID_RE = re.compile(r"^[0-9]+$") + _CLUSTER_PATH_ORG_ID_RE = re.compile(r"^/?sql/protocolv1/o/([0-9]+)/[^/?]+") @staticmethod def _extract_spog_headers(http_path, existing_headers): @@ -214,12 +215,12 @@ def _extract_spog_headers(http_path, existing_headers): query_string = http_path.split("?", 1)[1] params = parse_qs(query_string) value = params.get("o", [None])[0] - if value: + if value and Session._ORG_ID_RE.fullmatch(value): org_id = value source = "?o= in http_path" if org_id is None: - cluster_match = Session._CLUSTER_PATH_ORG_ID_RE.search(http_path) + cluster_match = Session._CLUSTER_PATH_ORG_ID_RE.match(http_path) if cluster_match: org_id = cluster_match.group(1) source = "cluster path segment" diff --git a/tests/unit/test_session.py b/tests/unit/test_session.py index f0a98393f..bab2a7125 100644 --- a/tests/unit/test_session.py +++ b/tests/unit/test_session.py @@ -162,7 +162,10 @@ def test_socket_timeout_passthrough(self, mock_client_class): @patch("%s.session.ThriftDatabricksClient" % PACKAGE_NAME) def test_configuration_passthrough(self, mock_client_class): - mock_session_config = {"ANSI_MODE": "FALSE", "QUERY_TAGS": "team:engineering,project:data-pipeline"} + mock_session_config = { + "ANSI_MODE": "FALSE", + "QUERY_TAGS": "team:engineering,project:data-pipeline", + } databricks.sql.connect( session_configuration=mock_session_config, **self.DUMMY_CONNECTION_ARGS ) @@ -218,10 +221,15 @@ def test_query_tags_dict_sets_session_config(self, mock_client_class): ) call_kwargs = mock_client_class.return_value.open_session.call_args[1] - assert call_kwargs["session_configuration"]["QUERY_TAGS"] == "team:data-eng,project:etl" + assert ( + call_kwargs["session_configuration"]["QUERY_TAGS"] + == "team:data-eng,project:etl" + ) @patch("%s.session.ThriftDatabricksClient" % PACKAGE_NAME) - def test_query_tags_dict_takes_precedence_over_session_config(self, mock_client_class): + def test_query_tags_dict_takes_precedence_over_session_config( + self, mock_client_class + ): databricks.sql.connect( query_tags={"team": "new-team"}, session_configuration={"QUERY_TAGS": "team:old-team,other:value"}, @@ -242,9 +250,7 @@ def test_extracts_org_id_from_query_param(self): assert result == {"x-databricks-org-id": "6051921418418893"} def test_no_query_param_returns_empty(self): - result = Session._extract_spog_headers( - "/sql/1.0/warehouses/abc123", [] - ) + result = Session._extract_spog_headers("/sql/1.0/warehouses/abc123", []) assert result == {} def test_no_o_param_returns_empty(self): @@ -281,6 +287,24 @@ def test_multiple_query_params(self): ) assert result == {"x-databricks-org-id": "12345"} + def test_non_numeric_query_param_returns_empty(self): + result = Session._extract_spog_headers( + "/sql/1.0/warehouses/abc123?o=abc123", [] + ) + assert result == {} + + def test_control_char_query_param_returns_empty(self): + result = Session._extract_spog_headers( + "/sql/1.0/warehouses/abc123?o=123%0D%0AX-Injected:%20yes", [] + ) + assert result == {} + + def test_empty_query_param_returns_empty(self): + result = Session._extract_spog_headers( + "/sql/1.0/warehouses/abc123?o=", [] + ) + assert result == {} + def test_extracts_org_id_from_cluster_path_segment(self): # All-purpose-compute path embeds workspace ID in /o//. # Without ?o=, the driver must still set x-databricks-org-id so that @@ -311,10 +335,18 @@ def test_explicit_header_wins_over_cluster_path_segment(self): ) assert result == {} + def test_nested_cluster_path_prefix_returns_empty(self): + result = Session._extract_spog_headers( + "evil/sql/protocolv1/o/999/0528-220959-uzmcn1qt", [] + ) + assert result == {} + + def test_incomplete_cluster_path_returns_empty(self): + result = Session._extract_spog_headers("sql/protocolv1/o/999/", []) + assert result == {} + def test_warehouse_path_without_query_param_returns_empty(self): # Regression guard: the new cluster-path regex must not accidentally # match warehouse paths (which never embed the workspace ID). - result = Session._extract_spog_headers( - "/sql/1.0/warehouses/abc123", [] - ) + result = Session._extract_spog_headers("/sql/1.0/warehouses/abc123", []) assert result == {}