From b79a08cc7a3fd2bcc37249f31fbf02a7dff72d71 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Fri, 13 Feb 2026 21:58:13 +0800 Subject: [PATCH 01/12] fix py --- .../pypaimon/filesystem/pyarrow_file_io.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py index cb689baeaf1e..6e80abf0729b 100644 --- a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py +++ b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py @@ -215,8 +215,14 @@ def list_directories(self, path: str): def exists(self, path: str) -> bool: path_str = self.to_filesystem_path(path) - file_info = self.filesystem.get_file_info([path_str])[0] - return file_info.type != pafs.FileType.NotFound + try: + file_info = self.filesystem.get_file_info([path_str])[0] + return file_info.type != pafs.FileType.NotFound + except OSError: + # OSS + PyArrow 6: get_file_info on a key prefix (no object) raises OSError. + if self._is_oss and not self._pyarrow_gte_7: + return False + raise def delete(self, path: str, recursive: bool = False) -> bool: path_str = self.to_filesystem_path(path) @@ -241,6 +247,9 @@ def delete(self, path: str, recursive: bool = False) -> bool: return True def mkdirs(self, path: str) -> bool: + if self._is_oss and not self._pyarrow_gte_7: + # OSS has no real directories; writing the object key is enough. + return True path_str = self.to_filesystem_path(path) file_info = self.filesystem.get_file_info([path_str])[0] @@ -509,9 +518,8 @@ def to_filesystem_path(self, path: str) -> str: if parsed.netloc: path_part = normalized_path.lstrip('/') if self._is_oss and not self._pyarrow_gte_7: - # For PyArrow 6.x + OSS, endpoint_override already contains bucket, - result = path_part if path_part else '.' - return result + # PyArrow 6 splits on first '/'; return netloc/path_part so bucket=OSS bucket, key=path_part. + return f"{parsed.netloc}/{path_part}" if path_part else parsed.netloc else: result = f"{parsed.netloc}/{path_part}" if path_part else parsed.netloc return result From 2979bae2a49f664df05639943250f3b5e21f3828 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sat, 14 Feb 2026 20:11:26 +0800 Subject: [PATCH 02/12] Revert "fix py" This reverts commit b79a08cc7a3fd2bcc37249f31fbf02a7dff72d71. --- .../pypaimon/filesystem/pyarrow_file_io.py | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py index 6e80abf0729b..cb689baeaf1e 100644 --- a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py +++ b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py @@ -215,14 +215,8 @@ def list_directories(self, path: str): def exists(self, path: str) -> bool: path_str = self.to_filesystem_path(path) - try: - file_info = self.filesystem.get_file_info([path_str])[0] - return file_info.type != pafs.FileType.NotFound - except OSError: - # OSS + PyArrow 6: get_file_info on a key prefix (no object) raises OSError. - if self._is_oss and not self._pyarrow_gte_7: - return False - raise + file_info = self.filesystem.get_file_info([path_str])[0] + return file_info.type != pafs.FileType.NotFound def delete(self, path: str, recursive: bool = False) -> bool: path_str = self.to_filesystem_path(path) @@ -247,9 +241,6 @@ def delete(self, path: str, recursive: bool = False) -> bool: return True def mkdirs(self, path: str) -> bool: - if self._is_oss and not self._pyarrow_gte_7: - # OSS has no real directories; writing the object key is enough. - return True path_str = self.to_filesystem_path(path) file_info = self.filesystem.get_file_info([path_str])[0] @@ -518,8 +509,9 @@ def to_filesystem_path(self, path: str) -> str: if parsed.netloc: path_part = normalized_path.lstrip('/') if self._is_oss and not self._pyarrow_gte_7: - # PyArrow 6 splits on first '/'; return netloc/path_part so bucket=OSS bucket, key=path_part. - return f"{parsed.netloc}/{path_part}" if path_part else parsed.netloc + # For PyArrow 6.x + OSS, endpoint_override already contains bucket, + result = path_part if path_part else '.' + return result else: result = f"{parsed.netloc}/{path_part}" if path_part else parsed.netloc return result From 2f06edc80f2ef27d2f64b129f4f4a05d7d149692 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sat, 14 Feb 2026 20:08:33 +0800 Subject: [PATCH 03/12] [python] Fix pyArrow 6 compatibility for S3/OSS FileIO (cherry picked from commit 159d1aa2bd440b0786f93bd6c6289680797348ab) --- .../pypaimon/filesystem/pyarrow_file_io.py | 93 +++++++++++-------- paimon-python/pypaimon/tests/file_io_test.py | 74 ++++++++++++--- 2 files changed, 113 insertions(+), 54 deletions(-) diff --git a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py index cb689baeaf1e..fcad08786bc3 100644 --- a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py +++ b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py @@ -40,18 +40,26 @@ from pypaimon.write.blob_format_writer import BlobFormatWriter +def _pyarrow_lt_7(): + return parse(pyarrow.__version__) < parse("7.0.0") + + class PyArrowFileIO(FileIO): def __init__(self, path: str, catalog_options: Options): self.properties = catalog_options self.logger = logging.getLogger(__name__) - self._pyarrow_gte_7 = parse(pyarrow.__version__) >= parse("7.0.0") + self._pyarrow_gte_7 = not _pyarrow_lt_7() self._pyarrow_gte_8 = parse(pyarrow.__version__) >= parse("8.0.0") scheme, netloc, _ = self.parse_location(path) self.uri_reader_factory = UriReaderFactory(catalog_options) self._is_oss = scheme in {"oss"} self._oss_bucket = None + self._oss_base_key = "" if self._is_oss: self._oss_bucket = self._extract_oss_bucket(path) + uri = urlparse(path) + if uri.scheme and uri.netloc and uri.path: + self._oss_base_key = uri.path.strip("/") self.filesystem = self._initialize_oss_fs(path) elif scheme in {"s3", "s3a", "s3n"}: self.filesystem = self._initialize_s3_fs() @@ -177,31 +185,33 @@ def new_input_stream(self, path: str): def new_output_stream(self, path: str): path_str = self.to_filesystem_path(path) - - if self._is_oss and not self._pyarrow_gte_7: - # For PyArrow 6.x + OSS, path_str is already just the key part - if '/' in path_str: - parent_dir = '/'.join(path_str.split('/')[:-1]) - else: - parent_dir = '' - - if parent_dir and not self.exists(parent_dir): - self.mkdirs(parent_dir) - else: + if not (self._is_oss and not self._pyarrow_gte_7): parent_dir = Path(path_str).parent if str(parent_dir) and not self.exists(str(parent_dir)): self.mkdirs(str(parent_dir)) return self.filesystem.open_output_stream(path_str) + @staticmethod + def _is_key_not_found_error(e: OSError) -> bool: + msg = str(e).lower() + return ("does not exist" in msg or "not exist" in msg or "nosuchkey" in msg or "133" in msg) + + def _get_file_info(self, path_str: str): + try: + file_infos = self.filesystem.get_file_info([path_str]) + file_info = file_infos[0] + return file_info if file_info.type != pafs.FileType.NotFound else None + except OSError as e: + if self._is_key_not_found_error(e): + return None + raise + def get_file_status(self, path: str): path_str = self.to_filesystem_path(path) - file_infos = self.filesystem.get_file_info([path_str]) - file_info = file_infos[0] - - if file_info.type == pafs.FileType.NotFound: + file_info = self._get_file_info(path_str) + if file_info is None: raise FileNotFoundError(f"File {path} (resolved as {path_str}) does not exist") - return file_info def list_status(self, path: str): @@ -215,14 +225,12 @@ def list_directories(self, path: str): def exists(self, path: str) -> bool: path_str = self.to_filesystem_path(path) - file_info = self.filesystem.get_file_info([path_str])[0] - return file_info.type != pafs.FileType.NotFound + return self._get_file_info(path_str) is not None def delete(self, path: str, recursive: bool = False) -> bool: path_str = self.to_filesystem_path(path) - file_info = self.filesystem.get_file_info([path_str])[0] - - if file_info.type == pafs.FileType.NotFound: + file_info = self._get_file_info(path_str) + if file_info is None: return False if file_info.type == pafs.FileType.Directory: @@ -242,13 +250,14 @@ def delete(self, path: str, recursive: bool = False) -> bool: def mkdirs(self, path: str) -> bool: path_str = self.to_filesystem_path(path) - file_info = self.filesystem.get_file_info([path_str])[0] - + file_info = self._get_file_info(path_str) + if file_info is None: + self.filesystem.create_dir(path_str, recursive=True) + return True if file_info.type == pafs.FileType.Directory: return True elif file_info.type == pafs.FileType.File: raise FileExistsError(f"Path exists but is not a directory: {path}") - self.filesystem.create_dir(path_str, recursive=True) return True @@ -263,19 +272,19 @@ def rename(self, src: str, dst: str) -> bool: try: if hasattr(self.filesystem, 'rename'): return self.filesystem.rename(src_str, dst_str) - - dst_file_info = self.filesystem.get_file_info([dst_str])[0] - if dst_file_info.type != pafs.FileType.NotFound: + + dst_file_info = self._get_file_info(dst_str) + if dst_file_info is not None: if dst_file_info.type == pafs.FileType.File: return False # Make it compatible with HadoopFileIO: if dst is an existing directory, # dst=dst/srcFileName src_name = Path(src_str).name dst_str = str(Path(dst_str) / src_name) - final_dst_info = self.filesystem.get_file_info([dst_str])[0] - if final_dst_info.type != pafs.FileType.NotFound: + final_dst_info = self._get_file_info(dst_str) + if final_dst_info is not None: return False - + self.filesystem.move(src_str, dst_str) return True except FileNotFoundError: @@ -310,8 +319,8 @@ def delete_directory_quietly(self, directory: str): def try_to_write_atomic(self, path: str, content: str) -> bool: if self.exists(path): path_str = self.to_filesystem_path(path) - file_info = self.filesystem.get_file_info([path_str])[0] - if file_info.type == pafs.FileType.Directory: + file_info = self._get_file_info(path_str) + if file_info is None or file_info.type == pafs.FileType.Directory: return False temp_path = path + str(uuid.uuid4()) + ".tmp" @@ -504,17 +513,21 @@ def to_filesystem_path(self, path: str) -> str: path_part = normalized_path.lstrip('/') return f"{drive_letter}:/{path_part}" if path_part else f"{drive_letter}:" + # OSS+PyArrow<7: endpoint_override already contains bucket (bucket.endpoint), so pass key only. + if self._is_oss and not self._pyarrow_gte_7: + if parsed.scheme and parsed.netloc: + path_part = normalized_path.lstrip('/') + return path_part if path_part else '.' + else: + # No scheme: path is already the key part (e.g. from Path.parent). Use as-is. + return str(path) + if isinstance(self.filesystem, S3FileSystem): if parsed.scheme: if parsed.netloc: path_part = normalized_path.lstrip('/') - if self._is_oss and not self._pyarrow_gte_7: - # For PyArrow 6.x + OSS, endpoint_override already contains bucket, - result = path_part if path_part else '.' - return result - else: - result = f"{parsed.netloc}/{path_part}" if path_part else parsed.netloc - return result + result = f"{parsed.netloc}/{path_part}" if path_part else parsed.netloc + return result else: result = normalized_path.lstrip('/') return result if result else '.' diff --git a/paimon-python/pypaimon/tests/file_io_test.py b/paimon-python/pypaimon/tests/file_io_test.py index ae9abeff238e..4344521b6d2d 100644 --- a/paimon-python/pypaimon/tests/file_io_test.py +++ b/paimon-python/pypaimon/tests/file_io_test.py @@ -22,19 +22,18 @@ from pathlib import Path from unittest.mock import MagicMock, patch -import pyarrow import pyarrow.fs as pafs from pypaimon.common.options import Options from pypaimon.common.options.config import OssOptions from pypaimon.filesystem.local_file_io import LocalFileIO -from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO +from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO, _pyarrow_lt_7 class FileIOTest(unittest.TestCase): """Test cases for FileIO.to_filesystem_path method.""" - def test_s3_filesystem_path_conversion(self): + def test_filesystem_path_conversion(self): """Test S3FileSystem path conversion with various formats.""" file_io = PyArrowFileIO("s3://bucket/warehouse", Options({})) self.assertIsInstance(file_io.filesystem, pafs.S3FileSystem) @@ -66,29 +65,76 @@ def test_s3_filesystem_path_conversion(self): parent_str = str(Path(converted_path).parent) self.assertEqual(file_io.to_filesystem_path(parent_str), parent_str) - from packaging.version import parse as parse_version + lt7 = _pyarrow_lt_7() oss_io = PyArrowFileIO("oss://test-bucket/warehouse", Options({ OssOptions.OSS_ENDPOINT.key(): 'oss-cn-hangzhou.aliyuncs.com' })) - lt7 = parse_version(pyarrow.__version__) < parse_version("7.0.0") got = oss_io.to_filesystem_path("oss://test-bucket/path/to/file.txt") - expected_path = ( - "path/to/file.txt" if lt7 else "test-bucket/path/to/file.txt") - self.assertEqual(got, expected_path) + self.assertEqual(got, "path/to/file.txt" if lt7 else "test-bucket/path/to/file.txt") + if lt7: + self.assertEqual(oss_io.to_filesystem_path("db-xxx.db/tbl-xxx/data.parquet"), + "db-xxx.db/tbl-xxx/data.parquet") + self.assertEqual(oss_io.to_filesystem_path("db-xxx.db/tbl-xxx"), "db-xxx.db/tbl-xxx") + manifest_uri = "oss://test-bucket/warehouse/db.db/table/manifest/manifest-list-abc-0" + manifest_key = oss_io.to_filesystem_path(manifest_uri) + self.assertEqual(manifest_key, "warehouse/db.db/table/manifest/manifest-list-abc-0", + "OSS+PyArrow6 must pass key only to PyArrow so manifest is written to correct bucket") + self.assertFalse(manifest_key.startswith("test-bucket/"), + "path must not start with bucket name or PyArrow 6 writes to wrong bucket") nf = MagicMock(type=pafs.FileType.NotFound) + get_file_info_calls = [] + + def record_get_file_info(paths): + get_file_info_calls.append(list(paths)) + return [MagicMock(type=pafs.FileType.NotFound) for _ in paths] + mock_fs = MagicMock() - mock_fs.get_file_info.side_effect = [[nf], [nf]] + mock_fs.get_file_info.side_effect = record_get_file_info if lt7 else [[nf], [nf]] mock_fs.create_dir = MagicMock() mock_fs.open_output_stream.return_value = MagicMock() oss_io.filesystem = mock_fs oss_io.new_output_stream("oss://test-bucket/path/to/file.txt") - mock_fs.create_dir.assert_called_once() - path_str = oss_io.to_filesystem_path("oss://test-bucket/path/to/file.txt") if lt7: - expected_parent = '/'.join(path_str.split('/')[:-1]) if '/' in path_str else '' + mock_fs.create_dir.assert_not_called() else: - expected_parent = str(Path(path_str).parent) - self.assertEqual(mock_fs.create_dir.call_args[0][0], expected_parent) + mock_fs.create_dir.assert_called_once() + path_str = oss_io.to_filesystem_path("oss://test-bucket/path/to/file.txt") + expected_parent = "/".join(path_str.split("/")[:-1]) if "/" in path_str else str(Path(path_str).parent) + self.assertEqual(mock_fs.create_dir.call_args[0][0], expected_parent) + if lt7: + for call_paths in get_file_info_calls: + for p in call_paths: + self.assertFalse( + p.startswith("test-bucket/"), + "OSS+PyArrow<7 must pass key only to get_file_info, not bucket/key. Got: %r" % (p,) + ) + + def test_exists(self): + lt7 = _pyarrow_lt_7() + with tempfile.TemporaryDirectory(prefix="file_io_nonexistent_") as tmpdir: + file_io = LocalFileIO("file://" + tmpdir, Options({})) + missing_uri = "file://" + os.path.join(tmpdir, "nonexistent_xyz") + path_str = file_io.to_filesystem_path(missing_uri) + raised = None + infos = None + try: + infos = file_io.filesystem.get_file_info([path_str]) + except OSError as e: + raised = e + if lt7: + if raised is not None: + err = str(raised).lower() + self.assertTrue("133" in err or "does not exist" in err or "not exist" in err, str(raised)) + else: + self.assertEqual(len(infos), 1) + self.assertEqual(infos[0].type, pafs.FileType.NotFound) + else: + self.assertIsNone(raised) + self.assertEqual(len(infos), 1) + self.assertEqual(infos[0].type, pafs.FileType.NotFound) + self.assertFalse(file_io.exists(missing_uri)) + with self.assertRaises(FileNotFoundError): + file_io.get_file_status(missing_uri) def test_local_filesystem_path_conversion(self): file_io = LocalFileIO("file:///tmp/warehouse", Options({})) From add8134ed284d2de164131128ab82e1e8e7c3b99 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sat, 14 Feb 2026 20:30:12 +0800 Subject: [PATCH 04/12] refactor _get_file_info --- .../pypaimon/filesystem/pyarrow_file_io.py | 40 ++++++++++--------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py index fcad08786bc3..495fc0f9e80f 100644 --- a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py +++ b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py @@ -185,32 +185,35 @@ def new_input_stream(self, path: str): def new_output_stream(self, path: str): path_str = self.to_filesystem_path(path) - if not (self._is_oss and not self._pyarrow_gte_7): + if self._is_oss and not self._pyarrow_gte_7: + # For PyArrow 6.x + OSS, path_str is already just the key part. + if '/' in path_str: + parent_dir = '/'.join(path_str.split('/')[:-1]) + else: + parent_dir = '' + if parent_dir and not self.exists(parent_dir): + self.mkdirs(parent_dir) + else: parent_dir = Path(path_str).parent if str(parent_dir) and not self.exists(str(parent_dir)): self.mkdirs(str(parent_dir)) - return self.filesystem.open_output_stream(path_str) - @staticmethod - def _is_key_not_found_error(e: OSError) -> bool: - msg = str(e).lower() - return ("does not exist" in msg or "not exist" in msg or "nosuchkey" in msg or "133" in msg) - def _get_file_info(self, path_str: str): try: file_infos = self.filesystem.get_file_info([path_str]) - file_info = file_infos[0] - return file_info if file_info.type != pafs.FileType.NotFound else None + return file_infos[0] except OSError as e: - if self._is_key_not_found_error(e): - return None + # this is for compatible with pyarrow < 7 + msg = str(e).lower() + if "does not exist" in msg or "not exist" in msg or "nosuchkey" in msg or "133" in msg: + return pafs.FileInfo(path_str, pafs.FileType.NotFound) raise def get_file_status(self, path: str): path_str = self.to_filesystem_path(path) file_info = self._get_file_info(path_str) - if file_info is None: + if file_info.type == pafs.FileType.NotFound: raise FileNotFoundError(f"File {path} (resolved as {path_str}) does not exist") return file_info @@ -225,14 +228,13 @@ def list_directories(self, path: str): def exists(self, path: str) -> bool: path_str = self.to_filesystem_path(path) - return self._get_file_info(path_str) is not None + return self._get_file_info(path_str).type != pafs.FileType.NotFound def delete(self, path: str, recursive: bool = False) -> bool: path_str = self.to_filesystem_path(path) file_info = self._get_file_info(path_str) - if file_info is None: + if file_info.type == pafs.FileType.NotFound: return False - if file_info.type == pafs.FileType.Directory: if not recursive: selector = pafs.FileSelector(path_str, recursive=False, allow_not_found=True) @@ -251,7 +253,7 @@ def delete(self, path: str, recursive: bool = False) -> bool: def mkdirs(self, path: str) -> bool: path_str = self.to_filesystem_path(path) file_info = self._get_file_info(path_str) - if file_info is None: + if file_info.type == pafs.FileType.NotFound: self.filesystem.create_dir(path_str, recursive=True) return True if file_info.type == pafs.FileType.Directory: @@ -274,7 +276,7 @@ def rename(self, src: str, dst: str) -> bool: return self.filesystem.rename(src_str, dst_str) dst_file_info = self._get_file_info(dst_str) - if dst_file_info is not None: + if dst_file_info.type != pafs.FileType.NotFound: if dst_file_info.type == pafs.FileType.File: return False # Make it compatible with HadoopFileIO: if dst is an existing directory, @@ -282,7 +284,7 @@ def rename(self, src: str, dst: str) -> bool: src_name = Path(src_str).name dst_str = str(Path(dst_str) / src_name) final_dst_info = self._get_file_info(dst_str) - if final_dst_info is not None: + if final_dst_info.type != pafs.FileType.NotFound: return False self.filesystem.move(src_str, dst_str) @@ -320,7 +322,7 @@ def try_to_write_atomic(self, path: str, content: str) -> bool: if self.exists(path): path_str = self.to_filesystem_path(path) file_info = self._get_file_info(path_str) - if file_info is None or file_info.type == pafs.FileType.Directory: + if file_info.type == pafs.FileType.Directory: return False temp_path = path + str(uuid.uuid4()) + ".tmp" From 08fdc2a538a02a53501fae5bdaeca452749f40a7 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sat, 14 Feb 2026 20:38:59 +0800 Subject: [PATCH 05/12] clean code --- .../pypaimon/filesystem/pyarrow_file_io.py | 37 +++++++++---------- 1 file changed, 17 insertions(+), 20 deletions(-) diff --git a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py index 495fc0f9e80f..03ce2f6f7ca8 100644 --- a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py +++ b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py @@ -54,12 +54,8 @@ def __init__(self, path: str, catalog_options: Options): self.uri_reader_factory = UriReaderFactory(catalog_options) self._is_oss = scheme in {"oss"} self._oss_bucket = None - self._oss_base_key = "" if self._is_oss: self._oss_bucket = self._extract_oss_bucket(path) - uri = urlparse(path) - if uri.scheme and uri.netloc and uri.path: - self._oss_base_key = uri.path.strip("/") self.filesystem = self._initialize_oss_fs(path) elif scheme in {"s3", "s3a", "s3n"}: self.filesystem = self._initialize_s3_fs() @@ -185,18 +181,21 @@ def new_input_stream(self, path: str): def new_output_stream(self, path: str): path_str = self.to_filesystem_path(path) + if self._is_oss and not self._pyarrow_gte_7: - # For PyArrow 6.x + OSS, path_str is already just the key part. + # For PyArrow 6.x + OSS, path_str is already just the key part if '/' in path_str: parent_dir = '/'.join(path_str.split('/')[:-1]) else: parent_dir = '' + if parent_dir and not self.exists(parent_dir): self.mkdirs(parent_dir) else: parent_dir = Path(path_str).parent if str(parent_dir) and not self.exists(str(parent_dir)): self.mkdirs(str(parent_dir)) + return self.filesystem.open_output_stream(path_str) def _get_file_info(self, path_str: str): @@ -215,6 +214,7 @@ def get_file_status(self, path: str): file_info = self._get_file_info(path_str) if file_info.type == pafs.FileType.NotFound: raise FileNotFoundError(f"File {path} (resolved as {path_str}) does not exist") + return file_info def list_status(self, path: str): @@ -233,6 +233,7 @@ def exists(self, path: str) -> bool: def delete(self, path: str, recursive: bool = False) -> bool: path_str = self.to_filesystem_path(path) file_info = self._get_file_info(path_str) + if file_info.type == pafs.FileType.NotFound: return False if file_info.type == pafs.FileType.Directory: @@ -253,6 +254,7 @@ def delete(self, path: str, recursive: bool = False) -> bool: def mkdirs(self, path: str) -> bool: path_str = self.to_filesystem_path(path) file_info = self._get_file_info(path_str) + if file_info.type == pafs.FileType.NotFound: self.filesystem.create_dir(path_str, recursive=True) return True @@ -260,6 +262,7 @@ def mkdirs(self, path: str) -> bool: return True elif file_info.type == pafs.FileType.File: raise FileExistsError(f"Path exists but is not a directory: {path}") + self.filesystem.create_dir(path_str, recursive=True) return True @@ -268,9 +271,9 @@ def rename(self, src: str, dst: str) -> bool: dst_parent = Path(dst_str).parent if str(dst_parent) and not self.exists(str(dst_parent)): self.mkdirs(str(dst_parent)) - + src_str = self.to_filesystem_path(src) - + try: if hasattr(self.filesystem, 'rename'): return self.filesystem.rename(src_str, dst_str) @@ -324,7 +327,7 @@ def try_to_write_atomic(self, path: str, content: str) -> bool: file_info = self._get_file_info(path_str) if file_info.type == pafs.FileType.Directory: return False - + temp_path = path + str(uuid.uuid4()) + ".tmp" success = False try: @@ -342,7 +345,7 @@ def copy_file(self, source_path: str, target_path: str, overwrite: bool = False) source_str = self.to_filesystem_path(source_path) target_str = self.to_filesystem_path(target_path) target_parent = Path(target_str).parent - + if str(target_parent) and not self.exists(str(target_parent)): self.mkdirs(str(target_parent)) @@ -366,7 +369,7 @@ def write_orc(self, path: str, data: pyarrow.Table, compression: str = 'zstd', zstd_level: int = 1, **kwargs): try: """Write ORC file using PyArrow ORC writer. - + Note: PyArrow's ORC writer doesn't support compression_level parameter. ORC files will use zstd compression with default level (which is 3, see https://github.com/facebook/zstd/blob/dev/programs/zstdcli.c) @@ -425,7 +428,7 @@ def record_generator(): 'zstd': 'zstandard', # zstd is commonly used in Paimon } compression_lower = compression.lower() - + codec = codec_map.get(compression_lower) if codec is None: raise ValueError( @@ -515,19 +518,13 @@ def to_filesystem_path(self, path: str) -> str: path_part = normalized_path.lstrip('/') return f"{drive_letter}:/{path_part}" if path_part else f"{drive_letter}:" - # OSS+PyArrow<7: endpoint_override already contains bucket (bucket.endpoint), so pass key only. - if self._is_oss and not self._pyarrow_gte_7: - if parsed.scheme and parsed.netloc: - path_part = normalized_path.lstrip('/') - return path_part if path_part else '.' - else: - # No scheme: path is already the key part (e.g. from Path.parent). Use as-is. - return str(path) - if isinstance(self.filesystem, S3FileSystem): if parsed.scheme: if parsed.netloc: path_part = normalized_path.lstrip('/') + # OSS+PyArrow<7: endpoint_override has bucket, pass key only. + if self._is_oss and not self._pyarrow_gte_7: + return path_part if path_part else '.' result = f"{parsed.netloc}/{path_part}" if path_part else parsed.netloc return result else: From 523923bdc2f69e7d6a2a2a129f44dab96629d101 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sat, 14 Feb 2026 20:49:07 +0800 Subject: [PATCH 06/12] clean code format --- paimon-python/pypaimon/filesystem/pyarrow_file_io.py | 2 ++ paimon-python/pypaimon/tests/file_io_test.py | 8 ++++---- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py index 03ce2f6f7ca8..8497b160c1e3 100644 --- a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py +++ b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py @@ -212,6 +212,7 @@ def _get_file_info(self, path_str: str): def get_file_status(self, path: str): path_str = self.to_filesystem_path(path) file_info = self._get_file_info(path_str) + if file_info.type == pafs.FileType.NotFound: raise FileNotFoundError(f"File {path} (resolved as {path_str}) does not exist") @@ -236,6 +237,7 @@ def delete(self, path: str, recursive: bool = False) -> bool: if file_info.type == pafs.FileType.NotFound: return False + if file_info.type == pafs.FileType.Directory: if not recursive: selector = pafs.FileSelector(path_str, recursive=False, allow_not_found=True) diff --git a/paimon-python/pypaimon/tests/file_io_test.py b/paimon-python/pypaimon/tests/file_io_test.py index 4344521b6d2d..dd6d9a41cf81 100644 --- a/paimon-python/pypaimon/tests/file_io_test.py +++ b/paimon-python/pypaimon/tests/file_io_test.py @@ -94,13 +94,13 @@ def record_get_file_info(paths): mock_fs.open_output_stream.return_value = MagicMock() oss_io.filesystem = mock_fs oss_io.new_output_stream("oss://test-bucket/path/to/file.txt") + mock_fs.create_dir.assert_called_once() + path_str = oss_io.to_filesystem_path("oss://test-bucket/path/to/file.txt") if lt7: - mock_fs.create_dir.assert_not_called() + expected_parent = "/".join(path_str.split("/")[:-1]) if "/" in path_str else "" else: - mock_fs.create_dir.assert_called_once() - path_str = oss_io.to_filesystem_path("oss://test-bucket/path/to/file.txt") expected_parent = "/".join(path_str.split("/")[:-1]) if "/" in path_str else str(Path(path_str).parent) - self.assertEqual(mock_fs.create_dir.call_args[0][0], expected_parent) + self.assertEqual(mock_fs.create_dir.call_args[0][0], expected_parent) if lt7: for call_paths in get_file_info_calls: for p in call_paths: From 79d5f25ab2184437f40dac52dff59d57a421f354 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sat, 14 Feb 2026 20:58:52 +0800 Subject: [PATCH 07/12] clean code format --- .../pypaimon/filesystem/pyarrow_file_io.py | 150 +++++++++--------- 1 file changed, 75 insertions(+), 75 deletions(-) diff --git a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py index 8497b160c1e3..ba39ad404aeb 100644 --- a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py +++ b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py @@ -63,7 +63,7 @@ def __init__(self, path: str, catalog_options: Options): self.filesystem = self._initialize_hdfs_fs(scheme, netloc) else: raise ValueError(f"Unrecognized filesystem type in URI: {scheme}") - + @staticmethod def parse_location(location: str): uri = urlparse(location) @@ -73,7 +73,7 @@ def parse_location(location: str): return uri.scheme, uri.netloc, uri.path else: return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}" - + def _create_s3_retry_config( self, max_attempts: int = 10, @@ -93,19 +93,19 @@ def _create_s3_retry_config( return config else: return {} - + def _extract_oss_bucket(self, location) -> str: uri = urlparse(location) if uri.scheme and uri.scheme != "oss": raise ValueError("Not an OSS URI: {}".format(location)) - + netloc = uri.netloc or "" if (getattr(uri, "username", None) or getattr(uri, "password", None)) or ("@" in netloc): first_segment = uri.path.lstrip("/").split("/", 1)[0] if not first_segment: raise ValueError("Invalid OSS URI without bucket: {}".format(location)) return first_segment - + host = getattr(uri, "hostname", None) or netloc if not host: raise ValueError("Invalid OSS URI without host: {}".format(location)) @@ -113,7 +113,7 @@ def _extract_oss_bucket(self, location) -> str: if not bucket: raise ValueError("Invalid OSS URI without bucket: {}".format(location)) return bucket - + def _initialize_oss_fs(self, path) -> FileSystem: client_kwargs = { "access_key": self.properties.get(OssOptions.OSS_ACCESS_KEY_ID), @@ -121,19 +121,19 @@ def _initialize_oss_fs(self, path) -> FileSystem: "session_token": self.properties.get(OssOptions.OSS_SECURITY_TOKEN), "region": self.properties.get(OssOptions.OSS_REGION), } - + if self._pyarrow_gte_7: client_kwargs['force_virtual_addressing'] = True client_kwargs['endpoint_override'] = self.properties.get(OssOptions.OSS_ENDPOINT) else: client_kwargs['endpoint_override'] = (self._oss_bucket + "." + self.properties.get(OssOptions.OSS_ENDPOINT)) - + retry_config = self._create_s3_retry_config() client_kwargs.update(retry_config) - + return pafs.S3FileSystem(**client_kwargs) - + def _initialize_s3_fs(self) -> FileSystem: client_kwargs = { "endpoint_override": self.properties.get(S3Options.S3_ENDPOINT), @@ -144,22 +144,22 @@ def _initialize_s3_fs(self) -> FileSystem: } if self._pyarrow_gte_7: client_kwargs["force_virtual_addressing"] = True - + retry_config = self._create_s3_retry_config() client_kwargs.update(retry_config) - + return pafs.S3FileSystem(**client_kwargs) - + def _initialize_hdfs_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem: if 'HADOOP_HOME' not in os.environ: raise RuntimeError("HADOOP_HOME environment variable is not set.") if 'HADOOP_CONF_DIR' not in os.environ: raise RuntimeError("HADOOP_CONF_DIR environment variable is not set.") - + hadoop_home = os.environ.get("HADOOP_HOME") native_lib_path = f"{hadoop_home}/lib/native" os.environ['LD_LIBRARY_PATH'] = f"{native_lib_path}:{os.environ.get('LD_LIBRARY_PATH', '')}" - + class_paths = subprocess.run( [f'{hadoop_home}/bin/hadoop', 'classpath', '--glob'], capture_output=True, @@ -167,37 +167,37 @@ def _initialize_hdfs_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem: check=True ) os.environ['CLASSPATH'] = class_paths.stdout.strip() - + host, port_str = splitport(netloc) return pafs.HadoopFileSystem( host=host, port=int(port_str), user=os.environ.get('HADOOP_USER_NAME', 'hadoop') ) - + def new_input_stream(self, path: str): path_str = self.to_filesystem_path(path) return self.filesystem.open_input_file(path_str) - + def new_output_stream(self, path: str): path_str = self.to_filesystem_path(path) - + if self._is_oss and not self._pyarrow_gte_7: # For PyArrow 6.x + OSS, path_str is already just the key part if '/' in path_str: parent_dir = '/'.join(path_str.split('/')[:-1]) else: parent_dir = '' - + if parent_dir and not self.exists(parent_dir): self.mkdirs(parent_dir) else: parent_dir = Path(path_str).parent if str(parent_dir) and not self.exists(str(parent_dir)): self.mkdirs(str(parent_dir)) - + return self.filesystem.open_output_stream(path_str) - + def _get_file_info(self, path_str: str): try: file_infos = self.filesystem.get_file_info([path_str]) @@ -208,36 +208,36 @@ def _get_file_info(self, path_str: str): if "does not exist" in msg or "not exist" in msg or "nosuchkey" in msg or "133" in msg: return pafs.FileInfo(path_str, pafs.FileType.NotFound) raise - + def get_file_status(self, path: str): path_str = self.to_filesystem_path(path) file_info = self._get_file_info(path_str) - + if file_info.type == pafs.FileType.NotFound: raise FileNotFoundError(f"File {path} (resolved as {path_str}) does not exist") - + return file_info - + def list_status(self, path: str): path_str = self.to_filesystem_path(path) selector = pafs.FileSelector(path_str, recursive=False, allow_not_found=True) return self.filesystem.get_file_info(selector) - + def list_directories(self, path: str): file_infos = self.list_status(path) return [info for info in file_infos if info.type == pafs.FileType.Directory] - + def exists(self, path: str) -> bool: path_str = self.to_filesystem_path(path) return self._get_file_info(path_str).type != pafs.FileType.NotFound - + def delete(self, path: str, recursive: bool = False) -> bool: path_str = self.to_filesystem_path(path) file_info = self._get_file_info(path_str) - + if file_info.type == pafs.FileType.NotFound: return False - + if file_info.type == pafs.FileType.Directory: if not recursive: selector = pafs.FileSelector(path_str, recursive=False, allow_not_found=True) @@ -252,11 +252,11 @@ def delete(self, path: str, recursive: bool = False) -> bool: else: self.filesystem.delete_file(path_str) return True - + def mkdirs(self, path: str) -> bool: path_str = self.to_filesystem_path(path) file_info = self._get_file_info(path_str) - + if file_info.type == pafs.FileType.NotFound: self.filesystem.create_dir(path_str, recursive=True) return True @@ -264,22 +264,22 @@ def mkdirs(self, path: str) -> bool: return True elif file_info.type == pafs.FileType.File: raise FileExistsError(f"Path exists but is not a directory: {path}") - + self.filesystem.create_dir(path_str, recursive=True) return True - + def rename(self, src: str, dst: str) -> bool: dst_str = self.to_filesystem_path(dst) dst_parent = Path(dst_str).parent if str(dst_parent) and not self.exists(str(dst_parent)): self.mkdirs(str(dst_parent)) - + src_str = self.to_filesystem_path(src) - + try: if hasattr(self.filesystem, 'rename'): return self.filesystem.rename(src_str, dst_str) - + dst_file_info = self._get_file_info(dst_str) if dst_file_info.type != pafs.FileType.NotFound: if dst_file_info.type == pafs.FileType.File: @@ -291,45 +291,45 @@ def rename(self, src: str, dst: str) -> bool: final_dst_info = self._get_file_info(dst_str) if final_dst_info.type != pafs.FileType.NotFound: return False - + self.filesystem.move(src_str, dst_str) return True except FileNotFoundError: return False except (PermissionError, OSError): return False - + def delete_quietly(self, path: str): if self.logger.isEnabledFor(logging.DEBUG): self.logger.debug(f"Ready to delete {path}") - + try: if not self.delete(path, False) and self.exists(path): self.logger.warning(f"Failed to delete file {path}") except Exception: self.logger.warning(f"Exception occurs when deleting file {path}", exc_info=True) - + def delete_files_quietly(self, files: List[str]): for file_path in files: self.delete_quietly(file_path) - + def delete_directory_quietly(self, directory: str): if self.logger.isEnabledFor(logging.DEBUG): self.logger.debug(f"Ready to delete {directory}") - + try: if not self.delete(directory, True) and self.exists(directory): self.logger.warning(f"Failed to delete directory {directory}") except Exception: self.logger.warning(f"Exception occurs when deleting directory {directory}", exc_info=True) - + def try_to_write_atomic(self, path: str, content: str) -> bool: if self.exists(path): path_str = self.to_filesystem_path(path) file_info = self._get_file_info(path_str) if file_info.type == pafs.FileType.Directory: return False - + temp_path = path + str(uuid.uuid4()) + ".tmp" success = False try: @@ -339,39 +339,39 @@ def try_to_write_atomic(self, path: str, content: str) -> bool: if not success: self.delete_quietly(temp_path) return success - + def copy_file(self, source_path: str, target_path: str, overwrite: bool = False): if not overwrite and self.exists(target_path): raise FileExistsError(f"Target file {target_path} already exists and overwrite=False") - + source_str = self.to_filesystem_path(source_path) target_str = self.to_filesystem_path(target_path) target_parent = Path(target_str).parent - + if str(target_parent) and not self.exists(str(target_parent)): self.mkdirs(str(target_parent)) - + self.filesystem.copy_file(source_str, target_str) - + def write_parquet(self, path: str, data: pyarrow.Table, compression: str = 'zstd', zstd_level: int = 1, **kwargs): try: import pyarrow.parquet as pq - + with self.new_output_stream(path) as output_stream: if compression.lower() == 'zstd': kwargs['compression_level'] = zstd_level pq.write_table(data, output_stream, compression=compression, **kwargs) - + except Exception as e: self.delete_quietly(path) raise RuntimeError(f"Failed to write Parquet file {path}: {e}") from e - + def write_orc(self, path: str, data: pyarrow.Table, compression: str = 'zstd', zstd_level: int = 1, **kwargs): try: """Write ORC file using PyArrow ORC writer. - + Note: PyArrow's ORC writer doesn't support compression_level parameter. ORC files will use zstd compression with default level (which is 3, see https://github.com/facebook/zstd/blob/dev/programs/zstdcli.c) @@ -379,7 +379,7 @@ def write_orc(self, path: str, data: pyarrow.Table, compression: str = 'zstd', """ import sys import pyarrow.orc as orc - + with self.new_output_stream(path) as output_stream: # Check Python version - if 3.6, don't use compression parameter if sys.version_info[:2] == (3, 6): @@ -391,11 +391,11 @@ def write_orc(self, path: str, data: pyarrow.Table, compression: str = 'zstd', compression=compression, **kwargs ) - + except Exception as e: self.delete_quietly(path) raise RuntimeError(f"Failed to write ORC file {path}: {e}") from e - + def write_avro( self, path: str, data: pyarrow.Table, avro_schema: Optional[Dict[str, Any]] = None, @@ -404,9 +404,9 @@ def write_avro( if avro_schema is None: from pypaimon.schema.data_types import PyarrowFieldParser avro_schema = PyarrowFieldParser.to_avro_schema(data.schema) - + records_dict = data.to_pydict() - + def record_generator(): num_rows = len(list(records_dict.values())[0]) for i in range(num_rows): @@ -417,9 +417,9 @@ def record_generator(): value = value.replace(tzinfo=timezone.utc) record[col] = value yield record - + records = record_generator() - + codec_map = { 'null': 'null', 'deflate': 'deflate', @@ -430,25 +430,25 @@ def record_generator(): 'zstd': 'zstandard', # zstd is commonly used in Paimon } compression_lower = compression.lower() - + codec = codec_map.get(compression_lower) if codec is None: raise ValueError( f"Unsupported compression '{compression}' for Avro format. " f"Supported compressions: {', '.join(sorted(codec_map.keys()))}." ) - + with self.new_output_stream(path) as output_stream: if codec == 'zstandard': kwargs['codec_compression_level'] = zstd_level fastavro.writer(output_stream, avro_schema, records, codec=codec, **kwargs) - + def write_lance(self, path: str, data: pyarrow.Table, **kwargs): try: import lance from pypaimon.read.reader.lance_utils import to_lance_specified file_path_for_lance, storage_options = to_lance_specified(self, path) - + writer = lance.file.LanceFileWriter( file_path_for_lance, data.schema, storage_options=storage_options, **kwargs) try: @@ -460,7 +460,7 @@ def write_lance(self, path: str, data: pyarrow.Table, **kwargs): except Exception as e: self.delete_quietly(path) raise RuntimeError(f"Failed to write Lance file {path}: {e}") from e - + def write_blob(self, path: str, data: pyarrow.Table, blob_as_descriptor: bool, **kwargs): try: if data.num_columns != 1: @@ -500,26 +500,26 @@ def write_blob(self, path: str, data: pyarrow.Table, blob_as_descriptor: bool, * row = GenericRow(row_values, fields, RowKind.INSERT) writer.add_element(row) writer.close() - + except Exception as e: self.delete_quietly(path) raise RuntimeError(f"Failed to write blob file {path}: {e}") from e - + def to_filesystem_path(self, path: str) -> str: from pyarrow.fs import S3FileSystem import re - + parsed = urlparse(path) normalized_path = re.sub(r'/+', '/', parsed.path) if parsed.path else '' - + if parsed.scheme and len(parsed.scheme) == 1 and not parsed.netloc: return str(path) - + if parsed.scheme == 'file' and parsed.netloc and parsed.netloc.endswith(':'): drive_letter = parsed.netloc.rstrip(':') path_part = normalized_path.lstrip('/') return f"{drive_letter}:/{path_part}" if path_part else f"{drive_letter}:" - + if isinstance(self.filesystem, S3FileSystem): if parsed.scheme: if parsed.netloc: @@ -534,10 +534,10 @@ def to_filesystem_path(self, path: str) -> str: return result if result else '.' else: return str(path) - + if parsed.scheme: if not normalized_path: return '.' return normalized_path - + return str(path) From 747a0223748c79dabb8c5b3c70807482e783f0be Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sat, 14 Feb 2026 21:01:18 +0800 Subject: [PATCH 08/12] clean code format --- .../pypaimon/filesystem/pyarrow_file_io.py | 150 +++++++++--------- 1 file changed, 75 insertions(+), 75 deletions(-) diff --git a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py index ba39ad404aeb..8497b160c1e3 100644 --- a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py +++ b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py @@ -63,7 +63,7 @@ def __init__(self, path: str, catalog_options: Options): self.filesystem = self._initialize_hdfs_fs(scheme, netloc) else: raise ValueError(f"Unrecognized filesystem type in URI: {scheme}") - + @staticmethod def parse_location(location: str): uri = urlparse(location) @@ -73,7 +73,7 @@ def parse_location(location: str): return uri.scheme, uri.netloc, uri.path else: return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}" - + def _create_s3_retry_config( self, max_attempts: int = 10, @@ -93,19 +93,19 @@ def _create_s3_retry_config( return config else: return {} - + def _extract_oss_bucket(self, location) -> str: uri = urlparse(location) if uri.scheme and uri.scheme != "oss": raise ValueError("Not an OSS URI: {}".format(location)) - + netloc = uri.netloc or "" if (getattr(uri, "username", None) or getattr(uri, "password", None)) or ("@" in netloc): first_segment = uri.path.lstrip("/").split("/", 1)[0] if not first_segment: raise ValueError("Invalid OSS URI without bucket: {}".format(location)) return first_segment - + host = getattr(uri, "hostname", None) or netloc if not host: raise ValueError("Invalid OSS URI without host: {}".format(location)) @@ -113,7 +113,7 @@ def _extract_oss_bucket(self, location) -> str: if not bucket: raise ValueError("Invalid OSS URI without bucket: {}".format(location)) return bucket - + def _initialize_oss_fs(self, path) -> FileSystem: client_kwargs = { "access_key": self.properties.get(OssOptions.OSS_ACCESS_KEY_ID), @@ -121,19 +121,19 @@ def _initialize_oss_fs(self, path) -> FileSystem: "session_token": self.properties.get(OssOptions.OSS_SECURITY_TOKEN), "region": self.properties.get(OssOptions.OSS_REGION), } - + if self._pyarrow_gte_7: client_kwargs['force_virtual_addressing'] = True client_kwargs['endpoint_override'] = self.properties.get(OssOptions.OSS_ENDPOINT) else: client_kwargs['endpoint_override'] = (self._oss_bucket + "." + self.properties.get(OssOptions.OSS_ENDPOINT)) - + retry_config = self._create_s3_retry_config() client_kwargs.update(retry_config) - + return pafs.S3FileSystem(**client_kwargs) - + def _initialize_s3_fs(self) -> FileSystem: client_kwargs = { "endpoint_override": self.properties.get(S3Options.S3_ENDPOINT), @@ -144,22 +144,22 @@ def _initialize_s3_fs(self) -> FileSystem: } if self._pyarrow_gte_7: client_kwargs["force_virtual_addressing"] = True - + retry_config = self._create_s3_retry_config() client_kwargs.update(retry_config) - + return pafs.S3FileSystem(**client_kwargs) - + def _initialize_hdfs_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem: if 'HADOOP_HOME' not in os.environ: raise RuntimeError("HADOOP_HOME environment variable is not set.") if 'HADOOP_CONF_DIR' not in os.environ: raise RuntimeError("HADOOP_CONF_DIR environment variable is not set.") - + hadoop_home = os.environ.get("HADOOP_HOME") native_lib_path = f"{hadoop_home}/lib/native" os.environ['LD_LIBRARY_PATH'] = f"{native_lib_path}:{os.environ.get('LD_LIBRARY_PATH', '')}" - + class_paths = subprocess.run( [f'{hadoop_home}/bin/hadoop', 'classpath', '--glob'], capture_output=True, @@ -167,37 +167,37 @@ def _initialize_hdfs_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem: check=True ) os.environ['CLASSPATH'] = class_paths.stdout.strip() - + host, port_str = splitport(netloc) return pafs.HadoopFileSystem( host=host, port=int(port_str), user=os.environ.get('HADOOP_USER_NAME', 'hadoop') ) - + def new_input_stream(self, path: str): path_str = self.to_filesystem_path(path) return self.filesystem.open_input_file(path_str) - + def new_output_stream(self, path: str): path_str = self.to_filesystem_path(path) - + if self._is_oss and not self._pyarrow_gte_7: # For PyArrow 6.x + OSS, path_str is already just the key part if '/' in path_str: parent_dir = '/'.join(path_str.split('/')[:-1]) else: parent_dir = '' - + if parent_dir and not self.exists(parent_dir): self.mkdirs(parent_dir) else: parent_dir = Path(path_str).parent if str(parent_dir) and not self.exists(str(parent_dir)): self.mkdirs(str(parent_dir)) - + return self.filesystem.open_output_stream(path_str) - + def _get_file_info(self, path_str: str): try: file_infos = self.filesystem.get_file_info([path_str]) @@ -208,36 +208,36 @@ def _get_file_info(self, path_str: str): if "does not exist" in msg or "not exist" in msg or "nosuchkey" in msg or "133" in msg: return pafs.FileInfo(path_str, pafs.FileType.NotFound) raise - + def get_file_status(self, path: str): path_str = self.to_filesystem_path(path) file_info = self._get_file_info(path_str) - + if file_info.type == pafs.FileType.NotFound: raise FileNotFoundError(f"File {path} (resolved as {path_str}) does not exist") - + return file_info - + def list_status(self, path: str): path_str = self.to_filesystem_path(path) selector = pafs.FileSelector(path_str, recursive=False, allow_not_found=True) return self.filesystem.get_file_info(selector) - + def list_directories(self, path: str): file_infos = self.list_status(path) return [info for info in file_infos if info.type == pafs.FileType.Directory] - + def exists(self, path: str) -> bool: path_str = self.to_filesystem_path(path) return self._get_file_info(path_str).type != pafs.FileType.NotFound - + def delete(self, path: str, recursive: bool = False) -> bool: path_str = self.to_filesystem_path(path) file_info = self._get_file_info(path_str) - + if file_info.type == pafs.FileType.NotFound: return False - + if file_info.type == pafs.FileType.Directory: if not recursive: selector = pafs.FileSelector(path_str, recursive=False, allow_not_found=True) @@ -252,11 +252,11 @@ def delete(self, path: str, recursive: bool = False) -> bool: else: self.filesystem.delete_file(path_str) return True - + def mkdirs(self, path: str) -> bool: path_str = self.to_filesystem_path(path) file_info = self._get_file_info(path_str) - + if file_info.type == pafs.FileType.NotFound: self.filesystem.create_dir(path_str, recursive=True) return True @@ -264,22 +264,22 @@ def mkdirs(self, path: str) -> bool: return True elif file_info.type == pafs.FileType.File: raise FileExistsError(f"Path exists but is not a directory: {path}") - + self.filesystem.create_dir(path_str, recursive=True) return True - + def rename(self, src: str, dst: str) -> bool: dst_str = self.to_filesystem_path(dst) dst_parent = Path(dst_str).parent if str(dst_parent) and not self.exists(str(dst_parent)): self.mkdirs(str(dst_parent)) - + src_str = self.to_filesystem_path(src) - + try: if hasattr(self.filesystem, 'rename'): return self.filesystem.rename(src_str, dst_str) - + dst_file_info = self._get_file_info(dst_str) if dst_file_info.type != pafs.FileType.NotFound: if dst_file_info.type == pafs.FileType.File: @@ -291,45 +291,45 @@ def rename(self, src: str, dst: str) -> bool: final_dst_info = self._get_file_info(dst_str) if final_dst_info.type != pafs.FileType.NotFound: return False - + self.filesystem.move(src_str, dst_str) return True except FileNotFoundError: return False except (PermissionError, OSError): return False - + def delete_quietly(self, path: str): if self.logger.isEnabledFor(logging.DEBUG): self.logger.debug(f"Ready to delete {path}") - + try: if not self.delete(path, False) and self.exists(path): self.logger.warning(f"Failed to delete file {path}") except Exception: self.logger.warning(f"Exception occurs when deleting file {path}", exc_info=True) - + def delete_files_quietly(self, files: List[str]): for file_path in files: self.delete_quietly(file_path) - + def delete_directory_quietly(self, directory: str): if self.logger.isEnabledFor(logging.DEBUG): self.logger.debug(f"Ready to delete {directory}") - + try: if not self.delete(directory, True) and self.exists(directory): self.logger.warning(f"Failed to delete directory {directory}") except Exception: self.logger.warning(f"Exception occurs when deleting directory {directory}", exc_info=True) - + def try_to_write_atomic(self, path: str, content: str) -> bool: if self.exists(path): path_str = self.to_filesystem_path(path) file_info = self._get_file_info(path_str) if file_info.type == pafs.FileType.Directory: return False - + temp_path = path + str(uuid.uuid4()) + ".tmp" success = False try: @@ -339,39 +339,39 @@ def try_to_write_atomic(self, path: str, content: str) -> bool: if not success: self.delete_quietly(temp_path) return success - + def copy_file(self, source_path: str, target_path: str, overwrite: bool = False): if not overwrite and self.exists(target_path): raise FileExistsError(f"Target file {target_path} already exists and overwrite=False") - + source_str = self.to_filesystem_path(source_path) target_str = self.to_filesystem_path(target_path) target_parent = Path(target_str).parent - + if str(target_parent) and not self.exists(str(target_parent)): self.mkdirs(str(target_parent)) - + self.filesystem.copy_file(source_str, target_str) - + def write_parquet(self, path: str, data: pyarrow.Table, compression: str = 'zstd', zstd_level: int = 1, **kwargs): try: import pyarrow.parquet as pq - + with self.new_output_stream(path) as output_stream: if compression.lower() == 'zstd': kwargs['compression_level'] = zstd_level pq.write_table(data, output_stream, compression=compression, **kwargs) - + except Exception as e: self.delete_quietly(path) raise RuntimeError(f"Failed to write Parquet file {path}: {e}") from e - + def write_orc(self, path: str, data: pyarrow.Table, compression: str = 'zstd', zstd_level: int = 1, **kwargs): try: """Write ORC file using PyArrow ORC writer. - + Note: PyArrow's ORC writer doesn't support compression_level parameter. ORC files will use zstd compression with default level (which is 3, see https://github.com/facebook/zstd/blob/dev/programs/zstdcli.c) @@ -379,7 +379,7 @@ def write_orc(self, path: str, data: pyarrow.Table, compression: str = 'zstd', """ import sys import pyarrow.orc as orc - + with self.new_output_stream(path) as output_stream: # Check Python version - if 3.6, don't use compression parameter if sys.version_info[:2] == (3, 6): @@ -391,11 +391,11 @@ def write_orc(self, path: str, data: pyarrow.Table, compression: str = 'zstd', compression=compression, **kwargs ) - + except Exception as e: self.delete_quietly(path) raise RuntimeError(f"Failed to write ORC file {path}: {e}") from e - + def write_avro( self, path: str, data: pyarrow.Table, avro_schema: Optional[Dict[str, Any]] = None, @@ -404,9 +404,9 @@ def write_avro( if avro_schema is None: from pypaimon.schema.data_types import PyarrowFieldParser avro_schema = PyarrowFieldParser.to_avro_schema(data.schema) - + records_dict = data.to_pydict() - + def record_generator(): num_rows = len(list(records_dict.values())[0]) for i in range(num_rows): @@ -417,9 +417,9 @@ def record_generator(): value = value.replace(tzinfo=timezone.utc) record[col] = value yield record - + records = record_generator() - + codec_map = { 'null': 'null', 'deflate': 'deflate', @@ -430,25 +430,25 @@ def record_generator(): 'zstd': 'zstandard', # zstd is commonly used in Paimon } compression_lower = compression.lower() - + codec = codec_map.get(compression_lower) if codec is None: raise ValueError( f"Unsupported compression '{compression}' for Avro format. " f"Supported compressions: {', '.join(sorted(codec_map.keys()))}." ) - + with self.new_output_stream(path) as output_stream: if codec == 'zstandard': kwargs['codec_compression_level'] = zstd_level fastavro.writer(output_stream, avro_schema, records, codec=codec, **kwargs) - + def write_lance(self, path: str, data: pyarrow.Table, **kwargs): try: import lance from pypaimon.read.reader.lance_utils import to_lance_specified file_path_for_lance, storage_options = to_lance_specified(self, path) - + writer = lance.file.LanceFileWriter( file_path_for_lance, data.schema, storage_options=storage_options, **kwargs) try: @@ -460,7 +460,7 @@ def write_lance(self, path: str, data: pyarrow.Table, **kwargs): except Exception as e: self.delete_quietly(path) raise RuntimeError(f"Failed to write Lance file {path}: {e}") from e - + def write_blob(self, path: str, data: pyarrow.Table, blob_as_descriptor: bool, **kwargs): try: if data.num_columns != 1: @@ -500,26 +500,26 @@ def write_blob(self, path: str, data: pyarrow.Table, blob_as_descriptor: bool, * row = GenericRow(row_values, fields, RowKind.INSERT) writer.add_element(row) writer.close() - + except Exception as e: self.delete_quietly(path) raise RuntimeError(f"Failed to write blob file {path}: {e}") from e - + def to_filesystem_path(self, path: str) -> str: from pyarrow.fs import S3FileSystem import re - + parsed = urlparse(path) normalized_path = re.sub(r'/+', '/', parsed.path) if parsed.path else '' - + if parsed.scheme and len(parsed.scheme) == 1 and not parsed.netloc: return str(path) - + if parsed.scheme == 'file' and parsed.netloc and parsed.netloc.endswith(':'): drive_letter = parsed.netloc.rstrip(':') path_part = normalized_path.lstrip('/') return f"{drive_letter}:/{path_part}" if path_part else f"{drive_letter}:" - + if isinstance(self.filesystem, S3FileSystem): if parsed.scheme: if parsed.netloc: @@ -534,10 +534,10 @@ def to_filesystem_path(self, path: str) -> str: return result if result else '.' else: return str(path) - + if parsed.scheme: if not normalized_path: return '.' return normalized_path - + return str(path) From ed574f4919896348aee8e17ae732cbdab30664d9 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sat, 14 Feb 2026 21:04:00 +0800 Subject: [PATCH 09/12] fix --- .../pypaimon/filesystem/pyarrow_file_io.py | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py index 8497b160c1e3..afcbc10249c1 100644 --- a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py +++ b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py @@ -188,7 +188,7 @@ def new_output_stream(self, path: str): parent_dir = '/'.join(path_str.split('/')[:-1]) else: parent_dir = '' - + if parent_dir and not self.exists(parent_dir): self.mkdirs(parent_dir) else: @@ -212,10 +212,10 @@ def _get_file_info(self, path_str: str): def get_file_status(self, path: str): path_str = self.to_filesystem_path(path) file_info = self._get_file_info(path_str) - + if file_info.type == pafs.FileType.NotFound: raise FileNotFoundError(f"File {path} (resolved as {path_str}) does not exist") - + return file_info def list_status(self, path: str): @@ -234,10 +234,10 @@ def exists(self, path: str) -> bool: def delete(self, path: str, recursive: bool = False) -> bool: path_str = self.to_filesystem_path(path) file_info = self._get_file_info(path_str) - + if file_info.type == pafs.FileType.NotFound: return False - + if file_info.type == pafs.FileType.Directory: if not recursive: selector = pafs.FileSelector(path_str, recursive=False, allow_not_found=True) @@ -256,7 +256,7 @@ def delete(self, path: str, recursive: bool = False) -> bool: def mkdirs(self, path: str) -> bool: path_str = self.to_filesystem_path(path) file_info = self._get_file_info(path_str) - + if file_info.type == pafs.FileType.NotFound: self.filesystem.create_dir(path_str, recursive=True) return True @@ -264,7 +264,7 @@ def mkdirs(self, path: str) -> bool: return True elif file_info.type == pafs.FileType.File: raise FileExistsError(f"Path exists but is not a directory: {path}") - + self.filesystem.create_dir(path_str, recursive=True) return True @@ -273,13 +273,13 @@ def rename(self, src: str, dst: str) -> bool: dst_parent = Path(dst_str).parent if str(dst_parent) and not self.exists(str(dst_parent)): self.mkdirs(str(dst_parent)) - + src_str = self.to_filesystem_path(src) - + try: if hasattr(self.filesystem, 'rename'): return self.filesystem.rename(src_str, dst_str) - + dst_file_info = self._get_file_info(dst_str) if dst_file_info.type != pafs.FileType.NotFound: if dst_file_info.type == pafs.FileType.File: @@ -291,7 +291,7 @@ def rename(self, src: str, dst: str) -> bool: final_dst_info = self._get_file_info(dst_str) if final_dst_info.type != pafs.FileType.NotFound: return False - + self.filesystem.move(src_str, dst_str) return True except FileNotFoundError: @@ -329,7 +329,7 @@ def try_to_write_atomic(self, path: str, content: str) -> bool: file_info = self._get_file_info(path_str) if file_info.type == pafs.FileType.Directory: return False - + temp_path = path + str(uuid.uuid4()) + ".tmp" success = False try: @@ -347,7 +347,7 @@ def copy_file(self, source_path: str, target_path: str, overwrite: bool = False) source_str = self.to_filesystem_path(source_path) target_str = self.to_filesystem_path(target_path) target_parent = Path(target_str).parent - + if str(target_parent) and not self.exists(str(target_parent)): self.mkdirs(str(target_parent)) @@ -371,7 +371,7 @@ def write_orc(self, path: str, data: pyarrow.Table, compression: str = 'zstd', zstd_level: int = 1, **kwargs): try: """Write ORC file using PyArrow ORC writer. - + Note: PyArrow's ORC writer doesn't support compression_level parameter. ORC files will use zstd compression with default level (which is 3, see https://github.com/facebook/zstd/blob/dev/programs/zstdcli.c) @@ -430,7 +430,7 @@ def record_generator(): 'zstd': 'zstandard', # zstd is commonly used in Paimon } compression_lower = compression.lower() - + codec = codec_map.get(compression_lower) if codec is None: raise ValueError( From cecea2a33a16746745ffb1b3550bbc1592c18b17 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sat, 14 Feb 2026 21:09:15 +0800 Subject: [PATCH 10/12] fix code format --- paimon-python/pypaimon/filesystem/pyarrow_file_io.py | 1 - paimon-python/pypaimon/tests/file_io_test.py | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py index afcbc10249c1..daf8ad2092cd 100644 --- a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py +++ b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py @@ -43,7 +43,6 @@ def _pyarrow_lt_7(): return parse(pyarrow.__version__) < parse("7.0.0") - class PyArrowFileIO(FileIO): def __init__(self, path: str, catalog_options: Options): self.properties = catalog_options diff --git a/paimon-python/pypaimon/tests/file_io_test.py b/paimon-python/pypaimon/tests/file_io_test.py index dd6d9a41cf81..ba91f94a18e6 100644 --- a/paimon-python/pypaimon/tests/file_io_test.py +++ b/paimon-python/pypaimon/tests/file_io_test.py @@ -97,7 +97,7 @@ def record_get_file_info(paths): mock_fs.create_dir.assert_called_once() path_str = oss_io.to_filesystem_path("oss://test-bucket/path/to/file.txt") if lt7: - expected_parent = "/".join(path_str.split("/")[:-1]) if "/" in path_str else "" + expected_parent = '/'.join(path_str.split('/')[:-1]) if '/' in path_str else '' else: expected_parent = "/".join(path_str.split("/")[:-1]) if "/" in path_str else str(Path(path_str).parent) self.assertEqual(mock_fs.create_dir.call_args[0][0], expected_parent) From c7b29ff0416dcbceda4b9413b2fb542cdd8a2494 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sat, 14 Feb 2026 21:58:08 +0800 Subject: [PATCH 11/12] fix code format --- paimon-python/pypaimon/filesystem/pyarrow_file_io.py | 1 + 1 file changed, 1 insertion(+) diff --git a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py index daf8ad2092cd..afcbc10249c1 100644 --- a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py +++ b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py @@ -43,6 +43,7 @@ def _pyarrow_lt_7(): return parse(pyarrow.__version__) < parse("7.0.0") + class PyArrowFileIO(FileIO): def __init__(self, path: str, catalog_options: Options): self.properties = catalog_options From 532040373ba38636d8b4cec289acb4213dd8e071 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sat, 14 Feb 2026 22:21:23 +0800 Subject: [PATCH 12/12] optimize error message check --- paimon-python/pypaimon/filesystem/pyarrow_file_io.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py index afcbc10249c1..c02c5f62a8cb 100644 --- a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py +++ b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py @@ -17,6 +17,7 @@ ################################################################################ import logging import os +import re import subprocess import uuid from datetime import datetime, timezone @@ -205,7 +206,8 @@ def _get_file_info(self, path_str: str): except OSError as e: # this is for compatible with pyarrow < 7 msg = str(e).lower() - if "does not exist" in msg or "not exist" in msg or "nosuchkey" in msg or "133" in msg: + if ("does not exist" in msg or "not exist" in msg or "nosuchkey" in msg + or re.search(r'\b133\b', msg) or "notfound" in msg): return pafs.FileInfo(path_str, pafs.FileType.NotFound) raise