Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 35 additions & 27 deletions backend/app/api/gateway_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,38 +11,41 @@
import httpx
from fastapi import APIRouter, HTTPException, Request

from app.auth import ensure_https
from app.models import GatewayConfig, GatewayCreateRequest, GatewayUpdateRequest
from app.api.auth_helpers import extract_bearer_token
from app.api.config_store import get_effective_setting, get_overrides, update_overrides
from app.config import get_settings
import app.services.database as _db
from app.services.rbac import resolve_role, role_gte

logger = logging.getLogger(__name__)
gateway_router = APIRouter()
settings = get_settings()


def _get_token(req: Request) -> str:
"""Extract bearer token from request headers."""
token = req.headers.get("X-Forwarded-Access-Token")
if not token:
auth = req.headers.get("Authorization", "")
if auth.startswith("Bearer "):
token = auth[7:]
if not token:
token = get_effective_setting("lakebase_service_token") or settings.databricks_token
if not token:
raise HTTPException(status_code=401, detail="No authentication token available")
return token

async def _require_role(req: Request, min_role: str):
"""Resolve caller's effective role and raise 403 if below min_role.
Uses extract_bearer_token (user OBO token only — no service-token fallback).
"""
token = extract_bearer_token(req)
identity = req.headers.get("X-Forwarded-Email", "")
host = _get_host()
role = await resolve_role(identity, token, host)
if not role_gte(role, min_role):
raise HTTPException(
status_code=403,
detail=f"Role '{min_role}' required. You have '{role}'."
)


def _get_host() -> str:
"""Get Databricks workspace host with https:// prefix."""
host = get_effective_setting("databricks_host") or settings.databricks_host
if not host:
raise HTTPException(status_code=500, detail="DATABRICKS_HOST not configured")
if not host.startswith("http"):
host = f"https://{host}"
return host
return ensure_https(host)


# --- Gateway CRUD ---
Expand All @@ -69,7 +72,8 @@ async def list_gateways():

@gateway_router.post("/gateways", status_code=201)
async def create_gateway(body: GatewayCreateRequest, req: Request):
"""Create a new gateway configuration."""
"""Create a new gateway configuration. Owner only."""
await _require_role(req, "owner")
try:
now = datetime.now(timezone.utc)
user_email = req.headers.get("X-Forwarded-Email")
Expand Down Expand Up @@ -132,8 +136,9 @@ async def get_gateway(gateway_id: str):


@gateway_router.put("/gateways/{gateway_id}")
async def update_gateway(gateway_id: str, body: GatewayUpdateRequest):
"""Update gateway fields."""
async def update_gateway(gateway_id: str, body: GatewayUpdateRequest, req: Request):
"""Update gateway fields. Manage or above."""
await _require_role(req, "manage")
try:
updates = body.model_dump(exclude_none=True)
if not updates:
Expand All @@ -153,8 +158,9 @@ async def update_gateway(gateway_id: str, body: GatewayUpdateRequest):


@gateway_router.delete("/gateways/{gateway_id}")
async def delete_gateway(gateway_id: str):
"""Delete a gateway."""
async def delete_gateway(gateway_id: str, req: Request):
"""Delete a gateway. Owner only."""
await _require_role(req, "owner")
try:
deleted = await _db.db_service.delete_gateway(gateway_id)
if not deleted:
Expand Down Expand Up @@ -218,8 +224,9 @@ async def get_gateway_cache(gateway_id: str):


@gateway_router.delete("/gateways/{gateway_id}/cache")
async def clear_gateway_cache(gateway_id: str):
"""Clear all cached entries for a specific gateway."""
async def clear_gateway_cache(gateway_id: str, req: Request):
"""Clear all cached entries for a specific gateway. Manage or above."""
await _require_role(req, "manage")
try:
gw = await _db.db_service.get_gateway(gateway_id)
if not gw:
Expand Down Expand Up @@ -256,7 +263,7 @@ async def get_gateway_logs(gateway_id: str, limit: int = 50):
async def list_genie_spaces(req: Request):
"""List available Genie Spaces from the workspace."""
try:
token = _get_token(req)
token = extract_bearer_token(req)
host = _get_host()

url = f"{host}/api/2.0/genie/spaces"
Expand All @@ -280,7 +287,7 @@ async def list_genie_spaces(req: Request):
async def list_warehouses(req: Request):
"""List available SQL warehouses from the workspace."""
try:
token = _get_token(req)
token = extract_bearer_token(req)
host = _get_host()

url = f"{host}/api/2.0/sql/warehouses"
Expand All @@ -304,7 +311,7 @@ async def list_warehouses(req: Request):
async def list_serving_endpoints(req: Request):
"""List available serving endpoints from the workspace."""
try:
token = _get_token(req)
token = extract_bearer_token(req)
host = _get_host()

