diff --git a/conf/default/distributed.conf.default b/conf/default/distributed.conf.default index 9dcfc340d10..f35d5feacf4 100644 --- a/conf/default/distributed.conf.default +++ b/conf/default/distributed.conf.default @@ -32,16 +32,8 @@ fstab_socket = /tmp/cape-fstab # Google Cloud Platform [GCP] +# Note: Further GCP configuration (project_id, zones, autodiscovery, etc.) +# MUST be done in conf/gcp.conf. This section only controls the enabled state. enabled = no -# Comma separated list of zones -zones = "" -project_id = "" -# rest usage instead of GCP python client -# https://cloud.google.com/docs/authentication/rest -# gcloud auth print-access-token -token = "" -# Seconds between try to discoverd new instances -autodiscovery = 600 -# Instances should start with following name pattern -instance_name = cape-server + diff --git a/conf/default/gcp.conf.default b/conf/default/gcp.conf.default index c1d0b13130d..62831761435 100644 --- a/conf/default/gcp.conf.default +++ b/conf/default/gcp.conf.default @@ -1,66 +1,48 @@ [gcp] -# Specify the Google Cloud Zone (for example, europe-north2-a). This is case-sensitive -zone = - +# Global Environment Settings # Specify the project identifier -project = - -# pubsub -subscription_id = - -# Running in a GCP environment. If true, the Compute Engine credentials will be used -running_in_gcp = true - -# Specify the path to the service account key file. If not specified, the default service account will be used -service_account_path = - -# Specify a comma-separated list of available machines to be used. -# Each machine will be represented by the instance-name (for example, cape-server-windows). -# For each specified instance-name you have to define a dedicated section containing the details -# on the respective machine. (E.g. cape-server-windows,cape-server-linux) -# For better performance, it is recommended to leave this empty and set autoscale = yes. -machines = - -[cape-server-linux] -# Specify the label name. -# Label would be the instance-name of the current machine as specified in your GCP account. -label = cape-server-linux - -# Specify the operating system platform used by current machine -# [windows/darwin/linux]. -platform = linux - -# Set the machine architecture -# x64 or x86 -arch = x64 - -# Specify the IP address of the current virtual machine. Make sure that the -# IP address is valid and that the host machine is able to reach it. If not, -# the analysis will fail. -# ip = - -# (Optional) Specify the name of the network interface that should be used -# when dumping network traffic from this machine with tcpdump. If specified, -# overrides the default interface specified above. -# Example (eth0 is the interface name): -# interface = - -# (Optional) Specify the IP of the Result Server, as your virtual machine sees it. -# The Result Server will always bind to the address and port specified in cuckoo.conf, -# however you could set up your virtual network to use NAT/PAT, so you can specify here -# the IP address for the Result Server as your machine sees it. If you don't specify an -# address here, the machine will use the default value from cuckoo.conf. -# NOTE: if you set this option you have to set result server IP to 0.0.0.0 in cuckoo.conf. -# Example: -# resultserver_ip = - -# (Optional) Specify the port for the Result Server, as your virtual machine sees it. -# The Result Server will always bind to the address and port specified in cuckoo.conf, -# however you could set up your virtual network to use NAT/PAT, so you can specify here -# the port for the Result Server as your machine sees it. If you don't specify a port -# here, the machine will use the default value from cuckoo.conf. -# resultserver_port = - -# (Optional) Set your own tags. These are comma separated and help to identify -# specific VMs. You can run samples on VMs with tag you require. -# tags = +project = +# Specify the Google Cloud Zone (for example, europe-north2-a). This is case-sensitive +zone = +# Authentication method: vm (instance credentials), json (key file), or token (manual token) +auth_by = vm +# Path to the service account key file (required if auth_by = json) +# service_account_path = data/gcp-credentials.json +# Bearer token for REST API usage (optional) +# token = + +[samples_pubsub] +# GCP Pub/Sub Sample Processing Service +enabled = no +# Pub/Sub subscription name +subscription_id = +# GCS bucket for sample downloads +samples_bucket = sandbox-samples-unique +# Concurrent message limit +max_messages = 5 +# Lease duration in seconds +lease_duration = 1800 + +[distributed] +# Worker Node Autodiscovery +enabled = no +# Seconds between try to discover new instances +autodiscovery_interval = 600 +# Instances should start with following name prefix +instance_name_pattern = cape-server +# Comma separated list of zones to scan (defaults to global zone if empty) +zones = + +[reporting] +# Analysis Results Upload to GCS +enabled = no +# The name of your GCS bucket where reports will be uploaded +results_bucket = +# Upload mode: zip (single archive per task) or file (individual files) +mode = zip +# Delete local report after successful upload to GCS +delete_after_upload = no +# Comma-separated list of DIRECTORY names to exclude (e.g., logs, shots) +exclude_dirs = logs, shots +# Comma-separated list of exact FILENAMES to exclude +exclude_files = diff --git a/conf/default/reporting.conf.default b/conf/default/reporting.conf.default index 2bd95ba78da..5f5bb47ec0a 100644 --- a/conf/default/reporting.conf.default +++ b/conf/default/reporting.conf.default @@ -226,27 +226,7 @@ enabled = no # Google Cloud Storage [gcs] +# Note: Further GCS configuration (bucket name, credentials, mode, etc.) +# MUST be done in conf/gcp.conf. This section only controls the enabled state. enabled = no -# The name of your Google Cloud Storage bucket where files will be uploaded. -bucket_name = your-gcs-bucket-name -# Comma-separated list of DIRECTORY names to exclude from the upload. -# Good examples are 'shots' (contains all screenshots) or 'memory' (for full memory dumps). -exclude_dirs = logs, shots - -# Comma-separated list of exact FILENAMES to exclude from the upload. -# Good examples are large report formats you don't need in GCS. -exclude_files = - -# Mode: zip - will submit all files and folders as unique zip archive. Useful to not spam pubsub notification on file creation. -# Mode: file - will submit one by one. -mode = zip - -# Can be vm or json -auth_by = vm -# only if auth_by = json. The absolute path to your Google Cloud service account JSON key file. -# This file is required for authentication. -credentials_path = data/gcp-credentials.json - -# Delete local report after successful upload to GCS -delete_after_upload = no diff --git a/lib/cuckoo/common/demux.py b/lib/cuckoo/common/demux.py index 21db28e968f..f890055b17d 100644 --- a/lib/cuckoo/common/demux.py +++ b/lib/cuckoo/common/demux.py @@ -168,9 +168,35 @@ def is_valid_package(package: str) -> bool: return any(ptype in package for ptype in VALID_PACKAGES) +# list of junk extensions to skip +JUNK_EXTENSIONS = { + b".yar", + b".yara", + b".md", + b".txt", + b".yml", + b".yaml", + b".gitignore", + b".gitattributes", + b".gitmodules", +} + +JUNK_NAMES = {b"license", b"copying", b"makefile", b"authors", b"readme"} + + # ToDo fix return type def _sf_children(child: sfFile): # -> bytes: path_to_extract = "" + filename_lower = child.filename.lower() + + # Skip junk files + if any(filename_lower.endswith(ext) for ext in JUNK_EXTENSIONS): + return (b"", child.platform, child.magic, child.filesize) + if any(name in filename_lower for name in JUNK_NAMES): + return (b"", child.platform, child.magic, child.filesize) + if b".github/" in filename_lower or b".git/" in filename_lower: + return (b"", child.platform, child.magic, child.filesize) + _, ext = os.path.splitext(child.filename) ext = ext.lower() if ( @@ -191,7 +217,7 @@ def _sf_children(child: sfFile): # -> bytes: _ = path_write_file(path_to_extract, child.contents) except Exception as e: log.exception(e) - return (path_to_extract.encode(), child.platform, child.magic, child.filesize) + return (path_to_extract.encode(), child.platform, child.magic or "", child.filesize) # ToDo fix typing need to add str as error msg @@ -211,7 +237,7 @@ def demux_sflock(filename: bytes, options: str, check_shellcode: bool = True): if unpacked.package in whitelist_extensions: file = File(filename) - magic_type = file.get_type() + magic_type = file.get_type() or "" platform = file.get_platform() file_size = file.get_size() return [[filename, platform, magic_type, file_size]], "" @@ -246,6 +272,15 @@ def demux_sample(filename: bytes, package: str, options: str, use_sflock: bool = If file is a ZIP, extract its included files and return their file paths If file is an email, extracts its attachments and return their file paths (later we'll also extract URLs) """ + # Skip junk files + filename_bytes = filename if isinstance(filename, bytes) else filename.encode() + filename_lower_bytes = filename_bytes.lower() + if any(filename_lower_bytes.endswith(ext) for ext in JUNK_EXTENSIONS) or any( + name in filename_lower_bytes for name in JUNK_NAMES + ): + filename_str = filename.decode(errors="ignore") if isinstance(filename, bytes) else filename + return [], [{"junk_filter": f"File {filename_str} skipped by junk filter"}] + # sflock requires filename to be bytes object for Py3 # TODO: Remove after checking all uses of demux_sample use bytes ~TheMythologist if isinstance(filename, str) and use_sflock: @@ -281,7 +316,7 @@ def demux_sample(filename: bytes, package: str, options: str, use_sflock: bool = filename = tmp_path # don't try to extract from office docs - magic = File(filename).get_type() + magic = File(filename).get_type() or "" # if file is an Office doc and password is supplied, try to decrypt the doc if "Microsoft" in magic: pass diff --git a/lib/cuckoo/common/gcp.py b/lib/cuckoo/common/gcp.py index 64a1e22ccf2..f20ccf9f602 100644 --- a/lib/cuckoo/common/gcp.py +++ b/lib/cuckoo/common/gcp.py @@ -6,15 +6,17 @@ from lib.cuckoo.common.config import Config from lib.cuckoo.common.path_utils import path_exists +from lib.cuckoo.common.constants import CUCKOO_ROOT try: from google.api_core.exceptions import Forbidden from google.cloud import compute_v1 from google.cloud import storage + from google.oauth2 import service_account HAVE_GCP = True except ImportError: - # pip install --upgrade google-cloud-compute + # pip install --upgrade google-cloud-compute google-cloud-storage HAVE_GCP = False try: @@ -23,20 +25,140 @@ except ImportError: HAVE_REQUESTS = False +import zipfile +import tempfile + log = logging.getLogger(__name__) + +# Initialize standard config gcp_cfg = Config("gcp") -reporting_conf = Config("reporting") + + +class GCSUploader: + """Helper class to upload files to GCS.""" + + @staticmethod + def parse_custom_string(custom_str): + if not custom_str: + return {} + + if custom_str.endswith("..."): + custom_str = custom_str[:-3] + parts = custom_str.split(",") + data = {} + for part in parts: + if ":" in part: + key, value = part.split(":", 1) + data[key] = value + return data + + def __init__(self, bucket_name=None, auth_by=None, credentials_path=None, exclude_dirs=None, exclude_files=None, mode=None): + if not HAVE_GCP: + raise ImportError("google-cloud-storage library is missing") + + if not bucket_name: + bucket_name = gcp_cfg.reporting.get("results_bucket") if hasattr(gcp_cfg, "reporting") else None + auth_by = gcp_cfg.gcp.get("auth_by", "vm") + credentials_path = gcp_cfg.gcp.get("service_account_path") + mode = gcp_cfg.reporting.get("mode", "zip") if hasattr(gcp_cfg, "reporting") else "zip" + exclude_dirs_str = gcp_cfg.reporting.get("exclude_dirs", "") if hasattr(gcp_cfg, "reporting") else "" + exclude_files_str = gcp_cfg.reporting.get("exclude_files", "") if hasattr(gcp_cfg, "reporting") else "" + + # Parse exclusion sets + self.exclude_dirs = {item.strip() for item in (exclude_dirs_str or "").split(",") if item.strip()} + self.exclude_files = {item.strip() for item in (exclude_files_str or "").split(",") if item.strip()} + else: + self.exclude_dirs = exclude_dirs if exclude_dirs else set() + self.exclude_files = exclude_files if exclude_files else set() + + self.mode = mode or "zip" + + if not bucket_name: + raise ValueError("GCS bucket_name is not configured.") + + if auth_by == "vm": + self.storage_client = storage.Client() + else: + if credentials_path: + if not os.path.isabs(credentials_path): + credentials_path = os.path.join(CUCKOO_ROOT, credentials_path) + if not credentials_path or not os.path.exists(credentials_path): + raise ValueError(f"Invalid credentials path: {credentials_path}") + credentials = service_account.Credentials.from_service_account_file(credentials_path) + self.storage_client = storage.Client(credentials=credentials) + + self.bucket = self.storage_client.bucket(bucket_name) + + def _iter_files_to_upload(self, source_directory): + """Generator that yields files to be uploaded, skipping excluded ones.""" + for root, dirs, files in os.walk(source_directory): + # Exclude specified directories + dirs[:] = [d for d in dirs if d not in self.exclude_dirs] + for filename in files: + # Exclude specified files + if filename in self.exclude_files: + continue + + local_path = os.path.join(root, filename) + if not os.path.exists(local_path): + continue + relative_path = os.path.relpath(local_path, source_directory) + yield local_path, relative_path + + def upload(self, source_directory, analysis_id, tlp=None, metadata=None): + if self.mode == "zip": + self.upload_zip_archive(analysis_id, source_directory, tlp=tlp, metadata=metadata) + else: + self.upload_files_individually(analysis_id, source_directory, tlp=tlp, metadata=metadata) + + def upload_zip_archive(self, analysis_id, source_directory, tlp=None, metadata=None): + log.debug("Compressing and uploading files for analysis ID %s to GCS", analysis_id) + blob_name = f"{analysis_id}_tlp_{tlp}.zip" if tlp else f"{analysis_id}.zip" + + with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as tmp_zip_file: + tmp_zip_file_name = tmp_zip_file.name + with zipfile.ZipFile(tmp_zip_file, "w", zipfile.ZIP_DEFLATED) as archive: + for local_path, relative_path in self._iter_files_to_upload(source_directory): + archive.write(local_path, os.path.join(str(analysis_id), relative_path)) + try: + log.debug("Uploading '%s' to '%s'", tmp_zip_file_name, blob_name) + blob = self.bucket.blob(blob_name) + if metadata: + blob.metadata = metadata + blob.upload_from_filename(tmp_zip_file_name) + finally: + os.unlink(tmp_zip_file_name) + log.info("Successfully uploaded archive for analysis %s to GCS.", analysis_id) + + def upload_files_individually(self, analysis_id, source_directory, tlp=None, metadata=None): + log.debug("Uploading files for analysis ID %s to GCS", analysis_id) + folder_name = f"{analysis_id}_tlp_{tlp}" if tlp else str(analysis_id) + + for local_path, relative_path in self._iter_files_to_upload(source_directory): + blob_name = f"{folder_name}/{relative_path}" + # log.debug("Uploading '%s' to '%s'", local_path, blob_name) + blob = self.bucket.blob(blob_name) + if metadata: + blob.metadata = metadata + blob.upload_from_filename(local_path) + + log.info("Successfully uploaded files for analysis %s to GCS.", analysis_id) + + def check_exists(self, analysis_id): + """Check if any blobs exist for the given analysis ID.""" + prefix = str(analysis_id) + blobs = list(self.storage_client.list_blobs(self.bucket, prefix=prefix, max_results=1)) + return len(blobs) > 0 + # GCS Configuration -GCS_ENABLED = reporting_conf.gcs.get("enabled", False) if hasattr(reporting_conf, "gcs") else False -GCS_DELETE_AFTER_UPLOAD = reporting_conf.gcs.get("delete_after_upload", False) if hasattr(reporting_conf, "gcs") else False +GCS_ENABLED = gcp_cfg.reporting.get("enabled", False) if hasattr(gcp_cfg, "reporting") else False +GCS_DELETE_AFTER_UPLOAD = gcp_cfg.reporting.get("delete_after_upload", False) if hasattr(gcp_cfg, "reporting") else False + gcs_uploader = None -GCSUploader = None if GCS_ENABLED: - from modules.reporting.gcs import GCSUploader try: - # Initialize without args to load from reporting.conf gcs_uploader = GCSUploader() except Exception as e: log.error("Failed to initialize GCS Uploader: %s", e) @@ -71,14 +193,16 @@ def download_from_gcs(gcs_uri, destination_path, logger=None, client=None): own_client = False if not storage_client: project_id = gcp_cfg.gcp.get("project") - if project_id and "" in project_id: - project_id = None - + auth_by = gcp_cfg.gcp.get("auth_by", "vm") service_account_path = gcp_cfg.gcp.get("service_account_path") - if service_account_path and os.path.exists(service_account_path): - storage_client = storage.Client.from_service_account_json(service_account_path) - else: + if auth_by == "json" and service_account_path: + if not os.path.isabs(service_account_path): + service_account_path = os.path.join(CUCKOO_ROOT, service_account_path) + if os.path.exists(service_account_path): + storage_client = storage.Client.from_service_account_json(service_account_path) + + if not storage_client: storage_client = storage.Client(project=project_id) own_client = True @@ -109,31 +233,37 @@ def check_node_up(host: str) -> bool: class GCP(object): def __init__(self) -> None: - self.dist_cfg = Config("distributed") - self.project_id = self.dist_cfg.GCP.project_id - if self.project_id and "" not in self.project_id: + self.project_id = gcp_cfg.gcp.get("project") + if self.project_id: if not os.environ.get("GOOGLE_CLOUD_PROJECT"): os.environ["GOOGLE_CLOUD_PROJECT"] = self.project_id if not os.environ.get("GCLOUD_PROJECT"): os.environ["GCLOUD_PROJECT"] = self.project_id - self.zones = [zone.strip() for zone in self.dist_cfg.GCP.zones.split(",")] + zones_str = gcp_cfg.distributed.get("zones") if hasattr(gcp_cfg, "distributed") else "" + if not zones_str: + zones_str = gcp_cfg.gcp.get("zone") or "" + self.zones = [zone.strip() for zone in zones_str.split(",") if zone.strip()] self.GCP_BASE_URL = "https://compute.googleapis.com/compute/v1/" + self.token = gcp_cfg.gcp.get("token") self.headers = { "X-Goog-User-Project": self.project_id, - "Authorization": f"Bearer {self.dist_cfg.GCP.token}", + "Authorization": f"Bearer {self.token}", } def list_instances(self) -> dict: """Auto discovery of new servers""" servers = {} - if self.dist_cfg.GCP.token: + instance_name_pattern = "cape-server" + if hasattr(gcp_cfg, "distributed"): + instance_name_pattern = gcp_cfg.distributed.get("instance_name_pattern", "cape-server") + if self.token: for zone in self.zones: try: r = requests.get(f"{self.GCP_BASE_URL}projects/{self.project_id}/zones/{zone}/instances", headers=self.headers) for instance in r.json().get("items", []): - if not instance["name"].startswith(self.dist_cfg.GCP.instance_name): + if not instance["name"].startswith(instance_name_pattern): continue ips = [ # Need to replace to internal IP not natIP @@ -154,7 +284,7 @@ def list_instances(self) -> dict: for zone in self.zones: instance_list = instance_client.list(project=self.project_id, zone=zone) for instance in instance_list.items: - if not instance.name.startswith(self.dist_cfg.GCP.instance_name): + if not instance.name.startswith(instance_name_pattern): continue # Public IP # ips = [access.nat_i_p for net_iface in instance.network_interfaces for access in net_iface.access_configs] @@ -168,10 +298,13 @@ def list_instances(self) -> dict: return servers def autodiscovery(self): + autodiscovery_interval = 600 + if hasattr(gcp_cfg, "distributed"): + autodiscovery_interval = int(gcp_cfg.distributed.get("autodiscovery_interval", 600)) while True: servers = self.list_instances() if not servers: - time.sleep(600) + time.sleep(autodiscovery_interval) for name, ips in servers.items(): for ip in ips: @@ -193,7 +326,7 @@ def autodiscovery(self): except Exception as e: log.exception(e) - time.sleep(int(self.dist_cfg.GCP.autodiscovery)) + time.sleep(autodiscovery_interval) def gcs_replay(task_range): @@ -202,7 +335,6 @@ def gcs_replay(task_range): return from lib.cuckoo.core.database import Database - from lib.cuckoo.common.constants import CUCKOO_ROOT main_db = Database() @@ -316,3 +448,98 @@ def gcs_sync(time_range): log.info("Found %d missing tasks in GCS: %s", len(missing_ids), sorted(missing_ids)) # Trigger replay for missing IDs gcs_replay(",".join(map(str, sorted(missing_ids)))) + + +def gcs_refetch_banned(time_range, samples_bucket=None): + if not HAVE_GCP: + log.error("Google Cloud Storage dependencies not installed.") + return + + from lib.cuckoo.common.cleaners_utils import convert_into_time + from lib.cuckoo.core.database import Database + from lib.cuckoo.core.data.task import TASK_BANNED, Task + from utils.submit import submit_file + from sqlalchemy import select + import tempfile + + db = Database() + try: + past_time = convert_into_time(time_range) + except ValueError as e: + log.error("Invalid time range: %s", e) + return + + if not samples_bucket: + samples_bucket = gcp_cfg.samples_pubsub.get("samples_bucket") if hasattr(gcp_cfg, "samples_pubsub") else None + if not samples_bucket: + # Fallback to the one we saw in logs if not configured + samples_bucket = "sandbox-samples-unique" + log.warning("samples_bucket not configured in gcp.conf, using default: %s", samples_bucket) + + log.info("Refetching banned tasks added after %s from bucket %s", past_time, samples_bucket) + + with db.session.begin(): + stmt = select(Task).where(Task.status == TASK_BANNED).where(Task.added_on >= past_time) + tasks = db.session.scalars(stmt).all() + task_data = [ + { + "id": t.id, + "sample_id": t.sample_id, + "options": t.options, + "custom": t.custom, + "category": t.category, + "target": t.target, + } + for t in tasks + ] + + if not task_data: + log.info("No banned tasks found in the given time range.") + return + + log.info("Found %d banned tasks to refetch.", len(task_data)) + + for task in task_data: + if not task["sample_id"]: + log.warning("Task %d has no sample associated, skipping", task["id"]) + continue + + with db.session.begin(): + sample = db.view_sample(task["sample_id"]) + if not sample: + log.warning("Sample for task %d not found in DB", task["id"]) + continue + sha256 = sample.sha256 + + gcs_uri = f"gs://{samples_bucket}/{sha256}" + + # Use CAPE's temp path if available + tmp_dir = Config().cuckoo.get("tmppath", "/tmp") + fd, tmp_path = tempfile.mkstemp(dir=tmp_dir) + os.close(fd) + + task_ids = [] + try: + if download_from_gcs(gcs_uri, tmp_path): + log.info("Successfully downloaded %s, resubmitting...", sha256) + task_ids, extra_details = submit_file( + db=db, + file_path=tmp_path, + options=task["options"], + custom=task["custom"], + category=task["category"], + filename=os.path.basename(task["target"]), + ) + if task_ids: + log.info("Task %d refetched as new task(s): %s", task["id"], task_ids) + else: + log.error("Failed to resubmit %s: %s", sha256, extra_details.get("errors")) + else: + log.error("Failed to download %s from %s", sha256, gcs_uri) + finally: + # Only delete if submission failed. If it succeeded, CAPE needs the file. + if not task_ids and os.path.exists(tmp_path): + try: + os.unlink(tmp_path) + except Exception as e: + log.warning("Failed to delete temp file %s: %s", tmp_path, e) diff --git a/lib/cuckoo/common/quarantine.py b/lib/cuckoo/common/quarantine.py index 0d8eaa93444..843590e5485 100644 --- a/lib/cuckoo/common/quarantine.py +++ b/lib/cuckoo/common/quarantine.py @@ -13,6 +13,7 @@ from Cryptodome.Cipher import ARC4 from lib.cuckoo.common.utils import store_temp_file +from lib.cuckoo.common.path_utils import path_exists try: import olefile @@ -695,14 +696,22 @@ def xorff_unquarantine(f): def unquarantine(f): f = f.decode() if isinstance(f, bytes) else f + if not path_exists(f): + return f + base = os.path.basename(f) realbase, ext = os.path.splitext(base) if not HAVE_OLEFILE: log.info("Missed olefile dependency: pip3 install olefile") - if ext.lower() == ".bup" or (HAVE_OLEFILE and olefile.isOleFile(f)): - with contextlib.suppress(Exception): - return mcafee_unquarantine(f) + + try: + if ext.lower() == ".bup" or (HAVE_OLEFILE and olefile.isOleFile(f)): + with contextlib.suppress(Exception): + return mcafee_unquarantine(f) + except (FileNotFoundError, PermissionError, IsADirectoryError): + pass + if ext.lower() in func_map: try: return func_map[ext.lower()](f) diff --git a/modules/reporting/gcs.py b/modules/reporting/gcs.py index fa6613b7fbf..32ff9c83781 100644 --- a/modules/reporting/gcs.py +++ b/modules/reporting/gcs.py @@ -1,155 +1,15 @@ import os import logging -import tempfile -import zipfile from lib.cuckoo.common.constants import CUCKOO_ROOT from lib.cuckoo.common.abstracts import Report -from lib.cuckoo.common.config import Config from lib.cuckoo.common.exceptions import CuckooReportError +# Import centralized GCP logic +from lib.cuckoo.common.gcp import GCSUploader, HAVE_GCP, gcp_cfg + # Set up a logger for this module log = logging.getLogger(__name__) -try: - # Import the Google Cloud Storage client library - from google.cloud import storage - from google.oauth2 import service_account - - HAVE_GCS = True -except ImportError: - HAVE_GCS = False - - -class GCSUploader: - """Helper class to upload files to GCS.""" - - @staticmethod - def parse_custom_string(custom_str): - if not custom_str: - return {} - - if custom_str.endswith("..."): - custom_str = custom_str[:-3] - parts = custom_str.split(",") - data = {} - for part in parts: - if ":" in part: - key, value = part.split(":", 1) - data[key] = value - return data - - def __init__(self, bucket_name=None, auth_by=None, credentials_path=None, exclude_dirs=None, exclude_files=None, mode=None): - if not HAVE_GCS: - raise ImportError("google-cloud-storage library is missing") - - # Load from reporting.conf if parameters are missing - if not bucket_name: - cfg = Config("reporting") - if not cfg.gcs.enabled: - # If we are initializing purely for manual usage but config is disabled, we might want to allow it if params are passed. - # But if params are missing AND config is disabled/missing, we can't proceed. - pass - - bucket_name = cfg.gcs.bucket_name - auth_by = cfg.gcs.auth_by - credentials_path_str = cfg.gcs.credentials_path - - if credentials_path_str: - credentials_path = os.path.join(CUCKOO_ROOT, credentials_path_str) - - exclude_dirs_str = cfg.gcs.get("exclude_dirs", "") - exclude_files_str = cfg.gcs.get("exclude_files", "") - mode = cfg.gcs.get("mode", "zip") - - # Parse exclusion sets - self.exclude_dirs = {item.strip() for item in exclude_dirs_str.split(",") if item.strip()} - self.exclude_files = {item.strip() for item in exclude_files_str.split(",") if item.strip()} - else: - self.exclude_dirs = exclude_dirs if exclude_dirs else set() - self.exclude_files = exclude_files if exclude_files else set() - - self.mode = mode - - if not bucket_name: - raise ValueError("GCS bucket_name is not configured.") - - if auth_by == "vm": - self.storage_client = storage.Client() - else: - if not credentials_path or not os.path.exists(credentials_path): - raise ValueError(f"Invalid credentials path: {credentials_path}") - credentials = service_account.Credentials.from_service_account_file(credentials_path) - self.storage_client = storage.Client(credentials=credentials) - - self.bucket = self.storage_client.bucket(bucket_name) - # We check bucket existence lazily or now? - # dist.py might not want to crash on init if network is flaky, but validation is good. - # Let's keep validation. - # Note: bucket.exists() requires permissions. - # if not self.bucket.exists(): - # raise ValueError(f"GCS Bucket '{bucket_name}' does not exist or is inaccessible.") - - def _iter_files_to_upload(self, source_directory): - """Generator that yields files to be uploaded, skipping excluded ones.""" - for root, dirs, files in os.walk(source_directory): - # Exclude specified directories - dirs[:] = [d for d in dirs if d not in self.exclude_dirs] - for filename in files: - # Exclude specified files - if filename in self.exclude_files: - continue - - local_path = os.path.join(root, filename) - if not os.path.exists(local_path): - continue - relative_path = os.path.relpath(local_path, source_directory) - yield local_path, relative_path - - def upload(self, source_directory, analysis_id, tlp=None, metadata=None): - if self.mode == "zip": - self.upload_zip_archive(analysis_id, source_directory, tlp=tlp, metadata=metadata) - else: - self.upload_files_individually(analysis_id, source_directory, tlp=tlp, metadata=metadata) - - def upload_zip_archive(self, analysis_id, source_directory, tlp=None, metadata=None): - log.debug("Compressing and uploading files for analysis ID %s to GCS", analysis_id) - blob_name = f"{analysis_id}_tlp_{tlp}.zip" if tlp else f"{analysis_id}.zip" - - with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as tmp_zip_file: - tmp_zip_file_name = tmp_zip_file.name - with zipfile.ZipFile(tmp_zip_file, "w", zipfile.ZIP_DEFLATED) as archive: - for local_path, relative_path in self._iter_files_to_upload(source_directory): - archive.write(local_path, os.path.join(str(analysis_id), relative_path)) - try: - log.debug("Uploading '%s' to '%s'", tmp_zip_file_name, blob_name) - blob = self.bucket.blob(blob_name) - if metadata: - blob.metadata = metadata - blob.upload_from_filename(tmp_zip_file_name) - finally: - os.unlink(tmp_zip_file_name) - log.info("Successfully uploaded archive for analysis %s to GCS.", analysis_id) - - def upload_files_individually(self, analysis_id, source_directory, tlp=None, metadata=None): - log.debug("Uploading files for analysis ID %s to GCS", analysis_id) - folder_name = f"{analysis_id}_tlp_{tlp}" if tlp else str(analysis_id) - - for local_path, relative_path in self._iter_files_to_upload(source_directory): - blob_name = f"{folder_name}/{relative_path}" - # log.debug("Uploading '%s' to '%s'", local_path, blob_name) - blob = self.bucket.blob(blob_name) - if metadata: - blob.metadata = metadata - blob.upload_from_filename(local_path) - - log.info("Successfully uploaded files for analysis %s to GCS.", analysis_id) - - def check_exists(self, analysis_id): - """Check if any blobs exist for the given analysis ID.""" - prefix = str(analysis_id) - blobs = list(self.storage_client.list_blobs(self.bucket, prefix=prefix, max_results=1)) - return len(blobs) > 0 - class GCS(Report): """ @@ -167,10 +27,10 @@ def run(self, results): results (dict): The analysis results dictionary. """ # Ensure the required library is installed - if not HAVE_GCS: + if not HAVE_GCP: log.error( "Failed to run GCS reporting module: the 'google-cloud-storage' " - "library is not installed. Please run 'poetry run pip install google-cloud-storage'." + "library is not installed. Please run 'poetry install --extras gcp'." ) return @@ -181,25 +41,31 @@ def run(self, results): analysis_id = results.get("info", {}).get("id") custom = results.get("info", {}).get("custom") - # We can now just use the Uploader. - # But for backward compatibility with overrides in self.options (e.g. per-module config overrides in Cuckoo), - # we should pass options explicitly if they differ from default config. - # However, typically reporting.conf is the source. - - # Parse exclusion lists from self.options to respect local module config - exclude_dirs_str = self.options.get("exclude_dirs", "") - exclude_files_str = self.options.get("exclude_files", "") - exclude_dirs = {item.strip() for item in exclude_dirs_str.split(",") if item.strip()} - exclude_files = {item.strip() for item in exclude_files_str.split(",") if item.strip()} - - # We manually construct to respect self.options - bucket_name = self.options.get("bucket_name") - auth_by = self.options.get("auth_by") - credentials_path_str = self.options.get("credentials_path") + # Prioritize reporting.conf (self.options), then gcp.conf (gcp_cfg) + exclude_dirs_str = self.options.get("exclude_dirs") + exclude_files_str = self.options.get("exclude_files") + + if exclude_dirs_str is None: + exclude_dirs_str = gcp_cfg.reporting.get("exclude_dirs", "") if hasattr(gcp_cfg, "reporting") else "" + + if exclude_files_str is None: + exclude_files_str = gcp_cfg.reporting.get("exclude_files", "") if hasattr(gcp_cfg, "reporting") else "" + + exclude_dirs = {item.strip() for item in (exclude_dirs_str or "").split(",") if item.strip()} + exclude_files = {item.strip() for item in (exclude_files_str or "").split(",") if item.strip()} + + bucket_name = self.options.get("bucket_name") or (gcp_cfg.reporting.get("results_bucket") if hasattr(gcp_cfg, "reporting") else None) + auth_by = self.options.get("auth_by") or gcp_cfg.gcp.get("auth_by", "vm") + credentials_path_str = self.options.get("credentials_path") or gcp_cfg.gcp.get("service_account_path") + credentials_path = None if credentials_path_str: - credentials_path = os.path.join(CUCKOO_ROOT, credentials_path_str) - mode = self.options.get("mode", "file") + if not os.path.isabs(credentials_path_str): + credentials_path = os.path.join(CUCKOO_ROOT, credentials_path_str) + else: + credentials_path = credentials_path_str + + mode = self.options.get("mode") or (gcp_cfg.reporting.get("mode", "zip") if hasattr(gcp_cfg, "reporting") else "zip") try: uploader = GCSUploader(bucket_name, auth_by, credentials_path, exclude_dirs, exclude_files, mode) diff --git a/utils/dist.py b/utils/dist.py index 9350f9f0295..acf63bf8a44 100644 --- a/utils/dist.py +++ b/utils/dist.py @@ -57,22 +57,31 @@ from lib.cuckoo.core.data.task import Task as MD_Task from dev_utils.mongodb import mongo_update_one +cfg = Config() +reporting_conf = Config("reporting") +web_conf = Config("web") dist_conf = Config("distributed") +gcp_conf = Config("gcp") main_server_name = dist_conf.distributed.get("main_server_name", "master") HAVE_GCP = False -if dist_conf.GCP.enabled: - from lib.cuckoo.common.gcp import GCP, HAVE_GCP, GCS_ENABLED, gcs_replay, gcs_sync, gcs_upload_report, gcs_uploader +if gcp_conf.samples_pubsub.enabled: + from lib.cuckoo.common.gcp import ( + GCP, + GCS_ENABLED, + HAVE_GCP, + gcs_refetch_banned, + gcs_replay, + gcs_sync, + gcs_upload_report, + gcs_uploader, + ) cloud = GCP() # we need original db to reserve ID in db, # to store later report, from master or worker -cfg = Config() -reporting_conf = Config("reporting") -web_conf = Config("web") - zip_pwd = web_conf.zipped_download.zip_pwd if not isinstance(zip_pwd, bytes): zip_pwd = zip_pwd.encode() @@ -1928,6 +1937,16 @@ def init_logging(debug=False): action="store", help="Sync GCS with DB for a given time range (e.g., 12h, 1d, 2d)", ) + p.add_argument( + "--gcs-refetch-banned", + action="store", + help="Refetch banned tasks from GCS for a given time range (e.g., 12h, 1d, 2d)", + ) + p.add_argument( + "--samples-bucket", + action="store", + help="Specify GCS bucket for samples (used with --gcs-refetch-banned)", + ) args = p.parse_args() log = init_logging(args.debug) @@ -1953,6 +1972,10 @@ def init_logging(debug=False): gcs_sync(args.gcs_sync) sys.exit() + if args.gcs_refetch_banned: + gcs_refetch_banned(args.gcs_refetch_banned, samples_bucket=args.samples_bucket) + sys.exit() + delete_enabled = args.enable_clean failed_clean_enabled = args.enable_failed_clean if args.node: diff --git a/utils/gcp_pubsub_service.py b/utils/gcp_pubsub_service.py index 1604fd6bc34..0e503bbc23c 100644 --- a/utils/gcp_pubsub_service.py +++ b/utils/gcp_pubsub_service.py @@ -37,12 +37,24 @@ def process(self, msg, kwargs): class GCPPubSubService: def __init__(self): - self.gcp_cfg = Config("gcp") - self.project_id = self.gcp_cfg.gcp.get("project") - self.subscription_id = os.getenv("GCP_SUBSCRIPTION_ID") or self.gcp_cfg.gcp.get("subscription_id") + import threading + from lib.cuckoo.common.gcp import gcp_cfg + self.processing_ids = set() + self.ids_lock = threading.Lock() + self.project_id = gcp_cfg.gcp.get("project") + self.subscription_id = os.getenv("GCP_SUBSCRIPTION_ID") or (gcp_cfg.samples_pubsub.get("subscription_id") if hasattr(gcp_cfg, "samples_pubsub") else None) + self.samples_bucket = (gcp_cfg.samples_pubsub.get("samples_bucket", "sandbox-samples-unique") if hasattr(gcp_cfg, "samples_pubsub") else "sandbox-samples-unique") + + if not self.project_id: + # Fallback to env var if project is missing from config + self.project_id = os.getenv("GOOGLE_CLOUD_PROJECT") or os.getenv("GCLOUD_PROJECT") + + if not self.project_id: + log.error("GCP project ID not configured. Please set it in conf/gcp.conf") + sys.exit(1) - if not self.project_id or "" in self.project_id: - log.error("GCP project ID not set. Please update gcp.conf or set GCP_PROJECT_ID env var") + if not self.subscription_id: + log.error("GCP subscription ID not configured. Please set it in conf/gcp.conf") sys.exit(1) # Ensure project ID is available for all GCP client libraries @@ -50,42 +62,42 @@ def __init__(self): os.environ["GOOGLE_CLOUD_PROJECT"] = self.project_id if not os.environ.get("GCLOUD_PROJECT"): os.environ["GCLOUD_PROJECT"] = self.project_id - if not self.subscription_id or "" in self.subscription_id: - log.error("GCP subscription ID not set. Please update gcp.conf or set GCP_SUBSCRIPTION_ID env var") - sys.exit(1) try: from google.cloud import pubsub_v1 + from google.cloud import storage self.pubsub_v1 = pubsub_v1 - except ImportError: - log.error("google-cloud-pubsub not installed. Run `pip install google-cloud-pubsub`") - sys.exit(1) - service_account_path = self.gcp_cfg.gcp.get("service_account_path") - if service_account_path and os.path.exists(service_account_path): - self.subscriber = self.pubsub_v1.SubscriberClient.from_service_account_json(service_account_path) - else: - self.subscriber = self.pubsub_v1.SubscriberClient() - - self.subscription_path = self.subscriber.subscription_path(self.project_id, self.subscription_id) + auth_by = gcp_cfg.gcp.get("auth_by", "vm") + service_account_path = gcp_cfg.gcp.get("service_account_path") - # Initialize storage client for reuse to avoid SSLEOFError and repeated auth - try: - from google.cloud import storage - if service_account_path and os.path.exists(service_account_path): - self.storage_client = storage.Client.from_service_account_json(service_account_path) + if auth_by == "json" and service_account_path: + if not os.path.isabs(service_account_path): + from lib.cuckoo.common.constants import CUCKOO_ROOT + service_account_path = os.path.join(CUCKOO_ROOT, service_account_path) + if os.path.exists(service_account_path): + self.subscriber = pubsub_v1.SubscriberClient.from_service_account_json(service_account_path) + self.storage_client = storage.Client.from_service_account_json(service_account_path) + else: + log.error("GCP service account file not found: %s", service_account_path) + sys.exit(1) else: + self.subscriber = pubsub_v1.SubscriberClient() self.storage_client = storage.Client(project=self.project_id) + + self.subscription_path = self.subscriber.subscription_path(self.project_id, self.subscription_id) except ImportError: - log.error("google-cloud-storage not installed. Run `pip install google-cloud-storage`") + log.error("GCP Pub/Sub dependencies not installed. Please run `poetry install --extras gcp` or `pip install google-cloud-pubsub google-cloud-storage`") sys.exit(1) except Exception as e: - log.error("Failed to initialize GCP storage client: %s", e) + log.error("Failed to initialize GCP Pub/Sub client: %s", e) sys.exit(1) init_database() self.db = Database() + self._init_clients() + self.cuckoo_cfg = Config() self.tmp_path = os.path.join(self.cuckoo_cfg.cuckoo.get("tmppath", "/tmp"), "cape-external") if not path_exists(self.tmp_path): @@ -95,21 +107,66 @@ def __init__(self): log.error("Failed to create temporary directory %s: %s", self.tmp_path, e) sys.exit(1) + def _init_clients(self): + """(Re)initialize Google Cloud clients.""" + from lib.cuckoo.common.gcp import gcp_cfg + from google.cloud import pubsub_v1 + from google.cloud import storage + + auth_by = gcp_cfg.gcp.get("auth_by", "vm") + service_account_path = gcp_cfg.gcp.get("service_account_path") + + if auth_by == "json" and service_account_path: + if not os.path.isabs(service_account_path): + from lib.cuckoo.common.constants import CUCKOO_ROOT + service_account_path = os.path.join(CUCKOO_ROOT, service_account_path) + if os.path.exists(service_account_path): + self.subscriber = pubsub_v1.SubscriberClient.from_service_account_json(service_account_path) + self.storage_client = storage.Client.from_service_account_json(service_account_path) + else: + log.error("GCP service account file not found: %s", service_account_path) + # Fallback to default credentials + self.subscriber = pubsub_v1.SubscriberClient() + self.storage_client = storage.Client(project=self.project_id) + else: + self.subscriber = pubsub_v1.SubscriberClient() + self.storage_client = storage.Client(project=self.project_id) + + self.subscription_path = self.subscriber.subscription_path(self.project_id, self.subscription_id) + def process_message(self, message): - correlation_id = message.message_id + msg_id = message.message_id + with self.ids_lock: + if msg_id in self.processing_ids: + log.warning("[%s] Already processing this message, ignoring redelivery", msg_id) + return + self.processing_ids.add(msg_id) + + correlation_id = msg_id + import time + start_time = time.time() try: payload = json.loads(message.data.decode("utf-8")) - correlation_id = payload.get("uuid") or payload.get("transaction_id") or message.message_id + correlation_id = payload.get("uuid") or payload.get("transaction_id") or msg_id # Create a localized logger with correlation_id mlog = GCPServiceLogger(log, {"correlation_id": correlation_id}) sample_hash = payload.get("sample_hash") gcs_uri = payload.get("gcs_uri") + + if not sample_hash and gcs_uri: + # Extract hash from URI if missing + sample_hash = os.path.basename(gcs_uri) + if not sample_hash or not gcs_uri: - mlog.error("Missing sample_hash or gcs_uri in payload") - message.nack() - return + # If we only have hash, construct URI using samples_bucket + if sample_hash and self.samples_bucket: + gcs_uri = f"gs://{self.samples_bucket}/{sample_hash}" + else: + mlog.error("Missing sample_hash or gcs_uri in payload") + message.nack() + return sandbox_options = payload.get("sandbox_options", "") parent_id = payload.get("parent_id", "") transaction_id = payload.get("transaction_id", "") @@ -146,15 +203,30 @@ def process_message(self, message): # Check if sample exists locally sample_hash = os.path.basename(sample_hash) local_path = os.path.join(CUCKOO_ROOT, "storage", "binaries", sample_hash) - is_temp = False if not path_exists(local_path): mlog.info("Sample %s not found locally, fetching from GCS: %s", sample_hash, gcs_uri) fd, temp_path = tempfile.mkstemp(prefix=sample_name, dir=self.tmp_path) os.close(fd) - if download_from_gcs(gcs_uri, temp_path, logger=mlog, client=self.storage_client): + dl_start = time.time() + + # Retry download once with fresh client if TransportError/SSL issues occur + success = False + for attempt in range(2): + try: + if download_from_gcs(gcs_uri, temp_path, logger=mlog, client=self.storage_client): + success = True + break + except Exception as e: + if attempt == 0: + mlog.warning("Transient error during download, recreating client and retrying: %s", e) + self._init_clients() + else: + mlog.error("Persistent error during download: %s", e) + + if success: + mlog.info("Download finished in %.2f seconds", time.time() - dl_start) local_path = temp_path - is_temp = True else: mlog.error("Failed to download sample from GCS: %s", gcs_uri) message.nack() @@ -162,7 +234,8 @@ def process_message(self, message): # Submit to CAPE try: - task_ids = submit_file( + submit_start = time.time() + task_ids, extra_details = submit_file( db=self.db, file_path=local_path, options=sandbox_options, @@ -171,32 +244,72 @@ def process_message(self, message): filename=sample_name, ) if task_ids: - mlog.info("Successfully submitted task(s) %s for sample %s", task_ids, sample_hash) + mlog.info("Successfully submitted task(s) %s for sample %s in %.2f seconds", task_ids, sample_hash, time.time() - submit_start) message.ack() + elif extra_details.get("errors"): + # Check if it was a duplicate, empty file, or junk + error_str = str(extra_details["errors"]) + if "Duplicate" in error_str or "junk_filter" in error_str or "Empty file" in error_str: + mlog.info("Sample %s skipped: %s", sample_hash, error_str) + message.ack() + else: + mlog.error("Failed to add task to database for sample %s: %s", sample_hash, extra_details["errors"]) + message.nack() else: - mlog.error("Failed to add task to database for sample %s", sample_hash) - message.nack() + # No tasks but no errors means it was filtered (junk) + mlog.info("Sample %s processed but no tasks created (filtered).", sample_hash) + message.ack() except Exception as e: - mlog.error("Failed to add task to database: %s", e) + import traceback + mlog.error("Failed to add task to database: %s\n%s", e, traceback.format_exc()) message.nack() finally: - if is_temp and path_exists(local_path): - try: - os.unlink(local_path) - except Exception as e: - mlog.warning("Failed to delete temp file %s: %s", local_path, e) + # We do NOT delete local_path or cuckoo-sflock here. + # CAPE's AnalysisManager will copy them to storage/binaries when analysis starts. + # Background cleaner handles /tmp disk space. + self.db.session.remove() + # Force GC to close any dangling FDs from sflock or File objects + import gc + gc.collect() except Exception as e: log.error("[%s] Error processing message: %s", correlation_id, e) message.nack() self.db.session.remove() + finally: + with self.ids_lock: + self.processing_ids.discard(msg_id) + log.info("[%s] Total processing time: %.2f seconds", correlation_id, time.time() - start_time) def start(self): log.info("Starting GCP Pub/Sub subscriber on %s", self.subscription_path) + from lib.cuckoo.common.gcp import gcp_cfg + max_messages = 5 + lease_duration = 1800 + if hasattr(gcp_cfg, "samples_pubsub"): + max_messages = int(gcp_cfg.samples_pubsub.get("max_messages", 5)) + lease_duration = int(gcp_cfg.samples_pubsub.get("lease_duration", 1800)) + + # Increase lease duration for big files and limit concurrency + # Support both old and new parameter names for max compatibility + kwargs = { + "max_messages": max_messages, + "max_duration_per_lease_extension": lease_duration, + } + # Some versions use max_lease_duration + if hasattr(self.pubsub_v1.types.FlowControl, "max_lease_duration"): + kwargs["max_lease_duration"] = lease_duration + + flow_control = self.pubsub_v1.types.FlowControl(**kwargs) + while True: - streaming_pull_future = self.subscriber.subscribe(self.subscription_path, callback=self.process_message) + streaming_pull_future = self.subscriber.subscribe( + self.subscription_path, + callback=self.process_message, + flow_control=flow_control + ) try: # result() keeps the main thread alive while the subscriber runs in background diff --git a/utils/submit.py b/utils/submit.py index af4597872bf..61a05fcf4c7 100644 --- a/utils/submit.py +++ b/utils/submit.py @@ -9,7 +9,6 @@ import os import random import sys -import shutil try: import requests @@ -47,13 +46,13 @@ def submit_file( clock=None, unique=False, quiet=False, - category = None, - filename = None, + category=None, + filename=None, ): if not File(file_path).get_size(): if not quiet: print((bold(yellow("Empty") + ": sample {0} (skipping file)".format(file_path)))) - return [] + return [], {"errors": ["Empty file"]} if unique: with db.session.begin(): @@ -62,14 +61,21 @@ def submit_file( msg = ": Sample {0} (skipping file)".format(file_path) if not quiet: print((bold(yellow("Duplicate")) + msg)) - return [] + return [], {"errors": ["Duplicate"]} + + # Ensure logging is available + import logging + l = logging.getLogger(__name__) tmp_path = "" try: + # Create a temp file with the correct name for demuxing (if needed) + # Some demuxers rely on the filename/extension with open(file_path, "rb") as f: if not filename: filename = os.path.basename(file_path) tmp_path = store_temp_file(f.read(), sanitize_filename(filename)) + with db.session.begin(): # ToDo expose extra_details["errors"] task_ids, extra_details = db.demux_sample_and_add_to_db( @@ -88,26 +94,22 @@ def submit_file( route=route, category=category, ) - return task_ids + return task_ids, extra_details except CuckooDemuxError as e: - print((bold(red("Error")) + ": {0}".format(e))) - return [] + l.error("Demux error: %s", e) + return [], {"errors": [str(e)]} + except Exception as e: + import traceback + l.error("Unexpected error in submit_file: %s\n%s", e, traceback.format_exc()) + return [], {"errors": [str(e)]} finally: - if tmp_path and path_exists(tmp_path): - if os.path.isfile(tmp_path): - parent_dir = os.path.dirname(tmp_path) - parent_name = os.path.basename(parent_dir) - is_upload_dir = False - if isinstance(parent_name, bytes): - is_upload_dir = parent_name.startswith(b"upload_") - else: - is_upload_dir = parent_name.startswith("upload_") - if is_upload_dir: - shutil.rmtree(parent_dir) - else: - os.unlink(tmp_path) - else: + # If submission failed, clean up the temp file. + # If it succeeded, CAPE's AnalysisManager will handle it. + if not task_ids and tmp_path and path_exists(tmp_path): + try: os.unlink(tmp_path) + except Exception as e: + l.warning("Failed to delete temp file %s: %s", tmp_path, e) def main(): @@ -415,7 +417,7 @@ def main(): task_ids = json["data"].get("task_ids") else: - task_ids = submit_file( + task_ids, extra_details = submit_file( db=db, file_path=file_path, package=args.package,