Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion lib/cuckoo/common/demux.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,13 @@ def demux_sample(filename: bytes, package: str, options: str, use_sflock: bool =
error_list.append({os.path.basename(filename).decode(): error_msg})
new_retlist.append((filename, platform))
else:
for filename, platform, magic_type, file_size in retlist:
for entry in retlist:
if not isinstance(entry, (list, tuple)) or len(entry) < 2:
log.warning("Skipping invalid entry in retlist: %s", entry)
continue
filename, platform = entry[0], entry[1]
magic_type = entry[2] if len(entry) > 2 else ""
file_size = entry[3] if len(entry) > 3 else 0
# verify not Windows binaries here:
if platform == "linux" and not linux_enabled and "Python" not in magic_type:
error_list.append({os.path.basename(filename).decode(): "Linux processing is disabled"})
Expand Down
180 changes: 169 additions & 11 deletions lib/cuckoo/common/gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
import os
import logging
import time
import shutil

from lib.cuckoo.common.config import Config
from lib.cuckoo.common.path_utils import path_exists

try:
from google.api_core.exceptions import Forbidden
Expand All @@ -23,9 +25,25 @@

log = logging.getLogger(__name__)
gcp_cfg = Config("gcp")
reporting_conf = Config("reporting")

# GCS Configuration
GCS_ENABLED = reporting_conf.gcs.get("enabled", False) if hasattr(reporting_conf, "gcs") else False
GCS_DELETE_AFTER_UPLOAD = reporting_conf.gcs.get("delete_after_upload", False) if hasattr(reporting_conf, "gcs") else False

def download_from_gcs(gcs_uri, destination_path, logger=None):
gcs_uploader = None
GCSUploader = None
if GCS_ENABLED:
from modules.reporting.gcs import GCSUploader
try:
# Initialize without args to load from reporting.conf
gcs_uploader = GCSUploader()
except Exception as e:
log.error("Failed to initialize GCS Uploader: %s", e)
GCS_ENABLED = False


def download_from_gcs(gcs_uri, destination_path, logger=None, client=None):
"""
Downloads a file from GCS.
gcs_uri: gs://bucket_name/object_name
Expand All @@ -49,19 +67,31 @@ def download_from_gcs(gcs_uri, destination_path, logger=None):

bucket_name, blob_name = path_parts

service_account_path = gcp_cfg.gcp.get("service_account_path")
storage_client = client
own_client = False
if not storage_client:
project_id = gcp_cfg.gcp.get("project")
if project_id and "<project_id>" in project_id:
project_id = None

if service_account_path and os.path.exists(service_account_path):
storage_client = storage.Client.from_service_account_json(service_account_path)
else:
storage_client = storage.Client()
service_account_path = gcp_cfg.gcp.get("service_account_path")

if service_account_path and os.path.exists(service_account_path):
storage_client = storage.Client.from_service_account_json(service_account_path)
else:
storage_client = storage.Client(project=project_id)
own_client = True

bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(blob_name)
try:
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(blob_name)

logger.info("Downloading %s to %s", gcs_uri, destination_path)
blob.download_to_filename(destination_path)
return True
logger.info("Downloading %s to %s", gcs_uri, destination_path)
blob.download_to_filename(destination_path)
return True
finally:
if own_client and hasattr(storage_client, "close"):
storage_client.close()
except Exception as e:
logger.error("Failed to download from GCS %s: %s", gcs_uri, e)
return False
Expand All @@ -81,6 +111,12 @@ class GCP(object):
def __init__(self) -> None:
self.dist_cfg = Config("distributed")
self.project_id = self.dist_cfg.GCP.project_id
if self.project_id and "<project_id>" not in self.project_id:
if not os.environ.get("GOOGLE_CLOUD_PROJECT"):
os.environ["GOOGLE_CLOUD_PROJECT"] = self.project_id
if not os.environ.get("GCLOUD_PROJECT"):
os.environ["GCLOUD_PROJECT"] = self.project_id

self.zones = [zone.strip() for zone in self.dist_cfg.GCP.zones.split(",")]
self.GCP_BASE_URL = "https://compute.googleapis.com/compute/v1/"

Expand Down Expand Up @@ -158,3 +194,125 @@ def autodiscovery(self):
log.exception(e)

time.sleep(int(self.dist_cfg.GCP.autodiscovery))


def gcs_replay(task_range):
if not GCS_ENABLED:
log.error("GCS is not enabled in reporting.conf")
return

from lib.cuckoo.core.database import Database
from lib.cuckoo.common.constants import CUCKOO_ROOT

main_db = Database()

task_ids = []
try:
if "-" in task_range:
start, end = map(int, task_range.split("-"))
task_ids = list(range(start, end + 1))
elif "," in task_range:
task_ids = [int(x) for x in task_range.split(",")]
else:
task_ids = [int(task_range)]
except ValueError:
log.error("Invalid task range format. Use 'start-end', 'id1,id2', or 'id'.")
return

for task_id in task_ids:
report_path = os.path.join(CUCKOO_ROOT, "storage", "analyses", str(task_id))
if not path_exists(report_path):
continue

try:
with main_db.session.begin():
task = main_db.view_task(task_id)
if not task:
log.error("Task %d not found in database", task_id)
continue

tlp = task.tlp
metadata = GCSUploader.parse_custom_string(task.custom)

samples = main_db.find_sample(task_id=task_id)
if samples:
metadata["sha256"] = samples[0].sample.sha256
metadata["md5"] = samples[0].sample.md5
metadata["sha1"] = samples[0].sample.sha1

metadata["task_id"] = task_id

gcs_upload_report(report_path, task_id, tlp, metadata=metadata)

except Exception as e:
log.error("Failed to replay GCS upload for task %d: %s", task_id, e)


def gcs_upload_report(report_path, analysis_id, tlp=None, metadata=None):
if not GCS_ENABLED:
return

try:
log.info("[GCS] Task %d ==> GCS", analysis_id)
gcs_uploader.upload(report_path, analysis_id, tlp=tlp, metadata=metadata)

if GCS_DELETE_AFTER_UPLOAD:
try:
shutil.rmtree(report_path)
log.info("Deleted local report for task %d after GCS upload", analysis_id)
except Exception as e:
log.error("Failed to delete local report %s: %s", report_path, e)

except Exception as e:
log.error("Failed to upload report to GCS for task %d: %s", analysis_id, e)


def gcs_sync(time_range):
if not GCS_ENABLED:
log.error("GCS is not enabled in reporting.conf")
return

from lib.cuckoo.common.cleaners_utils import convert_into_time
from lib.cuckoo.core.database import Database
from lib.cuckoo.core.data.task import TASK_REPORTED
from concurrent.futures import ThreadPoolExecutor, as_completed

main_db = Database()

try:
past_time = convert_into_time(time_range)
except ValueError as e:
log.error("Invalid time range: %s", e)
return

log.info("Fetching tasks from DB completed after %s", past_time)
with main_db.session.begin():
# Only check reported tasks as they are the ones supposed to be in GCS
tasks = main_db.list_tasks(completed_after=past_time, status=TASK_REPORTED)
db_ids = [t.id for t in tasks]

if not db_ids:
log.info("No reported tasks found in DB for the specified time range")
return

log.info("Found %d tasks in DB. Checking existence in GCS...", len(db_ids))

missing_ids = []
max_workers = 20 # Use a reasonable number of threads for GCS API calls
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_id = {executor.submit(gcs_uploader.check_exists, tid): tid for tid in db_ids}
for future in as_completed(future_to_id):
tid = future_to_id[future]
try:
if not future.result():
missing_ids.append(tid)
except Exception as e:
log.error("Error checking GCS existence for task %d: %s", tid, e)

if not missing_ids:
log.info("All tasks are already in GCS.")
return

log.info("Found %d missing tasks in GCS: %s", len(missing_ids), sorted(missing_ids))
# Trigger replay for missing IDs
gcs_replay(",".join(map(str, sorted(missing_ids))))
6 changes: 6 additions & 0 deletions modules/reporting/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ def upload_files_individually(self, analysis_id, source_directory, tlp=None, met

log.info("Successfully uploaded files for analysis %s to GCS.", analysis_id)

def check_exists(self, analysis_id):
"""Check if any blobs exist for the given analysis ID."""
prefix = str(analysis_id)
blobs = list(self.storage_client.list_blobs(self.bucket, prefix=prefix, max_results=1))
return len(blobs) > 0


class GCS(Report):
"""
Expand Down
56 changes: 23 additions & 33 deletions utils/dist.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@

HAVE_GCP = False
if dist_conf.GCP.enabled:
from lib.cuckoo.common.gcp import GCP, HAVE_GCP
from lib.cuckoo.common.gcp import GCP, HAVE_GCP, GCS_ENABLED, gcs_replay, gcs_sync, gcs_upload_report, gcs_uploader

cloud = GCP()

Expand Down Expand Up @@ -104,19 +104,6 @@ def filter(self, record):
NFS_FETCH = dist_conf.distributed.get("nfs")
RESTAPI_FETCH = dist_conf.distributed.get("restapi")

# GCS Configuration
GCS_ENABLED = reporting_conf.gcs.get("enabled", False) if hasattr(reporting_conf, "gcs") else False
GCS_DELETE_AFTER_UPLOAD = reporting_conf.gcs.get("delete_after_upload", False) if hasattr(reporting_conf, "gcs") else False

if GCS_ENABLED:
from modules.reporting.gcs import GCSUploader
try:
# Initialize without args to load from reporting.conf
gcs_uploader = GCSUploader()
except Exception as e:
print("Failed to initialize GCS Uploader: %s", e)
GCS_ENABLED = False

INTERVAL = 10

# controller of dead nodes
Expand Down Expand Up @@ -1028,25 +1015,10 @@ def fetch_latest_reports_nfs(self):
log.exception("Failed to save iocs for parent sample: %s", str(e))

if GCS_ENABLED:
try:
# We assume report_path is the analysis folder root.
# TLP is not readily available in 't' object without loading report.json or task options.
# We can try to get TLP from task options if available, or just pass None.
tlp = t.tlp
metadata = GCSUploader.parse_custom_string(t.custom)
metadata.update(hashes)
metadata["task_id"] = t.main_task_id
gcs_uploader.upload(report_path, t.main_task_id, tlp=tlp, metadata=metadata)

if GCS_DELETE_AFTER_UPLOAD:
try:
shutil.rmtree(report_path)
log.info("Deleted local report for task %d after GCS upload", t.main_task_id)
except Exception as e:
log.error("Failed to delete local report %s: %s", report_path, e)

except Exception as e:
log.error("Failed to upload report to GCS for task %d: %s", t.main_task_id, e)
metadata = gcs_uploader.parse_custom_string(t.custom)
metadata.update(hashes)
metadata["task_id"] = t.main_task_id
gcs_upload_report(report_path, t.main_task_id, tlp=t.tlp, metadata=metadata)

t.retrieved = True
t.finished = True
Expand Down Expand Up @@ -1946,6 +1918,16 @@ def init_logging(debug=False):
action="store_true",
help="Disable retrieval threads (use when running Go Fast-Fetcher)",
)
p.add_argument(
"--gcs-replay",
action="store",
help="Replay GCS upload for a range of tasks (e.g., 1-100 or 1,2,3)",
)
p.add_argument(
"--gcs-sync",
action="store",
help="Sync GCS with DB for a given time range (e.g., 12h, 1d, 2d)",
)

args = p.parse_args()
log = init_logging(args.debug)
Expand All @@ -1963,6 +1945,14 @@ def init_logging(debug=False):
main_db.set_status(args.force_reported, TASK_REPORTED)
sys.exit()

if args.gcs_replay:
gcs_replay(args.gcs_replay)
sys.exit()

if args.gcs_sync:
gcs_sync(args.gcs_sync)
sys.exit()

delete_enabled = args.enable_clean
failed_clean_enabled = args.enable_failed_clean
if args.node:
Expand Down
Loading
Loading