url = f"{host}/api/2.0/serving-endpoints"
Expand Down Expand Up @@ -439,8 +446,9 @@ class SettingsUpdateRequest(GatewayUpdateRequest):


@gateway_router.put("/settings")
async def update_settings_endpoint(body: SettingsUpdateRequest):
"""Update server configuration."""
async def update_settings_endpoint(body: SettingsUpdateRequest, req: Request):
"""Update server configuration. Owner only."""
await _require_role(req, "owner")
batch = {}
updated = {}
for field, value in body.model_dump(exclude_none=True).items():
Expand Down
84 changes: 84 additions & 0 deletions backend/app/api/rbac_routes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
"""RBAC management endpoints for user/role administration."""

import logging
from fastapi import APIRouter, HTTPException, Request
from pydantic import BaseModel

from app.auth import ensure_https
from app.api.auth_helpers import extract_bearer_token
from app.api.config_store import get_effective_setting
from app.config import get_settings
from app.services.rbac import resolve_role, role_gte, ROLES, invalidate_role_cache

logger = logging.getLogger(__name__)
rbac_router = APIRouter()
_settings = get_settings()


def _get_host() -> str:
host = get_effective_setting("databricks_host") or _settings.databricks_host or ""
return ensure_https(host) if host else host


async def _resolve_caller(req: Request):
"""Extract and resolve the calling user's identity and effective role."""
token = extract_bearer_token(req)
identity = req.headers.get("X-Forwarded-Email", "")
role = await resolve_role(identity, token, _get_host())
return identity, token, role


async def _require_role(req: Request, min_role: str):
identity, token, role = await _resolve_caller(req)
if not role_gte(role, min_role):
raise HTTPException(
status_code=403,
detail=f"Role '{min_role}' required. You have '{role}'."
)
return identity, token, role


@rbac_router.get("/users/me")
async def get_my_role(req: Request):
"""Return the current user's identity and effective role."""
identity, _, role = await _resolve_caller(req)
return {"identity": identity, "role": role}


@rbac_router.get("/users")
async def list_users(req: Request):
"""List all explicit role assignments. Manage or above."""
await _require_role(req, "manage")
import app.services.database as _db
return await _db.db_service.list_user_roles()


class RoleAssignment(BaseModel):
role: str


@rbac_router.post("/users/{email}/role", status_code=200)
async def assign_role(email: str, body: RoleAssignment, req: Request):
"""Assign a role to a user. Manage or above."""
identity, _, _ = await _require_role(req, "manage")
if body.role not in ROLES:
raise HTTPException(
status_code=400,
detail=f"Invalid role '{body.role}'. Valid roles: {ROLES}"
)
import app.services.database as _db
await _db.db_service.set_user_role(email, body.role, granted_by=identity)
invalidate_role_cache(email)
logger.info("Role assigned: %s → %s by %s", email, body.role, identity)
return {"identity": email, "role": body.role}


@rbac_router.delete("/users/{email}")
async def remove_user_role(email: str, req: Request):
"""Remove explicit role assignment (reverts to default 'use'). Manage or above."""
identity, _, _ = await _require_role(req, "manage")
import app.services.database as _db
await _db.db_service.delete_user_role(email)
invalidate_role_cache(email)
logger.info("Role removed: %s by %s", email, identity)
return {"success": True}
2 changes: 2 additions & 0 deletions backend/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from app.api.genie_clone_routes import genie_clone_router
from app.api.gateway_routes import gateway_router
from app.api.mcp_routes import mcp_router
from app.api.rbac_routes import rbac_router
from app.config import get_settings

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -78,6 +79,7 @@ async def _token_refresh_loop():

app.include_router(router, prefix="/api")
app.include_router(gateway_router, prefix="/api")
app.include_router(rbac_router, prefix="/api")
app.include_router(genie_clone_router, prefix="/api/2.0/genie")
app.include_router(mcp_router, prefix="/api/2.0/mcp")

Expand Down
14 changes: 14 additions & 0 deletions backend/app/services/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,3 +157,17 @@ async def delete_gateway(self, gateway_id: str) -> bool:

async def get_gateway_stats(self, gateway_id: str) -> dict:
return await self.backend.get_gateway_stats(gateway_id)

# --- User roles ---

async def get_user_role(self, identity: str):
return await self.backend.get_user_role(identity)

async def set_user_role(self, identity: str, role: str, granted_by: str = None):
return await self.backend.set_user_role(identity, role, granted_by)

async def list_user_roles(self) -> list:
return await self.backend.list_user_roles()

