Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 78 additions & 30 deletions kernelci/kbuild.py
Original file line number Diff line number Diff line change
Expand Up @@ -905,8 +905,7 @@ def _build_with_tuxmake(self):
# the archive has dtbs/ prefix inside, so extract to af_dir
self.addcmd(
f"if [ -f {self._af_dir}/dtbs.tar.xz ]; then "
f"tar -xf {self._af_dir}/dtbs.tar.xz -C {self._af_dir} && "
f"rm {self._af_dir}/dtbs.tar.xz; "
f"tar -xf {self._af_dir}/dtbs.tar.xz -C {self._af_dir}; "
f"fi"
)

Expand Down Expand Up @@ -1395,6 +1394,23 @@ def upload_artifacts(self):
artifact_path = os.path.join(self._af_dir, artifact)
upload_tasks.append((artifact, artifact_path))

def is_dtb_artifact(artifact):
return artifact.startswith("dtbs/") and artifact.endswith(".dtb")

dtb_tasks = [task for task in upload_tasks if is_dtb_artifact(task[0])]
dtbs_archive_task = next(
(task for task in upload_tasks if task[0] == "dtbs.tar.xz"),
None,
)
if dtb_tasks and dtbs_archive_task:
# Upload DTBs via one archive request and skip uploading both the
# extracted DTB files and tuxmake's archive as regular artifacts.
upload_tasks = [
task
for task in upload_tasks
if not is_dtb_artifact(task[0]) and task[0] != "dtbs.tar.xz"
]

