Skip to content

Commit 576fd03

Browse files
committed
perf: Speed up purge_table by deduplicating manifest reads and parallelizing file deletion
Three changes to reduce purge_table wall time from ~7s to ~0.13s (54x) on a table with 200 snapshots: 1. Deduplicate manifests by path before iterating in delete_data_files(). The same manifest appears across many snapshots' manifest lists. For 200 snapshots this reduces 20,100 manifest opens to 200. 2. Parallelize file deletion using the existing ExecutorFactory ThreadPoolExecutor, matching the pattern already used for manifest reading in plan_files() and data file reading in to_arrow(). This aligns with the Java reference implementation (CatalogUtil.dropTableData) which also deletes files concurrently via a worker thread pool. 3. Cache Avro-to-Iceberg schema conversion and reader tree resolution. All manifests of the same type share the same Avro schema, but it was being JSON-parsed, converted, and resolved into a reader tree on every open. Uses explicit threading.Lock for thread safety across all Python implementations.
1 parent 1a54e9c commit 576fd03

File tree

2 files changed

+68
-14
lines changed

2 files changed

+68
-14
lines changed

pyiceberg/avro/file.py

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io
2323
import json
2424
import os
25+
import threading
2526
from collections.abc import Callable
2627
from dataclasses import dataclass
2728
from enum import Enum
@@ -31,6 +32,8 @@
3132
TypeVar,
3233
)
3334

35+
from cachetools import LRUCache
36+
3437
from pyiceberg.avro.codecs import AVRO_CODEC_KEY, CODEC_MAPPING_ICEBERG_TO_AVRO, KNOWN_CODECS
3538
from pyiceberg.avro.codecs.codec import Codec
3639
from pyiceberg.avro.decoder import BinaryDecoder, new_decoder
@@ -68,6 +71,48 @@
6871
_SCHEMA_KEY = "avro.schema"
6972

7073

74+
# Cache for Avro-to-Iceberg schema conversion, keyed by raw schema JSON string.
75+
# Manifests of the same type share the same Avro schema, so this avoids
76+
# redundant JSON parsing and schema conversion on every manifest open.
77+
_schema_cache: LRUCache[str, Schema] = LRUCache(maxsize=32)
78+
_schema_cache_lock = threading.Lock()
79+
80+
# Cache for resolved reader trees, keyed by object identity of (file_schema,
81+
# read_schema, read_types, read_enums). Reader objects are stateless — read()
82+
# takes a decoder and returns decoded data without mutating self, so sharing
83+
# cached readers across threads and calls is safe.
84+
_reader_cache: LRUCache[tuple[int, ...], Reader] = LRUCache(maxsize=32)
85+
_reader_cache_lock = threading.Lock()
86+
87+
88+
def _cached_avro_to_iceberg(avro_schema_string: str) -> Schema:
89+
"""Convert an Avro schema JSON string to an Iceberg Schema, with caching."""
90+
with _schema_cache_lock:
91+
if avro_schema_string in _schema_cache:
92+
return _schema_cache[avro_schema_string]
93+
schema = AvroSchemaConversion().avro_to_iceberg(json.loads(avro_schema_string))
94+
with _schema_cache_lock:
95+
_schema_cache[avro_schema_string] = schema
96+
return schema
97+
98+
99+
def _cached_resolve_reader(
100+
file_schema: Schema,
101+
read_schema: Schema,
102+
read_types: dict[int, Callable[..., StructProtocol]],
103+
read_enums: dict[int, Callable[..., Enum]],
104+
) -> Reader:
105+
"""Resolve a reader tree for the given schema pair, with caching."""
106+
key = (id(file_schema), id(read_schema), id(read_types), id(read_enums))
107+
with _reader_cache_lock:
108+
if key in _reader_cache:
109+
return _reader_cache[key]
110+
reader = resolve_reader(file_schema, read_schema, read_types, read_enums)
111+
with _reader_cache_lock:
112+
_reader_cache[key] = reader
113+
return reader
114+
115+
71116
class AvroFileHeader(Record):
72117
@property
73118
def magic(self) -> bytes:
@@ -97,9 +142,7 @@ def compression_codec(self) -> type[Codec] | None:
97142

