|
17 | 17 | from __future__ import annotations |
18 | 18 |
|
19 | 19 | import itertools |
| 20 | +import os |
20 | 21 | import uuid |
21 | 22 | from abc import abstractmethod |
22 | 23 | from collections import defaultdict |
@@ -591,88 +592,77 @@ class _OverwriteFiles(_MergingSnapshotProducer["_OverwriteFiles"]): |
591 | 592 | Data and delete files were added and removed in a logical overwrite operation. |
592 | 593 | """ |
593 | 594 |
|
594 | | - def _existing_manifests(self) -> list[ManifestFile]: |
595 | | - """Determine if there are any existing manifest files.""" |
596 | | - existing_files = [] |
| 595 | + @cached_property |
| 596 | + def _cache_manifests(self) -> tuple[list[ManifestFile], list[ManifestEntry]]: |
| 597 | + """Iterate over manifests once, returning (existing_manifests, deleted_entries). |
597 | 598 |
|
598 | | - manifest_evaluators: dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator) |
599 | | - if snapshot := self._transaction.table_metadata.snapshot_by_name(name=self._target_branch): |
600 | | - for manifest_file in snapshot.manifests(io=self._io): |
601 | | - # Manifest does not contain rows that match the files to delete partitions |
602 | | - if not manifest_evaluators[manifest_file.partition_spec_id](manifest_file): |
603 | | - existing_files.append(manifest_file) |
604 | | - continue |
605 | | - |
606 | | - entries_to_write: set[ManifestEntry] = set() |
607 | | - found_deleted_entries: set[ManifestEntry] = set() |
608 | | - |
609 | | - for entry in manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True): |
610 | | - if entry.data_file in self._deleted_data_files: |
611 | | - found_deleted_entries.add(entry) |
612 | | - else: |
613 | | - entries_to_write.add(entry) |
| 599 | + Avoids the double manifest read that occurs when _existing_manifests() and |
| 600 | + _deleted_entries() are called independently from separate futures. |
| 601 | + """ |
| 602 | + print(f"CACHE MISS: executing _cache_manifests PID:{os.getpid()}") |
614 | 603 |
|
615 | | - # Is the intercept the empty set? |
616 | | - if len(found_deleted_entries) == 0: |
617 | | - existing_files.append(manifest_file) |
618 | | - continue |
| 604 | + if not (snapshot := self._transaction.table_metadata.snapshot_by_name(name=self._target_branch)): |
| 605 | + return [], [] |
619 | 606 |
|
620 | | - # Delete all files from manifest |
621 | | - if len(entries_to_write) == 0: |
622 | | - continue |
| 607 | + manifest_evaluators: dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator) |
623 | 608 |
|
624 | | - # We have to rewrite the manifest file without the deleted data files |
625 | | - with self.new_manifest_writer(self.spec(manifest_file.partition_spec_id)) as writer: |
626 | | - for entry in entries_to_write: |
627 | | - writer.add_entry( |
628 | | - ManifestEntry.from_args( |
629 | | - status=ManifestEntryStatus.EXISTING, |
630 | | - snapshot_id=entry.snapshot_id, |
631 | | - sequence_number=entry.sequence_number, |
632 | | - file_sequence_number=entry.file_sequence_number, |
633 | | - data_file=entry.data_file, |
634 | | - ) |
| 609 | + def _process_manifest(manifest: ManifestFile) -> tuple[ManifestFile | None, list[ManifestEntry]]: |
| 610 | + if not manifest_evaluators[manifest.partition_spec_id](manifest): |
| 611 | + return manifest, [] # unaffected partition — keep as-is, no deleted entries |
| 612 | + |
| 613 | + surviving: list[ManifestEntry] = [] |
| 614 | + deleted: list[ManifestEntry] = [] |
| 615 | + |
| 616 | + for entry in manifest.fetch_manifest_entry(io=self._io, discard_deleted=True): |
| 617 | + if entry.data_file.content == DataFileContent.DATA and entry.data_file in self._deleted_data_files: |
| 618 | + deleted.append( |
| 619 | + ManifestEntry.from_args( |
| 620 | + status=ManifestEntryStatus.DELETED, |
| 621 | + snapshot_id=entry.snapshot_id, |
| 622 | + sequence_number=entry.sequence_number, |
| 623 | + file_sequence_number=entry.file_sequence_number, |
| 624 | + data_file=entry.data_file, |
635 | 625 | ) |
636 | | - existing_files.append(writer.to_manifest_file()) |
| 626 | + ) |
| 627 | + else: |
| 628 | + surviving.append(entry) |
| 629 | + |
| 630 | + if not deleted: |
| 631 | + return manifest, [] # nothing deleted in this manifest |
| 632 | + |
| 633 | + if not surviving: |
| 634 | + return None, deleted # all entries deleted — drop manifest |
| 635 | + |
| 636 | + # Partial deletion — rewrite manifest with surviving entries only |
| 637 | + with self.new_manifest_writer(self.spec(manifest.partition_spec_id)) as writer: |
| 638 | + for entry in surviving: |
| 639 | + writer.add_entry( |
| 640 | + ManifestEntry.from_args( |
| 641 | + status=ManifestEntryStatus.EXISTING, |
| 642 | + snapshot_id=entry.snapshot_id, |
| 643 | + sequence_number=entry.sequence_number, |
| 644 | + file_sequence_number=entry.file_sequence_number, |
| 645 | + data_file=entry.data_file, |
| 646 | + ) |
| 647 | + ) |
| 648 | + return writer.to_manifest_file(), deleted |
637 | 649 |
|
638 | | - return existing_files |
| 650 | + executor = ExecutorFactory.get_or_create() |
| 651 | + results = list(executor.map(_process_manifest, snapshot.manifests(io=self._io))) |
639 | 652 |
|
640 | | - def _deleted_entries(self) -> list[ManifestEntry]: |
641 | | - """To determine if we need to record any deleted entries. |
| 653 | + existing = [m for m, _ in results if m is not None] |
| 654 | + deleted_entries = [e for _, entries in results for e in entries] |
| 655 | + return existing, deleted_entries |
642 | 656 |
|
643 | | - With a full overwrite all the entries are considered deleted. |
644 | | - With partial overwrites we have to use the predicate to evaluate |
645 | | - which entries are affected. |
646 | | - """ |
647 | | - if self._parent_snapshot_id is not None: |
648 | | - previous_snapshot = self._transaction.table_metadata.snapshot_by_id(self._parent_snapshot_id) |
649 | | - if previous_snapshot is None: |
650 | | - # This should never happen since you cannot overwrite an empty table |
651 | | - raise ValueError(f"Could not find the previous snapshot: {self._parent_snapshot_id}") |
652 | | - |
653 | | - executor = ExecutorFactory.get_or_create() |
654 | | - manifest_evaluators: dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator) |
655 | | - |
656 | | - def _get_entries(manifest: ManifestFile) -> list[ManifestEntry]: |
657 | | - if not manifest_evaluators[manifest.partition_spec_id](manifest): |
658 | | - return [] |
659 | | - |
660 | | - return [ |
661 | | - ManifestEntry.from_args( |
662 | | - status=ManifestEntryStatus.DELETED, |
663 | | - snapshot_id=entry.snapshot_id, |
664 | | - sequence_number=entry.sequence_number, |
665 | | - file_sequence_number=entry.file_sequence_number, |
666 | | - data_file=entry.data_file, |
667 | | - ) |
668 | | - for entry in manifest.fetch_manifest_entry(self._io, discard_deleted=True) |
669 | | - if entry.data_file.content == DataFileContent.DATA and entry.data_file in self._deleted_data_files |
670 | | - ] |
| 657 | + def _existing_manifests(self) -> list[ManifestFile]: |
| 658 | + hit = "_cache_manifests" in self.__dict__ |
| 659 | + print(f"[_existing_manifests] cache {'HIT' if hit else 'MISS'}") |
| 660 | + return self._cache_manifests[0] |
671 | 661 |
|
672 | | - list_of_entries = executor.map(_get_entries, previous_snapshot.manifests(self._io)) |
673 | | - return list(itertools.chain(*list_of_entries)) |
674 | | - else: |
675 | | - return [] |
| 662 | + def _deleted_entries(self) -> list[ManifestEntry]: |
| 663 | + hit = "_cache_manifests" in self.__dict__ |
| 664 | + print(f"[_deleted_entries] cache {'HIT' if hit else 'MISS'}") |
| 665 | + return self._cache_manifests[1] |
676 | 666 |
|
677 | 667 |
|
678 | 668 | class UpdateSnapshot: |
|
0 commit comments