async def delete_user_role(self, identity: str):
return await self.backend.delete_user_role(identity)
106 changes: 106 additions & 0 deletions backend/app/services/rbac.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
"""
Role-based access control for Genie Cache Gateway.

Roles (lowest → highest privilege):
use — query only: submit questions, view results
manage — configure gateways, view/clear cache, manage users
owner — full control: create/delete gateways, configure settings

Workspace admins are always treated as owner regardless of the user_roles table.
Unassigned users default to 'use'.
"""

import logging
import time

import httpx

from app.auth import ensure_https

logger = logging.getLogger(__name__)

ROLES = ['use', 'manage', 'owner']
ROLE_HIERARCHY = {'use': 1, 'manage': 2, 'owner': 3}
DEFAULT_ROLE = 'use'

# Shared HTTP client — avoids per-call TCP+TLS handshake overhead
_http_client = httpx.AsyncClient(timeout=5.0)

# Short-lived in-process caches to avoid hammering SCIM and DB on every request.
# Keys: token (admin check) and identity (role lookup). TTLs are conservative —
# role changes take effect within the TTL window without a restart.
_ADMIN_CACHE_TTL = 60.0 # seconds
_ROLE_CACHE_TTL = 120.0 # seconds
_admin_cache: dict[str, tuple[bool, float]] = {} # token → (is_admin, expires_at)
_role_cache: dict[str, tuple[str, float]] = {} # identity → (role, expires_at)


def role_gte(a: str, b: str) -> bool:
"""Return True if role a >= role b in the privilege hierarchy."""
return ROLE_HIERARCHY.get(a, 0) >= ROLE_HIERARCHY.get(b, 0)


def invalidate_role_cache(identity: str) -> None:
"""Evict a cached role so the next request re-reads from the database.
Call this immediately after any set_user_role / delete_user_role write.
"""
_role_cache.pop(identity, None)


async def is_workspace_admin(token: str, host: str) -> bool:
"""Check if the token owner is a Databricks workspace admin via SCIM /Me.
Result is cached for _ADMIN_CACHE_TTL seconds to avoid per-request SCIM calls.
"""
if not token or not host:
return False
host = ensure_https(host)

now = time.monotonic()
cached = _admin_cache.get(token)
if cached is not None:
result, expires_at = cached
if now < expires_at:
return result

result = False
try:
resp = await _http_client.get(
f"{host}/api/2.0/preview/scim/v2/Me",
headers={"Authorization": f"Bearer {token}"}
)
if resp.status_code == 200:
groups = resp.json().get("groups", [])
result = any(g.get("display") == "admins" for g in groups)
except Exception as e:
logger.debug("Workspace admin check failed: %s", e)

_admin_cache[token] = (result, now + _ADMIN_CACHE_TTL)
return result


async def resolve_role(identity: str, token: str, host: str) -> str:
"""
Resolve the effective role for a user:
1. Workspace admins → 'owner' (checked via Databricks SCIM API, cached 60 s)
2. Explicit assignment in user_roles table (cached 120 s, invalidated on write)
3. Default → 'use'
"""
import app.services.database as _db

if await is_workspace_admin(token, host):
return 'owner'

now = time.monotonic()
cached = _role_cache.get(identity)
if cached is not None:
role, expires_at = cached
if now < expires_at:
return role

assigned = None
if _db.db_service and identity:
assigned = await _db.db_service.get_user_role(identity)

role = assigned or DEFAULT_ROLE
_role_cache[identity] = (role, now + _ROLE_CACHE_TTL)
return role
26 changes: 26 additions & 0 deletions backend/app/services/storage_dynamic.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,3 +342,29 @@ async def get_gateway_stats(self, gateway_id: str) -> dict:
if hasattr(backend, 'pool'):
return await backend.get_gateway_stats(gateway_id)
return backend.get_gateway_stats(gateway_id)

# --- User roles CRUD (delegates to default backend) ---

async def get_user_role(self, identity: str):
backend = self.default_backend
if hasattr(backend, 'pool'):
return await backend.get_user_role(identity)
return backend.get_user_role(identity)

async def set_user_role(self, identity: str, role: str, granted_by: str = None):
backend = self.default_backend
if hasattr(backend, 'pool'):
return await backend.set_user_role(identity, role, granted_by)
return backend.set_user_role(identity, role, granted_by)

async def list_user_roles(self) -> list:
backend = self.default_backend
if hasattr(backend, 'pool'):
return await backend.list_user_roles()
return backend.list_user_roles()

async def delete_user_role(self, identity: str):
backend = self.default_backend
if hasattr(backend, 'pool'):
return await backend.delete_user_role(identity)
return backend.delete_user_role(identity)
Loading