Skip to content

Skipping html files returned by the server in UScensuspep_sex import#1942

Open
niveditasing wants to merge 4 commits intodatacommonsorg:masterfrom
niveditasing:Skipping_html
Open

Skipping html files returned by the server in UScensuspep_sex import#1942
niveditasing wants to merge 4 commits intodatacommonsorg:masterfrom
niveditasing:Skipping_html

Conversation

@niveditasing
Copy link
Copy Markdown
Contributor

No description provided.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request includes two distinct sets of changes: a refactoring of the Eurostat life expectancy data preprocessing script to improve efficiency and maintainability, and an update to the US Census PEP download logic to better handle existing files and HTML error pages. I recommend splitting these into separate pull requests to ensure clearer history and more focused reviews.

Comment on lines 15 to +154
import pandas as pd
from six.moves import urllib
import numpy as np
import re
import os
from absl import logging
from absl import app
from absl import logging
from absl import flags
import os

_FLAGS = flags.FLAGS
_MODULE_DIR = os.path.dirname(os.path.abspath(__file__))
flags.DEFINE_string('mode', '', 'Options: download or process')

PATH = 'demo_r_mlifexp.tsv'
flags.DEFINE_string('input_file', 'input_files/input_file.tsv',
'Path to input TSV file')
flags.DEFINE_string('output_file', 'demo_r_mlifexp_cleaned.csv',
'Path to output CSV file')


def nuts_to_iso(data):
"""Convert 2-letter NUTS codes for countries to ISO 3166-1 alpha-3 codes."""
# TODO(jefferyoldham): Consider using util/geo/geo_to_dcid_mappings.go's
# CountryCodeToDCID subject to NUTS's two additions. If so, remove
# countries_codes_and_coordinate.csv
ISO_2_TO_3_PATH = ('./countries_codes_and_coordinates.csv')
ISO_2_TO_3_PATH = os.path.join(_MODULE_DIR,
'countries_codes_and_coordinates.csv')
if not os.path.exists(ISO_2_TO_3_PATH):
logging.warning(
f"{ISO_2_TO_3_PATH} not found. Skipping ISO conversion for countries."
)
return data

codes = pd.read_csv(ISO_2_TO_3_PATH)
# The file seems to have quoted values like '"AD"'
codes["Alpha-2 code"] = codes["Alpha-2 code"].str.extract(r'"([a-zA-Z]+)"')
codes["Alpha-3 code"] = codes["Alpha-3 code"].str.extract(r'"([a-zA-Z]+)"')

# NUTS code matches ISO 3166-1 alpha-2 with two exceptions
codes["NUTS"] = codes["Alpha-2 code"]
codes.loc[codes["NUTS"] == "GR", "NUTS"] = "EL"
codes.loc[codes["NUTS"] == "GB", "NUTS"] = "UK"
code_dict = codes.set_index('NUTS').to_dict()['Alpha-3 code']
data.loc[data.index, 'geo'] = data['geo'].map(code_dict)
assert (~data['geo'].isnull()).all()

code_dict = codes.set_index('NUTS')['Alpha-3 code'].to_dict()

def map_geo(geo):
if len(geo) == 2:
iso3 = code_dict.get(geo)
if iso3:
return f'country/{iso3}'
return f'nuts/{geo}'

data['place'] = data['geo'].apply(map_geo)
return data


def obtain_value(entry):
"""Extract value from entry.
The entries could be like: '81.6', ': ', '79.9 e', ': e'.
"""
entry = entry.split(' ', maxsplit=-1)[0] # Discard notes.
if not entry or entry == ':':
return None
return float(entry)


def download_data(download_link, download_path):
"""Downloads raw data from Eurostat website and stores it in instance
data frame.

Args:
download_link(str): A string representing the URL of the data source.
download_path(str): A string specifying the local file path where the downloaded data will be saved.

Returns:None

"""
try:
logging.info(f'Processing file: {download_path}')
urllib.request.urlretrieve(download_link, "demo_r_mlifexp.tsv.gz")
raw_df = pd.read_table("demo_r_mlifexp.tsv.gz")
raw_df.to_csv(download_path, index=False, sep='\t')
logging.info(f'Downloaded {download_path} from {download_link}')
except Exception as e:
logging.fatal(f'Download error for: {download_link}: {e}')


def preprocess(file_path):
"""Preprocess the tsv file for importing into DataCommons.
Args:
input_file: Path to the input TSV file.
Returns:
None"""
try:
logging.info('File processing start')
data = pd.read_csv(file_path, delimiter='\t')
data = data.rename(columns=({
'freq,unit,sex,age,geo\TIME_PERIOD': 'unit,sex,age,geo\\time'
}))
data['unit,sex,age,geo\\time'] = data[
'unit,sex,age,geo\\time'].str.slice(2)
identifier = 'unit,sex,age,geo\\time'
assert data.columns.values[0].endswith(
'\\time'), "Expected the first column header to end with '\\time'."
years = list(data.columns.values)
years.remove(identifier)
data = pd.melt(data,
id_vars=identifier,
value_vars=years,
var_name='year',
value_name='life_expectancy')

# Format string into desired format.
data['year'] = data['year'].astype(int) # remove spaces, e.g. "2018 "
data['life_expectancy'] = data['life_expectancy'].apply(obtain_value)

