From bb7315bee3440c385495dea31184fb7e9a18d8db Mon Sep 17 00:00:00 2001 From: doomedraven Date: Sun, 24 May 2026 15:48:14 +0000 Subject: [PATCH 1/2] fixes (#3042) --- lib/cuckoo/common/demux.py | 8 +- lib/cuckoo/common/gcp.py | 180 +++++++++++++++++++++++++++++++++--- modules/reporting/gcs.py | 6 ++ utils/dist.py | 56 +++++------ utils/gcp_pubsub_service.py | 67 +++++++++++--- utils/submit.py | 12 +++ web/audit/views.py | 1 - 7 files changed, 272 insertions(+), 58 deletions(-) diff --git a/lib/cuckoo/common/demux.py b/lib/cuckoo/common/demux.py index 19fb110c129..21db28e968f 100644 --- a/lib/cuckoo/common/demux.py +++ b/lib/cuckoo/common/demux.py @@ -343,7 +343,13 @@ def demux_sample(filename: bytes, package: str, options: str, use_sflock: bool = error_list.append({os.path.basename(filename).decode(): error_msg}) new_retlist.append((filename, platform)) else: - for filename, platform, magic_type, file_size in retlist: + for entry in retlist: + if not isinstance(entry, (list, tuple)) or len(entry) < 2: + log.warning("Skipping invalid entry in retlist: %s", entry) + continue + filename, platform = entry[0], entry[1] + magic_type = entry[2] if len(entry) > 2 else "" + file_size = entry[3] if len(entry) > 3 else 0 # verify not Windows binaries here: if platform == "linux" and not linux_enabled and "Python" not in magic_type: error_list.append({os.path.basename(filename).decode(): "Linux processing is disabled"}) diff --git a/lib/cuckoo/common/gcp.py b/lib/cuckoo/common/gcp.py index 31d03a11c87..64a1e22ccf2 100644 --- a/lib/cuckoo/common/gcp.py +++ b/lib/cuckoo/common/gcp.py @@ -2,8 +2,10 @@ import os import logging import time +import shutil from lib.cuckoo.common.config import Config +from lib.cuckoo.common.path_utils import path_exists try: from google.api_core.exceptions import Forbidden @@ -23,9 +25,25 @@ log = logging.getLogger(__name__) gcp_cfg = Config("gcp") +reporting_conf = Config("reporting") +# GCS Configuration +GCS_ENABLED = reporting_conf.gcs.get("enabled", False) if hasattr(reporting_conf, "gcs") else False +GCS_DELETE_AFTER_UPLOAD = reporting_conf.gcs.get("delete_after_upload", False) if hasattr(reporting_conf, "gcs") else False -def download_from_gcs(gcs_uri, destination_path, logger=None): +gcs_uploader = None +GCSUploader = None +if GCS_ENABLED: + from modules.reporting.gcs import GCSUploader + try: + # Initialize without args to load from reporting.conf + gcs_uploader = GCSUploader() + except Exception as e: + log.error("Failed to initialize GCS Uploader: %s", e) + GCS_ENABLED = False + + +def download_from_gcs(gcs_uri, destination_path, logger=None, client=None): """ Downloads a file from GCS. gcs_uri: gs://bucket_name/object_name @@ -49,19 +67,31 @@ def download_from_gcs(gcs_uri, destination_path, logger=None): bucket_name, blob_name = path_parts - service_account_path = gcp_cfg.gcp.get("service_account_path") + storage_client = client + own_client = False + if not storage_client: + project_id = gcp_cfg.gcp.get("project") + if project_id and "" in project_id: + project_id = None - if service_account_path and os.path.exists(service_account_path): - storage_client = storage.Client.from_service_account_json(service_account_path) - else: - storage_client = storage.Client() + service_account_path = gcp_cfg.gcp.get("service_account_path") + + if service_account_path and os.path.exists(service_account_path): + storage_client = storage.Client.from_service_account_json(service_account_path) + else: + storage_client = storage.Client(project=project_id) + own_client = True - bucket = storage_client.bucket(bucket_name) - blob = bucket.blob(blob_name) + try: + bucket = storage_client.bucket(bucket_name) + blob = bucket.blob(blob_name) - logger.info("Downloading %s to %s", gcs_uri, destination_path) - blob.download_to_filename(destination_path) - return True + logger.info("Downloading %s to %s", gcs_uri, destination_path) + blob.download_to_filename(destination_path) + return True + finally: + if own_client and hasattr(storage_client, "close"): + storage_client.close() except Exception as e: logger.error("Failed to download from GCS %s: %s", gcs_uri, e) return False @@ -81,6 +111,12 @@ class GCP(object): def __init__(self) -> None: self.dist_cfg = Config("distributed") self.project_id = self.dist_cfg.GCP.project_id + if self.project_id and "" not in self.project_id: + if not os.environ.get("GOOGLE_CLOUD_PROJECT"): + os.environ["GOOGLE_CLOUD_PROJECT"] = self.project_id + if not os.environ.get("GCLOUD_PROJECT"): + os.environ["GCLOUD_PROJECT"] = self.project_id + self.zones = [zone.strip() for zone in self.dist_cfg.GCP.zones.split(",")] self.GCP_BASE_URL = "https://compute.googleapis.com/compute/v1/" @@ -158,3 +194,125 @@ def autodiscovery(self): log.exception(e) time.sleep(int(self.dist_cfg.GCP.autodiscovery)) + + +def gcs_replay(task_range): + if not GCS_ENABLED: + log.error("GCS is not enabled in reporting.conf") + return + + from lib.cuckoo.core.database import Database + from lib.cuckoo.common.constants import CUCKOO_ROOT + + main_db = Database() + + task_ids = [] + try: + if "-" in task_range: + start, end = map(int, task_range.split("-")) + task_ids = list(range(start, end + 1)) + elif "," in task_range: + task_ids = [int(x) for x in task_range.split(",")] + else: + task_ids = [int(task_range)] + except ValueError: + log.error("Invalid task range format. Use 'start-end', 'id1,id2', or 'id'.") + return + + for task_id in task_ids: + report_path = os.path.join(CUCKOO_ROOT, "storage", "analyses", str(task_id)) + if not path_exists(report_path): + continue + + try: + with main_db.session.begin(): + task = main_db.view_task(task_id) + if not task: + log.error("Task %d not found in database", task_id) + continue + + tlp = task.tlp + metadata = GCSUploader.parse_custom_string(task.custom) + + samples = main_db.find_sample(task_id=task_id) + if samples: + metadata["sha256"] = samples[0].sample.sha256 + metadata["md5"] = samples[0].sample.md5 + metadata["sha1"] = samples[0].sample.sha1 + + metadata["task_id"] = task_id + + gcs_upload_report(report_path, task_id, tlp, metadata=metadata) + + except Exception as e: + log.error("Failed to replay GCS upload for task %d: %s", task_id, e) + + +def gcs_upload_report(report_path, analysis_id, tlp=None, metadata=None): + if not GCS_ENABLED: + return + + try: + log.info("[GCS] Task %d ==> GCS", analysis_id) + gcs_uploader.upload(report_path, analysis_id, tlp=tlp, metadata=metadata) + + if GCS_DELETE_AFTER_UPLOAD: + try: + shutil.rmtree(report_path) + log.info("Deleted local report for task %d after GCS upload", analysis_id) + except Exception as e: + log.error("Failed to delete local report %s: %s", report_path, e) + + except Exception as e: + log.error("Failed to upload report to GCS for task %d: %s", analysis_id, e) + + +def gcs_sync(time_range): + if not GCS_ENABLED: + log.error("GCS is not enabled in reporting.conf") + return + + from lib.cuckoo.common.cleaners_utils import convert_into_time + from lib.cuckoo.core.database import Database + from lib.cuckoo.core.data.task import TASK_REPORTED + from concurrent.futures import ThreadPoolExecutor, as_completed + + main_db = Database() + + try: + past_time = convert_into_time(time_range) + except ValueError as e: + log.error("Invalid time range: %s", e) + return + + log.info("Fetching tasks from DB completed after %s", past_time) + with main_db.session.begin(): + # Only check reported tasks as they are the ones supposed to be in GCS + tasks = main_db.list_tasks(completed_after=past_time, status=TASK_REPORTED) + db_ids = [t.id for t in tasks] + + if not db_ids: + log.info("No reported tasks found in DB for the specified time range") + return + + log.info("Found %d tasks in DB. Checking existence in GCS...", len(db_ids)) + + missing_ids = [] + max_workers = 20 # Use a reasonable number of threads for GCS API calls + with ThreadPoolExecutor(max_workers=max_workers) as executor: + future_to_id = {executor.submit(gcs_uploader.check_exists, tid): tid for tid in db_ids} + for future in as_completed(future_to_id): + tid = future_to_id[future] + try: + if not future.result(): + missing_ids.append(tid) + except Exception as e: + log.error("Error checking GCS existence for task %d: %s", tid, e) + + if not missing_ids: + log.info("All tasks are already in GCS.") + return + + log.info("Found %d missing tasks in GCS: %s", len(missing_ids), sorted(missing_ids)) + # Trigger replay for missing IDs + gcs_replay(",".join(map(str, sorted(missing_ids)))) diff --git a/modules/reporting/gcs.py b/modules/reporting/gcs.py index 301213fe4a3..fa6613b7fbf 100644 --- a/modules/reporting/gcs.py +++ b/modules/reporting/gcs.py @@ -144,6 +144,12 @@ def upload_files_individually(self, analysis_id, source_directory, tlp=None, met log.info("Successfully uploaded files for analysis %s to GCS.", analysis_id) + def check_exists(self, analysis_id): + """Check if any blobs exist for the given analysis ID.""" + prefix = str(analysis_id) + blobs = list(self.storage_client.list_blobs(self.bucket, prefix=prefix, max_results=1)) + return len(blobs) > 0 + class GCS(Report): """ diff --git a/utils/dist.py b/utils/dist.py index e71e38bab96..9350f9f0295 100644 --- a/utils/dist.py +++ b/utils/dist.py @@ -62,7 +62,7 @@ HAVE_GCP = False if dist_conf.GCP.enabled: - from lib.cuckoo.common.gcp import GCP, HAVE_GCP + from lib.cuckoo.common.gcp import GCP, HAVE_GCP, GCS_ENABLED, gcs_replay, gcs_sync, gcs_upload_report, gcs_uploader cloud = GCP() @@ -104,19 +104,6 @@ def filter(self, record): NFS_FETCH = dist_conf.distributed.get("nfs") RESTAPI_FETCH = dist_conf.distributed.get("restapi") -# GCS Configuration -GCS_ENABLED = reporting_conf.gcs.get("enabled", False) if hasattr(reporting_conf, "gcs") else False -GCS_DELETE_AFTER_UPLOAD = reporting_conf.gcs.get("delete_after_upload", False) if hasattr(reporting_conf, "gcs") else False - -if GCS_ENABLED: - from modules.reporting.gcs import GCSUploader - try: - # Initialize without args to load from reporting.conf - gcs_uploader = GCSUploader() - except Exception as e: - print("Failed to initialize GCS Uploader: %s", e) - GCS_ENABLED = False - INTERVAL = 10 # controller of dead nodes @@ -1028,25 +1015,10 @@ def fetch_latest_reports_nfs(self): log.exception("Failed to save iocs for parent sample: %s", str(e)) if GCS_ENABLED: - try: - # We assume report_path is the analysis folder root. - # TLP is not readily available in 't' object without loading report.json or task options. - # We can try to get TLP from task options if available, or just pass None. - tlp = t.tlp - metadata = GCSUploader.parse_custom_string(t.custom) - metadata.update(hashes) - metadata["task_id"] = t.main_task_id - gcs_uploader.upload(report_path, t.main_task_id, tlp=tlp, metadata=metadata) - - if GCS_DELETE_AFTER_UPLOAD: - try: - shutil.rmtree(report_path) - log.info("Deleted local report for task %d after GCS upload", t.main_task_id) - except Exception as e: - log.error("Failed to delete local report %s: %s", report_path, e) - - except Exception as e: - log.error("Failed to upload report to GCS for task %d: %s", t.main_task_id, e) + metadata = gcs_uploader.parse_custom_string(t.custom) + metadata.update(hashes) + metadata["task_id"] = t.main_task_id + gcs_upload_report(report_path, t.main_task_id, tlp=t.tlp, metadata=metadata) t.retrieved = True t.finished = True @@ -1946,6 +1918,16 @@ def init_logging(debug=False): action="store_true", help="Disable retrieval threads (use when running Go Fast-Fetcher)", ) + p.add_argument( + "--gcs-replay", + action="store", + help="Replay GCS upload for a range of tasks (e.g., 1-100 or 1,2,3)", + ) + p.add_argument( + "--gcs-sync", + action="store", + help="Sync GCS with DB for a given time range (e.g., 12h, 1d, 2d)", + ) args = p.parse_args() log = init_logging(args.debug) @@ -1963,6 +1945,14 @@ def init_logging(debug=False): main_db.set_status(args.force_reported, TASK_REPORTED) sys.exit() + if args.gcs_replay: + gcs_replay(args.gcs_replay) + sys.exit() + + if args.gcs_sync: + gcs_sync(args.gcs_sync) + sys.exit() + delete_enabled = args.enable_clean failed_clean_enabled = args.enable_failed_clean if args.node: diff --git a/utils/gcp_pubsub_service.py b/utils/gcp_pubsub_service.py index 51938210408..1604fd6bc34 100644 --- a/utils/gcp_pubsub_service.py +++ b/utils/gcp_pubsub_service.py @@ -8,7 +8,6 @@ import os import sys import tempfile -import threading import warnings # Mute Google Cloud's Python version support warning for Python 3.10 @@ -45,6 +44,12 @@ def __init__(self): if not self.project_id or "" in self.project_id: log.error("GCP project ID not set. Please update gcp.conf or set GCP_PROJECT_ID env var") sys.exit(1) + + # Ensure project ID is available for all GCP client libraries + if not os.environ.get("GOOGLE_CLOUD_PROJECT"): + os.environ["GOOGLE_CLOUD_PROJECT"] = self.project_id + if not os.environ.get("GCLOUD_PROJECT"): + os.environ["GCLOUD_PROJECT"] = self.project_id if not self.subscription_id or "" in self.subscription_id: log.error("GCP subscription ID not set. Please update gcp.conf or set GCP_SUBSCRIPTION_ID env var") sys.exit(1) @@ -64,9 +69,32 @@ def __init__(self): self.subscription_path = self.subscriber.subscription_path(self.project_id, self.subscription_id) + # Initialize storage client for reuse to avoid SSLEOFError and repeated auth + try: + from google.cloud import storage + if service_account_path and os.path.exists(service_account_path): + self.storage_client = storage.Client.from_service_account_json(service_account_path) + else: + self.storage_client = storage.Client(project=self.project_id) + except ImportError: + log.error("google-cloud-storage not installed. Run `pip install google-cloud-storage`") + sys.exit(1) + except Exception as e: + log.error("Failed to initialize GCP storage client: %s", e) + sys.exit(1) + init_database() self.db = Database() + self.cuckoo_cfg = Config() + self.tmp_path = os.path.join(self.cuckoo_cfg.cuckoo.get("tmppath", "/tmp"), "cape-external") + if not path_exists(self.tmp_path): + try: + os.makedirs(self.tmp_path) + except Exception as e: + log.error("Failed to create temporary directory %s: %s", self.tmp_path, e) + sys.exit(1) + def process_message(self, message): correlation_id = message.message_id try: @@ -122,9 +150,9 @@ def process_message(self, message): if not path_exists(local_path): mlog.info("Sample %s not found locally, fetching from GCS: %s", sample_hash, gcs_uri) - fd, temp_path = tempfile.mkstemp(prefix=sample_name) + fd, temp_path = tempfile.mkstemp(prefix=sample_name, dir=self.tmp_path) os.close(fd) - if download_from_gcs(gcs_uri, temp_path, logger=mlog): + if download_from_gcs(gcs_uri, temp_path, logger=mlog, client=self.storage_client): local_path = temp_path is_temp = True else: @@ -157,24 +185,36 @@ def process_message(self, message): os.unlink(local_path) except Exception as e: mlog.warning("Failed to delete temp file %s: %s", local_path, e) + self.db.session.remove() except Exception as e: log.error("[%s] Error processing message: %s", correlation_id, e) message.nack() + self.db.session.remove() def start(self): log.info("Starting GCP Pub/Sub subscriber on %s", self.subscription_path) - streaming_pull_future = self.subscriber.subscribe(self.subscription_path, callback=self.process_message) - # Use a threading event to handle graceful shutdown - self.shutdown_event = threading.Event() + while True: + streaming_pull_future = self.subscriber.subscribe(self.subscription_path, callback=self.process_message) - try: - # result() keeps the main thread alive while the subscriber runs in background - streaming_pull_future.result() - except Exception as e: - log.error("Subscriber exited with error: %s", e) - streaming_pull_future.cancel() + try: + # result() keeps the main thread alive while the subscriber runs in background + streaming_pull_future.result() + except Exception as e: + log.error("Subscriber exited with error: %s. Restarting in 10 seconds...", e) + try: + streaming_pull_future.cancel() + except Exception: + pass + import time + time.sleep(10) + except KeyboardInterrupt: + try: + streaming_pull_future.cancel() + except Exception: + pass + break def main(): service = GCPPubSubService() @@ -186,3 +226,6 @@ def main(): except KeyboardInterrupt: log.info("Shutting down...") sys.exit(0) + except Exception as e: + log.error("Fatal error in main: %s", e) + sys.exit(1) diff --git a/utils/submit.py b/utils/submit.py index 5388ffb217c..e94bc940741 100644 --- a/utils/submit.py +++ b/utils/submit.py @@ -63,6 +63,7 @@ def submit_file( print((bold(yellow("Duplicate")) + msg)) return [] + tmp_path = "" try: with open(file_path, "rb") as f: if not filename: @@ -90,6 +91,17 @@ def submit_file( except CuckooDemuxError as e: print((bold(red("Error")) + ": {0}".format(e))) return [] + finally: + if tmp_path and path_exists(tmp_path): + if os.path.isfile(tmp_path): + parent_dir = os.path.dirname(tmp_path) + if os.path.basename(parent_dir).startswith("upload_"): + import shutil + shutil.rmtree(parent_dir) + else: + os.unlink(tmp_path) + else: + os.unlink(tmp_path) def main(): diff --git a/web/audit/views.py b/web/audit/views.py index 1f828bbf8e4..1c8badc96c1 100644 --- a/web/audit/views.py +++ b/web/audit/views.py @@ -482,7 +482,6 @@ def update_task_config(request, availabletest_id): # 2. Save the minified version to the DB (or keep pretty if preferred) test.task_config = parsed_data - db_session.commit() messages.success(request, f"Configuration for Test {test.name} (#{test.id}) updated successfully.") return JsonResponse({"success": True, "config_pretty": _format_json_config(parsed_data)}) From 82dc2b458e8cd2f2604c05a489862b2cc873d85f Mon Sep 17 00:00:00 2001 From: doomedraven Date: Sun, 24 May 2026 17:15:43 +0000 Subject: [PATCH 2/2] Improve upload directory check in submit.py Refactor directory check for upload directories to handle bytes. --- utils/submit.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/utils/submit.py b/utils/submit.py index e94bc940741..af4597872bf 100644 --- a/utils/submit.py +++ b/utils/submit.py @@ -9,6 +9,7 @@ import os import random import sys +import shutil try: import requests @@ -95,8 +96,13 @@ def submit_file( if tmp_path and path_exists(tmp_path): if os.path.isfile(tmp_path): parent_dir = os.path.dirname(tmp_path) - if os.path.basename(parent_dir).startswith("upload_"): - import shutil + parent_name = os.path.basename(parent_dir) + is_upload_dir = False + if isinstance(parent_name, bytes): + is_upload_dir = parent_name.startswith(b"upload_") + else: + is_upload_dir = parent_name.startswith("upload_") + if is_upload_dir: shutil.rmtree(parent_dir) else: os.unlink(tmp_path)