diff --git a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py index cb689baeaf1e..c02c5f62a8cb 100644 --- a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py +++ b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py @@ -17,6 +17,7 @@ ################################################################################ import logging import os +import re import subprocess import uuid from datetime import datetime, timezone @@ -40,11 +41,15 @@ from pypaimon.write.blob_format_writer import BlobFormatWriter +def _pyarrow_lt_7(): + return parse(pyarrow.__version__) < parse("7.0.0") + + class PyArrowFileIO(FileIO): def __init__(self, path: str, catalog_options: Options): self.properties = catalog_options self.logger = logging.getLogger(__name__) - self._pyarrow_gte_7 = parse(pyarrow.__version__) >= parse("7.0.0") + self._pyarrow_gte_7 = not _pyarrow_lt_7() self._pyarrow_gte_8 = parse(pyarrow.__version__) >= parse("8.0.0") scheme, netloc, _ = self.parse_location(path) self.uri_reader_factory = UriReaderFactory(catalog_options) @@ -194,10 +199,21 @@ def new_output_stream(self, path: str): return self.filesystem.open_output_stream(path_str) + def _get_file_info(self, path_str: str): + try: + file_infos = self.filesystem.get_file_info([path_str]) + return file_infos[0] + except OSError as e: + # this is for compatible with pyarrow < 7 + msg = str(e).lower() + if ("does not exist" in msg or "not exist" in msg or "nosuchkey" in msg + or re.search(r'\b133\b', msg) or "notfound" in msg): + return pafs.FileInfo(path_str, pafs.FileType.NotFound) + raise + def get_file_status(self, path: str): path_str = self.to_filesystem_path(path) - file_infos = self.filesystem.get_file_info([path_str]) - file_info = file_infos[0] + file_info = self._get_file_info(path_str) if file_info.type == pafs.FileType.NotFound: raise FileNotFoundError(f"File {path} (resolved as {path_str}) does not exist") @@ -215,12 +231,11 @@ def list_directories(self, path: str): def exists(self, path: str) -> bool: path_str = self.to_filesystem_path(path) - file_info = self.filesystem.get_file_info([path_str])[0] - return file_info.type != pafs.FileType.NotFound + return self._get_file_info(path_str).type != pafs.FileType.NotFound def delete(self, path: str, recursive: bool = False) -> bool: path_str = self.to_filesystem_path(path) - file_info = self.filesystem.get_file_info([path_str])[0] + file_info = self._get_file_info(path_str) if file_info.type == pafs.FileType.NotFound: return False @@ -242,8 +257,11 @@ def delete(self, path: str, recursive: bool = False) -> bool: def mkdirs(self, path: str) -> bool: path_str = self.to_filesystem_path(path) - file_info = self.filesystem.get_file_info([path_str])[0] + file_info = self._get_file_info(path_str) + if file_info.type == pafs.FileType.NotFound: + self.filesystem.create_dir(path_str, recursive=True) + return True if file_info.type == pafs.FileType.Directory: return True elif file_info.type == pafs.FileType.File: @@ -264,7 +282,7 @@ def rename(self, src: str, dst: str) -> bool: if hasattr(self.filesystem, 'rename'): return self.filesystem.rename(src_str, dst_str) - dst_file_info = self.filesystem.get_file_info([dst_str])[0] + dst_file_info = self._get_file_info(dst_str) if dst_file_info.type != pafs.FileType.NotFound: if dst_file_info.type == pafs.FileType.File: return False @@ -272,7 +290,7 @@ def rename(self, src: str, dst: str) -> bool: # dst=dst/srcFileName src_name = Path(src_str).name dst_str = str(Path(dst_str) / src_name) - final_dst_info = self.filesystem.get_file_info([dst_str])[0] + final_dst_info = self._get_file_info(dst_str) if final_dst_info.type != pafs.FileType.NotFound: return False @@ -310,7 +328,7 @@ def delete_directory_quietly(self, directory: str): def try_to_write_atomic(self, path: str, content: str) -> bool: if self.exists(path): path_str = self.to_filesystem_path(path) - file_info = self.filesystem.get_file_info([path_str])[0] + file_info = self._get_file_info(path_str) if file_info.type == pafs.FileType.Directory: return False @@ -508,13 +526,11 @@ def to_filesystem_path(self, path: str) -> str: if parsed.scheme: if parsed.netloc: path_part = normalized_path.lstrip('/') + # OSS+PyArrow<7: endpoint_override has bucket, pass key only. if self._is_oss and not self._pyarrow_gte_7: - # For PyArrow 6.x + OSS, endpoint_override already contains bucket, - result = path_part if path_part else '.' - return result - else: - result = f"{parsed.netloc}/{path_part}" if path_part else parsed.netloc - return result + return path_part if path_part else '.' + result = f"{parsed.netloc}/{path_part}" if path_part else parsed.netloc + return result else: result = normalized_path.lstrip('/') return result if result else '.' diff --git a/paimon-python/pypaimon/tests/file_io_test.py b/paimon-python/pypaimon/tests/file_io_test.py index ae9abeff238e..ba91f94a18e6 100644 --- a/paimon-python/pypaimon/tests/file_io_test.py +++ b/paimon-python/pypaimon/tests/file_io_test.py @@ -22,19 +22,18 @@ from pathlib import Path from unittest.mock import MagicMock, patch -import pyarrow import pyarrow.fs as pafs from pypaimon.common.options import Options from pypaimon.common.options.config import OssOptions from pypaimon.filesystem.local_file_io import LocalFileIO -from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO +from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO, _pyarrow_lt_7 class FileIOTest(unittest.TestCase): """Test cases for FileIO.to_filesystem_path method.""" - def test_s3_filesystem_path_conversion(self): + def test_filesystem_path_conversion(self): """Test S3FileSystem path conversion with various formats.""" file_io = PyArrowFileIO("s3://bucket/warehouse", Options({})) self.assertIsInstance(file_io.filesystem, pafs.S3FileSystem) @@ -66,18 +65,31 @@ def test_s3_filesystem_path_conversion(self): parent_str = str(Path(converted_path).parent) self.assertEqual(file_io.to_filesystem_path(parent_str), parent_str) - from packaging.version import parse as parse_version + lt7 = _pyarrow_lt_7() oss_io = PyArrowFileIO("oss://test-bucket/warehouse", Options({ OssOptions.OSS_ENDPOINT.key(): 'oss-cn-hangzhou.aliyuncs.com' })) - lt7 = parse_version(pyarrow.__version__) < parse_version("7.0.0") got = oss_io.to_filesystem_path("oss://test-bucket/path/to/file.txt") - expected_path = ( - "path/to/file.txt" if lt7 else "test-bucket/path/to/file.txt") - self.assertEqual(got, expected_path) + self.assertEqual(got, "path/to/file.txt" if lt7 else "test-bucket/path/to/file.txt") + if lt7: + self.assertEqual(oss_io.to_filesystem_path("db-xxx.db/tbl-xxx/data.parquet"), + "db-xxx.db/tbl-xxx/data.parquet") + self.assertEqual(oss_io.to_filesystem_path("db-xxx.db/tbl-xxx"), "db-xxx.db/tbl-xxx") + manifest_uri = "oss://test-bucket/warehouse/db.db/table/manifest/manifest-list-abc-0" + manifest_key = oss_io.to_filesystem_path(manifest_uri) + self.assertEqual(manifest_key, "warehouse/db.db/table/manifest/manifest-list-abc-0", + "OSS+PyArrow6 must pass key only to PyArrow so manifest is written to correct bucket") + self.assertFalse(manifest_key.startswith("test-bucket/"), + "path must not start with bucket name or PyArrow 6 writes to wrong bucket") nf = MagicMock(type=pafs.FileType.NotFound) + get_file_info_calls = [] + + def record_get_file_info(paths): + get_file_info_calls.append(list(paths)) + return [MagicMock(type=pafs.FileType.NotFound) for _ in paths] + mock_fs = MagicMock() - mock_fs.get_file_info.side_effect = [[nf], [nf]] + mock_fs.get_file_info.side_effect = record_get_file_info if lt7 else [[nf], [nf]] mock_fs.create_dir = MagicMock() mock_fs.open_output_stream.return_value = MagicMock() oss_io.filesystem = mock_fs @@ -87,8 +99,42 @@ def test_s3_filesystem_path_conversion(self): if lt7: expected_parent = '/'.join(path_str.split('/')[:-1]) if '/' in path_str else '' else: - expected_parent = str(Path(path_str).parent) + expected_parent = "/".join(path_str.split("/")[:-1]) if "/" in path_str else str(Path(path_str).parent) self.assertEqual(mock_fs.create_dir.call_args[0][0], expected_parent) + if lt7: + for call_paths in get_file_info_calls: + for p in call_paths: + self.assertFalse( + p.startswith("test-bucket/"), + "OSS+PyArrow<7 must pass key only to get_file_info, not bucket/key. Got: %r" % (p,) + ) + + def test_exists(self): + lt7 = _pyarrow_lt_7() + with tempfile.TemporaryDirectory(prefix="file_io_nonexistent_") as tmpdir: + file_io = LocalFileIO("file://" + tmpdir, Options({})) + missing_uri = "file://" + os.path.join(tmpdir, "nonexistent_xyz") + path_str = file_io.to_filesystem_path(missing_uri) + raised = None + infos = None + try: + infos = file_io.filesystem.get_file_info([path_str]) + except OSError as e: + raised = e + if lt7: + if raised is not None: + err = str(raised).lower() + self.assertTrue("133" in err or "does not exist" in err or "not exist" in err, str(raised)) + else: + self.assertEqual(len(infos), 1) + self.assertEqual(infos[0].type, pafs.FileType.NotFound) + else: + self.assertIsNone(raised) + self.assertEqual(len(infos), 1) + self.assertEqual(infos[0].type, pafs.FileType.NotFound) + self.assertFalse(file_io.exists(missing_uri)) + with self.assertRaises(FileNotFoundError): + file_io.get_file_status(missing_uri) def test_local_filesystem_path_conversion(self): file_io = LocalFileIO("file:///tmp/warehouse", Options({}))