From 7477c7fd165d94f27e95ced06343c7dbed86d18a Mon Sep 17 00:00:00 2001 From: Shiyas Mohammed Date: Sat, 14 Mar 2026 15:56:29 +0530 Subject: [PATCH] refactor(admin-api): add cursor-based pagination to list revisions endpoint --- crates/bin/ampctl/src/cmd/table.rs | 2 +- crates/bin/ampctl/src/cmd/table/list.rs | 14 +- .../ampctl/src/cmd/table/list__after_help.md | 5 +- crates/clients/admin/src/revisions.rs | 15 +- crates/core/data-store/src/lib.rs | 7 +- .../src/physical_table_revision.rs | 31 +-- .../src/physical_table_revision/sql.rs | 152 ++++++++------- .../tests/it_pagination.rs | 178 +++++++++++++++++- .../admin-api/src/handlers/revisions.rs | 2 +- .../revisions/{list_all.rs => list.rs} | 17 +- crates/services/admin-api/src/lib.rs | 4 +- docs/feat/admin-table.md | 12 +- docs/schemas/openapi/admin.spec.json | 12 +- tests/src/tests/it_admin_api_revisions.rs | 7 +- 14 files changed, 335 insertions(+), 123 deletions(-) rename crates/services/admin-api/src/handlers/revisions/{list_all.rs => list.rs} (86%) diff --git a/crates/bin/ampctl/src/cmd/table.rs b/crates/bin/ampctl/src/cmd/table.rs index 5c44b9669..4676c1c95 100644 --- a/crates/bin/ampctl/src/cmd/table.rs +++ b/crates/bin/ampctl/src/cmd/table.rs @@ -30,7 +30,7 @@ pub enum Commands { #[command(after_help = include_str!("table/get__after_help.md"))] Get(get::Args), - /// List all table revisions + /// List table revisions #[command(alias = "ls")] #[command(after_help = include_str!("table/list__after_help.md"))] List(list::Args), diff --git a/crates/bin/ampctl/src/cmd/table/list.rs b/crates/bin/ampctl/src/cmd/table/list.rs index 4f74f0f85..d8ace034d 100644 --- a/crates/bin/ampctl/src/cmd/table/list.rs +++ b/crates/bin/ampctl/src/cmd/table/list.rs @@ -30,9 +30,13 @@ pub struct Args { /// Maximum number of revisions to return (default: 100) #[arg(long, short = 'l')] pub limit: Option, + + /// Cursor for pagination: ID of the last revision from the previous page + #[arg(long)] + pub last_id: Option, } -/// List all table revisions by retrieving them from the admin API. +/// List table revisions by retrieving them from the admin API. /// /// Retrieves table revisions with optional filtering and displays them based on /// the output format. @@ -40,17 +44,18 @@ pub struct Args { /// # Errors /// /// Returns [`Error`] for API errors (400/500) or network failures. -#[tracing::instrument(skip_all, fields(admin_url = %global.admin_url, active = ?active, limit = ?limit))] +#[tracing::instrument(skip_all, fields(admin_url = %global.admin_url, active = ?active, limit = ?limit, last_id = ?last_id))] pub async fn run( Args { global, active, limit, + last_id, }: Args, ) -> Result<(), Error> { tracing::debug!("listing table revisions"); - let revisions = get_revisions(&global, active, limit).await?; + let revisions = get_revisions(&global, active, limit, last_id).await?; let result = ListResult { revisions }; global.print(&result).map_err(Error::JsonFormattingError)?; @@ -65,12 +70,13 @@ async fn get_revisions( global: &GlobalArgs, active: Option, limit: Option, + last_id: Option, ) -> Result, Error> { let client = global.build_client().map_err(Error::ClientBuildError)?; let revisions = client .revisions() - .list(active, limit) + .list(active, limit, last_id) .await .map_err(|err| { tracing::error!( diff --git a/crates/bin/ampctl/src/cmd/table/list__after_help.md b/crates/bin/ampctl/src/cmd/table/list__after_help.md index 15da1fc27..d8463d4f4 100644 --- a/crates/bin/ampctl/src/cmd/table/list__after_help.md +++ b/crates/bin/ampctl/src/cmd/table/list__after_help.md @@ -1,6 +1,6 @@ ## Examples -List all table revisions: +List table revisions: $ ampctl table list Filter by active status: @@ -12,6 +12,9 @@ Limit the number of results: Combine filters: $ ampctl table list --active true --limit 5 +Paginate through results: + $ ampctl table list --limit 10 --last-id 42 + With JSON output: $ ampctl table list --json diff --git a/crates/clients/admin/src/revisions.rs b/crates/clients/admin/src/revisions.rs index 8b70309eb..ee2524efa 100644 --- a/crates/clients/admin/src/revisions.rs +++ b/crates/clients/admin/src/revisions.rs @@ -25,8 +25,9 @@ fn revision_get_by_id(id: i64) -> String { /// Build URL path for listing all revisions. /// -/// GET `/revisions` with optional `?active=true|false` and `?limit=x` query parameters -fn revisions_list(active: Option, limit: Option) -> String { +/// GET `/revisions` with optional `?active=true|false`, `?limit=x`, and `?last_id=x` +/// query parameters +fn revisions_list(active: Option, limit: Option, last_id: Option) -> String { let mut params = Vec::new(); if let Some(v) = active { params.push(format!("active={v}")); @@ -34,6 +35,9 @@ fn revisions_list(active: Option, limit: Option) -> String { if let Some(v) = limit { params.push(format!("limit={v}")); } + if let Some(v) = last_id { + params.push(format!("last_id={v}")); + } if params.is_empty() { "revisions".to_owned() } else { @@ -220,8 +224,8 @@ impl<'a> RevisionsClient<'a> { /// List all revisions, optionally filtered by active status. /// - /// Sends GET to `/revisions` endpoint with optional `?active=true|false` - /// and `?limit=x` query parameters. + /// Sends GET to `/revisions` endpoint with optional `?active=true|false`, + /// `?limit=x`, and `?last_id=x` query parameters. /// /// # Errors /// @@ -232,11 +236,12 @@ impl<'a> RevisionsClient<'a> { &self, active: Option, limit: Option, + last_id: Option, ) -> Result, ListError> { let url = self .client .base_url() - .join(&revisions_list(active, limit)) + .join(&revisions_list(active, limit, last_id)) .expect("valid URL"); tracing::debug!(url = %url, "Sending GET request to list revisions"); diff --git a/crates/core/data-store/src/lib.rs b/crates/core/data-store/src/lib.rs index 89be71769..f08eee7fa 100644 --- a/crates/core/data-store/src/lib.rs +++ b/crates/core/data-store/src/lib.rs @@ -329,16 +329,19 @@ impl DataStore { .map_err(GetRevisionByLocationIdError) } - /// Lists physical table revisions from the metadata database. + /// Lists physical table revisions from the metadata database with cursor-based pagination. /// + /// Uses cursor-based pagination where `last_id` is the ID of the last revision + /// from the previous page. For the first page, pass `None` for `last_id`. /// When `active` is `None`, returns all revisions. When `Some(true)` or /// `Some(false)`, returns only revisions matching that active status. pub async fn list_all_table_revisions( &self, active: Option, limit: i64, + last_id: Option, ) -> Result, ListAllTableRevisionsError> { - metadata_db::physical_table_revision::list_all(&self.metadata_db, active, limit) + metadata_db::physical_table_revision::list(&self.metadata_db, limit, last_id, active) .await .map_err(ListAllTableRevisionsError) } diff --git a/crates/core/metadata-db/src/physical_table_revision.rs b/crates/core/metadata-db/src/physical_table_revision.rs index cdbf13a4f..aca6c7e59 100644 --- a/crates/core/metadata-db/src/physical_table_revision.rs +++ b/crates/core/metadata-db/src/physical_table_revision.rs @@ -148,44 +148,29 @@ where .map_err(Error::Database) } -/// List physical table locations with cursor-based pagination +/// List physical table revisions with cursor-based pagination, optionally filtered by +/// active status /// -/// This function provides an ergonomic interface for paginated listing that automatically -/// handles first page vs subsequent page logic based on the cursor parameter. +/// Uses cursor-based pagination where `last_id` is the ID of the last revision +/// from the previous page. For the first page, pass `None` for `last_id`. +/// If `active` is provided, only revisions matching that active status are returned. #[tracing::instrument(skip(exe), err)] pub async fn list<'c, E>( exe: E, limit: i64, last_id: Option + std::fmt::Debug>, + active: Option, ) -> Result, Error> where E: Executor<'c>, { match last_id { - None => sql::list_first_page(exe, limit).await, - Some(id) => sql::list_next_page(exe, limit, id.into()).await, + None => sql::list_first_page(exe, limit, active).await, + Some(id) => sql::list_next_page(exe, limit, id.into(), active).await, } .map_err(Error::Database) } -/// List all physical table revisions with an optional active status filter -/// -/// When `active` is `None`, returns all revisions. When `Some(true)` or `Some(false)`, -/// returns only revisions matching that active status. -#[tracing::instrument(skip(exe), err)] -pub async fn list_all<'c, E>( - exe: E, - active: Option, - limit: i64, -) -> Result, Error> -where - E: Executor<'c>, -{ - sql::list_all(exe, active, limit) - .await - .map_err(Error::Database) -} - /// A specific storage revision (location) of a physical table /// /// Each revision has its own storage path and an optional writer job that diff --git a/crates/core/metadata-db/src/physical_table_revision/sql.rs b/crates/core/metadata-db/src/physical_table_revision/sql.rs index b7542299e..c6b437d52 100644 --- a/crates/core/metadata-db/src/physical_table_revision/sql.rs +++ b/crates/core/metadata-db/src/physical_table_revision/sql.rs @@ -246,98 +246,116 @@ where Ok(result.rows_affected() > 0) } -/// List the first page of physical table revisions +/// List the first page of physical table revisions, optionally filtered by active status /// /// Returns a paginated list of revisions ordered by ID in descending order (newest first). /// This function is used to fetch the initial page when no cursor is available. +/// If `active` is provided, only revisions matching that active status are returned. pub async fn list_first_page<'c, E>( exe: E, limit: i64, + active: Option, ) -> Result, sqlx::Error> where E: Executor<'c, Database = Postgres>, { - let query = indoc::indoc! {r#" - SELECT - ptr.id, - ptr.path, - EXISTS(SELECT 1 FROM physical_tables WHERE active_revision_id = ptr.id) AS active, - ptr.writer, - ptr.metadata - FROM physical_table_revisions ptr - ORDER BY ptr.id DESC - LIMIT $1 - "#}; + match active { + None => { + let query = indoc::indoc! {r#" + SELECT + ptr.id, + ptr.path, + EXISTS(SELECT 1 FROM physical_tables WHERE active_revision_id = ptr.id) AS active, + ptr.writer, + ptr.metadata + FROM physical_table_revisions ptr + ORDER BY ptr.id DESC + LIMIT $1 + "#}; - sqlx::query_as(query).bind(limit).fetch_all(exe).await + sqlx::query_as(query).bind(limit).fetch_all(exe).await + } + Some(active) => { + let query = indoc::indoc! {r#" + SELECT + ptr.id, + ptr.path, + $2 AS active, + ptr.writer, + ptr.metadata + FROM physical_table_revisions ptr + WHERE EXISTS(SELECT 1 FROM physical_tables WHERE active_revision_id = ptr.id) = $2 + ORDER BY ptr.id DESC + LIMIT $1 + "#}; + + sqlx::query_as(query) + .bind(limit) + .bind(active) + .fetch_all(exe) + .await + } + } } -/// List subsequent pages of physical table revisions using cursor-based pagination +/// List subsequent pages of physical table revisions using cursor-based pagination, +/// optionally filtered by active status /// /// Returns a paginated list of revisions with IDs less than the provided cursor, /// ordered by ID in descending order (newest first). This implements cursor-based /// pagination for efficient traversal of large revision lists. +/// If `active` is provided, only revisions matching that active status are returned. pub async fn list_next_page<'c, E>( exe: E, limit: i64, last_id: LocationId, -) -> Result, sqlx::Error> -where - E: Executor<'c, Database = Postgres>, -{ - let query = indoc::indoc! {r#" - SELECT - ptr.id, - ptr.path, - EXISTS(SELECT 1 FROM physical_tables WHERE active_revision_id = ptr.id) AS active, - ptr.writer, - ptr.metadata - FROM physical_table_revisions ptr - WHERE ptr.id < $2 - ORDER BY ptr.id DESC - LIMIT $1 - "#}; - - sqlx::query_as(query) - .bind(limit) - .bind(last_id) - .fetch_all(exe) - .await -} - -/// List all physical table revisions with an optional active status filter -/// -/// Returns all revisions ordered by ID in descending order (newest first). -/// When `active` is `None`, all revisions are returned. When `Some(true)` or -/// `Some(false)`, only revisions matching that active status are returned. -pub async fn list_all<'c, E>( - exe: E, active: Option, - limit: i64, ) -> Result, sqlx::Error> where E: Executor<'c, Database = Postgres>, { - let query = indoc::indoc! {r#" - SELECT - ptr.id, - ptr.path, - pt.active_revision_id IS NOT NULL AS active, - ptr.writer, - ptr.metadata - FROM physical_table_revisions AS ptr - LEFT JOIN physical_tables pt ON pt.active_revision_id = ptr.id - WHERE ( - $1::boolean IS NULL - OR (pt.active_revision_id IS NOT NULL) = $1 - ) - ORDER BY ptr.id DESC - LIMIT $2 - "#}; + match active { + None => { + let query = indoc::indoc! {r#" + SELECT + ptr.id, + ptr.path, + EXISTS(SELECT 1 FROM physical_tables WHERE active_revision_id = ptr.id) AS active, + ptr.writer, + ptr.metadata + FROM physical_table_revisions ptr + WHERE ptr.id < $2 + ORDER BY ptr.id DESC + LIMIT $1 + "#}; - sqlx::query_as(query) - .bind(active) - .bind(limit) - .fetch_all(exe) - .await + sqlx::query_as(query) + .bind(limit) + .bind(last_id) + .fetch_all(exe) + .await + } + Some(active) => { + let query = indoc::indoc! {r#" + SELECT + ptr.id, + ptr.path, + $3 AS active, + ptr.writer, + ptr.metadata + FROM physical_table_revisions ptr + WHERE ptr.id < $2 + AND EXISTS(SELECT 1 FROM physical_tables WHERE active_revision_id = ptr.id) = $3 + ORDER BY ptr.id DESC + LIMIT $1 + "#}; + + sqlx::query_as(query) + .bind(limit) + .bind(last_id) + .bind(active) + .fetch_all(exe) + .await + } + } } diff --git a/crates/core/metadata-db/src/physical_table_revision/tests/it_pagination.rs b/crates/core/metadata-db/src/physical_table_revision/tests/it_pagination.rs index 5f52408ff..a60f2748e 100644 --- a/crates/core/metadata-db/src/physical_table_revision/tests/it_pagination.rs +++ b/crates/core/metadata-db/src/physical_table_revision/tests/it_pagination.rs @@ -1,4 +1,4 @@ -//! Pagination tests for location listing +//! Pagination and filtering tests for location listing use crate::{ datasets::{DatasetName, DatasetNamespace}, @@ -14,7 +14,7 @@ async fn list_locations_first_page_when_empty() { let (_db, conn) = setup_test_db().await; //* When - let locations = physical_table_revision::list(&conn, 10, None as Option) + let locations = physical_table_revision::list(&conn, 10, None as Option, None) .await .expect("Failed to list locations"); @@ -63,7 +63,7 @@ async fn list_locations_first_page_respects_limit() { } //* When - let locations = physical_table_revision::list(&conn, 3, None as Option) + let locations = physical_table_revision::list(&conn, 3, None as Option, None) .await .expect("Failed to list locations"); @@ -138,7 +138,7 @@ async fn list_locations_next_page_uses_cursor() { } // Get the first page to establish cursor - let first_page = physical_table_revision::list(&conn, 3, None as Option) + let first_page = physical_table_revision::list(&conn, 3, None as Option, None) .await .expect("Failed to list first page"); let cursor = first_page @@ -147,7 +147,7 @@ async fn list_locations_next_page_uses_cursor() { .id; //* When - let second_page = physical_table_revision::list(&conn, 3, Some(cursor)) + let second_page = physical_table_revision::list(&conn, 3, Some(cursor), None) .await .expect("Failed to list second page"); @@ -180,3 +180,171 @@ async fn list_locations_next_page_uses_cursor() { "list should use cursor to exclude locations with ID >= cursor" ); } + +#[tokio::test] +async fn list_locations_filters_by_active_status() { + //* Given + let (_db, conn) = setup_test_db().await; + + let namespace = DatasetNamespace::from_ref_unchecked("test-namespace"); + let name = DatasetName::from_ref_unchecked("test-dataset"); + let hash = ManifestHash::from_ref_unchecked( + "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef", + ); + + // Create 4 revisions: 2 active, 2 inactive + for i in 0..4 { + let table_name = TableName::from_owned_unchecked(format!("test_table_filter_{}", i)); + let path = TablePath::from_owned_unchecked(format!( + "test-dataset/test_table_filter_{}/revision-{}", + i, i + )); + let location_id = + register_table_and_revision(&conn, &namespace, &name, &hash, &table_name, &path) + .await + .expect("Failed to insert location"); + + // Only mark even-indexed revisions as active + if i % 2 == 0 { + physical_table::mark_active_by_id( + &conn, + location_id, + &namespace, + &name, + &hash, + &table_name, + ) + .await + .expect("Failed to mark location active"); + } + } + + //* When + let active_only = + physical_table_revision::list(&conn, 10, None as Option, Some(true)) + .await + .expect("Failed to list active locations"); + let inactive_only = + physical_table_revision::list(&conn, 10, None as Option, Some(false)) + .await + .expect("Failed to list inactive locations"); + let all = physical_table_revision::list(&conn, 10, None as Option, None) + .await + .expect("Failed to list all locations"); + + //* Then + assert_eq!( + active_only.len(), + 2, + "list with active=true should return only active revisions" + ); + for rev in &active_only { + assert!( + rev.active, + "all revisions should be active when filtered by active=true" + ); + } + + assert_eq!( + inactive_only.len(), + 2, + "list with active=false should return only inactive revisions" + ); + for rev in &inactive_only { + assert!( + !rev.active, + "all revisions should be inactive when filtered by active=false" + ); + } + + assert_eq!( + all.len(), + 4, + "list with active=None should return all revisions" + ); +} + +#[tokio::test] +async fn list_locations_combines_cursor_and_active_filter() { + //* Given + let (_db, conn) = setup_test_db().await; + + let namespace = DatasetNamespace::from_ref_unchecked("test-namespace"); + let name = DatasetName::from_ref_unchecked("test-dataset"); + let hash = ManifestHash::from_ref_unchecked( + "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef", + ); + + // Create 6 revisions: 3 active (even indices), 3 inactive (odd indices) + for i in 0..6 { + let table_name = TableName::from_owned_unchecked(format!("test_table_combo_{}", i)); + let path = TablePath::from_owned_unchecked(format!( + "test-dataset/test_table_combo_{}/revision-{}", + i, i + )); + let location_id = + register_table_and_revision(&conn, &namespace, &name, &hash, &table_name, &path) + .await + .expect("Failed to insert location"); + + if i % 2 == 0 { + physical_table::mark_active_by_id( + &conn, + location_id, + &namespace, + &name, + &hash, + &table_name, + ) + .await + .expect("Failed to mark location active"); + } + + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + } + + // Get first page of active revisions (limit 2) + let first_page = + physical_table_revision::list(&conn, 2, None as Option, Some(true)) + .await + .expect("Failed to list first page"); + + assert_eq!( + first_page.len(), + 2, + "first page should return 2 active revisions" + ); + for rev in &first_page { + assert!( + rev.active, + "first page should only contain active revisions" + ); + } + + let cursor = first_page + .last() + .expect("First page should not be empty") + .id; + + //* When — get second page of active revisions using cursor + let second_page = physical_table_revision::list(&conn, 2, Some(cursor), Some(true)) + .await + .expect("Failed to list second page"); + + //* Then + assert_eq!( + second_page.len(), + 1, + "second page should return remaining 1 active revision" + ); + for rev in &second_page { + assert!( + rev.active, + "second page should only contain active revisions" + ); + } + assert!( + cursor > second_page[0].id, + "cursor-based pagination should return revisions with IDs less than cursor" + ); +} diff --git a/crates/services/admin-api/src/handlers/revisions.rs b/crates/services/admin-api/src/handlers/revisions.rs index 95758d30b..2f4e1bbfa 100644 --- a/crates/services/admin-api/src/handlers/revisions.rs +++ b/crates/services/admin-api/src/handlers/revisions.rs @@ -3,7 +3,7 @@ pub mod create; pub mod deactivate; pub mod delete; pub mod get_by_id; -pub mod list_all; +pub mod list; pub mod prune; pub mod restore; pub mod truncate; diff --git a/crates/services/admin-api/src/handlers/revisions/list_all.rs b/crates/services/admin-api/src/handlers/revisions/list.rs similarity index 86% rename from crates/services/admin-api/src/handlers/revisions/list_all.rs rename to crates/services/admin-api/src/handlers/revisions/list.rs index 12e8b5a0f..b0e68f1a9 100644 --- a/crates/services/admin-api/src/handlers/revisions/list_all.rs +++ b/crates/services/admin-api/src/handlers/revisions/list.rs @@ -4,6 +4,7 @@ use axum::{ extract::{Query, State, rejection::QueryRejection}, http::StatusCode, }; +use metadata_db::physical_table_revision::LocationId; use monitoring::logging; use crate::{ @@ -16,11 +17,14 @@ use crate::{ /// Handler for the `GET /revisions` endpoint /// -/// Returns all physical table revisions, with an optional active status filter. +/// Returns physical table revisions with cursor-based pagination and optional active +/// status filtering. /// /// ## Query Parameters /// - `active` (optional): Filter by active status (`true` or `false`) /// - `limit` (optional): Maximum number of revisions to return (default: 100) +/// - `last_id` (optional): Cursor for pagination — ID of the last revision from the +/// previous page. Omit for the first page. /// /// ## Response /// - **200 OK**: Successfully retrieved revisions @@ -32,8 +36,8 @@ use crate::{ /// - `LIST_ALL_TABLE_REVISIONS_ERROR`: Failed to list table revisions /// /// This handler: -/// - Validates and extracts optional `active` and `limit` query parameters -/// - Calls the data store to list table revisions with the given filters +/// - Validates and extracts optional `active`, `limit`, and `last_id` query parameters +/// - Calls the data store to list table revisions with the given filters and cursor /// - Returns revision information as a JSON array #[tracing::instrument(skip_all, err)] #[cfg_attr( @@ -45,7 +49,8 @@ use crate::{ operation_id = "list_revisions", params( ("active" = Option, Query, description = "Filter by active status"), - ("limit" = Option, Query, description = "Maximum number of revisions to return (default: 100)") + ("limit" = Option, Query, description = "Maximum number of revisions to return (default: 100)"), + ("last_id" = Option, Query, description = "Cursor for pagination: ID of the last revision from the previous page") ), responses( (status = 200, description = "Successfully retrieved revisions", body = Vec), @@ -72,7 +77,7 @@ pub async fn handler( } let revisions = - ctx.data_store.list_all_table_revisions(query.active, query.limit) + ctx.data_store.list_all_table_revisions(query.active, query.limit, query.last_id) .await .map_err(|err| { tracing::debug!(error = %err, error_source = logging::error_source(&err), "failed to list revisions"); @@ -93,6 +98,8 @@ pub struct QueryParams { /// Maximum number of revisions to return (default: 100) #[serde(default = "default_limit")] pub limit: i64, + /// Cursor for pagination: ID of the last revision from the previous page + pub last_id: Option, } fn default_limit() -> i64 { diff --git a/crates/services/admin-api/src/lib.rs b/crates/services/admin-api/src/lib.rs index 3137620f1..f5323fab9 100644 --- a/crates/services/admin-api/src/lib.rs +++ b/crates/services/admin-api/src/lib.rs @@ -56,7 +56,7 @@ pub fn router(ctx: Ctx) -> Router<()> { ) .route( "/revisions", - get(revisions::list_all::handler).post(revisions::create::handler), + get(revisions::list::handler).post(revisions::create::handler), ) .route( "/revisions/{id}", @@ -167,7 +167,7 @@ pub fn router(ctx: Ctx) -> Router<()> { // Schema endpoints handlers::schema::handler, // Revision endpoints - handlers::revisions::list_all::handler, + handlers::revisions::list::handler, handlers::revisions::restore::handler, handlers::revisions::activate::handler, handlers::revisions::deactivate::handler, diff --git a/docs/feat/admin-table.md b/docs/feat/admin-table.md index be992ca21..cc6dd7b0f 100644 --- a/docs/feat/admin-table.md +++ b/docs/feat/admin-table.md @@ -47,7 +47,7 @@ Table revision management controls which physical version of a table's data is s | **`DataStore`** | Single-operation get revision by location ID | | **`DataStore`** | Single-operation delete revision by location ID (CASCADE deletes file metadata) | | **`DataStore`** | Multi-step truncate revision (stream files, delete from object store + metadata, verify, delete revision) | -| **`metadata_db`** | SQL operations on `physical_table_revisions` (register, list_all, get_by_location_id, delete_by_id) and `physical_tables` (mark_inactive_by_table_name, mark_active_by_id) | +| **`metadata_db`** | SQL operations on `physical_table_revisions` (register, list, get_by_location_id, delete_by_id) and `physical_tables` (mark_inactive_by_table_name, mark_active_by_id) | **Key Principle**: Admin API handlers do not interact with `metadata_db` directly. All database access is encapsulated in `DataStore` methods, keeping handlers as pure orchestration and presentation logic. @@ -67,7 +67,7 @@ Each handler follows the same pattern: parse request, resolve dataset reference, | Endpoint | Method | Description | | ----------------------------- | ------ | ------------------------------------------------- | -| `/revisions` | GET | List revisions (optional `?active=true\|false` filter, `?limit=N` default 100) | +| `/revisions` | GET | List revisions (optional `?active=true\|false` filter, `?limit=N` default 100, `?last_id=N` cursor) | | `/revisions` | POST | Register a new inactive table revision | | `/revisions/{id}` | GET | Retrieve a specific revision by location ID | | `/revisions/{id}` | DELETE | Delete an inactive table revision by location ID | @@ -102,7 +102,7 @@ curl -X POST http://localhost:1610/revisions \ ### List table revisions -View revisions with optional active status filter and result limit. +View revisions with optional active status filter, result limit, and cursor-based pagination. ```bash # Via ampctl @@ -118,6 +118,9 @@ ampctl table list --limit 10 # Combine filters ampctl table list --active true --limit 5 +# Cursor-based pagination (use last_id from previous page) +ampctl table list --limit 10 --last-id 42 + # JSON output for scripting ampctl table list --json @@ -125,6 +128,7 @@ ampctl table list --json curl http://localhost:1610/revisions curl http://localhost:1610/revisions?active=true curl http://localhost:1610/revisions?limit=10 +curl http://localhost:1610/revisions?limit=10&last_id=42 ``` ### Get a specific revision @@ -232,7 +236,7 @@ curl -X POST http://localhost:1610/revisions/42/restore - `crates/services/admin-api/src/handlers/revisions/restore.rs` - Restore endpoint handler and error types - `crates/core/data-store/src/lib.rs` - `register_table_revision`, `activate_table_revision` (transactional), `deactivate_table_revision`, `delete_table_revision`, and `truncate_revision` methods - `crates/core/metadata-db/src/physical_table.rs` - `mark_inactive_by_table_name` and `mark_active_by_id` SQL operations on `physical_tables` -- `crates/core/metadata-db/src/physical_table_revision.rs` - `register`, `list_all`, `get_by_location_id`, and `delete_by_id` SQL operations on `physical_table_revisions` +- `crates/core/metadata-db/src/physical_table_revision.rs` - `register`, `list`, `get_by_location_id`, and `delete_by_id` SQL operations on `physical_table_revisions` ## References diff --git a/docs/schemas/openapi/admin.spec.json b/docs/schemas/openapi/admin.spec.json index dd0fe1862..39dfc7686 100644 --- a/docs/schemas/openapi/admin.spec.json +++ b/docs/schemas/openapi/admin.spec.json @@ -1797,7 +1797,7 @@ "revisions" ], "summary": "Handler for the `GET /revisions` endpoint", - "description": "Returns all physical table revisions, with an optional active status filter.\n\n## Query Parameters\n- `active` (optional): Filter by active status (`true` or `false`)\n- `limit` (optional): Maximum number of revisions to return (default: 100)\n\n## Response\n- **200 OK**: Successfully retrieved revisions\n- **400 Bad Request**: Invalid query parameters\n- **500 Internal Server Error**: Database error during listing\n\n## Error Codes\n- `INVALID_QUERY_PARAMETERS`: Invalid query parameters\n- `LIST_ALL_TABLE_REVISIONS_ERROR`: Failed to list table revisions\n\nThis handler:\n- Validates and extracts optional `active` and `limit` query parameters\n- Calls the data store to list table revisions with the given filters\n- Returns revision information as a JSON array", + "description": "Returns physical table revisions with cursor-based pagination and optional active\nstatus filtering.\n\n## Query Parameters\n- `active` (optional): Filter by active status (`true` or `false`)\n- `limit` (optional): Maximum number of revisions to return (default: 100)\n- `last_id` (optional): Cursor for pagination — ID of the last revision from the\n previous page. Omit for the first page.\n\n## Response\n- **200 OK**: Successfully retrieved revisions\n- **400 Bad Request**: Invalid query parameters\n- **500 Internal Server Error**: Database error during listing\n\n## Error Codes\n- `INVALID_QUERY_PARAMETERS`: Invalid query parameters\n- `LIST_ALL_TABLE_REVISIONS_ERROR`: Failed to list table revisions\n\nThis handler:\n- Validates and extracts optional `active`, `limit`, and `last_id` query parameters\n- Calls the data store to list table revisions with the given filters and cursor\n- Returns revision information as a JSON array", "operationId": "list_revisions", "parameters": [ { @@ -1818,6 +1818,16 @@ "type": "integer", "format": "int64" } + }, + { + "name": "last_id", + "in": "query", + "description": "Cursor for pagination: ID of the last revision from the previous page", + "required": false, + "schema": { + "type": "integer", + "format": "int64" + } } ], "responses": { diff --git a/tests/src/tests/it_admin_api_revisions.rs b/tests/src/tests/it_admin_api_revisions.rs index d6ec75669..570481712 100644 --- a/tests/src/tests/it_admin_api_revisions.rs +++ b/tests/src/tests/it_admin_api_revisions.rs @@ -1224,7 +1224,7 @@ async fn prune_revision_with_reorg_schedules_non_canonical_files() { // Get the revision for the blocks table let revisions_after_first_deploy = ampctl_client .revisions() - .list(None, None) + .list(None, None, None) .await .expect("failed to list revisions"); let blocks_revision = revisions_after_first_deploy @@ -1410,7 +1410,10 @@ impl TestCtx { active: Option, limit: Option, ) -> Result, ListError> { - self.ampctl_client.revisions().list(active, limit).await + self.ampctl_client + .revisions() + .list(active, limit, None) + .await } /// Fetches a revision by its location ID, returning `None` if not found.