Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -703,3 +703,57 @@ jobs:
report_type: coverage
env_vars: OS,python
files: coverage.xml

sha256_pack_id_tests:
name: sha256 pack-id (informational)
needs: [lint]
runs-on: ubuntu-24.04
timeout-minutes: 90
continue-on-error: true
concurrency:
group: sha256-pack-id-${{ github.head_ref || github.ref }}
cancel-in-progress: false

steps:
- uses: actions/checkout@v6
with:
fetch-depth: 0
fetch-tags: true

- name: Set up Python 3.12
uses: actions/setup-python@v6
with:
python-version: "3.12"

- name: Cache pip
uses: actions/cache@v5
with:
path: ~/.cache/pip
key: ${{ runner.os }}-${{ runner.arch }}-pip-sha256-pack-id-${{ hashFiles('requirements.d/development.lock.txt') }}
restore-keys: |
${{ runner.os }}-${{ runner.arch }}-pip-

- name: Cache tox environments
uses: actions/cache@v5
with:
path: .tox
key: ${{ runner.os }}-${{ runner.arch }}-tox-sha256-pack-id-${{ hashFiles('requirements.d/development.lock.txt', 'pyproject.toml') }}
restore-keys: |
${{ runner.os }}-${{ runner.arch }}-tox-sha256-pack-id-

- name: Install Linux packages
run: |
sudo apt-get update
sudo apt-get install -y pkg-config build-essential
sudo apt-get install -y libssl-dev libacl1-dev liblz4-dev

- name: Install Python requirements
run: |
python -m pip install --upgrade pip setuptools wheel
pip install -r requirements.d/development.lock.txt

- name: Install borgbackup
run: pip install -ve ".[cockpit,s3,sftp,rclone]"

- name: Run tests with sha256 pack-ids
run: tox -e sha256-pack-id
5 changes: 5 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,11 @@ extras = ["pyfuse3", "sftp", "s3", "rclone"]
set_env = {BORG_FUSE_IMPL = "mfusepy"}
extras = ["mfusepy", "sftp", "s3", "rclone"]

# Informational env: forces sha256 pack_ids even with max_count=1 to expose
# code that still assumes pack_id == chunk_id. Run: tox -e sha256-pack-id
[tool.tox.env.sha256-pack-id]
set_env = {BORG_TESTONLY_SHA256_PACK_ID = "1"}
Comment thread
mr-raj12 marked this conversation as resolved.

[tool.tox.env.ruff]
skip_install = true
deps = ["ruff"]
Expand Down
7 changes: 3 additions & 4 deletions src/borg/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,7 @@ def __init__(self):
def chunks(self):
if self._chunks is None:
self._chunks = build_chunkindex_from_repo(self.repository, cache_immediately=True)
self.repository.set_chunk_index(self._chunks)
return self._chunks

def seen_chunk(self, id, size=None):
Expand Down Expand Up @@ -831,11 +832,8 @@ def open(self):
def close(self):
self.security_manager.save(self.manifest, self.key)
pi = ProgressIndicatorMessage(msgid="cache.close")
# Flush any chunks still buffered in the pack writer and update the index
# so the last batch gets real pack location values instead of UNKNOWN_*.
if self._chunks is not None:
pack_results = self.repository.flush()
self._chunks.update_pack_info(pack_results)
self.repository.flush()
if self._files is not None:
pi.output("Saving files cache")
integrity_data = self._write_files_cache(self._files)
Expand Down Expand Up @@ -869,6 +867,7 @@ def check_cache_compatibility(self):
def wipe_cache(self):
logger.warning("Discarding incompatible cache and forcing a cache rebuild")
self._chunks = ChunkIndex()
self.repository.set_chunk_index(self._chunks)
self.cache_config.manifest_id = ""
self.cache_config._config.set("cache", "manifest", "")

