From aca696e28ea8a8169cfc63b8c547b0d7e34872fe Mon Sep 17 00:00:00 2001 From: Mrityunjay Raj Date: Wed, 10 Jun 2026 10:43:45 +0530 Subject: [PATCH 1/9] repository: add obj_offset/obj_size range-load params to Store.load calls retry_size min() guards against corrupted meta_size, no-op for healthy objects. --- src/borg/repository.py | 16 ++++++++++++---- src/borg/testsuite/repository_test.py | 14 +++++++++++++- 2 files changed, 25 insertions(+), 5 deletions(-) diff --git a/src/borg/repository.py b/src/borg/repository.py index c8ae6c1a40..f8b0ffee58 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -603,20 +603,23 @@ def list(self, limit=None, marker=None): # note: do not collect the marker id return result - def get(self, id, read_data=True, raise_missing=True): + def get(self, id, read_data=True, raise_missing=True, obj_offset=0, obj_size=None): self._lock_refresh() pack_id = id # N=1: pack_id == chunk_id id_hex = bin_to_hex(id) key = "packs/" + bin_to_hex(pack_id) try: if read_data: - return self.store.load(key) + return self.store.load(key, offset=obj_offset, size=obj_size) else: # RepoObj layout supports separately encrypted metadata and data. # We return enough bytes so the client can decrypt the metadata. hdr_size = RepoObj.obj_header.size extra_size = 1024 - hdr_size # load a bit more, 1024b, reduces round trips - obj = self.store.load(key, size=hdr_size + extra_size) + load_size = hdr_size + extra_size + if obj_size is not None: + load_size = min(load_size, obj_size) + obj = self.store.load(key, offset=obj_offset, size=load_size) hdr = obj[0:hdr_size] if len(hdr) != hdr_size: raise IntegrityError(f"Object too small [id {id_hex}]: expected {hdr_size}, got {len(hdr)} bytes") @@ -624,7 +627,12 @@ def get(self, id, read_data=True, raise_missing=True): if meta_size > extra_size: # we did not get enough, need to load more, but not all. # this should be rare, as chunk metadata is rather small usually. - obj = self.store.load(key, size=hdr_size + meta_size) + retry_size = hdr_size + meta_size + if obj_size is not None: + # normally a no-op (meta_size <= obj_size - hdr_size for a healthy object); + # guards against a corrupted meta_size producing an oversize read. + retry_size = min(retry_size, obj_size) + obj = self.store.load(key, offset=obj_offset, size=retry_size) meta = obj[hdr_size : hdr_size + meta_size] if len(meta) != meta_size: raise IntegrityError(f"Object too small [id {id_hex}]: expected {meta_size}, got {len(meta)} bytes") diff --git a/src/borg/testsuite/repository_test.py b/src/borg/testsuite/repository_test.py index 86fe55a23f..04de1f37d7 100644 --- a/src/borg/testsuite/repository_test.py +++ b/src/borg/testsuite/repository_test.py @@ -3,7 +3,7 @@ from hashlib import sha256 import pytest -from ..helpers import IntegrityError, Location +from ..helpers import IntegrityError, Location, bin_to_hex from ..repository import Repository, MAX_DATA_SIZE, rest_serve_command, PackWriter from ..repoobj import RepoObj, OBJ_MAGIC, OBJ_VERSION from .hashindex_test import H @@ -198,6 +198,18 @@ def test_pack_writer_n2_flush(): assert results[1] == (id2, expected_pack_id, len(data1), len(data2)) +def test_get_with_range(tmp_path): + # get() passes obj_offset/obj_size through to store.load() for range reads. + chunk1 = fchunk(b"FIRST") + chunk2 = fchunk(b"SECOND") + pack = chunk1 + chunk2 + pack_id = H(42) + with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository: + repository.store_store("packs/" + bin_to_hex(pack_id), pack) + assert repository.get(pack_id, obj_offset=0, obj_size=len(chunk1)) == chunk1 + assert repository.get(pack_id, obj_offset=len(chunk1), obj_size=len(chunk2)) == chunk2 + + def test_pack_writer_final_partial_pack_uses_sha256(): # When max_count > 1, a final flush with only 1 piece must still use SHA256, # not the N=1 pack_id == chunk_id hack. From 3a9f2e882f45805bd88445b50a72373ae0e9dc9e Mon Sep 17 00:00:00 2001 From: Mrityunjay Raj Date: Wed, 10 Jun 2026 10:44:51 +0530 Subject: [PATCH 2/9] repository: add set_chunk_index() for ChunkIndex-based pack routing, refs #8572 Replace _pack_info (session-scoped dict) with a borrowed ChunkIndex reference. Cache passes its index via set_chunk_index(); get() routes correctly for all sessions. --- src/borg/cache.py | 2 ++ src/borg/repository.py | 22 +++++++++--- src/borg/testsuite/repository_test.py | 48 +++++++++++++++++++++++++++ 3 files changed, 67 insertions(+), 5 deletions(-) diff --git a/src/borg/cache.py b/src/borg/cache.py index 992fda6670..8399ab2870 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -679,6 +679,7 @@ def __init__(self): def chunks(self): if self._chunks is None: self._chunks = build_chunkindex_from_repo(self.repository, cache_immediately=True) + self.repository.set_chunk_index(self._chunks) return self._chunks def seen_chunk(self, id, size=None): @@ -869,6 +870,7 @@ def check_cache_compatibility(self): def wipe_cache(self): logger.warning("Discarding incompatible cache and forcing a cache rebuild") self._chunks = ChunkIndex() + self.repository.set_chunk_index(self._chunks) self.cache_config.manifest_id = "" self.cache_config._config.set("cache", "manifest", "") diff --git a/src/borg/repository.py b/src/borg/repository.py index f8b0ffee58..403234efa6 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -285,6 +285,7 @@ def __init__( self.lock_wait = lock_wait self.exclusive = exclusive self._pack_writer = None + self._chunks = None # borrowed ChunkIndex reference, set by set_chunk_index() def __repr__(self): return f"<{self.__class__.__name__} {self._location}>" @@ -407,15 +408,19 @@ def open(self, *, exclusive, lock_wait=None, lock=True): if lock: self.lock = Lock(self.store, exclusive, timeout=lock_wait).acquire() self._pack_writer = PackWriter(self.store, max_count=1) + self._chunks = None self.opened = True - def flush(self): - """Flush any buffered pack writer chunks. Returns pack_results (or None). + def set_chunk_index(self, chunks): + """Set the ChunkIndex get() uses to resolve pack locations. - Callers that maintain a ChunkIndex must call this and pass the result to - chunks.update_pack_info() before closing, so index entries for the last - batch of chunks get real pack location values instead of UNKNOWN_*. + The caller retains ownership; Repository holds a borrowed reference. + Pass None to clear. """ + self._chunks = chunks + + def flush(self): + """Flush any buffered pack writer chunks. Returns pack_results (or None).""" if self._pack_writer is not None: return self._pack_writer.flush() return None @@ -429,6 +434,7 @@ def close(self): if self.store_opened: self.store.close() self.store_opened = False + self._chunks = None self.opened = False def info(self): @@ -606,6 +612,10 @@ def list(self, limit=None, marker=None): def get(self, id, read_data=True, raise_missing=True, obj_offset=0, obj_size=None): self._lock_refresh() pack_id = id # N=1: pack_id == chunk_id + if self._chunks is not None: + entry = self._chunks.get(id) + if entry is not None and entry.pack_id != UNKNOWN_BYTES32: # UNKNOWN: buffered, not yet flushed + pack_id, obj_offset, obj_size = entry.pack_id, entry.obj_offset, entry.obj_size id_hex = bin_to_hex(id) key = "packs/" + bin_to_hex(pack_id) try: @@ -644,6 +654,8 @@ def get(self, id, read_data=True, raise_missing=True, obj_offset=0, obj_size=Non return None def get_many(self, ids, read_data=True, raise_missing=True): + # N>1: set_chunk_index() must be called before any get() so locations from prior sessions + # are available; without it get() falls back to N=1 (pack_id == chunk_id) for every id. for id_ in ids: yield self.get(id_, read_data=read_data, raise_missing=raise_missing) diff --git a/src/borg/testsuite/repository_test.py b/src/borg/testsuite/repository_test.py index 04de1f37d7..5852f3fd8c 100644 --- a/src/borg/testsuite/repository_test.py +++ b/src/borg/testsuite/repository_test.py @@ -4,6 +4,7 @@ import pytest from ..helpers import IntegrityError, Location, bin_to_hex +from ..hashindex import ChunkIndex from ..repository import Repository, MAX_DATA_SIZE, rest_serve_command, PackWriter from ..repoobj import RepoObj, OBJ_MAGIC, OBJ_VERSION from .hashindex_test import H @@ -210,6 +211,53 @@ def test_get_with_range(tmp_path): assert repository.get(pack_id, obj_offset=len(chunk1), obj_size=len(chunk2)) == chunk2 +def test_get_read_data_false_with_range(tmp_path): + # read_data=False with obj_size limits the load to the object boundary. + hdr_size = RepoObj.obj_header.size + chunk1 = fchunk(b"FIRST") + chunk2 = fchunk(b"SECOND") + pack = chunk1 + chunk2 + pack_id = H(43) + with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository: + repository.store_store("packs/" + bin_to_hex(pack_id), pack) + result = repository.get(pack_id, read_data=False, obj_offset=0, obj_size=len(chunk1)) + assert result == chunk1[:hdr_size] # empty meta, so header only + result2 = repository.get(pack_id, read_data=False, obj_offset=len(chunk1), obj_size=len(chunk2)) + assert result2 == chunk2[:hdr_size] + + +def test_get_read_data_false_large_meta(tmp_path): + # When meta_size > extra_size (975 bytes), get() retries with a larger load. + hdr_size = RepoObj.obj_header.size + big_meta = b"M" * 1000 # 1000 > 975, forces the retry load + chunk = fchunk(b"DATA", meta=big_meta) + pack_id = H(44) + with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository: + repository.store_store("packs/" + bin_to_hex(pack_id), chunk) + result = repository.get(pack_id, read_data=False, obj_offset=0, obj_size=len(chunk)) + assert result == chunk[: hdr_size + len(big_meta)] + + +def test_get_uses_chunk_index_location(tmp_path): + # get() routes to the correct pack and offset when a ChunkIndex is set via set_chunk_index(). + chunk1 = fchunk(b"FIRST") + chunk2 = fchunk(b"SECOND") + pack = chunk1 + chunk2 + pack_id = H(55) + id1, id2 = H(56), H(57) + with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository: + # Inject the pack directly; bypasses PackWriter to test routing independently. + repository.store_store("packs/" + bin_to_hex(pack_id), pack) + chunks = ChunkIndex() + chunks.add(id1, len(chunk1)) + chunks.update_pack_info([(id1, pack_id, 0, len(chunk1))]) + chunks.add(id2, len(chunk2)) + chunks.update_pack_info([(id2, pack_id, len(chunk1), len(chunk2))]) + repository.set_chunk_index(chunks) + assert repository.get(id1) == chunk1 + assert repository.get(id2) == chunk2 + + def test_pack_writer_final_partial_pack_uses_sha256(): # When max_count > 1, a final flush with only 1 piece must still use SHA256, # not the N=1 pack_id == chunk_id hack. From 8ab36b33a25f13a816020848eeb9f16a278b8a42 Mon Sep 17 00:00:00 2001 From: Mrityunjay Raj Date: Wed, 10 Jun 2026 22:12:01 +0530 Subject: [PATCH 3/9] repository: resolve pack location from ChunkIndex in get(), flush handles update_pack_info Remove obj_offset/obj_size params from get(); always initialize _chunks to an empty ChunkIndex so callers never need to guard for None. --- src/borg/cache.py | 5 +--- src/borg/repository.py | 30 +++++++++++------------ src/borg/testsuite/repository_test.py | 34 +++++++++++++-------------- 3 files changed, 31 insertions(+), 38 deletions(-) diff --git a/src/borg/cache.py b/src/borg/cache.py index 8399ab2870..1a95d0408b 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -832,11 +832,8 @@ def open(self): def close(self): self.security_manager.save(self.manifest, self.key) pi = ProgressIndicatorMessage(msgid="cache.close") - # Flush any chunks still buffered in the pack writer and update the index - # so the last batch gets real pack location values instead of UNKNOWN_*. if self._chunks is not None: - pack_results = self.repository.flush() - self._chunks.update_pack_info(pack_results) + self.repository.flush() if self._files is not None: pi.output("Saving files cache") integrity_data = self._write_files_cache(self._files) diff --git a/src/borg/repository.py b/src/borg/repository.py index 403234efa6..1a7690ad6a 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -408,22 +408,23 @@ def open(self, *, exclusive, lock_wait=None, lock=True): if lock: self.lock = Lock(self.store, exclusive, timeout=lock_wait).acquire() self._pack_writer = PackWriter(self.store, max_count=1) - self._chunks = None + self._chunks = ChunkIndex() self.opened = True def set_chunk_index(self, chunks): """Set the ChunkIndex get() uses to resolve pack locations. The caller retains ownership; Repository holds a borrowed reference. - Pass None to clear. + Pass None to reset to an empty index. """ - self._chunks = chunks + self._chunks = chunks if chunks is not None else ChunkIndex() def flush(self): - """Flush any buffered pack writer chunks. Returns pack_results (or None).""" + """Flush any buffered pack writer chunks.""" if self._pack_writer is not None: - return self._pack_writer.flush() - return None + pack_results = self._pack_writer.flush() + if pack_results: + self._chunks.update_pack_info(pack_results) def close(self): if self._pack_writer is not None: @@ -609,13 +610,13 @@ def list(self, limit=None, marker=None): # note: do not collect the marker id return result - def get(self, id, read_data=True, raise_missing=True, obj_offset=0, obj_size=None): + def get(self, id, read_data=True, raise_missing=True): self._lock_refresh() - pack_id = id # N=1: pack_id == chunk_id - if self._chunks is not None: - entry = self._chunks.get(id) - if entry is not None and entry.pack_id != UNKNOWN_BYTES32: # UNKNOWN: buffered, not yet flushed - pack_id, obj_offset, obj_size = entry.pack_id, entry.obj_offset, entry.obj_size + pack_id = id # N=1 fallback: pack_id == chunk_id + obj_offset, obj_size = 0, None + entry = self._chunks.get(id) + if entry is not None and entry.pack_id != UNKNOWN_BYTES32: # UNKNOWN: buffered, not yet flushed + pack_id, obj_offset, obj_size = entry.pack_id, entry.obj_offset, entry.obj_size id_hex = bin_to_hex(id) key = "packs/" + bin_to_hex(pack_id) try: @@ -654,8 +655,6 @@ def get(self, id, read_data=True, raise_missing=True, obj_offset=0, obj_size=Non return None def get_many(self, ids, read_data=True, raise_missing=True): - # N>1: set_chunk_index() must be called before any get() so locations from prior sessions - # are available; without it get() falls back to N=1 (pack_id == chunk_id) for every id. for id_ in ids: yield self.get(id_, read_data=read_data, raise_missing=raise_missing) @@ -667,8 +666,7 @@ def put(self, id, data, wait=True): Returns a list of (chunk_id, pack_id, obj_offset, obj_size) tuples for every chunk written to disk this call. At max_count=1 this is always - one entry. Callers should pass these to ChunkIndex.update_pack_info() - so the index holds real location values rather than UNKNOWN_INT32 placeholders. + one entry. """ self._lock_refresh() data_size = len(data) diff --git a/src/borg/testsuite/repository_test.py b/src/borg/testsuite/repository_test.py index 5852f3fd8c..04ba258580 100644 --- a/src/borg/testsuite/repository_test.py +++ b/src/borg/testsuite/repository_test.py @@ -199,31 +199,24 @@ def test_pack_writer_n2_flush(): assert results[1] == (id2, expected_pack_id, len(data1), len(data2)) -def test_get_with_range(tmp_path): - # get() passes obj_offset/obj_size through to store.load() for range reads. - chunk1 = fchunk(b"FIRST") - chunk2 = fchunk(b"SECOND") - pack = chunk1 + chunk2 - pack_id = H(42) - with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository: - repository.store_store("packs/" + bin_to_hex(pack_id), pack) - assert repository.get(pack_id, obj_offset=0, obj_size=len(chunk1)) == chunk1 - assert repository.get(pack_id, obj_offset=len(chunk1), obj_size=len(chunk2)) == chunk2 - - def test_get_read_data_false_with_range(tmp_path): - # read_data=False with obj_size limits the load to the object boundary. + # read_data=False with ChunkIndex entries limits the load to each object's boundary. hdr_size = RepoObj.obj_header.size chunk1 = fchunk(b"FIRST") chunk2 = fchunk(b"SECOND") pack = chunk1 + chunk2 pack_id = H(43) + id1, id2 = H(47), H(48) with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository: repository.store_store("packs/" + bin_to_hex(pack_id), pack) - result = repository.get(pack_id, read_data=False, obj_offset=0, obj_size=len(chunk1)) - assert result == chunk1[:hdr_size] # empty meta, so header only - result2 = repository.get(pack_id, read_data=False, obj_offset=len(chunk1), obj_size=len(chunk2)) - assert result2 == chunk2[:hdr_size] + chunks = ChunkIndex() + chunks.add(id1, len(chunk1)) + chunks.update_pack_info([(id1, pack_id, 0, len(chunk1))]) + chunks.add(id2, len(chunk2)) + chunks.update_pack_info([(id2, pack_id, len(chunk1), len(chunk2))]) + repository.set_chunk_index(chunks) + assert repository.get(id1, read_data=False) == chunk1[:hdr_size] + assert repository.get(id2, read_data=False) == chunk2[:hdr_size] def test_get_read_data_false_large_meta(tmp_path): @@ -232,9 +225,14 @@ def test_get_read_data_false_large_meta(tmp_path): big_meta = b"M" * 1000 # 1000 > 975, forces the retry load chunk = fchunk(b"DATA", meta=big_meta) pack_id = H(44) + chunk_id = H(49) with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository: repository.store_store("packs/" + bin_to_hex(pack_id), chunk) - result = repository.get(pack_id, read_data=False, obj_offset=0, obj_size=len(chunk)) + chunks = ChunkIndex() + chunks.add(chunk_id, len(chunk)) + chunks.update_pack_info([(chunk_id, pack_id, 0, len(chunk))]) + repository.set_chunk_index(chunks) + result = repository.get(chunk_id, read_data=False) assert result == chunk[: hdr_size + len(big_meta)] From 039b561fdf0ea2eebe739bdc4cf1d2085b5a4fa9 Mon Sep 17 00:00:00 2001 From: Mrityunjay Raj Date: Wed, 10 Jun 2026 23:01:29 +0530 Subject: [PATCH 4/9] repository: remove N=1 fallback from get(), update _chunks eagerly in put() get() raises ObjectNotFound when entry is missing or UNKNOWN_BYTES32; put() marks the id in _chunks immediately so the index is live after each write. --- src/borg/repository.py | 19 +++++++++++-------- src/borg/testsuite/repository_test.py | 20 +++++++++++++++++++- 2 files changed, 30 insertions(+), 9 deletions(-) diff --git a/src/borg/repository.py b/src/borg/repository.py index 1a7690ad6a..2fb2a5aadd 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -415,9 +415,8 @@ def set_chunk_index(self, chunks): """Set the ChunkIndex get() uses to resolve pack locations. The caller retains ownership; Repository holds a borrowed reference. - Pass None to reset to an empty index. """ - self._chunks = chunks if chunks is not None else ChunkIndex() + self._chunks = chunks def flush(self): """Flush any buffered pack writer chunks.""" @@ -435,7 +434,6 @@ def close(self): if self.store_opened: self.store.close() self.store_opened = False - self._chunks = None self.opened = False def info(self): @@ -612,11 +610,12 @@ def list(self, limit=None, marker=None): def get(self, id, read_data=True, raise_missing=True): self._lock_refresh() - pack_id = id # N=1 fallback: pack_id == chunk_id - obj_offset, obj_size = 0, None entry = self._chunks.get(id) - if entry is not None and entry.pack_id != UNKNOWN_BYTES32: # UNKNOWN: buffered, not yet flushed - pack_id, obj_offset, obj_size = entry.pack_id, entry.obj_offset, entry.obj_size + if entry is None or entry.pack_id == UNKNOWN_BYTES32: + if raise_missing: + raise self.ObjectNotFound(id, str(self._location)) + return None + pack_id, obj_offset, obj_size = entry.pack_id, entry.obj_offset, entry.obj_size id_hex = bin_to_hex(id) key = "packs/" + bin_to_hex(pack_id) try: @@ -672,7 +671,11 @@ def put(self, id, data, wait=True): data_size = len(data) if data_size > MAX_DATA_SIZE: raise IntegrityError(f"More than allowed put data [{data_size} > {MAX_DATA_SIZE}]") - return self._pack_writer.add(id, data) + pack_results = self._pack_writer.add(id, data) + self._chunks.add(id, 0) # mark seen; uncompressed size filled in by cache layer + if pack_results: + self._chunks.update_pack_info(pack_results) + return pack_results def delete(self, id, wait=True): """delete a repo object diff --git a/src/borg/testsuite/repository_test.py b/src/borg/testsuite/repository_test.py index 04ba258580..bba7faea43 100644 --- a/src/borg/testsuite/repository_test.py +++ b/src/borg/testsuite/repository_test.py @@ -77,15 +77,21 @@ def pdchunk(chunk): def test_basic_operations(repo_fixtures, request): + chunks = ChunkIndex() with get_repository_from_fixture(repo_fixtures, request) as repository: for x in range(100): - repository.put(H(x), fchunk(b"SOMEDATA")) + pack_results = repository.put(H(x), fchunk(b"SOMEDATA")) + if pack_results: + for chunk_id, *_ in pack_results: + chunks.add(chunk_id, 0) + chunks.update_pack_info(pack_results) key50 = H(50) assert pdchunk(repository.get(key50)) == b"SOMEDATA" repository.delete(key50) with pytest.raises(Repository.ObjectNotFound): repository.get(key50) with reopen(repository) as repository: + repository.set_chunk_index(chunks) with pytest.raises(Repository.ObjectNotFound): repository.get(key50) for x in range(100): @@ -256,6 +262,18 @@ def test_get_uses_chunk_index_location(tmp_path): assert repository.get(id2) == chunk2 +def test_put_marks_id_in_chunk_index(tmp_path): + # put() immediately updates _chunks: add() marks the id as seen, then update_pack_info + # fills in the real pack location for the current session. + with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository: + id1 = H(1) + repository.put(id1, fchunk(b"ZEROS")) + entry = repository._chunks.get(id1) + assert entry is not None + assert entry.pack_id == id1 # N=1: pack_id == chunk_id, set by update_pack_info in put() + assert entry.size == 0 # uncompressed size filled in by cache layer + + def test_pack_writer_final_partial_pack_uses_sha256(): # When max_count > 1, a final flush with only 1 piece must still use SHA256, # not the N=1 pack_id == chunk_id hack. From 62406c3108969d3eae7b75ee2c325806c1de91ce Mon Sep 17 00:00:00 2001 From: Mrityunjay Raj Date: Thu, 11 Jun 2026 10:10:11 +0530 Subject: [PATCH 5/9] repository: PackWriter now manages ChunkIndex updates internally On add(), marks chunk with UNKNOWN_BYTES32; on flush(), replaces with real pack_id. put(), flush(), and set_chunk_index() simplified accordingly. --- src/borg/repository.py | 51 ++++++++++++++++++--------- src/borg/testsuite/repository_test.py | 8 ++--- 2 files changed, 37 insertions(+), 22 deletions(-) diff --git a/src/borg/repository.py b/src/borg/repository.py index 2fb2a5aadd..39017847dd 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -103,8 +103,10 @@ class PackWriter: """Buffers chunks into a pack file and writes to the store when full. Collects (chunk_id, cdata) pairs in a list and flushes once max_count is - reached. On flush it returns the location info for every chunk so the - caller can update the ChunkIndex with real values. + reached. If a ChunkIndex is provided via the chunks parameter, PackWriter + maintains it directly: each add() marks the chunk as pending + (pack_id=UNKNOWN_BYTES32) with the correct offset and size; flush() then + updates all pending entries with the real pack_id once the pack is on disk. At max_count=1 (N=1 phase) each put() maps exactly one chunk to one pack, so pack_id == chunk_id and the naming scheme is unchanged from before. @@ -112,14 +114,21 @@ class PackWriter: this class's interface. """ - def __init__(self, store, *, max_count=1): + def __init__(self, store, *, max_count=1, chunks=None): self.store = store self.max_count = max_count + self.chunks = chunks # ChunkIndex reference; may be None (e.g. in unit tests) self._pieces = [] # list of (chunk_id, cdata) + self._current_offset = 0 # byte offset of the next chunk within the current pack def add(self, chunk_id, cdata): """Buffer a chunk. Returns flush results if the pack is now full, else None.""" + if self.chunks is not None: + # Mark chunk as pending: real offset/size are known now; pack_id not yet. + self.chunks.add(chunk_id, 0) # size filled in by cache layer + self.chunks.update_pack_info([(chunk_id, UNKNOWN_BYTES32, self._current_offset, len(cdata))]) self._pieces.append((chunk_id, cdata)) + self._current_offset += len(cdata) if len(self._pieces) >= self.max_count: return self.flush() return None @@ -127,9 +136,9 @@ def add(self, chunk_id, cdata): def flush(self): """Write the current pack to the store. - Returns a list of (chunk_id, pack_id, obj_offset, obj_size) tuples — + Returns a list of (chunk_id, pack_id, obj_offset, obj_size) tuples -- one entry per chunk that was written. Returns None if there was nothing - to flush. + to flush. Also updates the ChunkIndex (if set) with the real pack_id. """ if not self._pieces: return None @@ -161,6 +170,9 @@ def flush(self): self.store.store(key, pack_data) finally: self._pieces = [] # reset even on failure to prevent re-bundling a failed chunk + self._current_offset = 0 + if self.chunks is not None: + self.chunks.update_pack_info(results) # replace UNKNOWN_BYTES32 with real pack_id return results @@ -407,8 +419,8 @@ def open(self, *, exclusive, lock_wait=None, lock=True): # important: lock *after* making sure that there actually is an existing, supported repository. if lock: self.lock = Lock(self.store, exclusive, timeout=lock_wait).acquire() - self._pack_writer = PackWriter(self.store, max_count=1) self._chunks = ChunkIndex() + self._pack_writer = PackWriter(self.store, max_count=1, chunks=self._chunks) self.opened = True def set_chunk_index(self, chunks): @@ -417,13 +429,12 @@ def set_chunk_index(self, chunks): The caller retains ownership; Repository holds a borrowed reference. """ self._chunks = chunks + self._pack_writer.chunks = chunks # keep PackWriter in sync def flush(self): """Flush any buffered pack writer chunks.""" if self._pack_writer is not None: - pack_results = self._pack_writer.flush() - if pack_results: - self._chunks.update_pack_info(pack_results) + self._pack_writer.flush() # PackWriter updates _chunks internally def close(self): if self._pack_writer is not None: @@ -611,11 +622,21 @@ def list(self, limit=None, marker=None): def get(self, id, read_data=True, raise_missing=True): self._lock_refresh() entry = self._chunks.get(id) - if entry is None or entry.pack_id == UNKNOWN_BYTES32: + if entry is not None and entry.pack_id == UNKNOWN_BYTES32: + # chunk is buffered in PackWriter, not yet written to a pack file if raise_missing: raise self.ObjectNotFound(id, str(self._location)) return None - pack_id, obj_offset, obj_size = entry.pack_id, entry.obj_offset, entry.obj_size + if entry is not None: + pack_id, obj_offset, obj_size = entry.pack_id, entry.obj_offset, entry.obj_size + else: + # N=1 fallback for cross-session reads without a populated index. + # obj_size=None tells store.load() to fetch the full file; safe for N=1 + # because each pack holds exactly one chunk. Once a Repository.chunks + # lazy-build property lands, this branch will be unreachable. + pack_id = id # N=1: pack_id == chunk_id + obj_offset = 0 + obj_size = None id_hex = bin_to_hex(id) key = "packs/" + bin_to_hex(pack_id) try: @@ -627,6 +648,8 @@ def get(self, id, read_data=True, raise_missing=True): hdr_size = RepoObj.obj_header.size extra_size = 1024 - hdr_size # load a bit more, 1024b, reduces round trips load_size = hdr_size + extra_size + # obj_size is None only via the N=1 fallback above; clamp when available + # so a corrupted or malicious obj_size cannot cause an oversized read. if obj_size is not None: load_size = min(load_size, obj_size) obj = self.store.load(key, offset=obj_offset, size=load_size) @@ -671,11 +694,7 @@ def put(self, id, data, wait=True): data_size = len(data) if data_size > MAX_DATA_SIZE: raise IntegrityError(f"More than allowed put data [{data_size} > {MAX_DATA_SIZE}]") - pack_results = self._pack_writer.add(id, data) - self._chunks.add(id, 0) # mark seen; uncompressed size filled in by cache layer - if pack_results: - self._chunks.update_pack_info(pack_results) - return pack_results + return self._pack_writer.add(id, data) # PackWriter updates _chunks internally def delete(self, id, wait=True): """delete a repo object diff --git a/src/borg/testsuite/repository_test.py b/src/borg/testsuite/repository_test.py index bba7faea43..a0be42ece5 100644 --- a/src/borg/testsuite/repository_test.py +++ b/src/borg/testsuite/repository_test.py @@ -77,19 +77,15 @@ def pdchunk(chunk): def test_basic_operations(repo_fixtures, request): - chunks = ChunkIndex() with get_repository_from_fixture(repo_fixtures, request) as repository: for x in range(100): - pack_results = repository.put(H(x), fchunk(b"SOMEDATA")) - if pack_results: - for chunk_id, *_ in pack_results: - chunks.add(chunk_id, 0) - chunks.update_pack_info(pack_results) + repository.put(H(x), fchunk(b"SOMEDATA")) # put() updates _chunks via PackWriter key50 = H(50) assert pdchunk(repository.get(key50)) == b"SOMEDATA" repository.delete(key50) with pytest.raises(Repository.ObjectNotFound): repository.get(key50) + chunks = repository._chunks # capture index before close with reopen(repository) as repository: repository.set_chunk_index(chunks) with pytest.raises(Repository.ObjectNotFound): From 6fbc8cb5ddb131449f8d93618c3bcf8d40d3d345 Mon Sep 17 00:00:00 2001 From: Mrityunjay Raj Date: Thu, 11 Jun 2026 11:17:11 +0530 Subject: [PATCH 6/9] repository: add BORG_TESTONLY_SHA256_PACK_ID to force sha256 pack_ids at N=1 Adds tox env and informational CI job (continue-on-error) to track progress toward full sha256 pack_id adoption, refs #8572 --- .github/workflows/ci.yml | 51 ++++++++++++++++++++++++++++++++++++++++ pyproject.toml | 5 ++++ src/borg/repository.py | 6 +++-- 3 files changed, 60 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5479551182..95288304b0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -703,3 +703,54 @@ jobs: report_type: coverage env_vars: OS,python files: coverage.xml + + sha256_pack_id_tests: + name: sha256 pack-id (informational) + needs: [lint] + runs-on: ubuntu-24.04 + timeout-minutes: 90 + continue-on-error: true + + steps: + - uses: actions/checkout@v6 + with: + fetch-depth: 0 + fetch-tags: true + + - name: Set up Python 3.12 + uses: actions/setup-python@v6 + with: + python-version: "3.12" + + - name: Cache pip + uses: actions/cache@v5 + with: + path: ~/.cache/pip + key: ${{ runner.os }}-${{ runner.arch }}-pip-sha256-pack-id-${{ hashFiles('requirements.d/development.lock.txt') }} + restore-keys: | + ${{ runner.os }}-${{ runner.arch }}-pip- + + - name: Cache tox environments + uses: actions/cache@v5 + with: + path: .tox + key: ${{ runner.os }}-${{ runner.arch }}-tox-sha256-pack-id-${{ hashFiles('requirements.d/development.lock.txt', 'pyproject.toml') }} + restore-keys: | + ${{ runner.os }}-${{ runner.arch }}-tox-sha256-pack-id- + + - name: Install Linux packages + run: | + sudo apt-get update + sudo apt-get install -y pkg-config build-essential + sudo apt-get install -y libssl-dev libacl1-dev liblz4-dev + + - name: Install Python requirements + run: | + python -m pip install --upgrade pip setuptools wheel + pip install -r requirements.d/development.lock.txt + + - name: Install borgbackup + run: pip install -ve ".[cockpit,s3,sftp,rclone]" + + - name: Run tests with sha256 pack-ids + run: tox -e sha256-pack-id diff --git a/pyproject.toml b/pyproject.toml index 683a463726..c9e0ec8cc2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -257,6 +257,11 @@ extras = ["pyfuse3", "sftp", "s3", "rclone"] set_env = {BORG_FUSE_IMPL = "mfusepy"} extras = ["mfusepy", "sftp", "s3", "rclone"] +# Informational env: forces sha256 pack_ids even with max_count=1 to expose +# code that still assumes pack_id == chunk_id. Run: tox -e sha256-pack-id +[tool.tox.env.sha256-pack-id] +set_env = {BORG_TESTONLY_SHA256_PACK_ID = "1"} + [tool.tox.env.ruff] skip_install = true deps = ["ruff"] diff --git a/src/borg/repository.py b/src/borg/repository.py index 39017847dd..da855387db 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -152,10 +152,12 @@ def flush(self): # (backward-compatible file naming: packs/{chunk_id_hex}). # N>1: the pack contains multiple chunks; use SHA256(pack_bytes) so the # file is content-addressed and borgstore can verify/cache it. - if self.max_count == 1: + # BORG_TESTONLY_SHA256_PACK_ID: always use sha256 even at N=1, exposing code + # that still assumes pack_id == chunk_id. + if self.max_count == 1 and os.environ.get("BORG_TESTONLY_SHA256_PACK_ID") != "1": pack_id = self._pieces[0][0] # N=1: pack_id == chunk_id else: - pack_id = sha256(pack_data).digest() # N>1: content-addressed + pack_id = sha256(pack_data).digest() # Record (chunk_id, pack_id, obj_offset, obj_size) for every piece. results = [] From 4266cfb11f9d1a28edb0796db5da1599efa84dc5 Mon Sep 17 00:00:00 2001 From: Mrityunjay Raj Date: Thu, 11 Jun 2026 12:51:57 +0530 Subject: [PATCH 7/9] repository: add set_chunk_index() and lazy chunks property; route get() through ChunkIndex PackWriter now always owns a ChunkIndex; the N=1 fallback in get() is removed. --- src/borg/repository.py | 72 ++++++++++++++++++++++++------------------ 1 file changed, 42 insertions(+), 30 deletions(-) diff --git a/src/borg/repository.py b/src/borg/repository.py index da855387db..e31333cf49 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -117,16 +117,15 @@ class PackWriter: def __init__(self, store, *, max_count=1, chunks=None): self.store = store self.max_count = max_count - self.chunks = chunks # ChunkIndex reference; may be None (e.g. in unit tests) + self.chunks = chunks if chunks is not None else ChunkIndex() self._pieces = [] # list of (chunk_id, cdata) self._current_offset = 0 # byte offset of the next chunk within the current pack def add(self, chunk_id, cdata): """Buffer a chunk. Returns flush results if the pack is now full, else None.""" - if self.chunks is not None: - # Mark chunk as pending: real offset/size are known now; pack_id not yet. - self.chunks.add(chunk_id, 0) # size filled in by cache layer - self.chunks.update_pack_info([(chunk_id, UNKNOWN_BYTES32, self._current_offset, len(cdata))]) + # Mark chunk as pending: real offset/size are known now; pack_id not yet. + self.chunks.add(chunk_id, 0) # size filled in by cache layer + self.chunks.update_pack_info([(chunk_id, UNKNOWN_BYTES32, self._current_offset, len(cdata))]) self._pieces.append((chunk_id, cdata)) self._current_offset += len(cdata) if len(self._pieces) >= self.max_count: @@ -138,7 +137,7 @@ def flush(self): Returns a list of (chunk_id, pack_id, obj_offset, obj_size) tuples -- one entry per chunk that was written. Returns None if there was nothing - to flush. Also updates the ChunkIndex (if set) with the real pack_id. + to flush. Always updates the ChunkIndex with the real pack_id. """ if not self._pieces: return None @@ -173,8 +172,7 @@ def flush(self): finally: self._pieces = [] # reset even on failure to prevent re-bundling a failed chunk self._current_offset = 0 - if self.chunks is not None: - self.chunks.update_pack_info(results) # replace UNKNOWN_BYTES32 with real pack_id + self.chunks.update_pack_info(results) # replace UNKNOWN_BYTES32 with real pack_id return results @@ -299,7 +297,8 @@ def __init__( self.lock_wait = lock_wait self.exclusive = exclusive self._pack_writer = None - self._chunks = None # borrowed ChunkIndex reference, set by set_chunk_index() + self._chunks = None # ChunkIndex; set by open(), replaced by set_chunk_index() + self._chunks_initialized = False def __repr__(self): return f"<{self.__class__.__name__} {self._location}>" @@ -422,6 +421,7 @@ def open(self, *, exclusive, lock_wait=None, lock=True): if lock: self.lock = Lock(self.store, exclusive, timeout=lock_wait).acquire() self._chunks = ChunkIndex() + self._chunks_initialized = False self._pack_writer = PackWriter(self.store, max_count=1, chunks=self._chunks) self.opened = True @@ -432,6 +432,27 @@ def set_chunk_index(self, chunks): """ self._chunks = chunks self._pack_writer.chunks = chunks # keep PackWriter in sync + self._chunks_initialized = True + + @property + def chunks(self): + """ChunkIndex mapping every known chunk id to its pack location. + + Built lazily on first access if set_chunk_index() has not been called. + Current-session put() entries (which carry the precise pack_id, offset, + and size set by PackWriter) are overlaid on top of the store-built + entries so they always win. + """ + if not self._chunks_initialized: + from .cache import build_chunkindex_from_repo + + built = build_chunkindex_from_repo(self) + for k, v in self._chunks.iteritems(): + built[k] = v + self._chunks = built + self._pack_writer.chunks = built + self._chunks_initialized = True + return self._chunks def flush(self): """Flush any buffered pack writer chunks.""" @@ -623,22 +644,17 @@ def list(self, limit=None, marker=None): def get(self, id, read_data=True, raise_missing=True): self._lock_refresh() - entry = self._chunks.get(id) - if entry is not None and entry.pack_id == UNKNOWN_BYTES32: - # chunk is buffered in PackWriter, not yet written to a pack file + entry = self.chunks.get(id) + if entry is None: if raise_missing: raise self.ObjectNotFound(id, str(self._location)) return None - if entry is not None: - pack_id, obj_offset, obj_size = entry.pack_id, entry.obj_offset, entry.obj_size - else: - # N=1 fallback for cross-session reads without a populated index. - # obj_size=None tells store.load() to fetch the full file; safe for N=1 - # because each pack holds exactly one chunk. Once a Repository.chunks - # lazy-build property lands, this branch will be unreachable. - pack_id = id # N=1: pack_id == chunk_id - obj_offset = 0 - obj_size = None + if entry.pack_id == UNKNOWN_BYTES32: + # chunk is buffered in PackWriter, not yet flushed to a pack file + if raise_missing: + raise self.ObjectNotFound(id, str(self._location)) + return None + pack_id, obj_offset, obj_size = entry.pack_id, entry.obj_offset, entry.obj_size id_hex = bin_to_hex(id) key = "packs/" + bin_to_hex(pack_id) try: @@ -650,10 +666,8 @@ def get(self, id, read_data=True, raise_missing=True): hdr_size = RepoObj.obj_header.size extra_size = 1024 - hdr_size # load a bit more, 1024b, reduces round trips load_size = hdr_size + extra_size - # obj_size is None only via the N=1 fallback above; clamp when available - # so a corrupted or malicious obj_size cannot cause an oversized read. - if obj_size is not None: - load_size = min(load_size, obj_size) + # clamp so a corrupted or malicious obj_size cannot cause an oversized read. + load_size = min(load_size, obj_size) obj = self.store.load(key, offset=obj_offset, size=load_size) hdr = obj[0:hdr_size] if len(hdr) != hdr_size: @@ -663,10 +677,8 @@ def get(self, id, read_data=True, raise_missing=True): # we did not get enough, need to load more, but not all. # this should be rare, as chunk metadata is rather small usually. retry_size = hdr_size + meta_size - if obj_size is not None: - # normally a no-op (meta_size <= obj_size - hdr_size for a healthy object); - # guards against a corrupted meta_size producing an oversize read. - retry_size = min(retry_size, obj_size) + # normally a no-op; guards against a corrupted meta_size producing an oversize read. + retry_size = min(retry_size, obj_size) obj = self.store.load(key, offset=obj_offset, size=retry_size) meta = obj[hdr_size : hdr_size + meta_size] if len(meta) != meta_size: From 022c25ca9985a71a7d0fd712484fbff1176949ff Mon Sep 17 00:00:00 2001 From: Mrityunjay Raj Date: Thu, 11 Jun 2026 13:13:33 +0530 Subject: [PATCH 8/9] repository: simplify chunks property; trigger lazy build before put() Remove overlay loop: put() accesses self.chunks first so PackWriter.chunks is updated before any write. --- src/borg/repository.py | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/src/borg/repository.py b/src/borg/repository.py index e31333cf49..4bacfa1235 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -439,18 +439,12 @@ def chunks(self): """ChunkIndex mapping every known chunk id to its pack location. Built lazily on first access if set_chunk_index() has not been called. - Current-session put() entries (which carry the precise pack_id, offset, - and size set by PackWriter) are overlaid on top of the store-built - entries so they always win. """ if not self._chunks_initialized: from .cache import build_chunkindex_from_repo - built = build_chunkindex_from_repo(self) - for k, v in self._chunks.iteritems(): - built[k] = v - self._chunks = built - self._pack_writer.chunks = built + self._chunks = build_chunkindex_from_repo(self) + self._pack_writer.chunks = self._chunks self._chunks_initialized = True return self._chunks @@ -708,7 +702,8 @@ def put(self, id, data, wait=True): data_size = len(data) if data_size > MAX_DATA_SIZE: raise IntegrityError(f"More than allowed put data [{data_size} > {MAX_DATA_SIZE}]") - return self._pack_writer.add(id, data) # PackWriter updates _chunks internally + _ = self.chunks # ensure lazy build ran and PackWriter.chunks is current + return self._pack_writer.add(id, data) def delete(self, id, wait=True): """delete a repo object From 48372786d80c282d419dc363cd3e543315dc96c4 Mon Sep 17 00:00:00 2001 From: Mrityunjay Raj Date: Thu, 11 Jun 2026 19:19:34 +0530 Subject: [PATCH 9/9] ci/tests: exempt BORG_TESTONLY_SHA256_PACK_ID from clean_env; add concurrency to sha256 job env var was wiped before every test; sha256 job now gets its own concurrency group so it is not cancelled mid-run --- .github/workflows/ci.yml | 3 +++ src/borg/conftest.py | 6 +++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 95288304b0..9a8d5a08fc 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -710,6 +710,9 @@ jobs: runs-on: ubuntu-24.04 timeout-minutes: 90 continue-on-error: true + concurrency: + group: sha256-pack-id-${{ github.head_ref || github.ref }} + cancel-in-progress: false steps: - uses: actions/checkout@v6 diff --git a/src/borg/conftest.py b/src/borg/conftest.py index 557cb66f76..e8542e08ea 100644 --- a/src/borg/conftest.py +++ b/src/borg/conftest.py @@ -21,7 +21,11 @@ @pytest.fixture(autouse=True) def clean_env(tmpdir_factory, monkeypatch): # also avoid to use anything from the outside environment: - keys = [key for key in os.environ if key.startswith("BORG_") and key not in ("BORG_FUSE_IMPL",)] + keys = [ + key + for key in os.environ + if key.startswith("BORG_") and key not in ("BORG_FUSE_IMPL", "BORG_TESTONLY_SHA256_PACK_ID") + ] for key in keys: monkeypatch.delenv(key, raising=False) # avoid that we access / modify the user's normal .config / .cache directory: