Conversation
- all_files - all_data_files - all_delete_files
kevinjqliu
left a comment
There was a problem hiding this comment.
Thanks for the PR, I added a few comments
pyiceberg/table/inspect.py
Outdated
| all_manifest_files_by_snapshot: Iterator[List[ManifestFile]] = executor.map( | ||
| lambda args: args[0].manifests(self.tbl.io), [(snapshot,) for snapshot in snapshots] | ||
| ) | ||
| all_manifest_files = list( | ||
| {(manifest.manifest_path, manifest) for manifest_list in all_manifest_files_by_snapshot for manifest in manifest_list} | ||
| ) | ||
| all_files_by_manifest: Iterator[List[Dict[str, Any]]] = executor.map( | ||
| lambda args: self._files_by_manifest(*args), [(manifest, data_file_filter) for _, manifest in all_manifest_files] | ||
| ) | ||
| all_files_list = [file for files in all_files_by_manifest for file in files] | ||
| return pa.Table.from_pylist( | ||
| all_files_list, | ||
| schema=self._get_files_schema(), | ||
| ) |
There was a problem hiding this comment.
WDYT about something like this?
Also i would rename _files_by_manifest and have it return pa.Table, so we can skip the flatten and just concat the tables.
| all_manifest_files_by_snapshot: Iterator[List[ManifestFile]] = executor.map( | |
| lambda args: args[0].manifests(self.tbl.io), [(snapshot,) for snapshot in snapshots] | |
| ) | |
| all_manifest_files = list( | |
| {(manifest.manifest_path, manifest) for manifest_list in all_manifest_files_by_snapshot for manifest in manifest_list} | |
| ) | |
| all_files_by_manifest: Iterator[List[Dict[str, Any]]] = executor.map( | |
| lambda args: self._files_by_manifest(*args), [(manifest, data_file_filter) for _, manifest in all_manifest_files] | |
| ) | |
| all_files_list = [file for files in all_files_by_manifest for file in files] | |
| return pa.Table.from_pylist( | |
| all_files_list, | |
| schema=self._get_files_schema(), | |
| ) | |
| manifest_lists = executor.map( | |
| lambda snapshot: snapshot.manifests(self.tbl.io), | |
| snapshots | |
| ) | |
| unique_manifests = { | |
| (manifest.manifest_path, manifest) | |
| for manifest_list in manifest_lists | |
| for manifest in manifest_list | |
| } | |
| file_lists = executor.map( | |
| self._files_by_manifest, | |
| [(manifest, data_file_filter) for _, manifest in unique_manifests] | |
| ) | |
| all_files = [ | |
| file | |
| for file_list in file_lists | |
| for file in file_list | |
| ] | |
| return pa.Table.from_pylist( | |
| all_files, | |
| schema=self._get_files_schema() | |
| ) |
There was a problem hiding this comment.
I agree with this, the impl of the _files_by_manifest enforces uniqueness which wasn't clear
| self, manifest_list: ManifestFile, data_file_filter: Optional[Set[DataFileContent]] = None | ||
| ) -> List[Dict[str, Any]]: | ||
| files: list[dict[str, Any]] = [] | ||
| schema = self.tbl.metadata.schema() |
There was a problem hiding this comment.
when time traveling with different snapshots, we shouldnt just use the current table schema
for context #1053 (comment)
There was a problem hiding this comment.
@kevinjqliu updated code as per comments.
|
@soumya-ghosh Gentle ping, would you be interested in contributing this? Would be great to get this in 🚀 |
pyiceberg/table/inspect.py
Outdated
| return pa.Table.from_pylist( | ||
| files, | ||
| schema=files_schema, | ||
| [], | ||
| schema=self._get_files_schema(), | ||
| ) |
There was a problem hiding this comment.
Nice one, this can be further simplified to:
return self._get_files_schema().empty_table()Less is more :)
pyiceberg/table/inspect.py
Outdated
| def _files(self, snapshot_id: Optional[int] = None, data_file_filter: Optional[Set[DataFileContent]] = None) -> "pa.Table": | ||
| import pyarrow as pa | ||
|
|
||
| files_table: list[pa.Table] = [] |
There was a problem hiding this comment.
nit: we can move this one down, we don't need to create the error when we return on line 642
|
@soumya-ghosh I see that you incorporated the feedback by @kevinjqliu directly, instead of accepting the suggestion. That also works, thanks for working on this. I think we're pretty close 👍 |
|
Yes @Fokko, there is an open discussion that was happening in #1053 (comment). I will raise another PR for docs about the inspect operations. |
| return self._all_files({DataFileContent.DATA}) | ||
|
|
||
| def all_delete_files(self) -> "pa.Table": | ||
| return self._all_files({DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES}) |
There was a problem hiding this comment.
We have a Spark table to test this:
iceberg-python/dev/provision.py
Lines 121 to 138 in 05f07ee
There was a problem hiding this comment.
Okay, will check this. If this requires changes, it will also need changes in files and delete_files table.
There was a problem hiding this comment.
@Fokko Added an integration test for table with format version 3, used Spark to write through pyiceberg to V3 table were failing.
Note that, the outputs of files metadata (and all other related tables) do not completely match with Spark counterparts due to additional columns in like first_row_id, referenced_data_file, content_offset, content_size_in_bytes. This needs to added first in DataFile class then propagated as required. Should be addressed in different issue, will it part of V3 tracking issue?
Fokko
left a comment
There was a problem hiding this comment.
Left one minor comment for partition, apart from that, this looks great to me. Thanks @soumya-ghosh for working on this 🙌
| "content": data_file.content, | ||
| "file_path": data_file.file_path, | ||
| "file_format": data_file.file_format, | ||
| "spec_id": data_file.spec_id, |
There was a problem hiding this comment.
In Spark we also have the partition column, I think it would be good to add that one here as well:
iceberg-python/pyiceberg/table/inspect.py
Lines 124 to 125 in 9fff025
There was a problem hiding this comment.
@Fokko Added partition column in files metadata table schema and added a test for the same
There was a problem hiding this comment.
order of spec_id and partition column fixed.
| return self._all_files({DataFileContent.DATA}) | ||
|
|
||
| def all_delete_files(self) -> "pa.Table": | ||
| return self._all_files({DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES}) |
Fokko
left a comment
There was a problem hiding this comment.
One minor remark, apart from that it looks good.
Pinging @geruh @kevinjqliu to see if they have any further comments
| "content": data_file.content, | ||
| "file_path": data_file.file_path, | ||
| "file_format": data_file.file_format, | ||
| "spec_id": data_file.spec_id, |
|
Let's merge this to unblock #1958. Thanks @soumya-ghosh for working on this, and thanks @kevinjqliu and @geruh for the reviews 🙌 |
Implements below metadata table from - apache#1053 - `all_files` - `all_data_files` - `all_delete_files` Refactored code for files metadata for better reusability


Implements below metadata table from - #1053
all_filesall_data_filesall_delete_filesRefactored code for files metadata for better reusability