Expand Down
6 changes: 5 additions & 1 deletion src/borg/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@
@pytest.fixture(autouse=True)
def clean_env(tmpdir_factory, monkeypatch):
# also avoid to use anything from the outside environment:
keys = [key for key in os.environ if key.startswith("BORG_") and key not in ("BORG_FUSE_IMPL",)]
keys = [
key
for key in os.environ
if key.startswith("BORG_") and key not in ("BORG_FUSE_IMPL", "BORG_TESTONLY_SHA256_PACK_ID")
]
for key in keys:
monkeypatch.delenv(key, raising=False)
# avoid that we access / modify the user's normal .config / .cache directory:
Expand Down
91 changes: 70 additions & 21 deletions src/borg/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,33 +103,41 @@ class PackWriter:
"""Buffers chunks into a pack file and writes to the store when full.

Collects (chunk_id, cdata) pairs in a list and flushes once max_count is
reached. On flush it returns the location info for every chunk so the
caller can update the ChunkIndex with real values.
reached. If a ChunkIndex is provided via the chunks parameter, PackWriter
maintains it directly: each add() marks the chunk as pending
(pack_id=UNKNOWN_BYTES32) with the correct offset and size; flush() then
updates all pending entries with the real pack_id once the pack is on disk.

At max_count=1 (N=1 phase) each put() maps exactly one chunk to one pack,
so pack_id == chunk_id and the naming scheme is unchanged from before.
Raising max_count later (N>1 phase) enables real packing without touching
this class's interface.
"""

def __init__(self, store, *, max_count=1):
def __init__(self, store, *, max_count=1, chunks=None):
self.store = store
self.max_count = max_count
self.chunks = chunks if chunks is not None else ChunkIndex()
self._pieces = [] # list of (chunk_id, cdata)
self._current_offset = 0 # byte offset of the next chunk within the current pack

def add(self, chunk_id, cdata):
"""Buffer a chunk. Returns flush results if the pack is now full, else None."""
# Mark chunk as pending: real offset/size are known now; pack_id not yet.
self.chunks.add(chunk_id, 0) # size filled in by cache layer
self.chunks.update_pack_info([(chunk_id, UNKNOWN_BYTES32, self._current_offset, len(cdata))])
self._pieces.append((chunk_id, cdata))
self._current_offset += len(cdata)
if len(self._pieces) >= self.max_count:
return self.flush()
return None

def flush(self):
"""Write the current pack to the store.

Returns a list of (chunk_id, pack_id, obj_offset, obj_size) tuples
Returns a list of (chunk_id, pack_id, obj_offset, obj_size) tuples --
one entry per chunk that was written. Returns None if there was nothing
to flush.
to flush. Always updates the ChunkIndex with the real pack_id.
"""
if not self._pieces:
return None
Expand All @@ -143,10 +151,12 @@ def flush(self):
# (backward-compatible file naming: packs/{chunk_id_hex}).
# N>1: the pack contains multiple chunks; use SHA256(pack_bytes) so the
# file is content-addressed and borgstore can verify/cache it.
if self.max_count == 1:
# BORG_TESTONLY_SHA256_PACK_ID: always use sha256 even at N=1, exposing code
# that still assumes pack_id == chunk_id.
if self.max_count == 1 and os.environ.get("BORG_TESTONLY_SHA256_PACK_ID") != "1":
pack_id = self._pieces[0][0] # N=1: pack_id == chunk_id
else:
pack_id = sha256(pack_data).digest() # N>1: content-addressed
pack_id = sha256(pack_data).digest()

# Record (chunk_id, pack_id, obj_offset, obj_size) for every piece.
results = []
Expand All @@ -161,6 +171,8 @@ def flush(self):
self.store.store(key, pack_data)
finally:
self._pieces = [] # reset even on failure to prevent re-bundling a failed chunk
self._current_offset = 0
self.chunks.update_pack_info(results) # replace UNKNOWN_BYTES32 with real pack_id
return results
Comment thread
mr-raj12 marked this conversation as resolved.


