Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/bin/ampctl/src/cmd/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub enum Commands {
#[command(after_help = include_str!("table/get__after_help.md"))]
Get(get::Args),

/// List all table revisions
/// List table revisions
#[command(alias = "ls")]
#[command(after_help = include_str!("table/list__after_help.md"))]
List(list::Args),
Expand Down
14 changes: 10 additions & 4 deletions crates/bin/ampctl/src/cmd/table/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,27 +30,32 @@ pub struct Args {
/// Maximum number of revisions to return (default: 100)
#[arg(long, short = 'l')]
pub limit: Option<i64>,

/// Cursor for pagination: ID of the last revision from the previous page
#[arg(long)]
pub last_id: Option<i64>,
}

/// List all table revisions by retrieving them from the admin API.
/// List table revisions by retrieving them from the admin API.
///
/// Retrieves table revisions with optional filtering and displays them based on
/// the output format.
///
/// # Errors
///
/// Returns [`Error`] for API errors (400/500) or network failures.
#[tracing::instrument(skip_all, fields(admin_url = %global.admin_url, active = ?active, limit = ?limit))]
#[tracing::instrument(skip_all, fields(admin_url = %global.admin_url, active = ?active, limit = ?limit, last_id = ?last_id))]
pub async fn run(
Args {
global,
active,
limit,
last_id,
}: Args,
) -> Result<(), Error> {
tracing::debug!("listing table revisions");

let revisions = get_revisions(&global, active, limit).await?;
let revisions = get_revisions(&global, active, limit, last_id).await?;
let result = ListResult { revisions };
global.print(&result).map_err(Error::JsonFormattingError)?;

Expand All @@ -65,12 +70,13 @@ async fn get_revisions(
global: &GlobalArgs,
active: Option<bool>,
limit: Option<i64>,
last_id: Option<i64>,
) -> Result<Vec<client::revisions::RevisionInfo>, Error> {
let client = global.build_client().map_err(Error::ClientBuildError)?;

let revisions = client
.revisions()
.list(active, limit)
.list(active, limit, last_id)
.await
.map_err(|err| {
tracing::error!(
Expand Down
5 changes: 4 additions & 1 deletion crates/bin/ampctl/src/cmd/table/list__after_help.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
## Examples

List all table revisions:
List table revisions:
$ ampctl table list

Filter by active status:
Expand All @@ -12,6 +12,9 @@ Limit the number of results:
Combine filters:
$ ampctl table list --active true --limit 5

Paginate through results:
$ ampctl table list --limit 10 --last-id 42

With JSON output:
$ ampctl table list --json

Expand Down
15 changes: 10 additions & 5 deletions crates/clients/admin/src/revisions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,19 @@ fn revision_get_by_id(id: i64) -> String {

/// Build URL path for listing all revisions.
///
/// GET `/revisions` with optional `?active=true|false` and `?limit=x` query parameters
fn revisions_list(active: Option<bool>, limit: Option<i64>) -> String {
/// GET `/revisions` with optional `?active=true|false`, `?limit=x`, and `?last_id=x`
/// query parameters
fn revisions_list(active: Option<bool>, limit: Option<i64>, last_id: Option<i64>) -> String {
let mut params = Vec::new();
if let Some(v) = active {
params.push(format!("active={v}"));
}
if let Some(v) = limit {
params.push(format!("limit={v}"));
}
if let Some(v) = last_id {
params.push(format!("last_id={v}"));
}
if params.is_empty() {
"revisions".to_owned()
} else {
Expand Down Expand Up @@ -220,8 +224,8 @@ impl<'a> RevisionsClient<'a> {

/// List all revisions, optionally filtered by active status.
///
/// Sends GET to `/revisions` endpoint with optional `?active=true|false`
/// and `?limit=x` query parameters.
/// Sends GET to `/revisions` endpoint with optional `?active=true|false`,
/// `?limit=x`, and `?last_id=x` query parameters.
///
/// # Errors
///
Expand All @@ -232,11 +236,12 @@ impl<'a> RevisionsClient<'a> {
&self,
active: Option<bool>,
limit: Option<i64>,
last_id: Option<i64>,
) -> Result<Vec<RevisionInfo>, ListError> {
let url = self
.client
.base_url()
.join(&revisions_list(active, limit))
.join(&revisions_list(active, limit, last_id))
.expect("valid URL");

tracing::debug!(url = %url, "Sending GET request to list revisions");
Expand Down
7 changes: 5 additions & 2 deletions crates/core/data-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,16 +329,19 @@ impl DataStore {
.map_err(GetRevisionByLocationIdError)
}

/// Lists physical table revisions from the metadata database.
/// Lists physical table revisions from the metadata database with cursor-based pagination.
///
/// Uses cursor-based pagination where `last_id` is the ID of the last revision
/// from the previous page. For the first page, pass `None` for `last_id`.
/// When `active` is `None`, returns all revisions. When `Some(true)` or
/// `Some(false)`, returns only revisions matching that active status.
pub async fn list_all_table_revisions(
&self,
active: Option<bool>,
limit: i64,
last_id: Option<LocationId>,
) -> Result<Vec<PhysicalTableRevision>, ListAllTableRevisionsError> {
metadata_db::physical_table_revision::list_all(&self.metadata_db, active, limit)
metadata_db::physical_table_revision::list(&self.metadata_db, limit, last_id, active)
.await
.map_err(ListAllTableRevisionsError)
}
Expand Down
31 changes: 8 additions & 23 deletions crates/core/metadata-db/src/physical_table_revision.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,44 +148,29 @@ where
.map_err(Error::Database)
}

/// List physical table locations with cursor-based pagination
/// List physical table revisions with cursor-based pagination, optionally filtered by
/// active status
///
/// This function provides an ergonomic interface for paginated listing that automatically
/// handles first page vs subsequent page logic based on the cursor parameter.
/// Uses cursor-based pagination where `last_id` is the ID of the last revision
/// from the previous page. For the first page, pass `None` for `last_id`.
/// If `active` is provided, only revisions matching that active status are returned.
#[tracing::instrument(skip(exe), err)]
pub async fn list<'c, E>(
exe: E,
limit: i64,
last_id: Option<impl Into<LocationId> + std::fmt::Debug>,
active: Option<bool>,
) -> Result<Vec<PhysicalTableRevision>, Error>
where
E: Executor<'c>,
{
match last_id {
None => sql::list_first_page(exe, limit).await,
Some(id) => sql::list_next_page(exe, limit, id.into()).await,
None => sql::list_first_page(exe, limit, active).await,
Some(id) => sql::list_next_page(exe, limit, id.into(), active).await,
}
.map_err(Error::Database)
}

/// List all physical table revisions with an optional active status filter
///
/// When `active` is `None`, returns all revisions. When `Some(true)` or `Some(false)`,
/// returns only revisions matching that active status.
#[tracing::instrument(skip(exe), err)]
pub async fn list_all<'c, E>(
exe: E,
active: Option<bool>,
limit: i64,
) -> Result<Vec<PhysicalTableRevision>, Error>
where
E: Executor<'c>,
{
sql::list_all(exe, active, limit)
.await
.map_err(Error::Database)
}

/// A specific storage revision (location) of a physical table
///
/// Each revision has its own storage path and an optional writer job that
Expand Down
152 changes: 85 additions & 67 deletions crates/core/metadata-db/src/physical_table_revision/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,98 +246,116 @@ where
Ok(result.rows_affected() > 0)
}

/// List the first page of physical table revisions
/// List the first page of physical table revisions, optionally filtered by active status
///
/// Returns a paginated list of revisions ordered by ID in descending order (newest first).
/// This function is used to fetch the initial page when no cursor is available.
/// If `active` is provided, only revisions matching that active status are returned.
pub async fn list_first_page<'c, E>(
exe: E,
limit: i64,
active: Option<bool>,
) -> Result<Vec<PhysicalTableRevision>, sqlx::Error>
where
E: Executor<'c, Database = Postgres>,
{
let query = indoc::indoc! {r#"
SELECT
ptr.id,
ptr.path,
EXISTS(SELECT 1 FROM physical_tables WHERE active_revision_id = ptr.id) AS active,
ptr.writer,
ptr.metadata
FROM physical_table_revisions ptr
ORDER BY ptr.id DESC
LIMIT $1
"#};
match active {
None => {
let query = indoc::indoc! {r#"
SELECT
ptr.id,
ptr.path,
EXISTS(SELECT 1 FROM physical_tables WHERE active_revision_id = ptr.id) AS active,
ptr.writer,
ptr.metadata
FROM physical_table_revisions ptr
ORDER BY ptr.id DESC
LIMIT $1
"#};

sqlx::query_as(query).bind(limit).fetch_all(exe).await
sqlx::query_as(query).bind(limit).fetch_all(exe).await
}
Some(active) => {
let query = indoc::indoc! {r#"
SELECT
ptr.id,
ptr.path,
$2 AS active,
ptr.writer,
ptr.metadata
FROM physical_table_revisions ptr
WHERE EXISTS(SELECT 1 FROM physical_tables WHERE active_revision_id = ptr.id) = $2
ORDER BY ptr.id DESC
LIMIT $1
"#};

sqlx::query_as(query)
.bind(limit)
.bind(active)
.fetch_all(exe)
.await
}
}
}

/// List subsequent pages of physical table revisions using cursor-based pagination
/// List subsequent pages of physical table revisions using cursor-based pagination,
/// optionally filtered by active status
///
/// Returns a paginated list of revisions with IDs less than the provided cursor,
/// ordered by ID in descending order (newest first). This implements cursor-based
/// pagination for efficient traversal of large revision lists.
/// If `active` is provided, only revisions matching that active status are returned.
pub async fn list_next_page<'c, E>(
exe: E,
limit: i64,
last_id: LocationId,
) -> Result<Vec<PhysicalTableRevision>, sqlx::Error>
where
E: Executor<'c, Database = Postgres>,
{
let query = indoc::indoc! {r#"
SELECT
ptr.id,
ptr.path,
EXISTS(SELECT 1 FROM physical_tables WHERE active_revision_id = ptr.id) AS active,
ptr.writer,
ptr.metadata
FROM physical_table_revisions ptr
WHERE ptr.id < $2
ORDER BY ptr.id DESC
LIMIT $1
"#};

sqlx::query_as(query)
.bind(limit)
.bind(last_id)
.fetch_all(exe)
.await
}

/// List all physical table revisions with an optional active status filter
///
/// Returns all revisions ordered by ID in descending order (newest first).
/// When `active` is `None`, all revisions are returned. When `Some(true)` or
/// `Some(false)`, only revisions matching that active status are returned.
pub async fn list_all<'c, E>(
exe: E,
active: Option<bool>,
limit: i64,
) -> Result<Vec<PhysicalTableRevision>, sqlx::Error>
where
E: Executor<'c, Database = Postgres>,
{
let query = indoc::indoc! {r#"
SELECT
ptr.id,
ptr.path,
pt.active_revision_id IS NOT NULL AS active,
ptr.writer,
ptr.metadata
FROM physical_table_revisions AS ptr
LEFT JOIN physical_tables pt ON pt.active_revision_id = ptr.id
WHERE (
$1::boolean IS NULL
OR (pt.active_revision_id IS NOT NULL) = $1
)
ORDER BY ptr.id DESC
LIMIT $2
"#};
match active {
None => {
let query = indoc::indoc! {r#"
SELECT
ptr.id,
ptr.path,
EXISTS(SELECT 1 FROM physical_tables WHERE active_revision_id = ptr.id) AS active,
ptr.writer,
ptr.metadata
FROM physical_table_revisions ptr
WHERE ptr.id < $2
ORDER BY ptr.id DESC
LIMIT $1
"#};

sqlx::query_as(query)
.bind(active)
.bind(limit)
.fetch_all(exe)
.await
sqlx::query_as(query)
.bind(limit)
.bind(last_id)
.fetch_all(exe)
.await
}
Some(active) => {
let query = indoc::indoc! {r#"
SELECT
ptr.id,
ptr.path,
$3 AS active,
ptr.writer,
ptr.metadata
FROM physical_table_revisions ptr
WHERE ptr.id < $2
AND EXISTS(SELECT 1 FROM physical_tables WHERE active_revision_id = ptr.id) = $3
ORDER BY ptr.id DESC
LIMIT $1
"#};

sqlx::query_as(query)
.bind(limit)
.bind(last_id)
.bind(active)
.fetch_all(exe)
.await
}
}
}
Loading
Loading