# Function to handle a single artifact upload
# args: (artifact, artifact_path)
# returns: (artifact, stored_url, error)
Expand Down Expand Up @@ -1444,38 +1460,70 @@ def process_and_upload_artifact(
return artifact, None, str(e)

# Process uploads in parallel
max_workers = min(10, len(upload_tasks)) # Limit concurrent uploads
successful_uploads = 0
failed_uploads = []

with concurrent.futures.ThreadPoolExecutor(
max_workers=max_workers
) as executor:
# Submit all tasks
future_to_task = {
executor.submit(process_and_upload_artifact, task): task
for task in upload_tasks
}

# Process results as they complete
for future in concurrent.futures.as_completed(future_to_task):
artifact, stored_url, error = future.result()

if error:
failed_uploads.append((artifact, error))
continue

successful_uploads += 1
self._full_artifacts[artifact] = stored_url
artifact_key = self.map_artifact_name(artifact)

# Thread-safe update of node_af
with node_af_lock:
if artifact in KERNEL_IMAGE_NAMES[self._arch]:
node_af["kernel"] = stored_url
self._node["data"]["kernel_type"] = artifact.lower()
else:
if dtb_tasks and dtbs_archive_task:
try:
print(
"[_upload_artifacts] Uploading "
f"{len(dtb_tasks)} DTBs as {dtbs_archive_task[0]}"
)
dtb_urls = storage.upload_archive(
dtbs_archive_task[1],
[
(artifact_path, artifact)
for artifact, artifact_path in dtb_tasks
],
root_path,
archive_name=dtbs_archive_task[0],
)
for artifact, _artifact_path in dtb_tasks:
stored_url = dtb_urls.get(artifact)
if not stored_url:
failed_uploads.append(
(artifact, "missing URL after archive upload")
)
continue
successful_uploads += 1
self._full_artifacts[artifact] = stored_url
artifact_key = self.map_artifact_name(artifact)
with node_af_lock:
node_af[artifact_key] = stored_url
except Exception as e:
failed_uploads.append(("dtbs", str(e)))
print(f"[_upload_artifacts] Error uploading DTB archive: {e}")

if upload_tasks:
max_workers = min(10, len(upload_tasks)) # Limit concurrent uploads
with concurrent.futures.ThreadPoolExecutor(
max_workers=max_workers
) as executor:
# Submit all tasks
future_to_task = {
executor.submit(process_and_upload_artifact, task): task
for task in upload_tasks
}

# Process results as they complete
for future in concurrent.futures.as_completed(future_to_task):
artifact, stored_url, error = future.result()

if error:
failed_uploads.append((artifact, error))
continue

successful_uploads += 1
self._full_artifacts[artifact] = stored_url
artifact_key = self.map_artifact_name(artifact)

# Thread-safe update of node_af
with node_af_lock:
if artifact in KERNEL_IMAGE_NAMES[self._arch]:
node_af["kernel"] = stored_url
self._node["data"]["kernel_type"] = artifact.lower()
else:
node_af[artifact_key] = stored_url

# Report results
print(
Expand Down
39 changes: 39 additions & 0 deletions kernelci/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,17 @@ def _upload(self, file_paths, dest_path):
destination path and files names.
"""

def _upload_archive(
self, archive_path, file_paths, dest_path, archive_name
):
"""Implementation method to upload an archive for server-side unpacking.

Backends that support archive uploads should return a dictionary keyed
by destination file name. Backends that do not support this method use
the regular multi-file upload fallback.
"""
raise NotImplementedError

def upload_single(self, file_path, dest_path=""):
"""Upload a single file to storage

Expand Down Expand Up @@ -99,6 +110,34 @@ def upload_multiple(self, file_paths, dest_path=""):
for (file_src, file_dst) in file_paths
]

def upload_archive(
self, archive_path, file_paths, dest_path="", archive_name=None
):
"""Upload many files using an archive when supported by the backend.

Upload *archive_path* to storage for server-side extraction. The
*file_paths* argument is a list of (local, remote) file names, matching
.upload_multiple(), and is used to compute public URLs and to fall back
to a regular multi-file upload when the backend has no archive support.
"""
self._connect()
archive_name = archive_name or archive_path
try:
urls = self._upload_archive(
archive_path, file_paths, dest_path, archive_name
)
except NotImplementedError:
urls = self._upload(file_paths, dest_path)

if urls:
return urls
return {
file_dst: urljoin(
self.config.base_url, "/".join([".", dest_path, file_dst])
)
for (file_src, file_dst) in file_paths
}


def get_storage(config, credentials):
"""Get a Storage instance for a given storage configuration
Expand Down
70 changes: 70 additions & 0 deletions kernelci/storage/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,76 @@ def _upload(self, file_paths, dest_path):
if last_exception:
raise last_exception

def _upload_archive(
self, archive_path, file_paths, dest_path, archive_name
):
headers = {
"Authorization": self.credentials,
}
data = {
"path": dest_path,
}

max_retries = 5
retry_delay = 10 # seconds
last_exception = None

for attempt in range(max_retries):
try:
with open(archive_path, "rb") as archive_file:
files = {
"archive": (archive_name, archive_file),
}

url = urljoin(self.config.api_url, "v1/archive")
resp = requests.post(
url,
headers=headers,
data=data,
files=files,
timeout=900,
)
resp.raise_for_status()

try:
body = resp.json()
except ValueError as exc:
raise RuntimeError(
f"Archive upload returned invalid JSON: {resp.text}"
) from exc

if body.get("failed", 0):
failures = ", ".join(body.get("failures", []))
raise RuntimeError(
"Archive upload failed for "
f"{body.get('failed')} files: {failures}"
)

return {
file_dst: urljoin(
self.config.base_url,
"/".join([".", dest_path, file_dst]),
)
for (file_src, file_dst) in file_paths
}

except (
requests.exceptions.ReadTimeout,
requests.exceptions.ConnectionError,
requests.exceptions.Timeout,
) as exc:
last_exception = self._handle_network_error(
exc, attempt, max_retries, retry_delay
)

except requests.exceptions.HTTPError as exc:
last_exception = self._handle_http_error(
exc, attempt, max_retries, retry_delay
)

if last_exception:
raise last_exception


def get_storage(config, credentials):
"""Get a StorageBackend object"""
Expand Down
55 changes: 55 additions & 0 deletions tests/test_kbuild.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,58 @@ def test_no_existing_metadata(self, tmp_path):
own = json.loads((af_dir / "metadata.json").read_text())
assert own["build"]["compiler"] == "clang-21"
assert "compiler_version" not in own["build"]


class FakeStorage:
def __init__(self):
self.single_uploads = []
self.archive_uploads = []

def upload_single(self, file_path, dest_path=""):
self.single_uploads.append((file_path, dest_path))
return f"https://storage.test/{dest_path}/{file_path[1]}"

def upload_archive(
self, archive_path, file_paths, dest_path="", archive_name=None
):
self.archive_uploads.append(
(archive_path, file_paths, dest_path, archive_name)
)
return {
file_dst: f"https://storage.test/{dest_path}/{file_dst}"
for _file_src, file_dst in file_paths
}


class TestUploadArtifacts:
def test_tuxmake_dtbs_use_archive_upload(self, tmp_path):
kbuild = _kbuild(tmp_path, arch="arm64")
af_dir = tmp_path / "artifacts"
(af_dir / "dtbs" / "nested").mkdir(parents=True)
(af_dir / "dtbs" / "board-a.dtb").write_bytes(b"dtb-a")
(af_dir / "dtbs" / "nested" / "board-b.dtb").write_bytes(b"dtb-b")
(af_dir / "dtbs.tar.xz").write_bytes(b"archive")

storage = FakeStorage()
kbuild._get_storage = lambda: storage
kbuild._apijobname = "kbuild-clang-arm64"
kbuild._node = {"id": "node123", "data": {}}
kbuild._full_artifacts = {}

node_af = kbuild.upload_artifacts()

assert storage.single_uploads == []
assert len(storage.archive_uploads) == 1
archive_path, file_paths, dest_path, archive_name = (
storage.archive_uploads[0]
)
assert archive_path == str(af_dir / "dtbs.tar.xz")
assert dest_path == "kbuild-clang-arm64-node123"
assert archive_name == "dtbs.tar.xz"
assert sorted(file_dst for _file_src, file_dst in file_paths) == [
"dtbs/board-a.dtb",
"dtbs/nested/board-b.dtb",
]
assert "dtbs/board-a.dtb" in kbuild._full_artifacts
assert "dtbs/nested/board-b.dtb" in kbuild._full_artifacts
assert node_af["dtbs/board-a_dtb"].endswith("dtbs/board-a.dtb")
Loading