Expand Down Expand Up @@ -285,6 +297,8 @@ def __init__(
self.lock_wait = lock_wait
self.exclusive = exclusive
self._pack_writer = None
self._chunks = None # ChunkIndex; set by open(), replaced by set_chunk_index()
self._chunks_initialized = False

def __repr__(self):
return f"<{self.__class__.__name__} {self._location}>"
Expand Down Expand Up @@ -406,19 +420,38 @@ def open(self, *, exclusive, lock_wait=None, lock=True):
# important: lock *after* making sure that there actually is an existing, supported repository.
if lock:
self.lock = Lock(self.store, exclusive, timeout=lock_wait).acquire()
self._pack_writer = PackWriter(self.store, max_count=1)
self._chunks = ChunkIndex()
Comment thread
mr-raj12 marked this conversation as resolved.
self._chunks_initialized = False
self._pack_writer = PackWriter(self.store, max_count=1, chunks=self._chunks)
self.opened = True

def flush(self):
"""Flush any buffered pack writer chunks. Returns pack_results (or None).
def set_chunk_index(self, chunks):
"""Set the ChunkIndex get() uses to resolve pack locations.

Callers that maintain a ChunkIndex must call this and pass the result to
chunks.update_pack_info() before closing, so index entries for the last
batch of chunks get real pack location values instead of UNKNOWN_*.
The caller retains ownership; Repository holds a borrowed reference.
"""
self._chunks = chunks
self._pack_writer.chunks = chunks # keep PackWriter in sync
self._chunks_initialized = True

@property
def chunks(self):
"""ChunkIndex mapping every known chunk id to its pack location.

Built lazily on first access if set_chunk_index() has not been called.
"""
if not self._chunks_initialized:
from .cache import build_chunkindex_from_repo

self._chunks = build_chunkindex_from_repo(self)
self._pack_writer.chunks = self._chunks
self._chunks_initialized = True
return self._chunks

Comment thread
mr-raj12 marked this conversation as resolved.
def flush(self):
"""Flush any buffered pack writer chunks."""
if self._pack_writer is not None:
return self._pack_writer.flush()
return None
self._pack_writer.flush() # PackWriter updates _chunks internally

def close(self):
if self._pack_writer is not None:
Expand Down Expand Up @@ -605,26 +638,42 @@ def list(self, limit=None, marker=None):

def get(self, id, read_data=True, raise_missing=True):
self._lock_refresh()
pack_id = id # N=1: pack_id == chunk_id
entry = self.chunks.get(id)
if entry is None:
if raise_missing:
raise self.ObjectNotFound(id, str(self._location))
return None
if entry.pack_id == UNKNOWN_BYTES32:
# chunk is buffered in PackWriter, not yet flushed to a pack file
if raise_missing:
raise self.ObjectNotFound(id, str(self._location))
return None
pack_id, obj_offset, obj_size = entry.pack_id, entry.obj_offset, entry.obj_size
id_hex = bin_to_hex(id)
key = "packs/" + bin_to_hex(pack_id)
try:
if read_data:
return self.store.load(key)
return self.store.load(key, offset=obj_offset, size=obj_size)
else:
# RepoObj layout supports separately encrypted metadata and data.
# We return enough bytes so the client can decrypt the metadata.
hdr_size = RepoObj.obj_header.size
extra_size = 1024 - hdr_size # load a bit more, 1024b, reduces round trips
obj = self.store.load(key, size=hdr_size + extra_size)
load_size = hdr_size + extra_size
# clamp so a corrupted or malicious obj_size cannot cause an oversized read.
load_size = min(load_size, obj_size)
obj = self.store.load(key, offset=obj_offset, size=load_size)
hdr = obj[0:hdr_size]
if len(hdr) != hdr_size:
raise IntegrityError(f"Object too small [id {id_hex}]: expected {hdr_size}, got {len(hdr)} bytes")
meta_size = RepoObj.ObjHeader(*RepoObj.obj_header.unpack(hdr)).meta_size
if meta_size > extra_size:
# we did not get enough, need to load more, but not all.
# this should be rare, as chunk metadata is rather small usually.
obj = self.store.load(key, size=hdr_size + meta_size)
retry_size = hdr_size + meta_size
# normally a no-op; guards against a corrupted meta_size producing an oversize read.
retry_size = min(retry_size, obj_size)
obj = self.store.load(key, offset=obj_offset, size=retry_size)
meta = obj[hdr_size : hdr_size + meta_size]
if len(meta) != meta_size:
raise IntegrityError(f"Object too small [id {id_hex}]: expected {meta_size}, got {len(meta)} bytes")
Expand All @@ -647,13 +696,13 @@ def put(self, id, data, wait=True):

Returns a list of (chunk_id, pack_id, obj_offset, obj_size) tuples for
every chunk written to disk this call. At max_count=1 this is always
one entry. Callers should pass these to ChunkIndex.update_pack_info()
so the index holds real location values rather than UNKNOWN_INT32 placeholders.
one entry.
"""
self._lock_refresh()
data_size = len(data)
if data_size > MAX_DATA_SIZE:
raise IntegrityError(f"More than allowed put data [{data_size} > {MAX_DATA_SIZE}]")
_ = self.chunks # ensure lazy build ran and PackWriter.chunks is current
return self._pack_writer.add(id, data)

def delete(self, id, wait=True):
Expand Down
Loading
Loading