diff --git a/core/utils_report_common.py b/core/utils_report_common.py new file mode 100644 index 0000000..61e2dab --- /dev/null +++ b/core/utils_report_common.py @@ -0,0 +1,183 @@ +from collections import defaultdict +from datetime import datetime +from typing import Any, Dict, List, Optional, Tuple + +CURRENCY_SYMBOLS = { + "USD": "$", + "GBP": "£", + "EUR": "€", +} + + +def sort_cost_data(cost_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + return sorted(cost_data, key=lambda x: datetime.strptime(x["month"], "%Y-%m-%d")) + + +def summarize_costs( + cost_data: List[Dict[str, Any]], *, last_n: Optional[int] = None +) -> Tuple[List[str], List[float], float, str, str]: + sorted_costs = sort_cost_data(cost_data) + if last_n is not None: + sorted_costs = sorted_costs[-last_n:] + + months = [ + datetime.strptime(item["month"], "%Y-%m-%d").strftime("%b") + for item in sorted_costs + ] + values = [item["cost"] for item in sorted_costs] + total_cost = round(sum(values), 2) + + if sorted_costs: + currency_code = sorted_costs[0].get("currency", "USD") + else: + currency_code = "USD" + currency_symbol = CURRENCY_SYMBOLS.get(currency_code, currency_code) + + return months, values, total_cost, currency_code, currency_symbol + + +def summarize_risks( + risk_data: List[Dict[str, Any]], + risk_definitions: List[Dict[str, Any]], + *, + resource_name_map: Optional[Dict[str, str]] = None, + resource_id_map: Optional[Dict[str, int]] = None, +) -> Tuple[List[Dict[str, Any]], Dict[str, int]]: + risk_def_map = {rd["id"]: rd for rd in risk_definitions} + severity_counts = {"high": 0, "medium": 0, "low": 0} + + grouped_risks = defaultdict( + lambda: { + "impacted_resource_types": set(), + "impacted_resources_count": 0, + "has_overall_risk": False, + } + ) + + for entry in risk_data: + risk_code = entry["risk"] + resource_type = entry["resource_type"] + + if resource_type is None or resource_type == "null": + grouped_risks[risk_code]["has_overall_risk"] = True + continue + + resource_type = str(resource_type) + grouped_risks[risk_code]["impacted_resource_types"].add(resource_type) + grouped_risks[risk_code]["impacted_resources_count"] += 1 + + summarized_risks = [] + for risk_code, risk_info in grouped_risks.items(): + risk_definition = risk_def_map.get(risk_code) + if not risk_definition: + continue + + severity = risk_definition["severity"] + if severity in severity_counts: + severity_counts[severity] += 1 + + resource_types = sorted(risk_info["impacted_resource_types"]) + resource_names = None + if resource_name_map is not None: + resource_names = [ + resource_name_map.get(resource_type, "Unknown Resource") + for resource_type in resource_types + ] + + resource_ids = None + if resource_id_map is not None: + resource_ids = [ + resource_id_map[resource_type] + for resource_type in resource_types + if resource_type in resource_id_map + ] + + impacted_resources_count = ( + None + if risk_info["has_overall_risk"] + else risk_info["impacted_resources_count"] + ) + + summarized_risks.append( + { + "id": risk_code, + "name": risk_definition["name"], + "description": risk_definition["description"], + "severity": severity, + "impacted_resource_types": resource_types, + "impacted_resources": resource_names, + "impacted_resource_ids": resource_ids, + "impacted_resources_count": impacted_resources_count, + } + ) + + return summarized_risks, severity_counts + + +def summarize_alternative_technologies( + resource_inventory: List[Dict[str, Any]], + alternatives: List[Dict[str, Any]], + alternative_technologies: List[Dict[str, Any]], + exit_strategy: int, +) -> Dict[str, List[Dict[str, Any]]]: + active_technologies = { + tech["id"]: tech + for tech in alternative_technologies + if tech.get("status") == "t" + } + + grouped_alt_tech: Dict[str, List[Dict[str, Any]]] = { + str(resource["resource_type"]): [] for resource in resource_inventory + } + + for alt in alternatives: + if str(alt["strategy_type"]) != str(exit_strategy): + continue + + resource_type = str(alt["resource_type"]) + tech = active_technologies.get(alt["alternative_technology"]) + if not tech or resource_type not in grouped_alt_tech: + continue + + grouped_alt_tech[resource_type].append( + { + "product_name": tech["product_name"], + "product_description": tech["product_description"], + "product_url": tech["product_url"], + "open_source": tech["open_source"] == "t", + "support_plan": tech["support_plan"] == "t", + "status": tech["status"] == "t", + } + ) + + return grouped_alt_tech + + +def enrich_resource_inventory( + resource_inventory: List[Dict[str, Any]], + resource_type_mapping: Dict[str, Dict[str, Any]], + *, + report_path: Optional[str] = None, +) -> List[Dict[str, Any]]: + enriched_resources = [] + for idx, resource in enumerate(resource_inventory): + resource_type = str(resource["resource_type"]) + resource_info = resource_type_mapping.get(resource_type, {}) + icon = resource_info.get("icon", "/icons/default.png") + + entry = { + "id": idx + 1, + "resource_type": resource_type, + "code": resource_info.get("code", "N/A"), + "resource_name": resource_info.get("name", "Unknown Resource"), + "icon": icon, + "location": resource.get("location", "Unknown"), + "count": resource.get("count", 0), + } + + if report_path is not None: + entry["icon_url"] = f"{report_path}/assets{icon}" + + enriched_resources.append(entry) + + return enriched_resources diff --git a/core/utils_report_html.py b/core/utils_report_html.py index 210dba2..5bd8617 100644 --- a/core/utils_report_html.py +++ b/core/utils_report_html.py @@ -1,9 +1,13 @@ # core/utils_report_html.py import logging -from datetime import datetime -from collections import defaultdict from typing import List, Dict, Any, Tuple +from core.utils_report_common import ( + summarize_alternative_technologies, + summarize_costs, + summarize_risks, +) + # Configure logger logger = logging.getLogger("core.engine.report_html") logger.setLevel(logging.INFO) @@ -12,35 +16,7 @@ def transform_cost_inventory_for_html( cost_data: List[Dict[str, Any]], ) -> Tuple[List[str], List[float], float, str, str]: - months = [] - cost_values = [] - total_cost = 0 - - # Map currency codes to their respective symbols - currency_symbols = {"USD": "$", "GBP": "£", "EUR": "€"} - - # Convert list to dictionary if necessary - if isinstance(cost_data, list): - cost_data = { - item["month"]: {"cost": item["cost"], "currency": item["currency"]} - for item in cost_data - } - - # Extract currency from the first entry, assuming all costs use the same currency - first_entry = next(iter(cost_data.values()), None) - currency_code = first_entry.get("currency", "USD") if first_entry else "USD" - currency_symbol = currency_symbols.get( - currency_code, currency_code - ) # Default to currency_code if no symbol exists - - # Iterate over the cost data, expecting 6 months - for month, details in sorted(cost_data.items()): - months.append(datetime.strptime(month, "%Y-%m-%d").strftime("%b")) - cost_values.append(details["cost"]) - total_cost += details["cost"] - - total_cost = round(total_cost, 2) - return months, cost_values, total_cost, currency_code, currency_symbol + return summarize_costs(cost_data) def transform_risk_inventory_for_html( @@ -49,59 +25,16 @@ def transform_risk_inventory_for_html( resource_inventory: Dict[str, Dict[str, Any]], ) -> Tuple[List[Dict[str, Any]], Dict[str, int]]: severity_order = {"high": 1, "medium": 2, "low": 3} - severity_counts = {"high": 0, "medium": 0, "low": 0} - sorted_risks = [] - - # Map resource IDs to resource names for quick lookup resource_name_map = { str(key): value["name"] for key, value in resource_inventory.items() } - - # Group risks by their risk code and impacted resources - risk_map = defaultdict(lambda: {"impacted_resources": set(), "count": 0}) - for risk_entry in risk_data: - risk_code = risk_entry["risk"] - resource_type = ( - str(risk_entry["resource_type"]) - if risk_entry["resource_type"] != "null" - else None - ) - - if resource_type: - # Handle risks with associated resource types - resource_name = resource_name_map.get(resource_type, "Unknown Resource") - risk_map[risk_code]["impacted_resources"].add(resource_name) - risk_map[risk_code]["count"] += 1 - else: - # Handle overall risks with no specific resource type - risk_map[risk_code]["impacted_resources"] = [] - risk_map[risk_code]["count"] = None - - # Process risk definitions - for risk_code, risk_info in risk_map.items(): - risk_definition = next( - (rd for rd in risk_definitions if rd["id"] == risk_code), None - ) - if not risk_definition: - continue - - severity = risk_definition["severity"] - severity_counts[severity] += 1 - - sorted_risks.append( - { - "name": risk_definition["name"], - "description": risk_definition["description"], - "impacted_resources": list(risk_info["impacted_resources"]), - "impacted_resources_count": risk_info["count"], - "severity": severity, - } - ) - - # Sort risks by severity - sorted_risks.sort(key=lambda x: severity_order.get(x["severity"], 4)) - - return sorted_risks, severity_counts + risks, severity_counts = summarize_risks( + risk_data, + risk_definitions, + resource_name_map=resource_name_map, + ) + risks.sort(key=lambda x: severity_order.get(x["severity"], 4)) + return risks, severity_counts def transform_alt_tech_for_html( @@ -110,35 +43,20 @@ def transform_alt_tech_for_html( alternative_technologies: List[Dict[str, Any]], exit_strategy: int, ) -> List[Dict[str, Any]]: - alt_tech_data = [] + grouped_alt_tech = summarize_alternative_technologies( + resource_inventory, + alternatives, + alternative_technologies, + exit_strategy, + ) for resource in resource_inventory: - resource_type = resource.get("resource_type") - relevant_alternatives = [ - alt - for alt in alternatives - if str(alt["resource_type"]) == str(resource_type) - and str(alt["strategy_type"]) == str(exit_strategy) - ] - for alt in relevant_alternatives: - tech = next( - ( - t - for t in alternative_technologies - if t["id"] == alt["alternative_technology"] and t["status"] == "t" - ), - None, + resource_type = str(resource.get("resource_type")) + for tech in grouped_alt_tech.get(resource_type, []): + alt_tech_data.append( + { + "resource_type_id": resource.get("resource_type"), + **tech, + } ) - if tech: - alt_tech_data.append( - { - "resource_type_id": resource_type, - "product_name": tech.get("product_name"), - "product_description": tech.get("product_description"), - "product_url": tech.get("product_url"), - "open_source": tech.get("open_source") == "t", - "support_plan": tech.get("support_plan") == "t", - "status": tech.get("status") == "t", - } - ) return alt_tech_data diff --git a/core/utils_report_json.py b/core/utils_report_json.py index 230befb..8ef847b 100644 --- a/core/utils_report_json.py +++ b/core/utils_report_json.py @@ -1,9 +1,14 @@ # core/utils_report_json.py import logging -from datetime import datetime -from collections import defaultdict from typing import List, Dict, Any +from core.utils_report_common import ( + enrich_resource_inventory, + sort_cost_data, + summarize_alternative_technologies, + summarize_risks, +) + # Configure logger logger = logging.getLogger("core.engine.report_json") logger.setLevel(logging.INFO) @@ -13,32 +18,25 @@ def transform_resource_inventory_for_json( resource_inventory: List[Dict[str, Any]], resource_type_mapping: Dict[str, Dict[str, Any]], ) -> List[Dict[str, Any]]: - resource_inventory_json = [] - for idx, resource in enumerate(resource_inventory): - resource_type = str(resource["resource_type"]) - resource_info = resource_type_mapping.get(resource_type, {}) - resource_name = resource_info.get("name", "Unknown Resource") - resource_code = resource_info.get("code", "N/A") - - resource_inventory_json.append( - { - "id": idx + 1, - "code": resource_code, - "resource_name": resource_name, - "location": resource.get("location", "Unknown"), - "count": resource.get("count", 0), - } - ) - return resource_inventory_json + enriched_resources = enrich_resource_inventory( + resource_inventory, resource_type_mapping + ) + return [ + { + "id": resource["id"], + "code": resource["code"], + "resource_name": resource["resource_name"], + "location": resource["location"], + "count": resource["count"], + } + for resource in enriched_resources + ] def transform_cost_inventory_for_json( cost_data: List[Dict[str, Any]], ) -> List[Dict[str, Any]]: - # Sort by date before transformation - sorted_cost_data = sorted( - cost_data, key=lambda x: datetime.strptime(x["month"], "%Y-%m-%d") - ) + sorted_cost_data = sort_cost_data(cost_data) cost_inventory = [ { @@ -56,54 +54,26 @@ def transform_risk_inventory_for_json( risk_definitions: List[Dict[str, Any]], resource_inventory: List[Dict[str, Any]], ) -> List[Dict[str, Any]]: - # Map resource_type to their corresponding resource IDs resource_id_map = { str(value["resource_type"]): key + 1 for key, value in enumerate(resource_inventory) } - - # Group risks by risk.id - risk_map = defaultdict( - lambda: { - "id": None, - "name": "", - "description": "", - "severity": "", - "impacted_resources": set(), - "impacted_resources_count": 0, - } + risks, _ = summarize_risks( + risk_data, + risk_definitions, + resource_id_map=resource_id_map, ) - - for risk_entry in risk_data: - risk_id = risk_entry["risk"] - risk_definition = next( - (rd for rd in risk_definitions if rd["id"] == risk_id), None - ) - if not risk_definition: - continue - - resource_type = str(risk_entry["resource_type"]) - resource_id = resource_id_map.get(resource_type) - - # Initialize risk entry if not already in the map - if risk_map[risk_id]["id"] is None: - risk_map[risk_id]["id"] = risk_id - risk_map[risk_id]["name"] = risk_definition["name"] - risk_map[risk_id]["description"] = risk_definition["description"] - risk_map[risk_id]["severity"] = risk_definition["severity"] - - # Add impacted resources - if resource_id: - risk_map[risk_id]["impacted_resources"].add(resource_id) - - # Convert impacted_resources set to a list and compute counts - for risk in risk_map.values(): - risk["impacted_resources"] = list(risk["impacted_resources"]) - risk["impacted_resources_count"] = ( - len(risk["impacted_resources"]) if risk["impacted_resources"] else None - ) - - return list(risk_map.values()) + return [ + { + "id": risk["id"], + "name": risk["name"], + "description": risk["description"], + "severity": risk["severity"], + "impacted_resources": risk["impacted_resource_ids"] or [], + "impacted_resources_count": risk["impacted_resources_count"], + } + for risk in risks + ] def transform_alt_tech_for_json( @@ -112,45 +82,27 @@ def transform_alt_tech_for_json( alternative_technologies: List[Dict[str, Any]], exit_strategy: int, ) -> Dict[int, List[Dict[str, Any]]]: - # Map resource_type to resource_id resource_id_map = { str(value["resource_type"]): key + 1 for key, value in enumerate(resource_inventory) } - - # Initialize the grouped alternative technologies + grouped_alt_tech = summarize_alternative_technologies( + resource_inventory, + alternatives, + alternative_technologies, + exit_strategy, + ) grouped_alt_tech_data = { resource_id: [] for resource_id in resource_id_map.values() } - - # Iterate through alternatives to group them by resource_id - for alt in alternatives: - if str(alt["strategy_type"]) != str(exit_strategy): + for resource_type, technologies in grouped_alt_tech.items(): + resource_id = resource_id_map.get(resource_type) + if not resource_id: continue + grouped_alt_tech_data[resource_id] = [ + {"id": idx + 1, **tech} for idx, tech in enumerate(technologies) + ] - tech = next( - ( - t - for t in alternative_technologies - if t["id"] == alt["alternative_technology"] and t["status"] == "t" - ), - None, - ) - if tech: - resource_id = resource_id_map.get(str(alt["resource_type"])) - if resource_id: - grouped_alt_tech_data[resource_id].append( - { - "id": len(grouped_alt_tech_data[resource_id]) + 1, - "product_name": tech["product_name"], - "product_description": tech["product_description"], - "product_url": tech["product_url"], - "open_source": tech["open_source"] == "t", - "support_plan": tech["support_plan"] == "t", - } - ) - - # Return the grouped alternatives return { key: grouped_alt_tech_data[key] for key in sorted(grouped_alt_tech_data.keys()) } diff --git a/core/utils_report_pdf.py b/core/utils_report_pdf.py index 6b681c4..63e11d1 100644 --- a/core/utils_report_pdf.py +++ b/core/utils_report_pdf.py @@ -21,6 +21,13 @@ # Plotly import plotly.graph_objects as go +from core.utils_report_common import ( + enrich_resource_inventory, + summarize_alternative_technologies, + summarize_costs, + summarize_risks, +) + # Configure logger logger = logging.getLogger("core.engine.report_pdf") logger.setLevel(logging.INFO) @@ -29,104 +36,42 @@ def transform_resource_inventory_for_pdf( resource_inventory: list, resource_type_mapping: Dict[str, Any], report_path: str ) -> List[Dict[str, Any]]: - resources = [] - for idx, resource in enumerate(resource_inventory): - # Convert resource_type to string for lookup - resource_type = str(resource["resource_type"]) - # Fetch resource info from the mapping - resource_info = resource_type_mapping.get(resource_type, {}) - - resource_name = resource_info.get("name", "Unknown Resource") - # Construct icon_url from the resource_info, default if not found - icon_path = "/assets" + resource_info.get("icon", "/icons/default.png") - - # Prepend report_storage to form the full path to the icon - icon_url = f"{report_path}{icon_path}" - - resources.append( - { - "id": idx + 1, - "resource_name": resource_name, - "icon_url": icon_url, - "location": resource.get("location", "Unknown"), - "count": resource.get("count", 0), - } - ) - - return resources + enriched_resources = enrich_resource_inventory( + resource_inventory, + resource_type_mapping, + report_path=report_path, + ) + return [ + { + "id": resource["id"], + "resource_name": resource["resource_name"], + "icon_url": resource["icon_url"], + "location": resource["location"], + "count": resource["count"], + } + for resource in enriched_resources + ] def transform_cost_inventory_for_pdf( cost_data: list, ) -> Tuple[List[str], List[float], str]: - # Map currency codes to their respective symbols - currency_symbols = {"USD": "$", "GBP": "£", "EUR": "€"} - - # Sort cost_data by date ascending - sorted_cost_data = sorted( - cost_data, key=lambda x: datetime.strptime(x["month"], "%Y-%m-%d") - ) - - # Take the last 6 months - last_six_cost_data = sorted_cost_data[-6:] - - # Extract months and costs - months = [ - datetime.strptime(item["month"], "%Y-%m-%d").strftime("%b") - for item in last_six_cost_data - ] - costs = [item["cost"] for item in last_six_cost_data] - - # Determine currency symbol - if last_six_cost_data: - currency_code = last_six_cost_data[0].get("currency", "USD") - else: - currency_code = "USD" - currency_symbol = currency_symbols.get(currency_code, currency_code) - + months, costs, _, _, currency_symbol = summarize_costs(cost_data, last_n=6) return months, costs, currency_symbol def transform_risk_inventory_for_pdf( risk_data: list, risk_definitions: list, resource_inventory: list ) -> Tuple[List[Dict[str, Any]], Dict[str, int]]: - # Create a lookup for risk_definitions by their 'id' - risk_def_map = {rd["id"]: rd for rd in risk_definitions} - - # Group risks by their risk code - risk_map = {} - for entry in risk_data: - risk_code = entry["risk"] - if risk_code not in risk_map: - risk_map[risk_code] = {"impacted_resources_count": 0, "entries": []} - # If resource_type is not null, increment impacted_resources_count - if entry["resource_type"] is not None and entry["resource_type"] != "null": - risk_map[risk_code]["impacted_resources_count"] += 1 - risk_map[risk_code]["entries"].append(entry) - - # We'll track severity counts as well - severity_counts = {"high": 0, "medium": 0, "low": 0} - - # Build a final list of risks with name, severity, impacted_resources_count - risks = [] - for risk_code, data in risk_map.items(): - rd = risk_def_map.get(risk_code) - if not rd: - continue # If no definition found, skip - - severity = rd["severity"] # 'high', 'medium', or 'low' - if severity in severity_counts: - severity_counts[severity] += 1 - - risks.append( - { - "name": rd["name"], - "severity": severity, - "impacted_resources_count": data["impacted_resources_count"], - } - ) - - return risks, severity_counts + risks, severity_counts = summarize_risks(risk_data, risk_definitions) + return [ + { + "name": risk["name"], + "severity": risk["severity"], + "impacted_resources_count": risk["impacted_resources_count"] or 0, + } + for risk in risks + ], severity_counts def transform_alt_tech_for_pdf( @@ -137,24 +82,12 @@ def transform_alt_tech_for_pdf( exit_strategy: int, report_path: str, ) -> List[Dict[str, Any]]: - - # Count how many valid alternatives each resource_type has for the given exit_strategy - alt_counts = {} - for alt in alternatives: - if str(alt.get("strategy_type")) == str(exit_strategy): - rtype_str = str(alt.get("resource_type")) - tech = next( - ( - t - for t in alternative_technologies - if t["id"] == alt["alternative_technology"] - and t.get("status") == "t" - ), - None, - ) - if tech: - alt_counts[rtype_str] = alt_counts.get(rtype_str, 0) + 1 - + grouped_alt_tech = summarize_alternative_technologies( + resource_inventory, + alternatives, + alternative_technologies, + exit_strategy, + ) alt_tech = [] for idx, resource in enumerate(resource_inventory): rtype_str = str(resource["resource_type"]) @@ -164,7 +97,7 @@ def transform_alt_tech_for_pdf( icon_path = "/assets" + rtype_info.get("icon", "/icons/default.png") icon_url = f"{report_path}{icon_path}" - count = alt_counts.get(rtype_str, 0) + count = len(grouped_alt_tech.get(rtype_str, [])) alt_tech.append( {