diff --git a/kernelci/kbuild.py b/kernelci/kbuild.py index ac6a02ae2d..683f168bff 100644 --- a/kernelci/kbuild.py +++ b/kernelci/kbuild.py @@ -905,8 +905,7 @@ def _build_with_tuxmake(self): # the archive has dtbs/ prefix inside, so extract to af_dir self.addcmd( f"if [ -f {self._af_dir}/dtbs.tar.xz ]; then " - f"tar -xf {self._af_dir}/dtbs.tar.xz -C {self._af_dir} && " - f"rm {self._af_dir}/dtbs.tar.xz; " + f"tar -xf {self._af_dir}/dtbs.tar.xz -C {self._af_dir}; " f"fi" ) @@ -1395,6 +1394,23 @@ def upload_artifacts(self): artifact_path = os.path.join(self._af_dir, artifact) upload_tasks.append((artifact, artifact_path)) + def is_dtb_artifact(artifact): + return artifact.startswith("dtbs/") and artifact.endswith(".dtb") + + dtb_tasks = [task for task in upload_tasks if is_dtb_artifact(task[0])] + dtbs_archive_task = next( + (task for task in upload_tasks if task[0] == "dtbs.tar.xz"), + None, + ) + if dtb_tasks and dtbs_archive_task: + # Upload DTBs via one archive request and skip uploading both the + # extracted DTB files and tuxmake's archive as regular artifacts. + upload_tasks = [ + task + for task in upload_tasks + if not is_dtb_artifact(task[0]) and task[0] != "dtbs.tar.xz" + ] + # Function to handle a single artifact upload # args: (artifact, artifact_path) # returns: (artifact, stored_url, error) @@ -1444,38 +1460,70 @@ def process_and_upload_artifact( return artifact, None, str(e) # Process uploads in parallel - max_workers = min(10, len(upload_tasks)) # Limit concurrent uploads successful_uploads = 0 failed_uploads = [] - with concurrent.futures.ThreadPoolExecutor( - max_workers=max_workers - ) as executor: - # Submit all tasks - future_to_task = { - executor.submit(process_and_upload_artifact, task): task - for task in upload_tasks - } - - # Process results as they complete - for future in concurrent.futures.as_completed(future_to_task): - artifact, stored_url, error = future.result() - - if error: - failed_uploads.append((artifact, error)) - continue - - successful_uploads += 1 - self._full_artifacts[artifact] = stored_url - artifact_key = self.map_artifact_name(artifact) - - # Thread-safe update of node_af - with node_af_lock: - if artifact in KERNEL_IMAGE_NAMES[self._arch]: - node_af["kernel"] = stored_url - self._node["data"]["kernel_type"] = artifact.lower() - else: + if dtb_tasks and dtbs_archive_task: + try: + print( + "[_upload_artifacts] Uploading " + f"{len(dtb_tasks)} DTBs as {dtbs_archive_task[0]}" + ) + dtb_urls = storage.upload_archive( + dtbs_archive_task[1], + [ + (artifact_path, artifact) + for artifact, artifact_path in dtb_tasks + ], + root_path, + archive_name=dtbs_archive_task[0], + ) + for artifact, _artifact_path in dtb_tasks: + stored_url = dtb_urls.get(artifact) + if not stored_url: + failed_uploads.append( + (artifact, "missing URL after archive upload") + ) + continue + successful_uploads += 1 + self._full_artifacts[artifact] = stored_url + artifact_key = self.map_artifact_name(artifact) + with node_af_lock: node_af[artifact_key] = stored_url + except Exception as e: + failed_uploads.append(("dtbs", str(e))) + print(f"[_upload_artifacts] Error uploading DTB archive: {e}") + + if upload_tasks: + max_workers = min(10, len(upload_tasks)) # Limit concurrent uploads + with concurrent.futures.ThreadPoolExecutor( + max_workers=max_workers + ) as executor: + # Submit all tasks + future_to_task = { + executor.submit(process_and_upload_artifact, task): task + for task in upload_tasks + } + + # Process results as they complete + for future in concurrent.futures.as_completed(future_to_task): + artifact, stored_url, error = future.result() + + if error: + failed_uploads.append((artifact, error)) + continue + + successful_uploads += 1 + self._full_artifacts[artifact] = stored_url + artifact_key = self.map_artifact_name(artifact) + + # Thread-safe update of node_af + with node_af_lock: + if artifact in KERNEL_IMAGE_NAMES[self._arch]: + node_af["kernel"] = stored_url + self._node["data"]["kernel_type"] = artifact.lower() + else: + node_af[artifact_key] = stored_url # Report results print( diff --git a/kernelci/storage/__init__.py b/kernelci/storage/__init__.py index 3e049f90f7..738bb1cd0e 100644 --- a/kernelci/storage/__init__.py +++ b/kernelci/storage/__init__.py @@ -51,6 +51,17 @@ def _upload(self, file_paths, dest_path): destination path and files names. """ + def _upload_archive( + self, archive_path, file_paths, dest_path, archive_name + ): + """Implementation method to upload an archive for server-side unpacking. + + Backends that support archive uploads should return a dictionary keyed + by destination file name. Backends that do not support this method use + the regular multi-file upload fallback. + """ + raise NotImplementedError + def upload_single(self, file_path, dest_path=""): """Upload a single file to storage @@ -99,6 +110,34 @@ def upload_multiple(self, file_paths, dest_path=""): for (file_src, file_dst) in file_paths ] + def upload_archive( + self, archive_path, file_paths, dest_path="", archive_name=None + ): + """Upload many files using an archive when supported by the backend. + + Upload *archive_path* to storage for server-side extraction. The + *file_paths* argument is a list of (local, remote) file names, matching + .upload_multiple(), and is used to compute public URLs and to fall back + to a regular multi-file upload when the backend has no archive support. + """ + self._connect() + archive_name = archive_name or archive_path + try: + urls = self._upload_archive( + archive_path, file_paths, dest_path, archive_name + ) + except NotImplementedError: + urls = self._upload(file_paths, dest_path) + + if urls: + return urls + return { + file_dst: urljoin( + self.config.base_url, "/".join([".", dest_path, file_dst]) + ) + for (file_src, file_dst) in file_paths + } + def get_storage(config, credentials): """Get a Storage instance for a given storage configuration diff --git a/kernelci/storage/backend.py b/kernelci/storage/backend.py index f3d0d7fd9f..e1cb658728 100644 --- a/kernelci/storage/backend.py +++ b/kernelci/storage/backend.py @@ -137,6 +137,76 @@ def _upload(self, file_paths, dest_path): if last_exception: raise last_exception + def _upload_archive( + self, archive_path, file_paths, dest_path, archive_name + ): + headers = { + "Authorization": self.credentials, + } + data = { + "path": dest_path, + } + + max_retries = 5 + retry_delay = 10 # seconds + last_exception = None + + for attempt in range(max_retries): + try: + with open(archive_path, "rb") as archive_file: + files = { + "archive": (archive_name, archive_file), + } + + url = urljoin(self.config.api_url, "v1/archive") + resp = requests.post( + url, + headers=headers, + data=data, + files=files, + timeout=900, + ) + resp.raise_for_status() + + try: + body = resp.json() + except ValueError as exc: + raise RuntimeError( + f"Archive upload returned invalid JSON: {resp.text}" + ) from exc + + if body.get("failed", 0): + failures = ", ".join(body.get("failures", [])) + raise RuntimeError( + "Archive upload failed for " + f"{body.get('failed')} files: {failures}" + ) + + return { + file_dst: urljoin( + self.config.base_url, + "/".join([".", dest_path, file_dst]), + ) + for (file_src, file_dst) in file_paths + } + + except ( + requests.exceptions.ReadTimeout, + requests.exceptions.ConnectionError, + requests.exceptions.Timeout, + ) as exc: + last_exception = self._handle_network_error( + exc, attempt, max_retries, retry_delay + ) + + except requests.exceptions.HTTPError as exc: + last_exception = self._handle_http_error( + exc, attempt, max_retries, retry_delay + ) + + if last_exception: + raise last_exception + def get_storage(config, credentials): """Get a StorageBackend object""" diff --git a/tests/test_kbuild.py b/tests/test_kbuild.py index 0911b4afc6..170f62a26c 100644 --- a/tests/test_kbuild.py +++ b/tests/test_kbuild.py @@ -136,3 +136,58 @@ def test_no_existing_metadata(self, tmp_path): own = json.loads((af_dir / "metadata.json").read_text()) assert own["build"]["compiler"] == "clang-21" assert "compiler_version" not in own["build"] + + +class FakeStorage: + def __init__(self): + self.single_uploads = [] + self.archive_uploads = [] + + def upload_single(self, file_path, dest_path=""): + self.single_uploads.append((file_path, dest_path)) + return f"https://storage.test/{dest_path}/{file_path[1]}" + + def upload_archive( + self, archive_path, file_paths, dest_path="", archive_name=None + ): + self.archive_uploads.append( + (archive_path, file_paths, dest_path, archive_name) + ) + return { + file_dst: f"https://storage.test/{dest_path}/{file_dst}" + for _file_src, file_dst in file_paths + } + + +class TestUploadArtifacts: + def test_tuxmake_dtbs_use_archive_upload(self, tmp_path): + kbuild = _kbuild(tmp_path, arch="arm64") + af_dir = tmp_path / "artifacts" + (af_dir / "dtbs" / "nested").mkdir(parents=True) + (af_dir / "dtbs" / "board-a.dtb").write_bytes(b"dtb-a") + (af_dir / "dtbs" / "nested" / "board-b.dtb").write_bytes(b"dtb-b") + (af_dir / "dtbs.tar.xz").write_bytes(b"archive") + + storage = FakeStorage() + kbuild._get_storage = lambda: storage + kbuild._apijobname = "kbuild-clang-arm64" + kbuild._node = {"id": "node123", "data": {}} + kbuild._full_artifacts = {} + + node_af = kbuild.upload_artifacts() + + assert storage.single_uploads == [] + assert len(storage.archive_uploads) == 1 + archive_path, file_paths, dest_path, archive_name = ( + storage.archive_uploads[0] + ) + assert archive_path == str(af_dir / "dtbs.tar.xz") + assert dest_path == "kbuild-clang-arm64-node123" + assert archive_name == "dtbs.tar.xz" + assert sorted(file_dst for _file_src, file_dst in file_paths) == [ + "dtbs/board-a.dtb", + "dtbs/nested/board-b.dtb", + ] + assert "dtbs/board-a.dtb" in kbuild._full_artifacts + assert "dtbs/nested/board-b.dtb" in kbuild._full_artifacts + assert node_af["dtbs/board-a_dtb"].endswith("dtbs/board-a.dtb")