Skip to content

Automation #3987

@Vivek-Yadav7

Description

@Vivek-Yadav7

app.py

import streamlit as st
import pandas as pd
import dask.dataframe as dd
import io
import os
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
from datetime import datetime

---------------------------

Configuration

---------------------------

OUTPUT_DIR = "processed_data"
os.makedirs(OUTPUT_DIR, exist_ok=True)

st.set_page_config(page_title="Auto EDL + Dashboards", layout="wide")

---------------------------

Utility / ETL helpers

---------------------------

def read_file_to_dask(uploaded_file, sample_rows=10000):
"""
Accepts a file-like uploaded_file (from Streamlit), returns a Dask DataFrame.
Supports csv, xlsx, parquet.
Uses dtype inference from sample for performance.
"""
name = uploaded_file.name.lower()
# Read small sample with pandas to infer dtypes (faster)
uploaded_file.seek(0)
if name.endswith(".csv"):
sample_df = pd.read_csv(uploaded_file, nrows=sample_rows)
dtypes = sample_df.dtypes.to_dict()
uploaded_file.seek(0)
# let Dask read CSV with inferred dtypes
df = dd.read_csv(uploaded_file, assume_missing=True, dtype=dtypes, blocksize="64MB")
return df
elif name.endswith((".xls", ".xlsx")):
# pandas to read excel (Dask has no direct read_excel)
uploaded_file.seek(0)
pdf = pd.read_excel(uploaded_file)
return dd.from_pandas(pdf, npartitions=8)
elif name.endswith(".parquet"):
# Save uploaded bytes then read with dask
tmp_path = os.path.join(OUTPUT_DIR, f"tmp_{int(datetime.now().timestamp())}.parquet")
with open(tmp_path, "wb") as f:
f.write(uploaded_file.read())
df = dd.read_parquet(tmp_path)
return df
else:
raise ValueError("Unsupported file type. Upload CSV, Excel or Parquet.")

def basic_cleaning(ddf, config):
"""
Perform a set of automatic cleaning tasks on Dask DataFrame:
- trim column names
- drop duplicate rows (if asked)
- parse date columns heuristically
- fill missing for numeric columns (median) and categorical (mode)
- create a small set of features (year/month if date present)
"""
# normalize column names
ddf.columns = [c.strip().lower().replace(" ", "_") for c in ddf.columns]

# drop exact duplicates if requested
if config.get("drop_duplicates", True):
    ddf = ddf.drop_duplicates()

# detect date columns by dtype or name
date_cols = [c for c, dt in ddf.dtypes.items() if "datetime" in str(dt).lower()]
# heuristic by name
for c in ddf.columns:
    if c not in date_cols and any(k in c for k in ["date", "txn", "transaction", "posted", "time"]):
        try:
            ddf[c] = dd.to_datetime(ddf[c], errors="coerce")
            date_cols.append(c)
        except Exception:
            pass

# numeric vs categorical fill
num_cols = [c for c, dt in ddf.dtypes.items() if "int" in str(dt).lower() or "float" in str(dt).lower()]
cat_cols = [c for c in ddf.columns if c not in num_cols and c not in date_cols]

# Fill numeric with median (computed with dask)
for c in num_cols:
    try:
        med = ddf[c].quantile(0.5).compute()
        ddf[c] = ddf[c].fillna(med)
    except Exception:
        ddf[c] = ddf[c].fillna(0)

# Fill categorical with mode (approx)
for c in cat_cols:
    try:
        top = ddf[c].value_counts().nlargest(1).compute()
        if not top.empty:
            mode_val = top.index[0]
            ddf[c] = ddf[c].fillna(mode_val)
        else:
            ddf[c] = ddf[c].fillna("missing")
    except Exception:
        ddf[c] = ddf[c].fillna("missing")

# Create date features if any date column exists (take first)
if date_cols:
    dtc = date_cols[0]
    ddf["__year"] = dd.to_datetime(ddf[dtc], errors="coerce").dt.year
    ddf["__month"] = dd.to_datetime(ddf[dtc], errors="coerce").dt.month
    ddf["__day"] = dd.to_datetime(ddf[dtc], errors="coerce").dt.day

return ddf, {"date_cols": date_cols, "num_cols": num_cols, "cat_cols": cat_cols}

def save_dask_to_parquet(ddf, base_name="cleaned"):
out_path = os.path.join(OUTPUT_DIR, f"{base_name}_{int(datetime.now().timestamp())}.parquet")
ddf.to_parquet(out_path, engine="pyarrow", write_index=False)
return out_path

---------------------------

Streamlit UI

---------------------------

st.title("📂 Auto EDL + Dashboards — Upload & Visualize (Local)")
st.markdown(
"""
Upload a raw credit-card dataset (CSV / Excel / Parquet).
The app will automatically:
- Extract & load using Dask for large files
- Perform data cleaning & basic feature creation (EDL)
- Store cleaned data as Parquet for fast re-use
- Build interactive dashboards (summary, histograms, correlations)
"""
)

File uploader

uploaded_file = st.file_uploader("Upload CSV / Excel / Parquet (supports large files)", type=["csv","xlsx","xls","parquet"])
drop_dup = st.checkbox("Drop duplicate rows (recommended)", value=True)
max_rows_preview = st.slider("Max rows to preview (in-browser)", 1000, 10000, 2000, step=500)

if uploaded_file:
# Stage 1: Read to Dask
with st.spinner("Reading file into Dask (scales to large files)..."):
try:
ddf = read_file_to_dask(uploaded_file)
except Exception as e:
st.error(f"Failed to read file: {e}")
st.stop()
st.success("File loaded into Dask dataframe.")
st.write("Dask partitions:", ddf.npartitions)

# Show schema (dtypes)
st.subheader("Schema (inferred dtypes)")
# compute small sample for display
sample = ddf.head(max_rows_preview)
st.dataframe(sample.head(10))

# Run cleaning
if st.button("Run EDL and Generate Dashboards"):
    config = {"drop_duplicates": drop_dup}
    with st.spinner("Running cleaning steps... this may take some seconds for 500k rows..."):
        cleaned_ddf, meta = basic_cleaning(ddf, config)
        # persist cleaned ddf (computes)
        cleaned_ddf = cleaned_ddf.persist()
        # Save to parquet
        out_parquet = save_dask_to_parquet(cleaned_ddf, base_name="credit_cleaned")
    st.success(f"Cleaning done — saved cleaned parquet to `{out_parquet}`")

    # Load a sample (pandas) for quick plotting in-browser
    st.subheader("Quick Summary")
    sample_pd = cleaned_ddf.head(max_rows_preview)
    st.write("Rows (sample shown):", len(sample_pd))
    st.dataframe(sample_pd.head(10))

    # Dashboard layout
    st.subheader("Automated Dashboards")

    # 1) Numeric summary
    st.markdown("### Numeric columns summary")
    if meta["num_cols"]:
        num_summary = sample_pd[meta["num_cols"]].describe().T
        st.dataframe(num_summary)
        # show histograms for top 4 numeric cols
        cols_to_plot = meta["num_cols"][:4]
        for c in cols_to_plot:
            fig = px.histogram(sample_pd, x=c, nbins=50, title=f"Distribution of {c}")
            st.plotly_chart(fig, use_container_width=True)
    else:
        st.info("No numeric columns detected.")

    # 2) Categorical top values
    st.markdown("### Categorical columns top values")
    if meta["cat_cols"]:
        for c in meta["cat_cols"][:6]:
            top = (sample_pd[c].value_counts().nlargest(10)).reset_index()
            top.columns = [c, "count"]
            fig = px.bar(top, x=c, y="count", title=f"Top values for {c}")
            st.plotly_chart(fig, use_container_width=True)
    else:
        st.info("No categorical columns detected.")

    # 3) Correlation heatmap (numeric)
    if meta["num_cols"] and len(meta["num_cols"]) > 1:
        st.markdown("### Correlation (numeric)")
        corr = sample_pd[meta["num_cols"]].corr()
        fig = px.imshow(corr, text_auto=True, title="Numeric correlation matrix")
        st.plotly_chart(fig, use_container_width=True)

    # 4) Time series if date detected
    if meta["date_cols"]:
        st.markdown("### Time-series overview")
        dtcol = meta["date_cols"][0]
        # aggregate by month if present
        try:
            sample_pd[dtcol] = pd.to_datetime(sample_pd[dtcol], errors="coerce")
            agg = sample_pd.set_index(dtcol).resample('M').size().rename("count").reset_index()
            fig = px.line(agg, x=dtcol, y="count", title=f"Transactions per month ({dtcol})")
            st.plotly_chart(fig, use_container_width=True)
        except Exception:
            st.info("Couldn't create time-series plot for the detected date column.")

    # 5) Download cleaned data
    st.markdown("### Download cleaned data (Parquet)")
    st.write(f"Saved: `{out_parquet}`")
    with open(out_parquet, "rb") as f:
        st.download_button("Download cleaned parquet", data=f, file_name=os.path.basename(out_parquet), mime="application/octet-stream")

    st.success("Dashboards generated. Scroll up to view charts.")

st.info("Tip: Use Parquet as your upload format next time for fastest processing.")

else:
st.info("Upload your credit-card dataset to get started (CSV, Excel, Parquet).")

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions