From aee3f68caa48fa081c8a8f0e9effe7a9f9cbe1f7 Mon Sep 17 00:00:00 2001 From: Mrutunjay Kinagi Date: Tue, 17 Feb 2026 22:01:36 +0530 Subject: [PATCH 1/4] fix(table): validate snapshot timestamp drift on add snapshot (#2938) --- pyiceberg/table/update/__init__.py | 14 ++++++++++++ tests/table/test_init.py | 34 ++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+) diff --git a/pyiceberg/table/update/__init__.py b/pyiceberg/table/update/__init__.py index 455b46953c..c394529467 100644 --- a/pyiceberg/table/update/__init__.py +++ b/pyiceberg/table/update/__init__.py @@ -57,6 +57,7 @@ from pyiceberg.table import Transaction U = TypeVar("U") +ONE_MINUTE_MS = 60_000 class UpdateTableMetadata(ABC, Generic[U]): @@ -442,6 +443,19 @@ def _(update: AddSnapshotUpdate, base_metadata: TableMetadata, context: _TableMe f"Cannot add snapshot with sequence number {update.snapshot.sequence_number} " f"older than last sequence number {base_metadata.last_sequence_number}" ) + elif ( + base_metadata.snapshot_log + and update.snapshot.timestamp_ms - base_metadata.snapshot_log[-1].timestamp_ms < -ONE_MINUTE_MS + ): + raise ValueError( + f"Invalid snapshot timestamp {update.snapshot.timestamp_ms}: " + f"before last snapshot timestamp {base_metadata.snapshot_log[-1].timestamp_ms}" + ) + elif update.snapshot.timestamp_ms - base_metadata.last_updated_ms < -ONE_MINUTE_MS: + raise ValueError( + f"Invalid snapshot timestamp {update.snapshot.timestamp_ms}: " + f"before last updated timestamp {base_metadata.last_updated_ms}" + ) elif base_metadata.format_version >= 3 and update.snapshot.first_row_id is None: raise ValueError("Cannot add snapshot without first row id") elif ( diff --git a/tests/table/test_init.py b/tests/table/test_init.py index aef6e3cc5f..21ef1a1dfe 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -828,6 +828,40 @@ def test_update_metadata_add_snapshot(table_v2: Table) -> None: assert new_metadata.last_updated_ms == new_snapshot.timestamp_ms +def test_update_metadata_add_snapshot_rejects_old_timestamp_vs_snapshot_log(table_v2: Table) -> None: + oldest_allowed_snapshot_ts = table_v2.metadata.snapshot_log[-1].timestamp_ms - 60_000 + new_snapshot = Snapshot( + snapshot_id=25, + parent_snapshot_id=19, + sequence_number=200, + timestamp_ms=oldest_allowed_snapshot_ts - 1, + manifest_list="s3:/a/b/c.avro", + summary=Summary(Operation.APPEND), + schema_id=3, + ) + + with pytest.raises(ValueError, match="before last snapshot timestamp"): + update_table_metadata(table_v2.metadata, (AddSnapshotUpdate(snapshot=new_snapshot),)) + + +def test_update_metadata_add_snapshot_rejects_old_timestamp_vs_last_updated(table_v2: Table) -> None: + # Clear snapshot-log to isolate the last_updated_ms guard. + base_metadata = table_v2.metadata.model_copy(update={"snapshot_log": []}) + oldest_allowed_snapshot_ts = base_metadata.last_updated_ms - 60_000 + new_snapshot = Snapshot( + snapshot_id=25, + parent_snapshot_id=19, + sequence_number=200, + timestamp_ms=oldest_allowed_snapshot_ts - 1, + manifest_list="s3:/a/b/c.avro", + summary=Summary(Operation.APPEND), + schema_id=3, + ) + + with pytest.raises(ValueError, match="before last updated timestamp"): + update_table_metadata(base_metadata, (AddSnapshotUpdate(snapshot=new_snapshot),)) + + def test_update_metadata_set_ref_snapshot(table_v2: Table) -> None: update, _ = table_v2.transaction()._set_ref_snapshot( snapshot_id=3051729675574597004, From 62d4a1f6659fe34e48d9c8bbfacfab93a22a147f Mon Sep 17 00:00:00 2001 From: Mrutunjay Kinagi Date: Wed, 18 Feb 2026 12:46:34 +0530 Subject: [PATCH 2/4] style: apply ruff formatting for snapshot timestamp validation --- pyiceberg/table/update/__init__.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pyiceberg/table/update/__init__.py b/pyiceberg/table/update/__init__.py index c394529467..7372cb3d0d 100644 --- a/pyiceberg/table/update/__init__.py +++ b/pyiceberg/table/update/__init__.py @@ -444,8 +444,7 @@ def _(update: AddSnapshotUpdate, base_metadata: TableMetadata, context: _TableMe f"older than last sequence number {base_metadata.last_sequence_number}" ) elif ( - base_metadata.snapshot_log - and update.snapshot.timestamp_ms - base_metadata.snapshot_log[-1].timestamp_ms < -ONE_MINUTE_MS + base_metadata.snapshot_log and update.snapshot.timestamp_ms - base_metadata.snapshot_log[-1].timestamp_ms < -ONE_MINUTE_MS ): raise ValueError( f"Invalid snapshot timestamp {update.snapshot.timestamp_ms}: " From 7a31887168622a601ee794d7c47375a086608bc5 Mon Sep 17 00:00:00 2001 From: Mrutunjay Kinagi Date: Thu, 19 Feb 2026 12:03:26 +0530 Subject: [PATCH 3/4] fix(table): validate snapshot-log timestamps in metadata parsing --- pyiceberg/table/metadata.py | 36 ++++++++++++++++++++++++++++++++++++ tests/table/test_metadata.py | 22 +++++++++++++++++++++- 2 files changed, 57 insertions(+), 1 deletion(-) diff --git a/pyiceberg/table/metadata.py b/pyiceberg/table/metadata.py index 26b6e3d3ad..4be088d428 100644 --- a/pyiceberg/table/metadata.py +++ b/pyiceberg/table/metadata.py @@ -67,6 +67,7 @@ DEFAULT_SCHEMA_ID = 0 SUPPORTED_TABLE_FORMAT_VERSION = 2 +ONE_MINUTE_MS = 60_000 def cleanup_snapshot_id(data: dict[str, Any]) -> dict[str, Any]: @@ -125,6 +126,29 @@ def construct_refs(table_metadata: TableMetadata) -> TableMetadata: return table_metadata +def check_snapshot_timestamps(table_metadata: TableMetadata) -> TableMetadata: + """Validate snapshot and snapshot-log timestamps with small clock skew tolerance.""" + last_snapshot_log_entry: SnapshotLogEntry | None = None + for snapshot_log_entry in table_metadata.snapshot_log: + if ( + last_snapshot_log_entry is not None + and snapshot_log_entry.timestamp_ms - last_snapshot_log_entry.timestamp_ms < -ONE_MINUTE_MS + ): + raise ValidationError("[BUG] Expected sorted snapshot log entries.") + last_snapshot_log_entry = snapshot_log_entry + + if ( + last_snapshot_log_entry is not None + and table_metadata.last_updated_ms - last_snapshot_log_entry.timestamp_ms < -ONE_MINUTE_MS + ): + raise ValidationError( + f"Invalid update timestamp {table_metadata.last_updated_ms}: " + f"before last snapshot log entry at {last_snapshot_log_entry.timestamp_ms}" + ) + + return table_metadata + + class TableMetadataCommonFields(IcebergBaseModel): """Metadata for an Iceberg table as specified in the Apache Iceberg spec. @@ -378,6 +402,10 @@ def cleanup_snapshot_id(cls, data: dict[str, Any]) -> dict[str, Any]: def construct_refs(self) -> TableMetadataV1: return construct_refs(self) + @model_validator(mode="after") + def check_snapshot_timestamps(self) -> TableMetadata: + return check_snapshot_timestamps(self) + @model_validator(mode="before") def set_v2_compatible_defaults(cls, data: dict[str, Any]) -> dict[str, Any]: """Set default values to be compatible with the format v2. @@ -519,6 +547,10 @@ def check_sort_orders(self) -> TableMetadata: def construct_refs(self) -> TableMetadata: return construct_refs(self) + @model_validator(mode="after") + def check_snapshot_timestamps(self) -> TableMetadata: + return check_snapshot_timestamps(self) + format_version: Literal[2] = Field(alias="format-version", default=2) """An integer version number for the format. Implementations must throw an exception if a table’s version is higher than the supported version.""" @@ -563,6 +595,10 @@ def check_sort_orders(self) -> TableMetadata: def construct_refs(self) -> TableMetadata: return construct_refs(self) + @model_validator(mode="after") + def check_snapshot_timestamps(self) -> TableMetadata: + return check_snapshot_timestamps(self) + format_version: Literal[3] = Field(alias="format-version", default=3) """An integer version number for the format. Implementations must throw an exception if a table’s version is higher than the supported version.""" diff --git a/tests/table/test_metadata.py b/tests/table/test_metadata.py index c163c90626..9732bbd3bc 100644 --- a/tests/table/test_metadata.py +++ b/tests/table/test_metadata.py @@ -18,7 +18,7 @@ import io import json -from copy import copy +from copy import copy, deepcopy from typing import Any from unittest.mock import MagicMock, patch from uuid import UUID @@ -415,6 +415,26 @@ def test_sort_order_unsorted() -> None: assert len(table_metadata.sort_orders) == 0 +def test_snapshot_log_entries_are_sorted_with_tolerance(example_table_metadata_v2: dict[str, Any]) -> None: + table_metadata = deepcopy(example_table_metadata_v2) + table_metadata["snapshot-log"][1]["timestamp-ms"] = table_metadata["snapshot-log"][0]["timestamp-ms"] - 60_001 + + with pytest.raises(ValidationError) as exc_info: + TableMetadataUtil.parse_raw(json.dumps(table_metadata)) + + assert "Expected sorted snapshot log entries" in str(exc_info.value) + + +def test_last_updated_ms_not_before_last_snapshot_log_entry(example_table_metadata_v2: dict[str, Any]) -> None: + table_metadata = deepcopy(example_table_metadata_v2) + table_metadata["last-updated-ms"] = table_metadata["snapshot-log"][-1]["timestamp-ms"] - 60_001 + + with pytest.raises(ValidationError) as exc_info: + TableMetadataUtil.parse_raw(json.dumps(table_metadata)) + + assert "before last snapshot log entry at" in str(exc_info.value) + + def test_invalid_partition_spec() -> None: table_metadata_spec_not_found = { "format-version": 2, From c8eca78befa022fd21d83061a256bfaee93d8dcc Mon Sep 17 00:00:00 2001 From: Mrutunjay Kinagi Date: Thu, 19 Feb 2026 13:44:11 +0530 Subject: [PATCH 4/4] test: make extensive snapshot fixture timestamps monotonic --- tests/conftest.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 5c85f49a77..7ca285fae4 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -34,7 +34,7 @@ from collections.abc import Generator from datetime import date, datetime, timezone from pathlib import Path -from random import choice, randint +from random import choice from tempfile import TemporaryDirectory from typing import ( TYPE_CHECKING, @@ -815,19 +815,22 @@ def generate_snapshot( snapshot_log = [] initial_snapshot_id = 3051729675574597004 + base_timestamp_ms = 1602638573590 for i in range(2000): snapshot_id = initial_snapshot_id + i parent_snapshot_id = snapshot_id - 1 if i > 0 else None - timestamp_ms = int(time.time() * 1000) - randint(0, 1000000) + timestamp_ms = base_timestamp_ms + i snapshots.append(generate_snapshot(snapshot_id, parent_snapshot_id, timestamp_ms, i)) snapshot_log.append({"snapshot-id": snapshot_id, "timestamp-ms": timestamp_ms}) + last_updated_ms = snapshot_log[-1]["timestamp-ms"] + return { "format-version": 2, "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1", "location": "s3://bucket/test/location", "last-sequence-number": 34, - "last-updated-ms": 1602638573590, + "last-updated-ms": last_updated_ms, "last-column-id": 3, "current-schema-id": 1, "schemas": [