# Generate the statvars that each row belongs to.
data[['unit', 'sex', 'age',
'geo']] = data[identifier].str.split(',', expand=True)
assert (data['unit'] == 'YR').all()
data['sex'] = data['sex'].map({'F': '_Female', 'M': '_Male', 'T': ''})
assert (~data['sex'].isnull()).all()
age_except = data['age'].isin(['Y_GE85', 'Y_LT1'])
data.loc[age_except, 'age'] = data.loc[age_except, 'age'].map({
'Y_GE85': '85OrMoreYears',
'Y_LT1': 'Upto1Years'
})
data.loc[~age_except, 'age'] = data.loc[~age_except, 'age'].str.replace(
'Y', '') + "Years"
data = data.drop(columns=[identifier])
data['StatVar'] = "LifeExpectancy_Person_" + data['age'] + data['sex']
data = data.drop(columns=['unit', 'sex', 'age'])
statvars = data['StatVar'].unique()

# Convert the nuts codes to dcids
data_country = data[data['geo'].str.len() <= 2]
data_nuts = data[~(data['geo'].str.len() <= 2)]
data_country = nuts_to_iso(
data_country) # convert nuts code to ISO 3166-1 alpha-3
data.loc[data_country.index, 'geo'] = 'country/' + data_country['geo']
data.loc[data_nuts.index, 'geo'] = 'nuts/' + data_nuts['geo']

# Separate data of different StatVars from one column into multiple columns
# For example:
# geo year StatVar sv1_geo sv1_year sv2_geo sv2_year
# nuts/AT1 2018 sv1 => nuts/AT1 2018 nuts/AT2 2018
# nuts/AT2 2018 sv2
data_grouped = data.groupby('StatVar')
subsets = []
for _, subset in data_grouped:
pivot = subset['StatVar'].iloc[0] # get the statvar name
subset = subset.rename(
columns={
'geo': pivot + '_geo',
'year': pivot + '_year',
'life_expectancy': pivot
})
subset = subset.drop(columns=['StatVar']).reset_index(drop=True)
subsets.append(subset)
data = pd.concat(subsets, axis=1, join='outer')

# Save the processed data into CSV file.
data.to_csv(PATH[:-4] + '_cleaned.csv', index=False)
logging.info('File processing completed')
return
except Exception as e:
logging.fatal(f'Processing error {e}')
"""Extract value from entry."""
if pd.isna(entry) or entry == ':':
return np.nan
if isinstance(entry, str):
entry = entry.split(' ', maxsplit=-1)[0]
if entry == ':':
return np.nan
try:
return float(entry)
except ValueError:
return np.nan
return entry


def preprocess(input_file, output_file):
logging.info(f'Processing file: {input_file}')

# Read TSV
data = pd.read_csv(input_file, delimiter='\t')

# Identify the first column which contains multiple dimensions
identifier = data.columns[0]
years = [col for col in data.columns if col != identifier]

# Melt to long format
data = pd.melt(data,
id_vars=identifier,
value_vars=years,
var_name='year',
value_name='value')

# Clean year and value
data['year'] = data['year'].str.strip().astype(int)
data['value'] = data['value'].apply(obtain_value)

# Drop rows with NaN values
data = data.dropna(subset=['value'])

# Split dimensions
# Format is: freq,unit,sex,age,geo\TIME_PERIOD
# But wait, freq is stripped in the original preprocess.py?
# data['unit,sex,age,geo\time'] = data['unit,sex,age,geo\time'].str.slice(2)
# Let's check the first column content

dims = identifier.split('\\')[0].split(',')
data[dims] = data[identifier].str.split(',', expand=True)

# Map sex
data['sex_mapped'] = data['sex'].map({
'F': '_Female',
'M': '_Male',
'T': ''
})

# Map age
age_map = {
'Y_GE85': '85OrMoreYears',
'Y_LT1': 'Upto1Years',
'Y_GE95': '95OrMoreYears'
}

def map_age(age):
if age in age_map:
return age_map[age]
if age.startswith('Y'):
return age[1:] + "Years"
return age + "Years"

data['age_mapped'] = data['age'].apply(map_age)

# Create SV (StatVar)
data['SV'] = "dcid:LifeExpectancy_Person_" + data['age_mapped'] + data[
'sex_mapped']

# Map geo to place
data = nuts_to_iso(data)

# Select final columns
final_df = data[['year', 'place', 'SV', 'value']]

# Sort for consistency
final_df = final_df.sort_values(['year', 'place', 'SV'])

# Save to CSV
final_df.to_csv(output_file, index=False)
logging.info(f'Processed data saved to {output_file}')


def main(_):
mode = _FLAGS.mode
_DATA_URL = "https://ec.europa.eu/eurostat/api/dissemination/sdmx/2.1/data/demo_r_mlifexp/?format=TSV&compressed=true"

input_path = os.path.join(_MODULE_DIR, 'input_files')
if not os.path.exists(input_path):
os.makedirs(input_path)
input_file = os.path.join(input_path, 'input_file.tsv')

if mode == "" or mode == "download":
download_data(_DATA_URL, input_file)
if mode == "" or mode == "process":
preprocess(input_file)
preprocess(_FLAGS.input_file, _FLAGS.output_file)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This pull request appears to combine two unrelated changes: a fix for downloading files in the us_census/pep/us_pep_sex import, and a major refactoring of the eurostat/regional_statistics_by_nuts/life_expectancy import.

While the refactoring of this Eurostat import is a significant improvement, it would be best to submit it as a separate pull request. This would:

  • Allow for a descriptive title and body, making it easier to find and understand in the future.
  • Enable a more focused review of the refactoring itself.

Mixing unrelated changes in a single PR can make the repository history harder to follow and complicates potential rollbacks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant