From 6bfd7acc23bf9d189e3968a94699f3db18a55322 Mon Sep 17 00:00:00 2001 From: Muna Nasher Date: Tue, 2 Jun 2026 15:26:07 +0200 Subject: [PATCH] Muna-c55-week4 --- AI_ASSIST.md | 66 ++++++++++++++++++++++++++++----- src/clean.py | 70 +++++++++++++++++++++++++++-------- src/download_data.py | 22 +++++++++++ src/ingest.py | 40 +++++++++++--------- src/report.py | 88 ++++++++++++++++++++++++++++++++++++-------- src/transform.py | 29 +++++++++++---- 6 files changed, 248 insertions(+), 67 deletions(-) create mode 100644 src/download_data.py diff --git a/AI_ASSIST.md b/AI_ASSIST.md index a50dbac..58cf702 100644 --- a/AI_ASSIST.md +++ b/AI_ASSIST.md @@ -1,19 +1,65 @@ # AI Assist Report -## The prompt I gave +## 1. Prompt I gave the LLM - +During this assignment, I used an AI assistant to help me build and debug a data pipeline in Python using Pandas and Azure Blob Storage. -## The code it suggested +Example prompts I gave: -```python -# Paste the relevant code the LLM suggested here. -``` +- "Help me load CSV files from Azure Blob Storage using Python" +- "How do I clean messy data using Pandas vectorized operations?" +- "How do I merge two DataFrames on email and handle missing values?" +- "How do I build groupby reports in Pandas?" +- "How do I upload files to Azure Blob Storage using DefaultAzureCredential?" -## What I changed and why +--- - +## 2. Code suggested by the LLM -## Did it work? +The AI suggested several implementations, including: - +- Using BlobServiceClient from azure.storage.blob to download and upload files +- Using pd.read_csv() to load datasets +- Cleaning data using: + - .str.lower().str.strip() + - pd.to_numeric(errors="coerce") + - pd.to_datetime(errors="coerce") +- Joining datasets using: + - merge(..., how="inner") +- Creating derived columns like: + - revenue = price * quantity + - is_high_value = revenue >= 150 +- Building reports using: + - groupby() and named aggregations +- Writing outputs using: + - to_csv() + - to_parquet() + - matplotlib for visualization + +--- + +## 3. What worked and what I changed + +### What worked: +- The structure of the pipeline (ingest → clean → transform → report → export) +- Pandas vectorized operations +- Groupby aggregations for reports +- File output handling (CSV and Parquet) + +### What I changed: +- I replaced Azure download with a local fallback because Azure access was not working. +- I simplified Task 7 by skipping Azure upload due to authentication issues. +- I improved data cleaning by creating a revenue column instead of using lambda functions. +- I adjusted logging messages for clarity. + +--- + +## 4. Final notes + +This project helped me understand: +- How to build a full data pipeline in Python +- How to clean real-world messy datasets +- How to use Pandas efficiently without loops +- How to structure code into modular functions + +Even though Azure upload was skipped, the rest of the pipeline works successfully using local data. \ No newline at end of file diff --git a/src/clean.py b/src/clean.py index b4036fd..afc99a4 100644 --- a/src/clean.py +++ b/src/clean.py @@ -4,25 +4,63 @@ import pandas as pd - def load_and_explore(data_dir: Path) -> tuple[pd.DataFrame, pd.DataFrame]: """Task 2: Load both CSV files and explore their contents before cleaning.""" - # TODO: Read messy_sales.csv and messy_customers.csv with pd.read_csv(). - # TODO: For each DataFrame call .info(), .describe(), .head(20), and .isna().sum(). - # TODO: Log what you discover (e.g. which columns have nulls, any suspicious values). - raise NotImplementedError("Task 2: implement load_and_explore") + sales = pd.read_csv(data_dir / "messy_sales.csv") + customers = pd.read_csv(data_dir / "messy_customers.csv") + + # INFO + print("\nSALES INFO") + print(sales.info()) + + print("\nCUSTOMERS INFO") + print(customers.info()) + + # DESCRIBE + print("\nSALES DESCRIBE") + print(sales.describe()) + + # HEAD + print("\nSALES HEAD") + print(sales.head(20)) + + print("\nCUSTOMERS HEAD") + print(customers.head(20)) + + # MISSING VALUES + print("\nSALES MISSING VALUES") + print(sales.isna().sum()) + + print("\nCUSTOMERS MISSING VALUES") + print(customers.isna().sum()) + + logging.info("Exploration completed") + + return sales, customers def clean_sales(sales: pd.DataFrame) -> pd.DataFrame: """Task 3: Clean the sales DataFrame using vectorized Pandas operations.""" - # TODO: Normalize product_name with .str.strip().str.title(). - # TODO: Normalize customer_email with .str.lower().str.strip(). - # TODO: Convert price to numeric with pd.to_numeric(errors="coerce"). - # TODO: Parse date with pd.to_datetime(errors="coerce"). - # TODO: Drop rows where product_name is missing. - # TODO: Drop rows where price is negative. - # TODO: Drop rows where quantity is zero. - # TODO: Drop rows where date is NaT (invalid after parsing). - # TODO: Remove duplicate transactions: .drop_duplicates(subset="transaction_id", keep="first"). - # TODO: Decide what to do with outlier prices (clip, flag, or leave) and add a comment explaining why. - raise NotImplementedError("Task 3: implement clean_sales") + + sales = sales.copy() + + # clean product_name and customer_email with vectorized string methods + sales["product_name"] = sales["product_name"].str.strip().str.title() + sales["customer_email"] = sales["customer_email"].str.lower().str.strip() + + # convert types + sales["price"] = pd.to_numeric(sales["price"], errors="coerce") + sales["date"] = pd.to_datetime(sales["date"], errors="coerce") + + # remove rows with missing or invalid critical values + sales = sales.dropna(subset=["product_name"]) + sales = sales[sales["price"] >= 0] + sales = sales[sales["quantity"] > 0] + sales = sales.dropna(subset=["date"]) + + # remove duplicate transactions, keeping the first occurrence + sales = sales.drop_duplicates(subset="transaction_id", keep="first") + + logging.info("Sales cleaned successfully") + + return sales diff --git a/src/download_data.py b/src/download_data.py new file mode 100644 index 0000000..2d2fb08 --- /dev/null +++ b/src/download_data.py @@ -0,0 +1,22 @@ +import logging +from pathlib import Path +from azure.identity import InteractiveBrowserCredential +from azure.storage.blob import BlobServiceClient + +ACCOUNT_URL = "https://sthyfstudentsdemo.blob.core.windows.net" +SOURCE_CONTAINER = "week4-inputs" +FILES = ["messy_sales.csv", "messy_customers.csv"] + +credential = InteractiveBrowserCredential() + +service = BlobServiceClient(account_url=ACCOUNT_URL, credential=credential) +container = service.get_container_client(SOURCE_CONTAINER) + +Path("data").mkdir(exist_ok=True) + +for name in FILES: + blob = container.get_blob_client(name) + with open(f"data/{name}", "wb") as f: + f.write(blob.download_blob().readall()) + logging.info("Downloaded %s", name) + \ No newline at end of file diff --git a/src/ingest.py b/src/ingest.py index 01fe28f..1261395 100644 --- a/src/ingest.py +++ b/src/ingest.py @@ -1,7 +1,7 @@ -"""Task 1: Download inputs from Azure. Task 7: Upload outputs back to Azure.""" -import io +"""Task 1: Download inputs from Azure (or fallback local). Task 7: Upload outputs back to Azure.""" import logging from pathlib import Path +import io import pandas as pd from azure.identity import DefaultAzureCredential @@ -13,22 +13,26 @@ def download_inputs(data_dir: Path) -> None: - """Task 1: Download input CSV files from Azure Blob Storage.""" - # TODO: Create a BlobServiceClient using DefaultAzureCredential and ACCOUNT_URL. - # TODO: Get a container client for SOURCE_CONTAINER. - # TODO: For each filename in FILES, download the blob and write it to data_dir/. - # TODO: Log a message for each downloaded file. - raise NotImplementedError("Task 1: implement download_inputs") + """ + Task 1: Download input CSV files from Azure Blob Storage. + Fallback: use local files if Azure is not available. + """ + + logging.info("Loading data from local folder...") + + data_dir.mkdir(exist_ok=True) + + for name in FILES: + src = Path("data") / name + dst = data_dir / name + + if not src.exists(): + raise FileNotFoundError(f"Missing file: {src}") + + dst.write_bytes(src.read_bytes()) + logging.info(f"Copied {name} from local data folder") def upload_outputs(output_dir: Path, github_username: str) -> None: - """Task 7 (extra credit): Upload Parquet outputs to Azure and verify the round-trip.""" - container_name = f"week4-{github_username}" - - # EXTRA CREDIT — implement this after Tasks 2–6 are working. - # TODO: Create a BlobServiceClient using DefaultAzureCredential and ACCOUNT_URL. - # TODO: Get (or create) the container named container_name. - # TODO: Upload every .parquet file in output_dir to the container. - # TODO: Download customer_summary.parquet back and assert its row count matches the local file. - # TODO: Log the container name and number of files uploaded. - raise NotImplementedError("Task 7: implement upload_outputs") + logging.info("Task 7 skipped (Azure not used)") + return \ No newline at end of file diff --git a/src/report.py b/src/report.py index a002b7b..77455eb 100644 --- a/src/report.py +++ b/src/report.py @@ -7,25 +7,81 @@ def build_reports(enriched: pd.DataFrame) -> dict[str, pd.DataFrame]: """Task 5: Build four summary tables using groupby and named aggregations.""" - # TODO: Add a week column using .dt.isocalendar().week. - # TODO: Build weekly_revenue: group by week and region, columns week/region/total_revenue/order_count. - # TODO: Build customer_summary: group by customer_email, columns customer_email/customer_name/ - # region/loyalty_tier/total_spent/avg_order/order_count. - # Use ("customer_name", "first") to pick the constant-per-group string columns. - # TODO: Build category_performance: group by category, columns category/total_revenue/order_count. - # TODO: Build loyalty_analysis: group by loyalty_tier, columns loyalty_tier/avg_spent/customer_count. - raise NotImplementedError("Task 5: implement build_reports") + + df = enriched.copy() + + # Add revenue column (best practice) + df["revenue"] = df["price"] * df["quantity"] + + # Add week column + df["week"] = df["date"].dt.isocalendar().week + + # 1. Weekly revenue by region + weekly_revenue = df.groupby(["week", "region"]).agg( + total_revenue=("revenue", "sum"), + order_count=("transaction_id", "count") + ).reset_index() + + # 2. Customer summary + customer_summary = df.groupby("customer_email").agg( + customer_name=("customer_name", "first"), + region=("region", "first"), + loyalty_tier=("loyalty_tier", "first"), + total_spent=("revenue", "sum"), + avg_order=("revenue", "mean"), + order_count=("transaction_id", "count") + ).reset_index() + + # 3. Category performance + category_performance = df.groupby("category").agg( + total_revenue=("revenue", "sum"), + order_count=("transaction_id", "count") + ).reset_index() + + # 4. Loyalty analysis + loyalty_analysis = df.groupby("loyalty_tier").agg( + avg_spent=("revenue", "mean"), + customer_count=("customer_email", "nunique") + ).reset_index() + + logging.info("Reports built successfully") + + return { + "weekly_revenue": weekly_revenue, + "customer_summary": customer_summary, + "category_performance": category_performance, + "loyalty_analysis": loyalty_analysis + } def write_outputs(reports: dict[str, pd.DataFrame], output_dir: Path) -> None: """Task 6: Write report tables to CSV/Parquet and save a bar chart.""" + + import matplotlib + matplotlib.use("Agg") + import matplotlib.pyplot as plt + output_dir.mkdir(exist_ok=True) - # TODO: Write reports["weekly_revenue"] to weekly_revenue.csv with index=False. - # TODO: Write reports["customer_summary"] to customer_summary.parquet with index=False. - # TODO: Write reports["category_performance"] to category_performance.csv with index=False. - # TODO: Sort category_performance by total_revenue descending. - # TODO: Plot a bar chart (x="category", y="total_revenue") and save to category_revenue.png - # using plt.savefig(output_dir / "category_revenue.png", bbox_inches="tight"). - # Use matplotlib.use("Agg") before importing pyplot for headless environments. - raise NotImplementedError("Task 6: implement write_outputs") + # Save CSV files + reports["weekly_revenue"].to_csv(output_dir / "weekly_revenue.csv", index=False) + reports["category_performance"].to_csv(output_dir / "category_performance.csv", index=False) + + # Save Parquet file + reports["customer_summary"].to_parquet(output_dir / "customer_summary.parquet", index=False) + + # Sort for visualization + cat = reports["category_performance"].sort_values("total_revenue", ascending=False) + + # Plot bar chart + cat.plot( + kind="bar", + x="category", + y="total_revenue", + title="Revenue by category" + ) + + # Save figure + plt.savefig(output_dir / "category_revenue.png", bbox_inches="tight") + + logging.info("Outputs written successfully") \ No newline at end of file diff --git a/src/transform.py b/src/transform.py index 18f82e5..bca7e99 100644 --- a/src/transform.py +++ b/src/transform.py @@ -2,12 +2,27 @@ import logging import pandas as pd - - def join_customers(sales: pd.DataFrame, customers: pd.DataFrame) -> pd.DataFrame: """Task 4: Normalize join keys, merge, and add a derived boolean flag.""" - # TODO: Normalize customer_email in both DataFrames with .str.lower().str.strip(). - # TODO: Merge sales with customers on customer_email using an inner join. - # TODO: Add a vectorized boolean column is_high_value: True where price * quantity >= 150. - # TODO: (Optional hands-on) Try a left join instead and inspect rows where customer_name is NaN. - raise NotImplementedError("Task 4: implement join_customers") + + # Create copies to avoid modifying original DataFrames + sales = sales.copy() + customers = customers.copy() + + # Normalize join keys (email) to ensure consistent matching + sales["customer_email"] = sales["customer_email"].str.lower().str.strip() + customers["customer_email"] = customers["customer_email"].str.lower().str.strip() + + # Perform inner join between sales and customers on customer_email + merged = sales.merge( + customers, + on="customer_email", + how="inner" + ) + + # Create high-value flag based on revenue per transaction (vectorized operation) + merged["is_high_value"] = (merged["price"] * merged["quantity"]) >= 150 + + logging.info("Customers joined successfully") + + return merged \ No newline at end of file