Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions pyiceberg/table/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
DEFAULT_SCHEMA_ID = 0

SUPPORTED_TABLE_FORMAT_VERSION = 2
ONE_MINUTE_MS = 60_000


def cleanup_snapshot_id(data: dict[str, Any]) -> dict[str, Any]:
Expand Down Expand Up @@ -125,6 +126,29 @@ def construct_refs(table_metadata: TableMetadata) -> TableMetadata:
return table_metadata


def check_snapshot_timestamps(table_metadata: TableMetadata) -> TableMetadata:
"""Validate snapshot and snapshot-log timestamps with small clock skew tolerance."""
last_snapshot_log_entry: SnapshotLogEntry | None = None
for snapshot_log_entry in table_metadata.snapshot_log:
if (
last_snapshot_log_entry is not None
and snapshot_log_entry.timestamp_ms - last_snapshot_log_entry.timestamp_ms < -ONE_MINUTE_MS
):
raise ValidationError("[BUG] Expected sorted snapshot log entries.")
last_snapshot_log_entry = snapshot_log_entry

if (
last_snapshot_log_entry is not None
and table_metadata.last_updated_ms - last_snapshot_log_entry.timestamp_ms < -ONE_MINUTE_MS
):
raise ValidationError(
f"Invalid update timestamp {table_metadata.last_updated_ms}: "
f"before last snapshot log entry at {last_snapshot_log_entry.timestamp_ms}"
)

return table_metadata


class TableMetadataCommonFields(IcebergBaseModel):
"""Metadata for an Iceberg table as specified in the Apache Iceberg spec.

Expand Down Expand Up @@ -378,6 +402,10 @@ def cleanup_snapshot_id(cls, data: dict[str, Any]) -> dict[str, Any]:
def construct_refs(self) -> TableMetadataV1:
return construct_refs(self)

@model_validator(mode="after")
def check_snapshot_timestamps(self) -> TableMetadata:
return check_snapshot_timestamps(self)

@model_validator(mode="before")
def set_v2_compatible_defaults(cls, data: dict[str, Any]) -> dict[str, Any]:
"""Set default values to be compatible with the format v2.
Expand Down Expand Up @@ -519,6 +547,10 @@ def check_sort_orders(self) -> TableMetadata:
def construct_refs(self) -> TableMetadata:
return construct_refs(self)

@model_validator(mode="after")
def check_snapshot_timestamps(self) -> TableMetadata:
return check_snapshot_timestamps(self)

format_version: Literal[2] = Field(alias="format-version", default=2)
"""An integer version number for the format. Implementations must throw
an exception if a table’s version is higher than the supported version."""
Expand Down Expand Up @@ -563,6 +595,10 @@ def check_sort_orders(self) -> TableMetadata:
def construct_refs(self) -> TableMetadata:
return construct_refs(self)

@model_validator(mode="after")
def check_snapshot_timestamps(self) -> TableMetadata:
return check_snapshot_timestamps(self)

format_version: Literal[3] = Field(alias="format-version", default=3)
"""An integer version number for the format. Implementations must throw
an exception if a table’s version is higher than the supported version."""
Expand Down
13 changes: 13 additions & 0 deletions pyiceberg/table/update/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
from pyiceberg.table import Transaction

U = TypeVar("U")
ONE_MINUTE_MS = 60_000


class UpdateTableMetadata(ABC, Generic[U]):
Expand Down Expand Up @@ -442,6 +443,18 @@ def _(update: AddSnapshotUpdate, base_metadata: TableMetadata, context: _TableMe
f"Cannot add snapshot with sequence number {update.snapshot.sequence_number} "
f"older than last sequence number {base_metadata.last_sequence_number}"
)
elif (
base_metadata.snapshot_log and update.snapshot.timestamp_ms - base_metadata.snapshot_log[-1].timestamp_ms < -ONE_MINUTE_MS
):
raise ValueError(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe CommitFailedException?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I kept ValueError for consistency with the existing validation branches in this same method (sequence/row-id checks all currently raise ValueError). If you prefer, I can switch this validation path to CommitFailedException in a follow-up commit.

f"Invalid snapshot timestamp {update.snapshot.timestamp_ms}: "
f"before last snapshot timestamp {base_metadata.snapshot_log[-1].timestamp_ms}"
)
elif update.snapshot.timestamp_ms - base_metadata.last_updated_ms < -ONE_MINUTE_MS:
raise ValueError(
f"Invalid snapshot timestamp {update.snapshot.timestamp_ms}: "
f"before last updated timestamp {base_metadata.last_updated_ms}"
)
elif base_metadata.format_version >= 3 and update.snapshot.first_row_id is None:
raise ValueError("Cannot add snapshot without first row id")
elif (
Expand Down
9 changes: 6 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from collections.abc import Generator
from datetime import date, datetime, timezone
from pathlib import Path
from random import choice, randint
from random import choice
from tempfile import TemporaryDirectory
from typing import (
TYPE_CHECKING,
Expand Down Expand Up @@ -815,19 +815,22 @@ def generate_snapshot(
snapshot_log = []
initial_snapshot_id = 3051729675574597004

base_timestamp_ms = 1602638573590
for i in range(2000):
snapshot_id = initial_snapshot_id + i
parent_snapshot_id = snapshot_id - 1 if i > 0 else None
timestamp_ms = int(time.time() * 1000) - randint(0, 1000000)
timestamp_ms = base_timestamp_ms + i
snapshots.append(generate_snapshot(snapshot_id, parent_snapshot_id, timestamp_ms, i))
snapshot_log.append({"snapshot-id": snapshot_id, "timestamp-ms": timestamp_ms})

last_updated_ms = snapshot_log[-1]["timestamp-ms"]

return {
"format-version": 2,
"table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
"location": "s3://bucket/test/location",
"last-sequence-number": 34,
"last-updated-ms": 1602638573590,
"last-updated-ms": last_updated_ms,
"last-column-id": 3,
"current-schema-id": 1,
"schemas": [
Expand Down
34 changes: 34 additions & 0 deletions tests/table/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -828,6 +828,40 @@ def test_update_metadata_add_snapshot(table_v2: Table) -> None:
assert new_metadata.last_updated_ms == new_snapshot.timestamp_ms


def test_update_metadata_add_snapshot_rejects_old_timestamp_vs_snapshot_log(table_v2: Table) -> None:
oldest_allowed_snapshot_ts = table_v2.metadata.snapshot_log[-1].timestamp_ms - 60_000
new_snapshot = Snapshot(
snapshot_id=25,
parent_snapshot_id=19,
sequence_number=200,
timestamp_ms=oldest_allowed_snapshot_ts - 1,
manifest_list="s3:/a/b/c.avro",
summary=Summary(Operation.APPEND),
schema_id=3,
)

with pytest.raises(ValueError, match="before last snapshot timestamp"):
update_table_metadata(table_v2.metadata, (AddSnapshotUpdate(snapshot=new_snapshot),))


def test_update_metadata_add_snapshot_rejects_old_timestamp_vs_last_updated(table_v2: Table) -> None:
# Clear snapshot-log to isolate the last_updated_ms guard.
base_metadata = table_v2.metadata.model_copy(update={"snapshot_log": []})
oldest_allowed_snapshot_ts = base_metadata.last_updated_ms - 60_000
new_snapshot = Snapshot(
snapshot_id=25,
parent_snapshot_id=19,
sequence_number=200,
timestamp_ms=oldest_allowed_snapshot_ts - 1,
manifest_list="s3:/a/b/c.avro",
summary=Summary(Operation.APPEND),
schema_id=3,
)

with pytest.raises(ValueError, match="before last updated timestamp"):
update_table_metadata(base_metadata, (AddSnapshotUpdate(snapshot=new_snapshot),))


def test_update_metadata_set_ref_snapshot(table_v2: Table) -> None:
update, _ = table_v2.transaction()._set_ref_snapshot(
snapshot_id=3051729675574597004,
Expand Down
22 changes: 21 additions & 1 deletion tests/table/test_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import io
import json
from copy import copy
from copy import copy, deepcopy
from typing import Any
from unittest.mock import MagicMock, patch
from uuid import UUID
Expand Down Expand Up @@ -415,6 +415,26 @@ def test_sort_order_unsorted() -> None:
assert len(table_metadata.sort_orders) == 0


def test_snapshot_log_entries_are_sorted_with_tolerance(example_table_metadata_v2: dict[str, Any]) -> None:
table_metadata = deepcopy(example_table_metadata_v2)
table_metadata["snapshot-log"][1]["timestamp-ms"] = table_metadata["snapshot-log"][0]["timestamp-ms"] - 60_001

with pytest.raises(ValidationError) as exc_info:
TableMetadataUtil.parse_raw(json.dumps(table_metadata))

assert "Expected sorted snapshot log entries" in str(exc_info.value)


def test_last_updated_ms_not_before_last_snapshot_log_entry(example_table_metadata_v2: dict[str, Any]) -> None:
table_metadata = deepcopy(example_table_metadata_v2)
table_metadata["last-updated-ms"] = table_metadata["snapshot-log"][-1]["timestamp-ms"] - 60_001

with pytest.raises(ValidationError) as exc_info:
TableMetadataUtil.parse_raw(json.dumps(table_metadata))

assert "before last snapshot log entry at" in str(exc_info.value)


def test_invalid_partition_spec() -> None:
table_metadata_spec_not_found = {
"format-version": 2,
Expand Down