98143
def get_schema(self) -> Schema:
99144
if _SCHEMA_KEY in self.meta:
100-
avro_schema_string = self.meta[_SCHEMA_KEY]
101-
avro_schema = json.loads(avro_schema_string)
102-
return AvroSchemaConversion().avro_to_iceberg(avro_schema)
145+
return _cached_avro_to_iceberg(self.meta[_SCHEMA_KEY])
103146
else:
104147
raise ValueError("No schema found in Avro file headers")
105148

@@ -178,7 +221,7 @@ def __enter__(self) -> AvroFile[D]:
178221
if not self.read_schema:
179222
self.read_schema = self.schema
180223

181-
self.reader = resolve_reader(self.schema, self.read_schema, self.read_types, self.read_enums)
224+
self.reader = _cached_resolve_reader(self.schema, self.read_schema, self.read_types, self.read_enums)
182225

183226
return self
184227

pyiceberg/catalog/__init__.py

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@
9090
MANIFEST_LIST = "manifest list"
9191
PREVIOUS_METADATA = "previous metadata"
9292
METADATA = "metadata"
93+
DATA_FILE = "data"
9394
URI = "uri"
9495
LOCATION = "location"
9596
EXTERNAL_TABLE = "EXTERNAL_TABLE"
@@ -284,7 +285,7 @@ def list_catalogs() -> list[str]:
284285

285286

286287
def delete_files(io: FileIO, files_to_delete: set[str], file_type: str) -> None:
287-
"""Delete files.
288+
"""Delete files in parallel.
288289
289290
Log warnings if failing to delete any file.
290291
@@ -293,32 +294,42 @@ def delete_files(io: FileIO, files_to_delete: set[str], file_type: str) -> None:
293294
files_to_delete: A set of file paths to be deleted.
294295
file_type: The type of the file.
295296
"""
296-
for file in files_to_delete:
297+
from pyiceberg.utils.concurrent import ExecutorFactory
298+
299+
def _delete_file(file: str) -> None:
297300
try:
298301
io.delete(file)
299302
except OSError:
300303
logger.warning(f"Failed to delete {file_type} file {file}", exc_info=logger.isEnabledFor(logging.DEBUG))
301304

305+
executor = ExecutorFactory.get_or_create()
306+
list(executor.map(_delete_file, files_to_delete))
307+
302308

303309
def delete_data_files(io: FileIO, manifests_to_delete: list[ManifestFile]) -> None:
304310
"""Delete data files linked to given manifests.
305311
312+
Deduplicates manifests by path before reading entries, since the same manifest
313+
appears across multiple snapshots' manifest lists. Deletes data files in parallel.
314+
306315
Log warnings if failing to delete any file.
307316
308317
Args:
309318
io: The FileIO used to delete the object.
310319
manifests_to_delete: A list of manifest contains paths of data files to be deleted.
311320
"""
312-
deleted_files: dict[str, bool] = {}
321+
unique_manifests: dict[str, ManifestFile] = {}
313322
for manifest_file in manifests_to_delete:
323+
unique_manifests.setdefault(manifest_file.manifest_path, manifest_file)
324+
325+
# Collect all unique data file paths
326+
data_file_paths: set[str] = set()
327+
for manifest_file in unique_manifests.values():
314328
for entry in manifest_file.fetch_manifest_entry(io, discard_deleted=False):
315-
path = entry.data_file.file_path
316-
if not deleted_files.get(path, False):
317-
try:
318-
io.delete(path)
319-
except OSError:
320-
logger.warning(f"Failed to delete data file {path}", exc_info=logger.isEnabledFor(logging.DEBUG))
321-
deleted_files[path] = True
329+
data_file_paths.add(entry.data_file.file_path)
330+
331+
# Delete in parallel
332+
delete_files(io, data_file_paths, DATA_FILE)
322333

323334

324335
def _import_catalog(name: str, catalog_impl: str, properties: Properties) -> Catalog | None:

0 commit comments

Comments
 (0)