diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5479551182..9a8d5a08fc 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -703,3 +703,57 @@ jobs: report_type: coverage env_vars: OS,python files: coverage.xml + + sha256_pack_id_tests: + name: sha256 pack-id (informational) + needs: [lint] + runs-on: ubuntu-24.04 + timeout-minutes: 90 + continue-on-error: true + concurrency: + group: sha256-pack-id-${{ github.head_ref || github.ref }} + cancel-in-progress: false + + steps: + - uses: actions/checkout@v6 + with: + fetch-depth: 0 + fetch-tags: true + + - name: Set up Python 3.12 + uses: actions/setup-python@v6 + with: + python-version: "3.12" + + - name: Cache pip + uses: actions/cache@v5 + with: + path: ~/.cache/pip + key: ${{ runner.os }}-${{ runner.arch }}-pip-sha256-pack-id-${{ hashFiles('requirements.d/development.lock.txt') }} + restore-keys: | + ${{ runner.os }}-${{ runner.arch }}-pip- + + - name: Cache tox environments + uses: actions/cache@v5 + with: + path: .tox + key: ${{ runner.os }}-${{ runner.arch }}-tox-sha256-pack-id-${{ hashFiles('requirements.d/development.lock.txt', 'pyproject.toml') }} + restore-keys: | + ${{ runner.os }}-${{ runner.arch }}-tox-sha256-pack-id- + + - name: Install Linux packages + run: | + sudo apt-get update + sudo apt-get install -y pkg-config build-essential + sudo apt-get install -y libssl-dev libacl1-dev liblz4-dev + + - name: Install Python requirements + run: | + python -m pip install --upgrade pip setuptools wheel + pip install -r requirements.d/development.lock.txt + + - name: Install borgbackup + run: pip install -ve ".[cockpit,s3,sftp,rclone]" + + - name: Run tests with sha256 pack-ids + run: tox -e sha256-pack-id diff --git a/pyproject.toml b/pyproject.toml index 683a463726..c9e0ec8cc2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -257,6 +257,11 @@ extras = ["pyfuse3", "sftp", "s3", "rclone"] set_env = {BORG_FUSE_IMPL = "mfusepy"} extras = ["mfusepy", "sftp", "s3", "rclone"] +# Informational env: forces sha256 pack_ids even with max_count=1 to expose +# code that still assumes pack_id == chunk_id. Run: tox -e sha256-pack-id +[tool.tox.env.sha256-pack-id] +set_env = {BORG_TESTONLY_SHA256_PACK_ID = "1"} + [tool.tox.env.ruff] skip_install = true deps = ["ruff"] diff --git a/src/borg/cache.py b/src/borg/cache.py index 992fda6670..1a95d0408b 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -679,6 +679,7 @@ def __init__(self): def chunks(self): if self._chunks is None: self._chunks = build_chunkindex_from_repo(self.repository, cache_immediately=True) + self.repository.set_chunk_index(self._chunks) return self._chunks def seen_chunk(self, id, size=None): @@ -831,11 +832,8 @@ def open(self): def close(self): self.security_manager.save(self.manifest, self.key) pi = ProgressIndicatorMessage(msgid="cache.close") - # Flush any chunks still buffered in the pack writer and update the index - # so the last batch gets real pack location values instead of UNKNOWN_*. if self._chunks is not None: - pack_results = self.repository.flush() - self._chunks.update_pack_info(pack_results) + self.repository.flush() if self._files is not None: pi.output("Saving files cache") integrity_data = self._write_files_cache(self._files) @@ -869,6 +867,7 @@ def check_cache_compatibility(self): def wipe_cache(self): logger.warning("Discarding incompatible cache and forcing a cache rebuild") self._chunks = ChunkIndex() + self.repository.set_chunk_index(self._chunks) self.cache_config.manifest_id = "" self.cache_config._config.set("cache", "manifest", "") diff --git a/src/borg/conftest.py b/src/borg/conftest.py index 557cb66f76..e8542e08ea 100644 --- a/src/borg/conftest.py +++ b/src/borg/conftest.py @@ -21,7 +21,11 @@ @pytest.fixture(autouse=True) def clean_env(tmpdir_factory, monkeypatch): # also avoid to use anything from the outside environment: - keys = [key for key in os.environ if key.startswith("BORG_") and key not in ("BORG_FUSE_IMPL",)] + keys = [ + key + for key in os.environ + if key.startswith("BORG_") and key not in ("BORG_FUSE_IMPL", "BORG_TESTONLY_SHA256_PACK_ID") + ] for key in keys: monkeypatch.delenv(key, raising=False) # avoid that we access / modify the user's normal .config / .cache directory: diff --git a/src/borg/repository.py b/src/borg/repository.py index c8ae6c1a40..4bacfa1235 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -103,8 +103,10 @@ class PackWriter: """Buffers chunks into a pack file and writes to the store when full. Collects (chunk_id, cdata) pairs in a list and flushes once max_count is - reached. On flush it returns the location info for every chunk so the - caller can update the ChunkIndex with real values. + reached. If a ChunkIndex is provided via the chunks parameter, PackWriter + maintains it directly: each add() marks the chunk as pending + (pack_id=UNKNOWN_BYTES32) with the correct offset and size; flush() then + updates all pending entries with the real pack_id once the pack is on disk. At max_count=1 (N=1 phase) each put() maps exactly one chunk to one pack, so pack_id == chunk_id and the naming scheme is unchanged from before. @@ -112,14 +114,20 @@ class PackWriter: this class's interface. """ - def __init__(self, store, *, max_count=1): + def __init__(self, store, *, max_count=1, chunks=None): self.store = store self.max_count = max_count + self.chunks = chunks if chunks is not None else ChunkIndex() self._pieces = [] # list of (chunk_id, cdata) + self._current_offset = 0 # byte offset of the next chunk within the current pack def add(self, chunk_id, cdata): """Buffer a chunk. Returns flush results if the pack is now full, else None.""" + # Mark chunk as pending: real offset/size are known now; pack_id not yet. + self.chunks.add(chunk_id, 0) # size filled in by cache layer + self.chunks.update_pack_info([(chunk_id, UNKNOWN_BYTES32, self._current_offset, len(cdata))]) self._pieces.append((chunk_id, cdata)) + self._current_offset += len(cdata) if len(self._pieces) >= self.max_count: return self.flush() return None @@ -127,9 +135,9 @@ def add(self, chunk_id, cdata): def flush(self): """Write the current pack to the store. - Returns a list of (chunk_id, pack_id, obj_offset, obj_size) tuples — + Returns a list of (chunk_id, pack_id, obj_offset, obj_size) tuples -- one entry per chunk that was written. Returns None if there was nothing - to flush. + to flush. Always updates the ChunkIndex with the real pack_id. """ if not self._pieces: return None @@ -143,10 +151,12 @@ def flush(self): # (backward-compatible file naming: packs/{chunk_id_hex}). # N>1: the pack contains multiple chunks; use SHA256(pack_bytes) so the # file is content-addressed and borgstore can verify/cache it. - if self.max_count == 1: + # BORG_TESTONLY_SHA256_PACK_ID: always use sha256 even at N=1, exposing code + # that still assumes pack_id == chunk_id. + if self.max_count == 1 and os.environ.get("BORG_TESTONLY_SHA256_PACK_ID") != "1": pack_id = self._pieces[0][0] # N=1: pack_id == chunk_id else: - pack_id = sha256(pack_data).digest() # N>1: content-addressed + pack_id = sha256(pack_data).digest() # Record (chunk_id, pack_id, obj_offset, obj_size) for every piece. results = [] @@ -161,6 +171,8 @@ def flush(self): self.store.store(key, pack_data) finally: self._pieces = [] # reset even on failure to prevent re-bundling a failed chunk + self._current_offset = 0 + self.chunks.update_pack_info(results) # replace UNKNOWN_BYTES32 with real pack_id return results @@ -285,6 +297,8 @@ def __init__( self.lock_wait = lock_wait self.exclusive = exclusive self._pack_writer = None + self._chunks = None # ChunkIndex; set by open(), replaced by set_chunk_index() + self._chunks_initialized = False def __repr__(self): return f"<{self.__class__.__name__} {self._location}>" @@ -406,19 +420,38 @@ def open(self, *, exclusive, lock_wait=None, lock=True): # important: lock *after* making sure that there actually is an existing, supported repository. if lock: self.lock = Lock(self.store, exclusive, timeout=lock_wait).acquire() - self._pack_writer = PackWriter(self.store, max_count=1) + self._chunks = ChunkIndex() + self._chunks_initialized = False + self._pack_writer = PackWriter(self.store, max_count=1, chunks=self._chunks) self.opened = True - def flush(self): - """Flush any buffered pack writer chunks. Returns pack_results (or None). + def set_chunk_index(self, chunks): + """Set the ChunkIndex get() uses to resolve pack locations. - Callers that maintain a ChunkIndex must call this and pass the result to - chunks.update_pack_info() before closing, so index entries for the last - batch of chunks get real pack location values instead of UNKNOWN_*. + The caller retains ownership; Repository holds a borrowed reference. """ + self._chunks = chunks + self._pack_writer.chunks = chunks # keep PackWriter in sync + self._chunks_initialized = True + + @property + def chunks(self): + """ChunkIndex mapping every known chunk id to its pack location. + + Built lazily on first access if set_chunk_index() has not been called. + """ + if not self._chunks_initialized: + from .cache import build_chunkindex_from_repo + + self._chunks = build_chunkindex_from_repo(self) + self._pack_writer.chunks = self._chunks + self._chunks_initialized = True + return self._chunks + + def flush(self): + """Flush any buffered pack writer chunks.""" if self._pack_writer is not None: - return self._pack_writer.flush() - return None + self._pack_writer.flush() # PackWriter updates _chunks internally def close(self): if self._pack_writer is not None: @@ -605,18 +638,31 @@ def list(self, limit=None, marker=None): def get(self, id, read_data=True, raise_missing=True): self._lock_refresh() - pack_id = id # N=1: pack_id == chunk_id + entry = self.chunks.get(id) + if entry is None: + if raise_missing: + raise self.ObjectNotFound(id, str(self._location)) + return None + if entry.pack_id == UNKNOWN_BYTES32: + # chunk is buffered in PackWriter, not yet flushed to a pack file + if raise_missing: + raise self.ObjectNotFound(id, str(self._location)) + return None + pack_id, obj_offset, obj_size = entry.pack_id, entry.obj_offset, entry.obj_size id_hex = bin_to_hex(id) key = "packs/" + bin_to_hex(pack_id) try: if read_data: - return self.store.load(key) + return self.store.load(key, offset=obj_offset, size=obj_size) else: # RepoObj layout supports separately encrypted metadata and data. # We return enough bytes so the client can decrypt the metadata. hdr_size = RepoObj.obj_header.size extra_size = 1024 - hdr_size # load a bit more, 1024b, reduces round trips - obj = self.store.load(key, size=hdr_size + extra_size) + load_size = hdr_size + extra_size + # clamp so a corrupted or malicious obj_size cannot cause an oversized read. + load_size = min(load_size, obj_size) + obj = self.store.load(key, offset=obj_offset, size=load_size) hdr = obj[0:hdr_size] if len(hdr) != hdr_size: raise IntegrityError(f"Object too small [id {id_hex}]: expected {hdr_size}, got {len(hdr)} bytes") @@ -624,7 +670,10 @@ def get(self, id, read_data=True, raise_missing=True): if meta_size > extra_size: # we did not get enough, need to load more, but not all. # this should be rare, as chunk metadata is rather small usually. - obj = self.store.load(key, size=hdr_size + meta_size) + retry_size = hdr_size + meta_size + # normally a no-op; guards against a corrupted meta_size producing an oversize read. + retry_size = min(retry_size, obj_size) + obj = self.store.load(key, offset=obj_offset, size=retry_size) meta = obj[hdr_size : hdr_size + meta_size] if len(meta) != meta_size: raise IntegrityError(f"Object too small [id {id_hex}]: expected {meta_size}, got {len(meta)} bytes") @@ -647,13 +696,13 @@ def put(self, id, data, wait=True): Returns a list of (chunk_id, pack_id, obj_offset, obj_size) tuples for every chunk written to disk this call. At max_count=1 this is always - one entry. Callers should pass these to ChunkIndex.update_pack_info() - so the index holds real location values rather than UNKNOWN_INT32 placeholders. + one entry. """ self._lock_refresh() data_size = len(data) if data_size > MAX_DATA_SIZE: raise IntegrityError(f"More than allowed put data [{data_size} > {MAX_DATA_SIZE}]") + _ = self.chunks # ensure lazy build ran and PackWriter.chunks is current return self._pack_writer.add(id, data) def delete(self, id, wait=True): diff --git a/src/borg/testsuite/repository_test.py b/src/borg/testsuite/repository_test.py index 86fe55a23f..a0be42ece5 100644 --- a/src/borg/testsuite/repository_test.py +++ b/src/borg/testsuite/repository_test.py @@ -3,7 +3,8 @@ from hashlib import sha256 import pytest -from ..helpers import IntegrityError, Location +from ..helpers import IntegrityError, Location, bin_to_hex +from ..hashindex import ChunkIndex from ..repository import Repository, MAX_DATA_SIZE, rest_serve_command, PackWriter from ..repoobj import RepoObj, OBJ_MAGIC, OBJ_VERSION from .hashindex_test import H @@ -78,13 +79,15 @@ def pdchunk(chunk): def test_basic_operations(repo_fixtures, request): with get_repository_from_fixture(repo_fixtures, request) as repository: for x in range(100): - repository.put(H(x), fchunk(b"SOMEDATA")) + repository.put(H(x), fchunk(b"SOMEDATA")) # put() updates _chunks via PackWriter key50 = H(50) assert pdchunk(repository.get(key50)) == b"SOMEDATA" repository.delete(key50) with pytest.raises(Repository.ObjectNotFound): repository.get(key50) + chunks = repository._chunks # capture index before close with reopen(repository) as repository: + repository.set_chunk_index(chunks) with pytest.raises(Repository.ObjectNotFound): repository.get(key50) for x in range(100): @@ -198,6 +201,75 @@ def test_pack_writer_n2_flush(): assert results[1] == (id2, expected_pack_id, len(data1), len(data2)) +def test_get_read_data_false_with_range(tmp_path): + # read_data=False with ChunkIndex entries limits the load to each object's boundary. + hdr_size = RepoObj.obj_header.size + chunk1 = fchunk(b"FIRST") + chunk2 = fchunk(b"SECOND") + pack = chunk1 + chunk2 + pack_id = H(43) + id1, id2 = H(47), H(48) + with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository: + repository.store_store("packs/" + bin_to_hex(pack_id), pack) + chunks = ChunkIndex() + chunks.add(id1, len(chunk1)) + chunks.update_pack_info([(id1, pack_id, 0, len(chunk1))]) + chunks.add(id2, len(chunk2)) + chunks.update_pack_info([(id2, pack_id, len(chunk1), len(chunk2))]) + repository.set_chunk_index(chunks) + assert repository.get(id1, read_data=False) == chunk1[:hdr_size] + assert repository.get(id2, read_data=False) == chunk2[:hdr_size] + + +def test_get_read_data_false_large_meta(tmp_path): + # When meta_size > extra_size (975 bytes), get() retries with a larger load. + hdr_size = RepoObj.obj_header.size + big_meta = b"M" * 1000 # 1000 > 975, forces the retry load + chunk = fchunk(b"DATA", meta=big_meta) + pack_id = H(44) + chunk_id = H(49) + with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository: + repository.store_store("packs/" + bin_to_hex(pack_id), chunk) + chunks = ChunkIndex() + chunks.add(chunk_id, len(chunk)) + chunks.update_pack_info([(chunk_id, pack_id, 0, len(chunk))]) + repository.set_chunk_index(chunks) + result = repository.get(chunk_id, read_data=False) + assert result == chunk[: hdr_size + len(big_meta)] + + +def test_get_uses_chunk_index_location(tmp_path): + # get() routes to the correct pack and offset when a ChunkIndex is set via set_chunk_index(). + chunk1 = fchunk(b"FIRST") + chunk2 = fchunk(b"SECOND") + pack = chunk1 + chunk2 + pack_id = H(55) + id1, id2 = H(56), H(57) + with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository: + # Inject the pack directly; bypasses PackWriter to test routing independently. + repository.store_store("packs/" + bin_to_hex(pack_id), pack) + chunks = ChunkIndex() + chunks.add(id1, len(chunk1)) + chunks.update_pack_info([(id1, pack_id, 0, len(chunk1))]) + chunks.add(id2, len(chunk2)) + chunks.update_pack_info([(id2, pack_id, len(chunk1), len(chunk2))]) + repository.set_chunk_index(chunks) + assert repository.get(id1) == chunk1 + assert repository.get(id2) == chunk2 + + +def test_put_marks_id_in_chunk_index(tmp_path): + # put() immediately updates _chunks: add() marks the id as seen, then update_pack_info + # fills in the real pack location for the current session. + with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository: + id1 = H(1) + repository.put(id1, fchunk(b"ZEROS")) + entry = repository._chunks.get(id1) + assert entry is not None + assert entry.pack_id == id1 # N=1: pack_id == chunk_id, set by update_pack_info in put() + assert entry.size == 0 # uncompressed size filled in by cache layer + + def test_pack_writer_final_partial_pack_uses_sha256(): # When max_count > 1, a final flush with only 1 piece must still use SHA256, # not the N=1 pack_id == chunk_id hack.