Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 183 additions & 0 deletions core/utils_report_common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
from collections import defaultdict
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple

CURRENCY_SYMBOLS = {
"USD": "$",
"GBP": "£",
"EUR": "€",
}


def sort_cost_data(cost_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
return sorted(cost_data, key=lambda x: datetime.strptime(x["month"], "%Y-%m-%d"))


def summarize_costs(
cost_data: List[Dict[str, Any]], *, last_n: Optional[int] = None
) -> Tuple[List[str], List[float], float, str, str]:
sorted_costs = sort_cost_data(cost_data)
if last_n is not None:
sorted_costs = sorted_costs[-last_n:]

months = [
datetime.strptime(item["month"], "%Y-%m-%d").strftime("%b")
for item in sorted_costs
]
values = [item["cost"] for item in sorted_costs]
total_cost = round(sum(values), 2)

if sorted_costs:
currency_code = sorted_costs[0].get("currency", "USD")
else:
currency_code = "USD"
currency_symbol = CURRENCY_SYMBOLS.get(currency_code, currency_code)

return months, values, total_cost, currency_code, currency_symbol


def summarize_risks(
risk_data: List[Dict[str, Any]],
risk_definitions: List[Dict[str, Any]],
*,
resource_name_map: Optional[Dict[str, str]] = None,
resource_id_map: Optional[Dict[str, int]] = None,
) -> Tuple[List[Dict[str, Any]], Dict[str, int]]:
risk_def_map = {rd["id"]: rd for rd in risk_definitions}
severity_counts = {"high": 0, "medium": 0, "low": 0}

grouped_risks = defaultdict(
lambda: {
"impacted_resource_types": set(),
"impacted_resources_count": 0,
"has_overall_risk": False,
}
)

for entry in risk_data:
risk_code = entry["risk"]
resource_type = entry["resource_type"]

if resource_type is None or resource_type == "null":
grouped_risks[risk_code]["has_overall_risk"] = True
continue

resource_type = str(resource_type)
grouped_risks[risk_code]["impacted_resource_types"].add(resource_type)
grouped_risks[risk_code]["impacted_resources_count"] += 1

summarized_risks = []
for risk_code, risk_info in grouped_risks.items():
risk_definition = risk_def_map.get(risk_code)
if not risk_definition:
continue

severity = risk_definition["severity"]
if severity in severity_counts:
severity_counts[severity] += 1

resource_types = sorted(risk_info["impacted_resource_types"])
resource_names = None
if resource_name_map is not None:
resource_names = [
resource_name_map.get(resource_type, "Unknown Resource")
for resource_type in resource_types
]

resource_ids = None
if resource_id_map is not None:
resource_ids = [
resource_id_map[resource_type]
for resource_type in resource_types
if resource_type in resource_id_map
]

impacted_resources_count = (
None
if risk_info["has_overall_risk"]
else risk_info["impacted_resources_count"]
)

summarized_risks.append(
{
"id": risk_code,
"name": risk_definition["name"],
"description": risk_definition["description"],
"severity": severity,
"impacted_resource_types": resource_types,
"impacted_resources": resource_names,
"impacted_resource_ids": resource_ids,
"impacted_resources_count": impacted_resources_count,
}
)

return summarized_risks, severity_counts


def summarize_alternative_technologies(
resource_inventory: List[Dict[str, Any]],
alternatives: List[Dict[str, Any]],
alternative_technologies: List[Dict[str, Any]],
exit_strategy: int,
) -> Dict[str, List[Dict[str, Any]]]:
active_technologies = {
tech["id"]: tech
for tech in alternative_technologies
if tech.get("status") == "t"
}

grouped_alt_tech: Dict[str, List[Dict[str, Any]]] = {
str(resource["resource_type"]): [] for resource in resource_inventory
}

for alt in alternatives:
if str(alt["strategy_type"]) != str(exit_strategy):
continue

resource_type = str(alt["resource_type"])
tech = active_technologies.get(alt["alternative_technology"])
if not tech or resource_type not in grouped_alt_tech:
continue

grouped_alt_tech[resource_type].append(
{
"product_name": tech["product_name"],
"product_description": tech["product_description"],
"product_url": tech["product_url"],
"open_source": tech["open_source"] == "t",
"support_plan": tech["support_plan"] == "t",
"status": tech["status"] == "t",
}
)

return grouped_alt_tech


def enrich_resource_inventory(
resource_inventory: List[Dict[str, Any]],
resource_type_mapping: Dict[str, Dict[str, Any]],
*,
report_path: Optional[str] = None,
) -> List[Dict[str, Any]]:
enriched_resources = []
for idx, resource in enumerate(resource_inventory):
resource_type = str(resource["resource_type"])
resource_info = resource_type_mapping.get(resource_type, {})
icon = resource_info.get("icon", "/icons/default.png")

entry = {
"id": idx + 1,
"resource_type": resource_type,
"code": resource_info.get("code", "N/A"),
"resource_name": resource_info.get("name", "Unknown Resource"),
"icon": icon,
"location": resource.get("location", "Unknown"),
"count": resource.get("count", 0),
}

if report_path is not None:
entry["icon_url"] = f"{report_path}/assets{icon}"

enriched_resources.append(entry)

return enriched_resources
136 changes: 27 additions & 109 deletions core/utils_report_html.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
# core/utils_report_html.py
import logging
from datetime import datetime
from collections import defaultdict
from typing import List, Dict, Any, Tuple

from core.utils_report_common import (
summarize_alternative_technologies,
summarize_costs,
summarize_risks,
)

# Configure logger
logger = logging.getLogger("core.engine.report_html")
logger.setLevel(logging.INFO)
Expand All @@ -12,35 +16,7 @@
def transform_cost_inventory_for_html(
cost_data: List[Dict[str, Any]],
) -> Tuple[List[str], List[float], float, str, str]:
months = []
cost_values = []
total_cost = 0

# Map currency codes to their respective symbols
currency_symbols = {"USD": "$", "GBP": "£", "EUR": "€"}

# Convert list to dictionary if necessary
if isinstance(cost_data, list):
cost_data = {
item["month"]: {"cost": item["cost"], "currency": item["currency"]}
for item in cost_data
}

# Extract currency from the first entry, assuming all costs use the same currency
first_entry = next(iter(cost_data.values()), None)
currency_code = first_entry.get("currency", "USD") if first_entry else "USD"
currency_symbol = currency_symbols.get(
currency_code, currency_code
) # Default to currency_code if no symbol exists

# Iterate over the cost data, expecting 6 months
for month, details in sorted(cost_data.items()):
months.append(datetime.strptime(month, "%Y-%m-%d").strftime("%b"))
cost_values.append(details["cost"])
total_cost += details["cost"]

total_cost = round(total_cost, 2)
return months, cost_values, total_cost, currency_code, currency_symbol
return summarize_costs(cost_data)


def transform_risk_inventory_for_html(
Expand All @@ -49,59 +25,16 @@ def transform_risk_inventory_for_html(
resource_inventory: Dict[str, Dict[str, Any]],
) -> Tuple[List[Dict[str, Any]], Dict[str, int]]:
severity_order = {"high": 1, "medium": 2, "low": 3}
severity_counts = {"high": 0, "medium": 0, "low": 0}
sorted_risks = []

# Map resource IDs to resource names for quick lookup
resource_name_map = {
str(key): value["name"] for key, value in resource_inventory.items()
}

# Group risks by their risk code and impacted resources
risk_map = defaultdict(lambda: {"impacted_resources": set(), "count": 0})
for risk_entry in risk_data:
risk_code = risk_entry["risk"]
resource_type = (
str(risk_entry["resource_type"])
if risk_entry["resource_type"] != "null"
else None
)

if resource_type:
# Handle risks with associated resource types
resource_name = resource_name_map.get(resource_type, "Unknown Resource")
risk_map[risk_code]["impacted_resources"].add(resource_name)
risk_map[risk_code]["count"] += 1
else:
# Handle overall risks with no specific resource type
risk_map[risk_code]["impacted_resources"] = []
risk_map[risk_code]["count"] = None

# Process risk definitions
for risk_code, risk_info in risk_map.items():
risk_definition = next(
(rd for rd in risk_definitions if rd["id"] == risk_code), None
)
if not risk_definition:
continue

severity = risk_definition["severity"]
severity_counts[severity] += 1

sorted_risks.append(
{
"name": risk_definition["name"],
"description": risk_definition["description"],
"impacted_resources": list(risk_info["impacted_resources"]),
"impacted_resources_count": risk_info["count"],
"severity": severity,
}
)

# Sort risks by severity
sorted_risks.sort(key=lambda x: severity_order.get(x["severity"], 4))

return sorted_risks, severity_counts
risks, severity_counts = summarize_risks(
risk_data,
risk_definitions,
resource_name_map=resource_name_map,
)
risks.sort(key=lambda x: severity_order.get(x["severity"], 4))
return risks, severity_counts


def transform_alt_tech_for_html(
Expand All @@ -110,35 +43,20 @@ def transform_alt_tech_for_html(
alternative_technologies: List[Dict[str, Any]],
exit_strategy: int,
) -> List[Dict[str, Any]]:

alt_tech_data = []
grouped_alt_tech = summarize_alternative_technologies(
resource_inventory,
alternatives,
alternative_technologies,
exit_strategy,
)
for resource in resource_inventory:
resource_type = resource.get("resource_type")
relevant_alternatives = [
alt
for alt in alternatives
if str(alt["resource_type"]) == str(resource_type)
and str(alt["strategy_type"]) == str(exit_strategy)
]
for alt in relevant_alternatives:
tech = next(
(
t
for t in alternative_technologies
if t["id"] == alt["alternative_technology"] and t["status"] == "t"
),
None,
resource_type = str(resource.get("resource_type"))
for tech in grouped_alt_tech.get(resource_type, []):
alt_tech_data.append(
{
"resource_type_id": resource.get("resource_type"),
**tech,
}
)
if tech:
alt_tech_data.append(
{
"resource_type_id": resource_type,
"product_name": tech.get("product_name"),
"product_description": tech.get("product_description"),
"product_url": tech.get("product_url"),
"open_source": tech.get("open_source") == "t",
"support_plan": tech.get("support_plan") == "t",
"status": tech.get("status") == "t",
}
)
return alt_tech_data
Loading