diff --git a/.gitignore b/.gitignore index bdebada..8e44a0a 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,5 @@ terraform.tfstate terraform.tfstate.backup terraform.tfvars /node_modules +.venv +.mypy_cache diff --git a/cleanup/aws-gc.py b/cleanup/aws-gc.py new file mode 100755 index 0000000..009e6ac --- /dev/null +++ b/cleanup/aws-gc.py @@ -0,0 +1,866 @@ +#!/usr/bin/env python3 + +import argparse +from collections import defaultdict +from dataclasses import dataclass +import datetime +from itertools import groupby +import json +import logging +import os +import platform +import secrets +import signal +import sys +import time +from concurrent.futures import ThreadPoolExecutor + + +__version__ = "0.1.0" + +GREEN = "\033[0;32m" +RED = "\033[0;31m" +YELLOW = "\033[0;33m" +PURPLE = "\033[0;35m" +BLUE = "\033[0;34m" + +RESET = "\033[0m" +BOLD = "\033[1m" + +try: + import botocore + import botocore.exceptions + from botocore.paginate import PageIterator + import botocore.config + import boto3 + from tqdm import tqdm + from tqdm.contrib.logging import logging_redirect_tqdm +except ImportError as err: + print( + f"{RED}Import error: {err}{RESET}\n" + "Install the dependencies with this command:\n" + f"{GREEN}pip3 install --upgrade boto3 botocore tqdm{RESET}" + ) + sys.exit(1) + +################### +### CLI parsing ### +################### + + +global_args = argparse.ArgumentParser(add_help=False) +global_args.add_argument( + "--debug", + action="store_true", + help="Enable verbose debug logging", +) +global_args.add_argument( + "--version", + action="version", + version=f"Inventory Collector v{__version__}", +) +global_args.add_argument( + "--no-progress-bars", + action="store_true", + help="Disable the progress bars", +) +global_args.add_argument( + "--concurrency", + type=int, + default=20, + help="Maximum number of concurrent API calls to perform within the scope of AWS region and resource type", +) + +cli = argparse.ArgumentParser( + parents=[global_args], + description="Filter and delete AWS resources", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, +) + +subcommands = cli.add_subparsers(help="subcommands") +plan_command = subcommands.add_parser( + "plan", + help="List resources that match the given criteria and save them to a plan file. " + "It can then be fed to the delete command to perform the actual deletion of the resources.", +) +plan_command.add_argument( + "--id", + type=str, + help="Include resources with IDs that contain any of the given substrings (case-insensitive). Syntax: value1,value2,...", +) +plan_command.add_argument( + "--tag", + type=str, + help="Include resources that have a any of the given tags (case-insensitive); Syntax: key[=value],...", +) +plan_command.add_argument( + "--exclude-id", + type=str, + help="Exclude resources with IDs that contain any of the given substrings (case-insensitive). Syntax: value1,value2,... " + "Takes precedence over --id", +) +plan_command.add_argument( + "--exclude-tag", + type=str, + help="Exclude resources that have any of the given tags (case-insensitive); Syntax: key[=value],... " + "Takes precedence over --tag", +) +plan_command.add_argument( + "--type", + type=str, + help="Include resources with types that contain any of the given substrings (case-insensitive). Syntax: value1,value2,...", +) +plan_command.add_argument( + "--exclude-type", + type=str, + help="Exclude resources with types that contain any of the given substrings (case-insensitive). Syntax: value1,value2,... " + "Takes precedence over --type", +) +plan_command.add_argument( + "--regions", + type=str, + help="Comma-separated list of AWS regions to clean up. " + "Use 'current' to target only the current region. (default: all active regions)", +) +default_plan_file = "aws-gc-plan.json" + +plan_command.add_argument( + "--output", + type=str, + help="Path to a file or `-` for stdout where JSON plan will be written", + default=default_plan_file, +) +plan_command.set_defaults(func=lambda args: PlanCommand(args).run()) + +delete_command = subcommands.add_parser( + "delete", + help="Delete resources from a plan file generated by the `plan` command", +) +delete_command.add_argument( + "plan", + help="Path to a file or `-` for stdin with JSON plan generated by the `plan` command", + default=default_plan_file, +) +delete_command.set_defaults(func=lambda args: DeleteCommand(args).run()) + +args = cli.parse_args() + +boto_config = botocore.config.Config( + retries={ + "total_max_attempts": int(os.getenv("AWS_MAX_ATTEMPTS", "20")), + # Even though this is an experimental feature, it is just too awesome. + # It increases the performance at least by 2 times. + "mode": "adaptive", + }, + # times two to account for retries + max_pool_connections=args.concurrency * 2, +) + + +class StringPattern: + def __init__(self, pattern: str): + self.values = pattern.strip().lower().split(",") + + def matches(self, suspect: str): + suspect = suspect.lower() + return any(value in suspect for value in self.values) + + +class DictPattern: + def __init__(self, pattern: str): + self.items = [ + DictPattern.parse_kv(item) for item in pattern.strip().lower().split(",") + ] + + @staticmethod + def parse_kv(item: str): + parts = item.split("=", 1) + return parts[0], None if len(parts) == 1 else parts[1] + + def matches(self, scrutinee: dict[str, str]): + return any( + (scrutinee_value := scrutinee.get(key)) is not None + and (value is None or scrutinee_value == value) + for key, value in self.items + ) + + +############################# +### Logging configuration ### +############################# + + +class CustomFormatter(logging.Formatter): + LEVELS = { + logging.DEBUG: BLUE, + logging.INFO: GREEN, + logging.WARNING: YELLOW, + logging.ERROR: RED, + logging.CRITICAL: RED, + } + + def __init__(self): + padding = ( + max(len(level) for level in CustomFormatter.LEVELS.values()) + + len(RESET) + + len(GREEN) + ) + super().__init__( + f"{PURPLE}%(asctime)s.%(msecs)03d UTC{RESET} {GREEN}%(levelname)-{padding}s{RESET} " + f"{BOLD}%(name)s{RESET}: %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + ) + + def format(self, record): + level_color = CustomFormatter.LEVELS.get(record.levelno) + + if level_color is not None: + record.levelname = f"{level_color}{record.levelname}{RESET}" + + return logging.Formatter.format(self, record) + + +logger = logging.getLogger("aws-gc") +logger.setLevel(logging.DEBUG if args.debug else logging.INFO) + +handler = logging.StreamHandler(sys.stderr) + +handler.setFormatter(CustomFormatter()) +logger.addHandler(handler) + +# There are some noisy logs from this module. We provide better logging instead +logging.getLogger("botocore.credentials").setLevel(logging.ERROR) + + +###################### +### Business logic ### +###################### + +# (not supported by Cloud Control API) +# aws_cloudwatch_event_target +# aws_ebs_snapshot + +# (too low level, but supported by Cloud Control API) +# aws_eks_access_entry +# aws_eks_node_group +# aws_lambda_function_event_invoke_config +# aws_lambda_permission +# aws_s3_bucket_policy +# aws_kms_alias + +# (too low level, not supported for listing by Cloud Control API) +# aws_iam_role_policy +# aws_sqs_queue_policy + +# (too low level, not supported by Cloud Control API) +# aws_dynamodb_table_item +# aws_ecs_cluster_capacity_providers +# aws_eks_access_policy_association +# aws_iam_access_key +# aws_iam_role_policy_attachment +# aws_iam_user_policy_attachment +# aws_s3_bucket_lifecycle_configuration +# aws_s3_bucket_logging +# aws_s3_bucket_object_lock_configuration +# aws_s3_bucket_server_side_encryption_configuration +# aws_s3_bucket_versioning +# aws_security_group_rule + +# (these were in ancient versions of Elastio, support is no longer needed) +# aws_appautoscaling_policy +# aws_appautoscaling_target + +# (covered) +# aws_autoscaling_group +# aws_batch_compute_environment +# aws_batch_job_definition +# aws_batch_job_queue +# aws_cloudformation_stack +# aws_cloudwatch_event_bus +# aws_cloudwatch_event_rule +# aws_cloudwatch_log_group +# aws_cloudwatch_metric_alarm +# aws_dynamodb_table +# aws_ebs_volume +# aws_ecs_capacity_provider +# aws_ecs_cluster +# aws_ecs_service +# aws_ecs_task_definition +# aws_eks_cluster +# aws_iam_instance_profile +# aws_iam_openid_connect_provider +# aws_iam_policy +# aws_iam_role +# aws_iam_user +# aws_instance +# aws_kms_key +# aws_lambda_event_source_mapping +# aws_lambda_function +# aws_launch_template +# aws_s3_bucket +# aws_scheduler_schedule +# aws_scheduler_schedule_group +# aws_security_group +# aws_sfn_state_machine +# aws_sns_topic +# aws_sns_topic_subscription +# aws_sqs_queue +# aws_ssm_parameter + + +@dataclass +class ResourceType: + name: str + has_random_id: bool = False + supports_tags: bool = True + + @staticmethod + def from_name(name: str): + type = RESOURCE_TYPES_BY_NAME.get(name) + if type is None: + raise Exception(f"Unknown resource type: {name}") + return type + + +RESOURCE_TYPES = [ + ResourceType("AWS::AutoScaling::AutoScalingGroup"), + ResourceType("AWS::Batch::ComputeEnvironment"), + ResourceType("AWS::Batch::JobDefinition"), + ResourceType("AWS::Batch::JobQueue"), + ResourceType("AWS::CloudFormation::Stack"), + ResourceType("AWS::Events::EventBus"), + ResourceType("AWS::Events::Rule"), + ResourceType("AWS::Logs::LogGroup"), + ResourceType("AWS::CloudWatch::Alarm"), + ResourceType("AWS::DynamoDB::Table"), + ResourceType("AWS::EC2::Volume"), + ResourceType("AWS::ECS::CapacityProvider"), + ResourceType("AWS::ECS::Cluster"), + ResourceType("AWS::ECS::Service"), + ResourceType("AWS::ECS::TaskDefinition"), + ResourceType("AWS::EKS::Cluster"), + ResourceType("AWS::IAM::InstanceProfile"), + ResourceType("AWS::IAM::OIDCProvider"), + ResourceType("AWS::IAM::ManagedPolicy"), + ResourceType("AWS::IAM::Role"), + ResourceType("AWS::IAM::User"), + ResourceType("AWS::EC2::Instance", has_random_id=True), + ResourceType("AWS::KMS::Key", has_random_id=True), + ResourceType("AWS::Lambda::EventSourceMapping", has_random_id=True), + ResourceType("AWS::Lambda::Function"), + ResourceType("AWS::EC2::LaunchTemplate"), + ResourceType("AWS::S3::Bucket"), + ResourceType("AWS::Scheduler::Schedule"), + ResourceType("AWS::Scheduler::ScheduleGroup"), + ResourceType("AWS::EC2::SecurityGroup", has_random_id=True), + ResourceType("AWS::StepFunctions::StateMachine"), + ResourceType("AWS::SNS::Topic"), + ResourceType("AWS::SNS::Subscription"), + ResourceType("AWS::SQS::Queue"), + ResourceType("AWS::SSM::Parameter"), +] +RESOURCE_TYPES_BY_NAME = { + resource_type.name: resource_type for resource_type in RESOURCE_TYPES +} + + +class PlanCommand: + def __init__(self, args): + self._tag = args.tag and DictPattern(args.tag) + self._id = args.id and StringPattern(args.id) + self._exclude_id = args.exclude_id and StringPattern(args.exclude_id) + self._exclude_tag = args.exclude_tag and DictPattern(args.exclude_tag) + self._type = args.type and StringPattern(args.type) + self._exclude_type = args.exclude_type and StringPattern(args.exclude_type) + self._output = args.output + + ec2 = boto3.client("ec2", config=boto_config) + + def active_regions(): + return [ + region["RegionName"] + for region in ec2.describe_regions()["Regions"] + if region.get("OptInStatus") + in (None, "opt-in-not-required", "opted-in") + ] + + regions: list[str] = ( + args.regions.split(",") if args.regions else active_regions() + ) + self._regions = sorted( + (ec2.meta.region_name if region == "current" else region) + for region in set(regions) + ) + self._list_resources_counter = ProgressCounter( + "cloudcontrol:ListResources API calls" + ) + self._get_resource_counter = ProgressCounter( + "cloudcontrol:GetResource API calls" + ) + self._resources_counter = ProgressCounter("resources found") + + def run(self): + plan = self.plan().to_json() + + if self._output == "-": + print(plan) + return + + with open(self._output, "w") as file: + file.write(plan) + + def plan(self): + logger.info(f"Listing resources in {len(self._regions)} regions") + + effective_resource_types = [ + type + for type in RESOURCE_TYPES + if ( + ( + self._exclude_type is None + or not self._exclude_type.matches(type.name) + ) + and (self._type is None or self._type.matches(type.name)) + ) + ] + + list_resources_params = [ + RegionAndTypeCtx( + log_padding=max(len(region) for region in self._regions) + + max( + len(resource_type.name) + for resource_type in effective_resource_types + ) + + 4, + region=region, + resource_type=resource_type, + ) + for region in self._regions + for resource_type in effective_resource_types + ] + + progress = progress_bar( + total=len(list_resources_params), + desc="(listing resources)", + unit="region-type", + ) + + batches = ThreadPoolExecutor().map( + lambda ctx: self.list_resources(progress, ctx), + list_resources_params, + ) + + resources = [resource for batch in batches for resource in batch] + + logger.info(f"Generated a plan that contains {len(resources)} resources") + + return Plan(resources) + + def list_resources(self, progress: tqdm, ctx: "RegionAndTypeCtx"): + logger.info(f"{ctx.log_prefix()} {GREEN}cloudcontrol:ListResources{RESET}") + + paginator = ctx.cloud_control.get_paginator("list_resources") + + resource_ids = [ + resource["Identifier"] + for page in ObservablePageIterator( + self._list_resources_counter, + paginator.paginate(TypeName=ctx.resource_type.name), + ) + for resource in page.get("ResourceDescriptions", []) + if ( + ctx.resource_type.has_random_id + or ( + ( + self._exclude_id is None + or not self._exclude_id.matches(resource["Identifier"]) + ) + and (self._id is None or self._id.matches(resource["Identifier"])) + ) + ) + ] + + if self._tag is not None or self._exclude_tag is not None: + resources = self.enrich_resources(ctx, resource_ids) + else: + resources = [ctx.resource(id) for id in resource_ids] + self._resources_counter.incr(len(resources)) + + logger.info(f"{ctx.log_prefix()} {BOLD}discovery completed{RESET}") + progress.update(1) + + return resources + + # Fetch tags info for the given resources + def enrich_resources(self, ctx: "RegionAndTypeCtx", ids: list[str]): + resources: list[Resource] = list() + + iter = ctx.thread_pool.map(lambda id: self.get_resource(ctx, id), ids) + + for resource in iter: + if resource is None or resource.tags is None: + continue + + if self._exclude_tag is not None and self._exclude_tag.matches( + resource.tags + ): + continue + + if self._tag is None or self._tag.matches(resource.tags): + resources.append(resource) + self._resources_counter.incr() + + return resources + + def get_resource(self, ctx: "RegionAndTypeCtx", id: str): + logger.info(f"{ctx.log_prefix()} {BLUE}cloudcontrol:GetResource {id}{RESET}") + + try: + resource = ctx.cloud_control.get_resource( + TypeName=ctx.resource_type.name, Identifier=id + ) + except botocore.exceptions.ClientError as err: + ctx.log_api_error(f"cloudcontrol:GetResource({id})", err) + return None + + self._get_resource_counter.incr() + + properties = json.loads(resource["ResourceDescription"].get("Properties", "{}")) + + raw_tags = properties.get("Tags") + + if isinstance(raw_tags, list): + tags = {tag["Key"]: tag.get("Value", "") for tag in raw_tags} + else: + tags = raw_tags + + res = ctx.resource( + id, + tags=tags, + ) + + return res + + +class DeleteCommand: + def __init__(self, args): + if args.plan == "-": + plan = sys.stdin.read() + else: + with open(args.plan, "r") as file: + plan = file.read() + + self.plan = Plan.from_json(plan) + + self._deleted_resources_counter = progress_bar( + total=len(self.plan.resources), + desc="(deleting resources)", + unit="resource", + ) + + self._log_padding = ( + max(len(resource.region) for resource in self.plan.resources) + + max(len(resource.type.name) for resource in self.plan.resources) + + 4 + ) + + def run(self): + logger.info(f"Deleting {len(self.plan.resources)} resources") + + ordered_resource_types = [ + # CFN stacks are intentionally deleted only after all other + # resources, because they may contain resources deletion of which + # must wait before other resources are deleted. E.g. if CFN stack + # contains IAM roles, we can't destroy it before batch compute + # environments, because they may go into INVALID state and it won't + # be possible to delete them without restoring the IAM roles they + # need. + "AWS::CloudFormation::Stack", + "AWS::IAM::Role", + ] + + ordered_resources: list["Resource"] = [] + unordered_resources: list["Resource"] = [] + + for resource in self.plan.resources: + if resource.type.name in ordered_resource_types: + ordered_resources.append(resource) + else: + unordered_resources.append(resource) + + # First delete unodered resources in parallel + iter = ThreadPoolExecutor().map( + lambda params: self.delete_resources(*params), + self.resources_by_region_and_type(unordered_resources), + ) + + # This empty loop is required to exhaust the iterator and propagate + # any exceptions that might happen in the threads, as they are raised + # only in the __next__() method of the iterator + for _ in iter: + pass + + # Delete resources that require ordering sequentially at the end + for params in self.resources_by_region_and_type(ordered_resources): + self.delete_resources(*params) + + def resources_by_region_and_type( + self, + resources: list["Resource"], + ) -> list[tuple["RegionAndTypeCtx", list["Resource"]]]: + grouped = defaultdict(list) + for resource in resources: + grouped[resource.region, resource.type.name].append(resource) + return [ + ( + RegionAndTypeCtx( + self._log_padding, region, ResourceType.from_name(name) + ), + resources, + ) + for (region, name), resources in grouped.items() + ] + + # Expects a homogeneous list of resources within the same region/type dimensions + def delete_resources( + self, + ctx: "RegionAndTypeCtx", + resources: list["Resource"], + ): + iter = ctx.thread_pool.map( + lambda resource: self.delete_resource(ctx, resource), resources + ) + + # This empty loop is required to exhaust the iterator and propagate + # any exceptions that might happen in the threads, as they are raised + # only in the __next__() method of the iterator + for _ in iter: + pass + + def delete_resource(self, ctx: "RegionAndTypeCtx", resource: "Resource"): + while True: + if ctx.resource_type.name == "AWS::S3::Bucket": + self.empty_s3_bucket(ctx, resource.id) + + logger.info( + f"{ctx.log_prefix()} {PURPLE}cloudcontrol:DeleteResource {resource.id}{RESET}" + ) + + client_token = secrets.token_hex(32) + delete_response = ctx.cloud_control.delete_resource( + TypeName=ctx.resource_type.name, + Identifier=resource.id, + ClientToken=client_token, + ) + + logger.debug( + f"{ctx.log_prefix()} cloudcontrol:DeleteResource({resource.id}) response: {delete_response}" + ) + + progress_event = delete_response["ProgressEvent"] + + while progress_event is not None: + logger.debug( + f"{ctx.log_prefix()} cloudcontrol:GetResourceRequestStatus response: {progress_event}" + ) + + status = progress_event["OperationStatus"] + code = progress_event.get("ErrorCode") + + if status == "SUCCESS" or code == "NotFound": + self._deleted_resources_counter.update(1) + return + + if status == "FAILED" or status == "CANCEL_IN_PROGRESS": + status_message = progress_event.get( + "StatusMessage", "Unknown error" + ) + logger.warning( + f"{ctx.log_prefix()} Failed to delete {resource.id} ({code}). Deletion" + f"request status: {RED}{status}. {status_message}{RESET}. Retrying..." + ) + break + + retry_after = progress_event.get("RetryAfter") + + if retry_after is None: + wait_secs = 3.0 + else: + diff = retry_after - datetime.datetime.now() + wait_secs = max(0, diff.total_seconds() + 1) + + logger.info( + f"{ctx.log_prefix()} Status of deletion is {BLUE}{status}{RESET} " + f"for {resource.id}. Waiting for {wait_secs} seconds..." + ) + + time.sleep(wait_secs) + + status_response = ctx.cloud_control.get_resource_request_status( + RequestToken=progress_event["RequestToken"] + ) + progress_event = status_response["ProgressEvent"] + + def empty_s3_bucket(self, ctx: "RegionAndTypeCtx", bucket_name: str): + logger.info(f"{ctx.log_prefix()} {PURPLE}Emptying {bucket_name}{RESET}") + try: + bucket = ctx.s3_resource.Bucket(bucket_name) + bucket_versioning = ctx.s3_resource.BucketVersioning(bucket_name) + if bucket_versioning.status == "Enabled": + bucket.object_versions.delete() + else: + bucket.objects.all().delete() + except botocore.exceptions.ClientError as err: + ctx.log_api_error(f"Emptying s3 bucket {bucket_name}", err) + + +class ProgressCounter: + def __init__(self, label: str): + self._progress = tqdm( + disable=args.no_progress_bars, + bar_format=f"{GREEN}{BOLD}{{n_fmt}}{RESET} {{desc}}{RESET}", + desc=f"{label}", + ) + + def incr(self, by=1): + self._progress.update(by) + + +class ObservablePageIterator[T]: + def __init__(self, counter: ProgressCounter, iter: "PageIterator[T]"): + self._api_calls_counter = counter + self._iter = iter.__iter__() + + def __iter__(self): + return self + + def __next__(self) -> T | None: + result = next(self._iter) + if result is not None: + self._api_calls_counter.incr() + + return result + + +@dataclass +class Resource: + type: ResourceType + region: str + id: str + tags: dict[str, str] | None + + +@dataclass +class Plan: + resources: list[Resource] + + def to_json(self): + return json.dumps( + { + "resources": [ + { + "region": resource.region, + "type": resource.type.name, + "id": resource.id, + "tags": resource.tags, + } + for resource in self.resources + ] + }, + indent=2, + ) + + @staticmethod + def from_json(input: str): + data = json.loads(input) + + resources = [] + + for resource in data.get("resources", []): + resources.append( + Resource( + type=ResourceType.from_name(resource["type"]), + region=resource["region"], + id=resource["id"], + tags=resource.get("tags"), + ) + ) + + return Plan(resources=resources) + + +class RegionAndTypeCtx: + def __init__( + self, + log_padding: int, + region: str, + resource_type: ResourceType, + ): + self._log_padding = log_padding + self.resource_type = resource_type + self.region = region + self.thread_pool = ThreadPoolExecutor(max_workers=args.concurrency) + self.cloud_control = boto3.client( + "cloudcontrol", region_name=region, config=boto_config + ) + self.eventbridge = boto3.client( + "events", region_name=region, config=boto_config + ) + self.s3_resource = boto3.resource("s3", region_name=region, config=boto_config) + + def resource(self, id: str, tags: dict[str, str] | None = None) -> Resource: + return Resource(type=self.resource_type, region=self.region, id=id, tags=tags) + + def log_prefix(self): + return f"[{self.region}, {self.resource_type.name}]".ljust(self._log_padding) + + def log_api_error(self, api_name: str, err: Exception): + logger.warning( + f"{self.log_prefix()} {YELLOW}{BOLD}{api_name} failed{RESET}: {YELLOW}{err}{RESET}" + ) + + +def progress_bar(unit: str, total: int, desc: str) -> tqdm: + return tqdm( + total=total, + unit=unit, + disable=args.no_progress_bars, + leave=True, + desc=desc, + bar_format=( + f"{GREEN}{BOLD}{{n_fmt}} / {{total_fmt}} {{unit}}s{RESET} " + f"{BLUE}[elapsed: {{elapsed}}, ETA: {{remaining}}]{RESET} " + f"{PURPLE}[{{rate_fmt}}]{RESET} " + f"{GREEN}{BOLD}{{desc}}: {{percentage:3.0f}}%{RESET} {{bar}}" + ), + ) + + +def main(): + logger.info( + f"aws-gc v{__version__}, botocore v{botocore.__version__}, Python v{platform.python_version()}" + ) + try: + with logging_redirect_tqdm([logger]): + args.func(args) + except Exception as err: + if args.debug: + logger.exception(err) + else: + logger.error(err) + return 1 + return 0 + + +def on_cancel(sig_number: int, _frame): + logger.error( + f"Cancellation signal received: {signal.strsignal(sig_number)} ({sig_number}). " + "Exiting..." + ) + os._exit(1) + + +if __name__ == "__main__": + signal.signal(signal.SIGINT, on_cancel) + signal.signal(signal.SIGTERM, on_cancel) + sys.exit(main()) diff --git a/cleanup/aws-gc.sh b/cleanup/aws-gc.sh new file mode 100755 index 0000000..3745670 --- /dev/null +++ b/cleanup/aws-gc.sh @@ -0,0 +1,12 @@ +#!/usr/bin/env bash + +set -euo pipefail + +script_dir=$(dirname "${BASH_SOURCE[0]}") + +python_script="$script_dir/aws-gc.py" + +echo Checking Python code with mypy... +mypy "$python_script" --config-file "$script_dir/mypy.ini" + +"$python_script" "$@" diff --git a/cleanup/mypy.ini b/cleanup/mypy.ini new file mode 100644 index 0000000..ae814c1 --- /dev/null +++ b/cleanup/mypy.ini @@ -0,0 +1,18 @@ +[mypy] +pretty = True + +disallow_incomplete_defs = False +disallow_untyped_defs = False +disallow_untyped_calls = False +disallow_any_generics = False + +check_untyped_defs = True +disallow_subclassing_any = True +disallow_untyped_decorators = True +extra_checks = True +no_implicit_reexport = True +strict_equality = True +warn_redundant_casts = True +warn_return_any = True +warn_unused_configs = True +warn_unused_ignores = True diff --git a/cleanup/requirements.txt b/cleanup/requirements.txt new file mode 100644 index 0000000..81a6607 --- /dev/null +++ b/cleanup/requirements.txt @@ -0,0 +1,9 @@ +botocore-stubs==1.42.41 +botocore==1.42.56 +boto3-stubs[cloudcontrol,ec2]==1.42.56 +boto3==1.42.56 + +tqdm==4.67.3 +types-tqdm==4.67.3.20260205 + +mypy==1.19.1