From 7cc4748ee290f148ba69a80d1ddb6aafe025782e Mon Sep 17 00:00:00 2001 From: Daniel Slaugh Date: Fri, 6 Feb 2026 15:19:13 -0700 Subject: [PATCH 1/8] ETL improved logging --- src/hydroserverpy/api/models/etl/task.py | 41 ++++++++++++++++--- src/hydroserverpy/etl/extractors/base.py | 16 +++++--- .../etl/extractors/http_extractor.py | 10 +++-- .../etl/extractors/local_file_extractor.py | 13 ++++-- .../etl/transformers/json_transformer.py | 1 + 5 files changed, 63 insertions(+), 18 deletions(-) diff --git a/src/hydroserverpy/api/models/etl/task.py b/src/hydroserverpy/api/models/etl/task.py index e0c4f7a..1a603ff 100644 --- a/src/hydroserverpy/api/models/etl/task.py +++ b/src/hydroserverpy/api/models/etl/task.py @@ -183,6 +183,7 @@ def run_local(self): task_run = self.create_task_run(status="RUNNING", started_at=datetime.now(timezone.utc)) + runtime_source_uri: Optional[str] = None try: logging.info("Starting extract") @@ -199,9 +200,13 @@ def run_local(self): ] data = extractor_cls.extract(self, loader_cls) + runtime_source_uri = getattr(extractor_cls, "runtime_source_uri", None) if self.is_empty(data): self._update_status( - loader_cls, True, "No data returned from the extractor" + task_run, + True, + "No data returned from the extractor", + runtime_source_uri=runtime_source_uri, ) return @@ -209,15 +214,22 @@ def run_local(self): data = transformer_cls.transform(data, task_mappings) if self.is_empty(data): self._update_status( - loader_cls, True, "No data returned from the transformer" + task_run, + True, + "No data returned from the transformer", + runtime_source_uri=runtime_source_uri, ) return logging.info("Starting load") loader_cls.load(data, self) - self._update_status(task_run, True, "OK") + self._update_status( + task_run, True, "OK", runtime_source_uri=runtime_source_uri + ) except Exception as e: - self._update_status(task_run, False, str(e)) + self._update_status( + task_run, False, str(e), runtime_source_uri=runtime_source_uri + ) @staticmethod def is_empty(data): @@ -229,11 +241,28 @@ def is_empty(data): return False - def _update_status(self, task_run: TaskRun, success: bool, msg: str): + def _update_status( + self, + task_run: TaskRun, + success: bool, + msg: str, + runtime_source_uri: Optional[str] = None, + ): + result = {"message": msg} + if runtime_source_uri: + result.update( + { + "runtimeSourceUri": runtime_source_uri, + "runtime_source_uri": runtime_source_uri, + "runtimeUrl": runtime_source_uri, + "runtime_url": runtime_source_uri, + } + ) + self.update_task_run( task_run.id, status="SUCCESS" if success else "FAILURE", - result={"message": msg} + result=result ) self.next_run_at = self._next_run() self.save() diff --git a/src/hydroserverpy/etl/extractors/base.py b/src/hydroserverpy/etl/extractors/base.py index 5eeeebb..045541d 100644 --- a/src/hydroserverpy/etl/extractors/base.py +++ b/src/hydroserverpy/etl/extractors/base.py @@ -9,21 +9,22 @@ class Extractor: def __init__(self, extractor_config: ExtractorConfig): self.cfg = extractor_config + self.runtime_source_uri = None def resolve_placeholder_variables(self, task: Task, loader): - logging.info(f"Creating runtime variables...") + logging.info("Resolving extractor runtime variables...") filled = {} for placeholder in self.cfg.placeholder_variables: name = placeholder.name if placeholder.type == "runTime": - logging.info(f"Resolving runtime var: {name}") + logging.info("Resolving runtime var: %s", name) if placeholder.run_time_value == "latestObservationTimestamp": value = loader.earliest_begin_date(task) elif placeholder.run_time_value == "jobExecutionTime": value = pd.Timestamp.now(tz="UTC") elif placeholder.type == "perTask": - logging.info(f"Resolving task var: {name}") + logging.info("Resolving task var: %s", name) if name not in task.extractor_variables: raise KeyError(f"Missing per-task variable '{name}'") value = task.extractor_variables[name] @@ -36,8 +37,13 @@ def resolve_placeholder_variables(self, task: Task, loader): filled[name] = value if not filled: - return self.cfg.source_uri - return self.format_uri(filled) + uri = self.cfg.source_uri + else: + uri = self.format_uri(filled) + + self.runtime_source_uri = uri + logging.info("Resolved runtime source URI: %s", uri) + return uri def format_uri(self, placeholder_variables): try: diff --git a/src/hydroserverpy/etl/extractors/http_extractor.py b/src/hydroserverpy/etl/extractors/http_extractor.py index 8181848..ea5df51 100644 --- a/src/hydroserverpy/etl/extractors/http_extractor.py +++ b/src/hydroserverpy/etl/extractors/http_extractor.py @@ -15,10 +15,14 @@ def extract(self, task: Task, loader=None): Downloads the file from the HTTP/HTTPS server and returns a file-like object. """ url = self.resolve_placeholder_variables(task, loader) - logging.info(f"Requesting data from → {url}") + logging.info("Requesting data from → %s", url) - response = requests.get(url) - response.raise_for_status() + try: + response = requests.get(url) + response.raise_for_status() + except Exception as e: + logging.error("HTTP request failed for %s: %s", url, e) + raise data = BytesIO() for chunk in response.iter_content(chunk_size=8192): diff --git a/src/hydroserverpy/etl/extractors/local_file_extractor.py b/src/hydroserverpy/etl/extractors/local_file_extractor.py index 813f855..d20f23f 100644 --- a/src/hydroserverpy/etl/extractors/local_file_extractor.py +++ b/src/hydroserverpy/etl/extractors/local_file_extractor.py @@ -7,14 +7,19 @@ class LocalFileExtractor(Extractor): def __init__(self, extractor_config: ExtractorConfig): super().__init__(extractor_config) - def extract(self, *args, **kwargs): + def extract(self, task=None, loader=None): """ Opens the file and returns a file-like object. """ + path = ( + self.resolve_placeholder_variables(task, loader) + if task is not None + else self.cfg.source_uri + ) try: - file_handle = open(self.cfg.source_uri, "r") - logging.info(f"Successfully opened file '{self.cfg.source_uri}'.") + file_handle = open(path, "r") + logging.info("Successfully opened file '%s'.", path) return file_handle except Exception as e: - logging.error(f"Error opening file '{self.cfg.source_uri}': {e}") + logging.error("Error opening file '%s': %s", path, e) return None diff --git a/src/hydroserverpy/etl/transformers/json_transformer.py b/src/hydroserverpy/etl/transformers/json_transformer.py index fa3e89f..c699d7d 100644 --- a/src/hydroserverpy/etl/transformers/json_transformer.py +++ b/src/hydroserverpy/etl/transformers/json_transformer.py @@ -35,6 +35,7 @@ def transform(self, data_file, mappings: List[SourceTargetMapping]): logging.warning("No data points found in the JSON data.") return None + logging.info("Extracted %s JSON data points.", len(data_points)) df = pd.DataFrame(data_points) return self.standardize_dataframe(df, mappings) From 0fb0962738194c404c1ab82ac31acfc6e6bc5027 Mon Sep 17 00:00:00 2001 From: Daniel Slaugh Date: Fri, 6 Feb 2026 15:43:04 -0700 Subject: [PATCH 2/8] More explicit run messages --- src/hydroserverpy/api/models/etl/task.py | 40 ++++++++++- .../etl/loaders/hydroserver_loader.py | 69 ++++++++++++++++--- 2 files changed, 99 insertions(+), 10 deletions(-) diff --git a/src/hydroserverpy/api/models/etl/task.py b/src/hydroserverpy/api/models/etl/task.py index 1a603ff..90ca333 100644 --- a/src/hydroserverpy/api/models/etl/task.py +++ b/src/hydroserverpy/api/models/etl/task.py @@ -222,9 +222,12 @@ def run_local(self): return logging.info("Starting load") - loader_cls.load(data, self) + load_stats = loader_cls.load(data, self) self._update_status( - task_run, True, "OK", runtime_source_uri=runtime_source_uri + task_run, + True, + self._success_message(load_stats), + runtime_source_uri=runtime_source_uri, ) except Exception as e: self._update_status( @@ -267,6 +270,39 @@ def _update_status( self.next_run_at = self._next_run() self.save() + @staticmethod + def _success_message(load_stats: Optional[dict]) -> str: + if not isinstance(load_stats, dict): + return "OK" + + loaded = load_stats.get("observations_loaded") + datastreams_loaded = load_stats.get("datastreams_loaded") + available = load_stats.get("observations_available") + timestamps_total = load_stats.get("timestamps_total") + timestamps_after_cutoff = load_stats.get("timestamps_after_cutoff") + + if loaded is None: + return "OK" + + if loaded == 0: + if timestamps_total and timestamps_after_cutoff == 0: + cutoff = load_stats.get("cutoff") + if cutoff: + return ( + "No new observations to load " + f"(all timestamps were at or before {cutoff})." + ) + return "No new observations to load (all timestamps were at or before the cutoff)." + if available == 0: + return "No new observations to load." + return "No new observations were loaded." + + if datastreams_loaded is not None: + return ( + f"Load completed successfully ({loaded} rows across {datastreams_loaded} datastreams)." + ) + return f"Load completed successfully ({loaded} rows loaded)." + def _next_run(self) -> Optional[str]: now = datetime.now(timezone.utc) if cron := self.crontab: diff --git a/src/hydroserverpy/etl/loaders/hydroserver_loader.py b/src/hydroserverpy/etl/loaders/hydroserver_loader.py index b4e942b..3b19ed6 100644 --- a/src/hydroserverpy/etl/loaders/hydroserver_loader.py +++ b/src/hydroserverpy/etl/loaders/hydroserver_loader.py @@ -1,5 +1,5 @@ from __future__ import annotations -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any, Dict from .base import Loader import logging @@ -20,30 +20,70 @@ def __init__(self, client: HydroServer, task_id): self._begin_cache: dict[str, pd.Timestamp] = {} self.task_id = task_id - def load(self, data: pd.DataFrame, task: Task) -> None: + def load(self, data: pd.DataFrame, task: Task) -> Dict[str, Any]: """ Load observations from a DataFrame to the HydroServer. :param data: A Pandas DataFrame where each column corresponds to a datastream. """ begin_date = self.earliest_begin_date(task) new_data = data[data["timestamp"] > begin_date] + + cutoff_value = ( + begin_date.isoformat() + if hasattr(begin_date, "isoformat") + else str(begin_date) + ) + stats: Dict[str, Any] = { + "cutoff": cutoff_value, + "timestamps_total": len(data), + "timestamps_after_cutoff": len(new_data), + "timestamps_filtered_by_cutoff": max(len(data) - len(new_data), 0), + "observations_available": 0, + "observations_loaded": 0, + "observations_skipped": 0, + "observations_filtered_by_end_time": 0, + "datastreams_total": 0, + "datastreams_available": 0, + "datastreams_loaded": 0, + "per_datastream": {}, + } + for col in new_data.columns.difference(["timestamp"]): - datastream = self.client.datastreams.get( - uid=str(col) - ) + stats["datastreams_total"] += 1 + datastream = self.client.datastreams.get(uid=str(col)) ds_cutoff = datastream.phenomenon_end_time - df = ( + + base_df = ( new_data[["timestamp", col]] - .loc[lambda d: d["timestamp"] > ds_cutoff if ds_cutoff else True] .rename(columns={col: "value"}) .dropna(subset=["value"]) ) + pre_count = len(base_df) + if ds_cutoff: + base_df = base_df.loc[base_df["timestamp"] > ds_cutoff] + + filtered_by_end = pre_count - len(base_df) + if filtered_by_end: + stats["observations_filtered_by_end_time"] += filtered_by_end + + df = base_df + available = len(df) + stats["observations_available"] += available if df.empty: - logging.warning(f"No new data for {col}, skipping.") + logging.warning( + "No new data for %s after filtering; skipping.", col + ) + stats["per_datastream"][str(col)] = { + "available": 0, + "loaded": 0, + "skipped": 0, + } continue + stats["datastreams_available"] += 1 df = df.rename(columns={"timestamp": "phenomenon_time", "value": "result"}) + loaded = 0 # Chunked upload CHUNK_SIZE = 5000 total = len(df) @@ -61,6 +101,7 @@ def load(self, data: pd.DataFrame, task: Task) -> None: self.client.datastreams.load_observations( uid=str(col), observations=chunk ) + loaded += len(chunk) except Exception as e: status = getattr(e, "status_code", None) or getattr( getattr(e, "response", None), "status_code", None @@ -74,6 +115,18 @@ def load(self, data: pd.DataFrame, task: Task) -> None: ) raise + stats["observations_loaded"] += loaded + stats["observations_skipped"] += max(available - loaded, 0) + if loaded > 0: + stats["datastreams_loaded"] += 1 + stats["per_datastream"][str(col)] = { + "available": available, + "loaded": loaded, + "skipped": max(available - loaded, 0), + } + + return stats + def _fetch_earliest_begin( self, mappings: list[SourceTargetMapping] ) -> pd.Timestamp: From 26b5dc58784189060a997abae1a41082ffe1ac5d Mon Sep 17 00:00:00 2001 From: Daniel Slaugh Date: Mon, 9 Feb 2026 15:26:16 -0700 Subject: [PATCH 3/8] Add README --- src/hydroserverpy/etl/STATES_README.md | 32 ++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 src/hydroserverpy/etl/STATES_README.md diff --git a/src/hydroserverpy/etl/STATES_README.md b/src/hydroserverpy/etl/STATES_README.md new file mode 100644 index 0000000..e8623ed --- /dev/null +++ b/src/hydroserverpy/etl/STATES_README.md @@ -0,0 +1,32 @@ +## Possible error states: + +Config file validation +Tell the user exactly which configuration variables are missing or invalid +Could not connect to the source system. +The source system did not respond before the timeout. +Authentication with the source system failed; credentials may be invalid or expired. +The requested payload was not found on the source system. +The source system returned no data. + +The source returned a format different from what this job expects. +The payload’s expected fields were not found. +For CSV: + +- The header row contained unexpected values and could not be processed. +- One or more data rows contained unexpected values and could not be processed. + +For JSON: + +- The timestamp or value key couldn’t be found with the specified JMESPath query + +This job references a resource that no longer exists. +The file structure does not match the configuration. + +HydroServer rejected some or all of the data. +The target datastream could not be found. +An internal system error occurred while processing the job. +The job stopped before completion. + +## Possible warning states: + +## Possible success states: From 2ae818f40ada8bcf3c778123e258ad4a16d1d830 Mon Sep 17 00:00:00 2001 From: Daniel Slaugh Date: Tue, 10 Feb 2026 09:22:21 -0700 Subject: [PATCH 4/8] Add better logging --- src/hydroserverpy/api/models/etl/task.py | 32 ++--- src/hydroserverpy/etl/STATES_README.md | 77 ++++++++---- src/hydroserverpy/etl/etl_configuration.py | 23 +++- src/hydroserverpy/etl/extractors/base.py | 47 +++++-- .../etl/extractors/ftp_extractor.py | 30 ++++- .../etl/extractors/http_extractor.py | 46 ++++++- .../etl/extractors/local_file_extractor.py | 11 +- .../etl/loaders/hydroserver_loader.py | 115 ++++++++++-------- src/hydroserverpy/etl/logging_utils.py | 64 ++++++++++ src/hydroserverpy/etl/timestamp_parser.py | 35 +++++- src/hydroserverpy/etl/transformers/base.py | 60 +++++++-- .../etl/transformers/csv_transformer.py | 67 ++++++++-- .../etl/transformers/json_transformer.py | 30 ++++- 13 files changed, 497 insertions(+), 140 deletions(-) create mode 100644 src/hydroserverpy/etl/logging_utils.py diff --git a/src/hydroserverpy/api/models/etl/task.py b/src/hydroserverpy/api/models/etl/task.py index 90ca333..145e6ca 100644 --- a/src/hydroserverpy/api/models/etl/task.py +++ b/src/hydroserverpy/api/models/etl/task.py @@ -8,6 +8,7 @@ from datetime import datetime, timedelta, timezone from pydantic import Field, AliasPath, AliasChoices, TypeAdapter from hydroserverpy.etl.factories import extractor_factory, transformer_factory, loader_factory +from hydroserverpy.etl.loaders.hydroserver_loader import LoadSummary from hydroserverpy.etl.etl_configuration import ExtractorConfig, TransformerConfig, LoaderConfig, SourceTargetMapping, MappingPath from ..base import HydroServerBaseModel from .orchestration_system import OrchestrationSystem @@ -222,11 +223,11 @@ def run_local(self): return logging.info("Starting load") - load_stats = loader_cls.load(data, self) + load_summary = loader_cls.load(data, self) self._update_status( task_run, True, - self._success_message(load_stats), + self._success_message(load_summary), runtime_source_uri=runtime_source_uri, ) except Exception as e: @@ -271,35 +272,26 @@ def _update_status( self.save() @staticmethod - def _success_message(load_stats: Optional[dict]) -> str: - if not isinstance(load_stats, dict): - return "OK" - - loaded = load_stats.get("observations_loaded") - datastreams_loaded = load_stats.get("datastreams_loaded") - available = load_stats.get("observations_available") - timestamps_total = load_stats.get("timestamps_total") - timestamps_after_cutoff = load_stats.get("timestamps_after_cutoff") - - if loaded is None: + def _success_message(load: Optional[LoadSummary]) -> str: + if not load: return "OK" + loaded = load.observations_loaded if loaded == 0: - if timestamps_total and timestamps_after_cutoff == 0: - cutoff = load_stats.get("cutoff") - if cutoff: + if load.timestamps_total and load.timestamps_after_cutoff == 0: + if load.cutoff: return ( "No new observations to load " - f"(all timestamps were at or before {cutoff})." + f"(all timestamps were at or before {load.cutoff})." ) return "No new observations to load (all timestamps were at or before the cutoff)." - if available == 0: + if load.observations_available == 0: return "No new observations to load." return "No new observations were loaded." - if datastreams_loaded is not None: + if load.datastreams_loaded: return ( - f"Load completed successfully ({loaded} rows across {datastreams_loaded} datastreams)." + f"Load completed successfully ({loaded} rows across {load.datastreams_loaded} datastreams)." ) return f"Load completed successfully ({loaded} rows loaded)." diff --git a/src/hydroserverpy/etl/STATES_README.md b/src/hydroserverpy/etl/STATES_README.md index e8623ed..5060041 100644 --- a/src/hydroserverpy/etl/STATES_README.md +++ b/src/hydroserverpy/etl/STATES_README.md @@ -1,32 +1,65 @@ -## Possible error states: - -Config file validation -Tell the user exactly which configuration variables are missing or invalid -Could not connect to the source system. -The source system did not respond before the timeout. -Authentication with the source system failed; credentials may be invalid or expired. -The requested payload was not found on the source system. -The source system returned no data. - -The source returned a format different from what this job expects. -The payload’s expected fields were not found. -For CSV: +## Possible Needs Attention states: + +These are the most important end-user messages the ETL system can return for a task run +that needs user action. + +### Configuration / Setup + +- Invalid extractor configuration. Tell the user exactly which field is missing or invalid. +- Invalid transformer configuration. Tell the user exactly which field is missing or invalid. +- A required configuration value is missing. +- A required configuration value is null where a value is expected. +- Missing required per-task extractor variable "". +- Extractor source URI contains a placeholder "", but it was not provided. +- Task configuration is missing required daylight savings offset (when using daylightSavings mode). + +### Data Source (Connectivity / Authentication) + +- Could not connect to the source system. +- The source system did not respond before the timeout. +- Authentication with the source system failed; credentials may be invalid or expired. +- The requested payload was not found on the source system. +- The source system returned no data. +### Source Data Did Not Match The Task + +- The source returned a format different from what this job expects. +- The payload's expected fields were not found. +- One or more timestamps could not be read with the current settings. +- This job references a resource that no longer exists. +- The file structure does not match the configuration. + +For CSV: - The header row contained unexpected values and could not be processed. - One or more data rows contained unexpected values and could not be processed. +- Timestamp column "" was not found in the extracted data. +- A mapping source index is out of range for the extracted data. +- A mapping source column was not found in the extracted data. For JSON: +- The timestamp or value key could not be found with the specified query. +- Transformer did not receive any extracted data to parse. + +### Targets / HydroServer + +- HydroServer rejected some or all of the data. +- The target data series (datastream) could not be found. + - This may happen if the datastream was deleted or the mapping points to the wrong target. -- The timestamp or value key couldn’t be found with the specified JMESPath query +### Unexpected System Error -This job references a resource that no longer exists. -The file structure does not match the configuration. +- An internal system error occurred while processing the job. +- The job stopped before completion. -HydroServer rejected some or all of the data. -The target datastream could not be found. -An internal system error occurred while processing the job. -The job stopped before completion. +## Possible OK states: -## Possible warning states: +These are the most important end-user messages the ETL system can return for a successful run. -## Possible success states: +- Load completed successfully. +- Load completed successfully ( rows loaded). +- Load completed successfully ( rows across datastreams). +- No new observations to load. +- No new observations were loaded. +- No new observations to load (all timestamps were at or before ). +- No data returned from the extractor. Nothing to load. +- Transform produced no rows. Nothing to load. diff --git a/src/hydroserverpy/etl/etl_configuration.py b/src/hydroserverpy/etl/etl_configuration.py index 34ca9e7..8634668 100644 --- a/src/hydroserverpy/etl/etl_configuration.py +++ b/src/hydroserverpy/etl/etl_configuration.py @@ -2,6 +2,7 @@ from typing import Annotated, Dict, List, Literal, Optional, Union from pydantic import BaseModel, Field, field_validator from enum import Enum +from zoneinfo import ZoneInfo WorkflowType = Literal["ETL", "Aggregation", "Virtual", "SDL"] CSVDelimiterType = Literal[",", "|", "\t", ";", " "] @@ -76,12 +77,28 @@ class Timestamp(BaseModel): class Config: populate_by_name = True + validate_default = True - @field_validator("timezone") + @field_validator("timezone", mode="after") def check_timezone(cls, timezone_value, info): mode = info.data.get("timezone_mode") - if mode == TimezoneMode.fixedOffset and timezone_value is None: - raise ValueError("`timezone` must be set when timezoneMode is fixedOffset") + if mode == TimezoneMode.fixedOffset: + if timezone_value is None: + raise ValueError( + "`timezone` must be set when timezoneMode is fixedOffset (e.g. '-0700')" + ) + if mode == TimezoneMode.daylightSavings: + if timezone_value is None or str(timezone_value).strip() == "": + raise ValueError( + "Task configuration is missing required daylight savings offset (when using daylightSavings mode)." + ) + # Validate it's a real IANA tz name early to avoid cryptic ZoneInfo errors later. + try: + ZoneInfo(str(timezone_value)) + except Exception: + raise ValueError( + f"Invalid timezone {timezone_value!r}. Use an IANA timezone like 'America/Denver'." + ) return timezone_value diff --git a/src/hydroserverpy/etl/extractors/base.py b/src/hydroserverpy/etl/extractors/base.py index 045541d..7668869 100644 --- a/src/hydroserverpy/etl/extractors/base.py +++ b/src/hydroserverpy/etl/extractors/base.py @@ -4,6 +4,10 @@ from datetime import datetime from ..etl_configuration import ExtractorConfig, Task from ..timestamp_parser import TimestampParser +from ..logging_utils import redact_url, summarize_list + + +logger = logging.getLogger(__name__) class Extractor: @@ -12,21 +16,32 @@ def __init__(self, extractor_config: ExtractorConfig): self.runtime_source_uri = None def resolve_placeholder_variables(self, task: Task, loader): - logging.info("Resolving extractor runtime variables...") + placeholders = list(self.cfg.placeholder_variables or []) filled = {} - for placeholder in self.cfg.placeholder_variables: + runtime_names: set[str] = set() + task_names: set[str] = set() + for placeholder in placeholders: name = placeholder.name if placeholder.type == "runTime": - logging.info("Resolving runtime var: %s", name) + logger.debug("Resolving runtime var: %s", name) + runtime_names.add(name) if placeholder.run_time_value == "latestObservationTimestamp": value = loader.earliest_begin_date(task) elif placeholder.run_time_value == "jobExecutionTime": value = pd.Timestamp.now(tz="UTC") elif placeholder.type == "perTask": - logging.info("Resolving task var: %s", name) + logger.debug("Resolving task var: %s", name) + task_names.add(name) if name not in task.extractor_variables: - raise KeyError(f"Missing per-task variable '{name}'") + logger.error( + "Missing per-task extractor variable '%s'. Provided extractorVariables keys=%s", + name, + summarize_list(sorted((task.extractor_variables or {}).keys())), + ) + raise ValueError( + f"Missing required per-task extractor variable '{name}'." + ) value = task.extractor_variables[name] else: continue @@ -36,13 +51,24 @@ def resolve_placeholder_variables(self, task: Task, loader): value = parser.utc_to_string(value) filled[name] = value + + if runtime_names: + names = ", ".join(sorted(runtime_names)) + logger.debug( + "Runtime variables resolved (%s): %s", len(runtime_names), names + ) + if task_names: + names = ", ".join(sorted(task_names)) + logger.debug("Task variables resolved (%s): %s", len(task_names), names) + if not filled: uri = self.cfg.source_uri else: uri = self.format_uri(filled) self.runtime_source_uri = uri - logging.info("Resolved runtime source URI: %s", uri) + # Keep a stable log prefix for downstream parsing, but redact secrets. + logger.info("Resolved runtime source URI: %s", redact_url(uri)) return uri def format_uri(self, placeholder_variables): @@ -50,7 +76,14 @@ def format_uri(self, placeholder_variables): uri = self.cfg.source_uri.format(**placeholder_variables) except KeyError as e: missing_key = e.args[0] - raise KeyError(f"Missing placeholder variable: {missing_key}") + logger.error( + "Failed to format sourceUri: missing placeholder '%s'. Provided placeholders=%s", + missing_key, + summarize_list(sorted(placeholder_variables.keys())), + ) + raise ValueError( + f"Extractor source URI contains a placeholder '{missing_key}', but it was not provided." + ) return uri @abstractmethod diff --git a/src/hydroserverpy/etl/extractors/ftp_extractor.py b/src/hydroserverpy/etl/extractors/ftp_extractor.py index b715308..15f08b3 100644 --- a/src/hydroserverpy/etl/extractors/ftp_extractor.py +++ b/src/hydroserverpy/etl/extractors/ftp_extractor.py @@ -1,5 +1,5 @@ import logging -from ftplib import FTP +from ftplib import FTP, error_perm from io import BytesIO from typing import Dict @@ -7,6 +7,9 @@ from ..types import TimeRange +logger = logging.getLogger(__name__) + + class FTPExtractor(Extractor): def __init__( self, @@ -33,18 +36,33 @@ def extract(self): try: ftp.connect(self.host, self.port) ftp.login(user=self.username, passwd=self.password) - logging.info(f"Connected to FTP server: {self.host}:{self.port}") + logger.debug("Connected to FTP server %s:%s", self.host, self.port) data = BytesIO() ftp.retrbinary(f"RETR {self.filepath}", data.write) - logging.info( - f"Successfully downloaded file '{self.filepath}' from FTP server." + logger.debug( + "Successfully downloaded file %r from FTP server.", + self.filepath, ) data.seek(0) + if data.getbuffer().nbytes == 0: + raise ValueError("The source system returned no data.") return data + except error_perm as e: + msg = str(e) + # Common FTP status codes: + # 530 = not logged in / auth failure + # 550 = file unavailable + if msg.startswith("530"): + raise ValueError( + "Authentication with the source system failed; credentials may be invalid or expired." + ) from e + if msg.startswith("550"): + raise ValueError("The requested payload was not found on the source system.") from e + raise ValueError("The source system returned an error.") from e except Exception as e: - logging.error(f"Error retrieving file from FTP server: {e}") - return None + logger.error("Error retrieving file from FTP server: %s", e, exc_info=True) + raise ValueError("Could not connect to the source system.") from e finally: if ftp: ftp.quit() diff --git a/src/hydroserverpy/etl/extractors/http_extractor.py b/src/hydroserverpy/etl/extractors/http_extractor.py index ea5df51..442a0bf 100644 --- a/src/hydroserverpy/etl/extractors/http_extractor.py +++ b/src/hydroserverpy/etl/extractors/http_extractor.py @@ -4,6 +4,10 @@ from ..etl_configuration import Task from .base import Extractor, ExtractorConfig +from ..logging_utils import redact_url + + +logger = logging.getLogger(__name__) class HTTPExtractor(Extractor): @@ -15,18 +19,50 @@ def extract(self, task: Task, loader=None): Downloads the file from the HTTP/HTTPS server and returns a file-like object. """ url = self.resolve_placeholder_variables(task, loader) - logging.info("Requesting data from → %s", url) + logger.info("Requesting data from → %s", redact_url(url)) try: response = requests.get(url) - response.raise_for_status() - except Exception as e: - logging.error("HTTP request failed for %s: %s", url, e) - raise + except requests.exceptions.Timeout as e: + raise ValueError("The source system did not respond before the timeout.") from e + except requests.exceptions.ConnectionError as e: + raise ValueError("Could not connect to the source system.") from e + except requests.exceptions.RequestException as e: + # Generic network/client error. + raise ValueError("Could not connect to the source system.") from e + + status = getattr(response, "status_code", None) + if status in (401, 403): + raise ValueError( + "Authentication with the source system failed; credentials may be invalid or expired." + ) + if status == 404: + raise ValueError("The requested payload was not found on the source system.") + if status is not None and status >= 400: + logger.error( + "HTTP request failed (status=%s) for %s", + status, + redact_url(url), + ) + raise ValueError("The source system returned an error.") data = BytesIO() + total_bytes = 0 for chunk in response.iter_content(chunk_size=8192): if chunk: + total_bytes += len(chunk) data.write(chunk) data.seek(0) + + if total_bytes == 0: + raise ValueError("The source system returned no data.") + + # Keep payload-level details at DEBUG; hydroserver-api-services already logs + # a concise "Extractor returned payload" line for the end user. + logger.debug( + "Extractor returned payload (status=%s, content_type=%r, bytes=%s).", + getattr(response, "status_code", None), + response.headers.get("Content-Type") if hasattr(response, "headers") else None, + total_bytes, + ) return data diff --git a/src/hydroserverpy/etl/extractors/local_file_extractor.py b/src/hydroserverpy/etl/extractors/local_file_extractor.py index d20f23f..0402c31 100644 --- a/src/hydroserverpy/etl/extractors/local_file_extractor.py +++ b/src/hydroserverpy/etl/extractors/local_file_extractor.py @@ -3,6 +3,9 @@ from ..etl_configuration import ExtractorConfig +logger = logging.getLogger(__name__) + + class LocalFileExtractor(Extractor): def __init__(self, extractor_config: ExtractorConfig): super().__init__(extractor_config) @@ -18,8 +21,10 @@ def extract(self, task=None, loader=None): ) try: file_handle = open(path, "r") - logging.info("Successfully opened file '%s'.", path) + logger.debug("Successfully opened local file %r.", path) return file_handle + except FileNotFoundError as e: + raise ValueError("This job references a resource that no longer exists.") from e except Exception as e: - logging.error("Error opening file '%s': %s", path, e) - return None + logger.error("Error opening local file %r: %s", path, e, exc_info=True) + raise ValueError("Could not open the source file.") from e diff --git a/src/hydroserverpy/etl/loaders/hydroserver_loader.py b/src/hydroserverpy/etl/loaders/hydroserver_loader.py index 3b19ed6..7821836 100644 --- a/src/hydroserverpy/etl/loaders/hydroserver_loader.py +++ b/src/hydroserverpy/etl/loaders/hydroserver_loader.py @@ -5,11 +5,25 @@ import logging import pandas as pd from ..etl_configuration import Task, SourceTargetMapping +from dataclasses import dataclass if TYPE_CHECKING: from hydroserverpy.api.client import HydroServer +logger = logging.getLogger(__name__) + + +@dataclass(frozen=True) +class LoadSummary: + cutoff: str + timestamps_total: int + timestamps_after_cutoff: int + observations_available: int + observations_loaded: int + datastreams_loaded: int + + class HydroServerLoader(Loader): """ A class that extends the HydroServer client with ETL-specific functionalities. @@ -20,11 +34,12 @@ def __init__(self, client: HydroServer, task_id): self._begin_cache: dict[str, pd.Timestamp] = {} self.task_id = task_id - def load(self, data: pd.DataFrame, task: Task) -> Dict[str, Any]: + def load(self, data: pd.DataFrame, task: Task) -> LoadSummary: """ Load observations from a DataFrame to the HydroServer. :param data: A Pandas DataFrame where each column corresponds to a datastream. """ + logger.info("Saving data to HydroServer...") begin_date = self.earliest_begin_date(task) new_data = data[data["timestamp"] > begin_date] @@ -33,25 +48,24 @@ def load(self, data: pd.DataFrame, task: Task) -> Dict[str, Any]: if hasattr(begin_date, "isoformat") else str(begin_date) ) - stats: Dict[str, Any] = { - "cutoff": cutoff_value, - "timestamps_total": len(data), - "timestamps_after_cutoff": len(new_data), - "timestamps_filtered_by_cutoff": max(len(data) - len(new_data), 0), - "observations_available": 0, - "observations_loaded": 0, - "observations_skipped": 0, - "observations_filtered_by_end_time": 0, - "datastreams_total": 0, - "datastreams_available": 0, - "datastreams_loaded": 0, - "per_datastream": {}, - } + timestamps_total = len(data) + timestamps_after_cutoff = len(new_data) + observations_available = 0 + observations_loaded = 0 + datastreams_loaded = 0 for col in new_data.columns.difference(["timestamp"]): - stats["datastreams_total"] += 1 - datastream = self.client.datastreams.get(uid=str(col)) - ds_cutoff = datastream.phenomenon_end_time + try: + datastream = self.client.datastreams.get(uid=str(col)) + except Exception as e: + status = getattr(e, "status_code", None) or getattr( + getattr(e, "response", None), "status_code", None + ) + if status == 404: + raise ValueError("The target datastream could not be found.") from e + raise ValueError("HydroServer rejected some or all of the data.") from e + + ds_cutoff = getattr(datastream, "phenomenon_end_time", None) base_df = ( new_data[["timestamp", col]] @@ -62,25 +76,12 @@ def load(self, data: pd.DataFrame, task: Task) -> Dict[str, Any]: if ds_cutoff: base_df = base_df.loc[base_df["timestamp"] > ds_cutoff] - filtered_by_end = pre_count - len(base_df) - if filtered_by_end: - stats["observations_filtered_by_end_time"] += filtered_by_end - df = base_df available = len(df) - stats["observations_available"] += available + observations_available += available if df.empty: - logging.warning( - "No new data for %s after filtering; skipping.", col - ) - stats["per_datastream"][str(col)] = { - "available": 0, - "loaded": 0, - "skipped": 0, - } continue - stats["datastreams_available"] += 1 df = df.rename(columns={"timestamp": "phenomenon_time", "value": "result"}) loaded = 0 @@ -90,7 +91,7 @@ def load(self, data: pd.DataFrame, task: Task) -> Dict[str, Any]: for start in range(0, total, CHUNK_SIZE): end = min(start + CHUNK_SIZE, total) chunk = df.iloc[start:end] - logging.info( + logger.debug( "Uploading %s rows (%s-%s) to datastream %s", len(chunk), start, @@ -106,8 +107,16 @@ def load(self, data: pd.DataFrame, task: Task) -> Dict[str, Any]: status = getattr(e, "status_code", None) or getattr( getattr(e, "response", None), "status_code", None ) + if status == 404: + raise ValueError( + "The target datastream could not be found." + ) from e + if status == 400: + raise ValueError( + "HydroServer rejected some or all of the data." + ) from e if status == 409 or "409" in str(e) or "Conflict" in str(e): - logging.info( + logger.info( "409 Conflict for datastream %s on rows %s-%s; skipping remainder for this stream.", col, start, @@ -115,32 +124,42 @@ def load(self, data: pd.DataFrame, task: Task) -> Dict[str, Any]: ) raise - stats["observations_loaded"] += loaded - stats["observations_skipped"] += max(available - loaded, 0) if loaded > 0: - stats["datastreams_loaded"] += 1 - stats["per_datastream"][str(col)] = { - "available": available, - "loaded": loaded, - "skipped": max(available - loaded, 0), - } - - return stats + datastreams_loaded += 1 + observations_loaded += loaded + + return LoadSummary( + cutoff=cutoff_value, + timestamps_total=timestamps_total, + timestamps_after_cutoff=timestamps_after_cutoff, + observations_available=observations_available, + observations_loaded=observations_loaded, + datastreams_loaded=datastreams_loaded, + ) def _fetch_earliest_begin( self, mappings: list[SourceTargetMapping] ) -> pd.Timestamp: - logging.info("Querying HydroServer for earliest begin date for task...") + logger.info( + "Checking HydroServer for the most recent data already stored (so we only load new observations)..." + ) timestamps = [] for m in mappings: for p in m["paths"] if isinstance(m, dict) else m.paths: - datastream_id = p["targetIdentifier"] if isinstance(p, dict) else p.target_identifier + datastream_id = ( + p["targetIdentifier"] + if isinstance(p, dict) + else p.target_identifier + ) datastream = self.client.datastreams.get(datastream_id) raw = datastream.phenomenon_end_time or "1970-01-01" ts = pd.to_datetime(raw, utc=True) timestamps.append(ts) - logging.info(f"Found earliest begin date: {min(timestamps)}") - return min(timestamps) + earliest = ( + min(timestamps) if timestamps else pd.Timestamp("1970-01-01", tz="UTC") + ) + logger.debug("Found earliest begin date: %s", earliest) + return earliest def earliest_begin_date(self, task: Task) -> pd.Timestamp: """ diff --git a/src/hydroserverpy/etl/logging_utils.py b/src/hydroserverpy/etl/logging_utils.py new file mode 100644 index 0000000..eced064 --- /dev/null +++ b/src/hydroserverpy/etl/logging_utils.py @@ -0,0 +1,64 @@ +from __future__ import annotations + +import re +from typing import Any, Iterable, Mapping, Optional +from urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit + + +_SENSITIVE_KEY_RE = re.compile( + r"(?:" + r"pass(word)?|passwd|secret|token|api[_-]?key|apikey|auth|bearer|signature|sig|" + r"access[_-]?token|refresh[_-]?token|client[_-]?secret" + r")", + re.IGNORECASE, +) + + +def is_sensitive_key(key: Optional[str]) -> bool: + if not key: + return False + return bool(_SENSITIVE_KEY_RE.search(key)) + + +def redact_value(key: str, value: Any) -> Any: + if is_sensitive_key(key): + return "REDACTED" + if isinstance(value, str) and len(value) > 256: + return value[:256] + "...(truncated)" + return value + + +def redact_mapping(values: Mapping[str, Any]) -> dict[str, Any]: + return {k: redact_value(k, v) for k, v in values.items()} + + +def redact_url(url: str) -> str: + """ + Redact common secret-bearing URL parts (userinfo and sensitive query params). + """ + try: + parts = urlsplit(url) + except Exception: + return url + + netloc = parts.netloc + if "@" in netloc: + netloc = "@" + netloc.split("@", 1)[1] + + query_pairs = [] + for k, v in parse_qsl(parts.query, keep_blank_values=True): + query_pairs.append((k, "REDACTED" if is_sensitive_key(k) else v)) + redacted_query = urlencode(query_pairs, doseq=True) + + return urlunsplit( + (parts.scheme, netloc, parts.path, redacted_query, parts.fragment) + ) + + +def summarize_list(values: Iterable[Any], *, max_items: int = 20) -> str: + items = list(values) + if len(items) <= max_items: + return repr(items) + return ( + repr(items[:max_items])[:-1] + f", ... (+{len(items) - max_items} more)]" + ) diff --git a/src/hydroserverpy/etl/timestamp_parser.py b/src/hydroserverpy/etl/timestamp_parser.py index c2301b9..d46e46e 100644 --- a/src/hydroserverpy/etl/timestamp_parser.py +++ b/src/hydroserverpy/etl/timestamp_parser.py @@ -12,6 +12,8 @@ TimezoneMode = Literal["utc", "daylightSavings", "fixedOffset", "embeddedOffset"] ALLOWED_TIMEZONE_MODES = {m.lower() for m in get_args(TimezoneMode)} +logger = logging.getLogger(__name__) + class Timestamp(BaseModel): format: TimestampFormat @@ -42,6 +44,10 @@ def __init__(self, raw: Union[Timestamp, dict]): @cached_property def tz(self): if self.tz_mode == "fixedoffset": + if not self.timestamp.timezone: + raise ValueError( + "`timezone` must be set when timezoneMode is fixedOffset (e.g. '-0700')" + ) offset = self.timestamp.timezone.strip() if len(offset) != 5 or offset[0] not in "+-": raise ValueError(f"Invalid timezone: {offset}") @@ -49,7 +55,16 @@ def tz(self): hrs, mins = int(offset[1:3]), int(offset[3:5]) return timezone(timedelta(minutes=sign * (hrs * 60 + mins))) if self.tz_mode == "daylightsavings": - return ZoneInfo(self.timestamp.timezone) + if not self.timestamp.timezone: + raise ValueError( + "Task configuration is missing required daylight savings offset (when using daylightSavings mode)." + ) + try: + return ZoneInfo(self.timestamp.timezone) + except Exception as e: + raise ValueError( + f"Invalid timezone {self.timestamp.timezone!r}. Use an IANA timezone like 'America/Denver'." + ) from e if self.tz_mode == "utc": return timezone.utc @@ -81,9 +96,14 @@ def parse_series(self, raw_series: pd.Series) -> pd.Series: if parsed.isna().any(): bad_rows = s[parsed.isna()].head(2).tolist() - logging.warning( - f"{parsed.isna().sum()} timestamps failed to parse. " - f"Sample bad values: {bad_rows}" + logger.warning( + "%s timestamps failed to parse (format=%r, timezoneMode=%r, timezone=%r, customFormat=%r). Sample bad values: %s", + parsed.isna().sum(), + self.timestamp.format, + self.timestamp.timezone_mode, + self.timestamp.timezone, + self.timestamp.custom_format, + bad_rows, ) return parsed @@ -106,7 +126,12 @@ def utc_to_string(self, dt: Union[datetime, pd.Timestamp]) -> str: return dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S") if tz_format == "custom": - logging.info(f"custom timestamp: ... {self.timestamp}") + logger.debug( + "Formatting runtime timestamp using custom format (customFormat=%r, timezoneMode=%r, timezone=%r).", + self.timestamp.custom_format, + self.timestamp.timezone_mode, + self.timestamp.timezone, + ) return dt.astimezone(self.tz).strftime(self.timestamp.custom_format) raise ValueError(f"Unknown timestamp.format: {self.timestamp.format!r}") diff --git a/src/hydroserverpy/etl/transformers/base.py b/src/hydroserverpy/etl/transformers/base.py index 42a754a..89bc824 100644 --- a/src/hydroserverpy/etl/transformers/base.py +++ b/src/hydroserverpy/etl/transformers/base.py @@ -8,6 +8,7 @@ from ..timestamp_parser import TimestampParser from ..etl_configuration import MappingPath, TransformerConfig, SourceTargetMapping +from ..logging_utils import summarize_list ALLOWED_AST = ( ast.Expression, @@ -51,6 +52,9 @@ def _compile_arithmetic_expr(expr: str): return _compile_arithmetic_expr_canon(_canonicalize_expr(expr)) +logger = logging.getLogger(__name__) + + class Transformer(ABC): def __init__(self, transformer_config: TransformerConfig): self.cfg = transformer_config @@ -68,18 +72,39 @@ def needs_datastreams(self) -> bool: def standardize_dataframe( self, df: pd.DataFrame, mappings: List[SourceTargetMapping] ): + logger.debug( + "Standardizing extracted dataframe (rows=%s, columns=%s).", + len(df), + len(df.columns), + ) + logger.debug( + "Extracted dataframe columns (sample): %s", + summarize_list(list(df.columns), max_items=30), + ) if not df.empty: - logging.info(f"Read task into dataframe: {df.iloc[0].to_dict()}") - else: - logging.info("Read task into dataframe: [empty dataframe]") + # Avoid dumping full rows; just log a compact preview. + preview = df.iloc[0].to_dict() + for k, v in list(preview.items()): + if isinstance(v, str) and len(v) > 128: + preview[k] = v[:128] + "...(truncated)" + logger.debug("Extracted dataframe first-row preview: %s", preview) # 1) Normalize timestamp column df.rename(columns={self.timestamp.key: "timestamp"}, inplace=True) if "timestamp" not in df.columns: msg = f"Timestamp column '{self.timestamp.key}' not found in data." - logging.error(msg) + logger.error( + "%s Available columns=%s", + msg, + summarize_list(list(df.columns), max_items=30), + ) raise ValueError(msg) - logging.info(f"Renamed timestamp column to 'timestamp'") + logger.debug( + "Normalized timestamp column '%s' -> 'timestamp' (timezoneMode=%r, format=%r).", + self.timestamp.key, + getattr(self.timestamp, "timezone_mode", None), + getattr(self.timestamp, "format", None), + ) df["timestamp"] = self.timestamp_parser.parse_series(df["timestamp"]) df = df.drop_duplicates(subset=["timestamp"], keep="last") @@ -89,10 +114,21 @@ def _resolve_source_col(s_id: Union[str, int]) -> str: try: return df.columns[s_id] except IndexError: + logger.error( + "Source index %s is out of range. Extracted columns count=%s, columns(sample)=%s", + s_id, + len(df.columns), + summarize_list(list(df.columns), max_items=30), + ) raise ValueError( f"Source index {s_id} is out of range for extracted data." ) if s_id not in df.columns: + logger.error( + "Source column %r not found. Available columns=%s", + s_id, + summarize_list(list(df.columns), max_items=30), + ) raise ValueError(f"Source column '{s_id}' not found in extracted data.") return s_id @@ -107,19 +143,25 @@ def _apply_transformations(series: pd.Series, path: MappingPath) -> pd.Series: try: out = eval(code, {"__builtins__": {}}, {"x": out}) except Exception as ee: - logging.exception( - "Data transformation failed for expression=%r", + logger.exception( + "Data transformation failed for target=%r expression=%r", + path.target_identifier, transformation.expression, ) raise else: msg = f"Unsupported transformation type: {transformation.type}" - logging.error(msg) + logger.error(msg) raise ValueError(msg) return out # source target mappings may be one to many. Therefore, create a new column for each target and apply transformations transformed_df = pd.DataFrame(index=df.index) + logger.debug( + "Applying %s source mapping(s): %s", + len(mappings), + summarize_list([m.source_identifier for m in mappings], max_items=30), + ) for m in mappings: src_col = _resolve_source_col(m.source_identifier) base = df[src_col] @@ -130,6 +172,6 @@ def _apply_transformations(series: pd.Series, path: MappingPath) -> pd.Series: # 6) Keep only timestamp + target columns df = pd.concat([df[["timestamp"]], pd.DataFrame(transformed_df)], axis=1) - logging.info(f"standardized dataframe created: {df.shape}") + logger.debug("Standardized dataframe created: %s", df.shape) return df diff --git a/src/hydroserverpy/etl/transformers/csv_transformer.py b/src/hydroserverpy/etl/transformers/csv_transformer.py index 4d219f0..efe3b32 100644 --- a/src/hydroserverpy/etl/transformers/csv_transformer.py +++ b/src/hydroserverpy/etl/transformers/csv_transformer.py @@ -6,6 +6,9 @@ from ..etl_configuration import TransformerConfig, SourceTargetMapping +logger = logging.getLogger(__name__) + + class CSVTransformer(Transformer): def __init__(self, transformer_config: TransformerConfig): super().__init__(transformer_config) @@ -32,16 +35,48 @@ def transform( Returns: observations_map (dict): Dict mapping datastream IDs to pandas DataFrames. """ + if data_file is None: + raise TypeError( + "CSVTransformer received None; expected file-like, bytes, or str" + ) clean_file = self._strip_comments(data_file) use_index = self.identifier_type == "index" if use_index: # Users will always interact in 1-based, so if the key is a column index, convert to 0-based to work with Pandas - timestamp_pos = int(self.timestamp.key) - 1 - usecols = [timestamp_pos] + [int(m.source_identifier) - 1 for m in mappings] + try: + timestamp_pos = int(self.timestamp.key) - 1 + except Exception as e: + raise ValueError( + "CSV transformer identifierType='index' requires timestamp.key to be a 1-based " + "column index string (example: '1')." + ) from e + try: + source_positions = [int(m.source_identifier) - 1 for m in mappings] + except Exception as e: + raise ValueError( + "CSV transformer identifierType='index' requires each mapping sourceIdentifier to be a " + "1-based column index string (example: '2')." + ) from e + + usecols = [timestamp_pos] + source_positions + # When selecting columns by integer position, `dtype={col_name: ...}` can break because there is + # no stable column name at parse time. Reading everything as string is safe; we coerce later. + dtype = "string" else: usecols = [self.timestamp.key] + [m.source_identifier for m in mappings] + dtype = {self.timestamp.key: "string"} + + logger.info("Reading CSV data...") + logger.debug( + "Parsing CSV (identifierType=%r, delimiter=%r, headerRow=%r, dataStartRow=%r, usecols=%r).", + self.identifier_type, + self.delimiter, + getattr(self.cfg, "header_row", None), + getattr(self.cfg, "data_start_row", None), + usecols, + ) try: # Pandas’ heuristics strip offsets and silently coerce failures to strings. @@ -53,16 +88,34 @@ def transform( header=0, skiprows=self._build_skiprows(), usecols=usecols, - dtype={self.timestamp.key: "string"}, + dtype=dtype, ) - logging.info(f"CSV file read into dataframe: {df.shape}") + logger.debug("CSV file read into dataframe: %s", df.shape) except Exception as e: - logging.error(f"Error reading CSV data: {e}") - return None + # Try to extract a safe preview of the header for "usecols do not match columns" failures. + header_preview = None + try: + pos = clean_file.tell() + clean_file.seek(0) + header_preview = (clean_file.readline() or "").strip() + clean_file.seek(pos) + except Exception: + header_preview = None + + msg = str(e) + user_message = "One or more data rows contained unexpected values and could not be processed." + if "No columns to parse from file" in msg: + user_message = "The source system returned no data." + elif "Usecols do not match columns" in msg or "not in list" in msg: + user_message = "The header row contained unexpected values and could not be processed." + + logger.error("Error reading CSV data: %s", user_message) + raise ValueError(user_message) from e # In index mode, relabel columns back to original 1-based indices so base transformer can use integer labels directly if use_index: - df.columns = [(c + 1) if isinstance(c, int) else c for c in usecols] + # Task config stores keys as strings; keep columns as strings so timestamp.key/sourceIdentifier match. + df.columns = [str(c + 1) if isinstance(c, int) else str(c) for c in usecols] return self.standardize_dataframe(df, mappings) diff --git a/src/hydroserverpy/etl/transformers/json_transformer.py b/src/hydroserverpy/etl/transformers/json_transformer.py index c699d7d..09af63b 100644 --- a/src/hydroserverpy/etl/transformers/json_transformer.py +++ b/src/hydroserverpy/etl/transformers/json_transformer.py @@ -7,6 +7,9 @@ from ..etl_configuration import TransformerConfig, SourceTargetMapping +logger = logging.getLogger(__name__) + + class JSONTransformer(Transformer): def __init__(self, transformer_config: TransformerConfig): super().__init__(transformer_config) @@ -30,12 +33,24 @@ def transform(self, data_file, mappings: List[SourceTargetMapping]): ) json_data = json.load(data_file) + logger.debug("Loaded JSON payload (type=%s).", type(json_data).__name__) + if not isinstance(json_data, (dict, list)): + raise ValueError("The payload's expected fields were not found.") + data_points = self.extract_data_points(json_data) - if not data_points: - logging.warning("No data points found in the JSON data.") - return None + if data_points is None: + # JMESPath returned null: this usually means the expected field/path was not present. + raise ValueError( + "The timestamp or value key could not be found with the specified query." + ) + if len(data_points) == 0: + # Treat an empty result list as "no data" (not a configuration error). + # Build an empty frame with the expected columns so standardization can proceed cleanly. + cols = [self.timestamp.key] + [m.source_identifier for m in mappings] + df = pd.DataFrame(columns=cols) + return self.standardize_dataframe(df, mappings) - logging.info("Extracted %s JSON data points.", len(data_points)) + logger.debug("Extracted %s JSON data point(s).", len(data_points)) df = pd.DataFrame(data_points) return self.standardize_dataframe(df, mappings) @@ -43,7 +58,12 @@ def transform(self, data_file, mappings: List[SourceTargetMapping]): def extract_data_points(self, json_data: Any) -> Optional[List[dict]]: """Extracts data points from the JSON data using the data_path.""" data_points = jmespath.search(self.jmespath, json_data) + if data_points is None: + return None if isinstance(data_points, dict): data_points = [data_points] - return data_points + if isinstance(data_points, list): + return data_points + # Unexpected output type; surface it as a configuration issue. + return None From da335aad11f5e64884131025276dc5b2e6f0705c Mon Sep 17 00:00:00 2001 From: Daniel Slaugh Date: Tue, 10 Feb 2026 12:12:10 -0700 Subject: [PATCH 5/8] Don't redact runtime URLs --- src/hydroserverpy/api/models/etl/task.py | 6 ++--- src/hydroserverpy/etl/STATES_README.md | 4 ++-- src/hydroserverpy/etl/extractors/base.py | 6 ++--- .../etl/extractors/http_extractor.py | 19 ++++++++++----- src/hydroserverpy/etl/logging_utils.py | 24 ------------------- 5 files changed, 21 insertions(+), 38 deletions(-) diff --git a/src/hydroserverpy/api/models/etl/task.py b/src/hydroserverpy/api/models/etl/task.py index 145e6ca..cdc2a0e 100644 --- a/src/hydroserverpy/api/models/etl/task.py +++ b/src/hydroserverpy/api/models/etl/task.py @@ -281,12 +281,12 @@ def _success_message(load: Optional[LoadSummary]) -> str: if load.timestamps_total and load.timestamps_after_cutoff == 0: if load.cutoff: return ( - "No new observations to load " + "Already up to date - no new observations loaded " f"(all timestamps were at or before {load.cutoff})." ) - return "No new observations to load (all timestamps were at or before the cutoff)." + return "Already up to date - no new observations loaded (all timestamps were at or before the cutoff)." if load.observations_available == 0: - return "No new observations to load." + return "Already up to date - no new observations loaded." return "No new observations were loaded." if load.datastreams_loaded: diff --git a/src/hydroserverpy/etl/STATES_README.md b/src/hydroserverpy/etl/STATES_README.md index 5060041..c408ba7 100644 --- a/src/hydroserverpy/etl/STATES_README.md +++ b/src/hydroserverpy/etl/STATES_README.md @@ -58,8 +58,8 @@ These are the most important end-user messages the ETL system can return for a s - Load completed successfully. - Load completed successfully ( rows loaded). - Load completed successfully ( rows across datastreams). -- No new observations to load. +- Already up to date - no new observations loaded. - No new observations were loaded. -- No new observations to load (all timestamps were at or before ). +- Already up to date - no new observations loaded (all timestamps were at or before ). - No data returned from the extractor. Nothing to load. - Transform produced no rows. Nothing to load. diff --git a/src/hydroserverpy/etl/extractors/base.py b/src/hydroserverpy/etl/extractors/base.py index 7668869..82f4bc5 100644 --- a/src/hydroserverpy/etl/extractors/base.py +++ b/src/hydroserverpy/etl/extractors/base.py @@ -4,7 +4,7 @@ from datetime import datetime from ..etl_configuration import ExtractorConfig, Task from ..timestamp_parser import TimestampParser -from ..logging_utils import redact_url, summarize_list +from ..logging_utils import summarize_list logger = logging.getLogger(__name__) @@ -67,8 +67,8 @@ def resolve_placeholder_variables(self, task: Task, loader): uri = self.format_uri(filled) self.runtime_source_uri = uri - # Keep a stable log prefix for downstream parsing, but redact secrets. - logger.info("Resolved runtime source URI: %s", redact_url(uri)) + # Keep a stable log prefix for downstream parsing. + logger.info("Resolved runtime source URI: %s", uri) return uri def format_uri(self, placeholder_variables): diff --git a/src/hydroserverpy/etl/extractors/http_extractor.py b/src/hydroserverpy/etl/extractors/http_extractor.py index 442a0bf..6c9aa75 100644 --- a/src/hydroserverpy/etl/extractors/http_extractor.py +++ b/src/hydroserverpy/etl/extractors/http_extractor.py @@ -4,7 +4,6 @@ from ..etl_configuration import Task from .base import Extractor, ExtractorConfig -from ..logging_utils import redact_url logger = logging.getLogger(__name__) @@ -19,12 +18,14 @@ def extract(self, task: Task, loader=None): Downloads the file from the HTTP/HTTPS server and returns a file-like object. """ url = self.resolve_placeholder_variables(task, loader) - logger.info("Requesting data from → %s", redact_url(url)) + logger.info("Requesting data from source URI") try: response = requests.get(url) except requests.exceptions.Timeout as e: - raise ValueError("The source system did not respond before the timeout.") from e + raise ValueError( + "The source system did not respond before the timeout." + ) from e except requests.exceptions.ConnectionError as e: raise ValueError("Could not connect to the source system.") from e except requests.exceptions.RequestException as e: @@ -37,12 +38,14 @@ def extract(self, task: Task, loader=None): "Authentication with the source system failed; credentials may be invalid or expired." ) if status == 404: - raise ValueError("The requested payload was not found on the source system.") + raise ValueError( + "The requested payload was not found on the source system." + ) if status is not None and status >= 400: logger.error( "HTTP request failed (status=%s) for %s", status, - redact_url(url), + url, ) raise ValueError("The source system returned an error.") @@ -62,7 +65,11 @@ def extract(self, task: Task, loader=None): logger.debug( "Extractor returned payload (status=%s, content_type=%r, bytes=%s).", getattr(response, "status_code", None), - response.headers.get("Content-Type") if hasattr(response, "headers") else None, + ( + response.headers.get("Content-Type") + if hasattr(response, "headers") + else None + ), total_bytes, ) return data diff --git a/src/hydroserverpy/etl/logging_utils.py b/src/hydroserverpy/etl/logging_utils.py index eced064..6cd185c 100644 --- a/src/hydroserverpy/etl/logging_utils.py +++ b/src/hydroserverpy/etl/logging_utils.py @@ -2,7 +2,6 @@ import re from typing import Any, Iterable, Mapping, Optional -from urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit _SENSITIVE_KEY_RE = re.compile( @@ -32,29 +31,6 @@ def redact_mapping(values: Mapping[str, Any]) -> dict[str, Any]: return {k: redact_value(k, v) for k, v in values.items()} -def redact_url(url: str) -> str: - """ - Redact common secret-bearing URL parts (userinfo and sensitive query params). - """ - try: - parts = urlsplit(url) - except Exception: - return url - - netloc = parts.netloc - if "@" in netloc: - netloc = "@" + netloc.split("@", 1)[1] - - query_pairs = [] - for k, v in parse_qsl(parts.query, keep_blank_values=True): - query_pairs.append((k, "REDACTED" if is_sensitive_key(k) else v)) - redacted_query = urlencode(query_pairs, doseq=True) - - return urlunsplit( - (parts.scheme, netloc, parts.path, redacted_query, parts.fragment) - ) - - def summarize_list(values: Iterable[Any], *, max_items: int = 20) -> str: items = list(values) if len(items) <= max_items: From 4269fc4dbd43f860cff78f3e7f3ce17561141d3c Mon Sep 17 00:00:00 2001 From: Daniel Slaugh Date: Tue, 10 Feb 2026 13:54:38 -0700 Subject: [PATCH 6/8] Fix JSON serialization errors on hs.tasks.create() --- src/hydroserverpy/api/services/etl/task.py | 27 ++++++++++--------- .../etl/transformers/csv_transformer.py | 2 +- 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/src/hydroserverpy/api/services/etl/task.py b/src/hydroserverpy/api/services/etl/task.py index 69eefc8..0292fa2 100644 --- a/src/hydroserverpy/api/services/etl/task.py +++ b/src/hydroserverpy/api/services/etl/task.py @@ -2,7 +2,6 @@ from typing import Literal, Union, Optional, List, Dict, Any, TYPE_CHECKING from uuid import UUID from datetime import datetime -from pydantic import Field from hydroserverpy.api.models import DataConnection, Task, TaskRun, TaskMapping, OrchestrationSystem from hydroserverpy.api.utils import normalize_uuid from ..base import HydroServerBaseService @@ -79,16 +78,16 @@ def create( workspace: Union["Workspace", UUID, str], data_connection: Union["DataConnection", UUID, str], orchestration_system: Union["OrchestrationSystem", UUID, str], - extractor_variables: dict = Field(default_factory=dict), - transformer_variables: dict = Field(default_factory=dict), - loader_variables: dict = Field(default_factory=dict), + extractor_variables: Optional[dict] = None, + transformer_variables: Optional[dict] = None, + loader_variables: Optional[dict] = None, paused: bool = False, start_time: Optional[datetime] = None, next_run_at: Optional[datetime] = None, crontab: Optional[str] = None, interval: Optional[int] = None, interval_period: Optional[str] = None, - mappings: List[dict] = Field(default_factory=list), + mappings: Optional[List[dict]] = None, ) -> "Task": """Create a new ETL task.""" @@ -97,19 +96,23 @@ def create( "workspaceId": normalize_uuid(workspace), "dataConnectionId": normalize_uuid(data_connection), "orchestrationSystemId": normalize_uuid(orchestration_system), - "extractorVariables": extractor_variables, - "transformerVariables": transformer_variables, - "loaderVariables": loader_variables, - "schedule": { + "extractorVariables": extractor_variables or {}, + "transformerVariables": transformer_variables or {}, + "loaderVariables": loader_variables or {}, + "mappings": mappings or [], + } + + # Only include schedule if the caller provided scheduling information. + # Using Ellipsis here breaks JSON serialization. + if interval or crontab: + body["schedule"] = { "paused": paused, "startTime": start_time, "nextRunAt": next_run_at, "crontab": crontab, "interval": interval, "intervalPeriod": interval_period, - } if interval or crontab else ..., - "mappings": mappings if mappings else [] - } + } return super().create(**body) diff --git a/src/hydroserverpy/etl/transformers/csv_transformer.py b/src/hydroserverpy/etl/transformers/csv_transformer.py index efe3b32..c1225a1 100644 --- a/src/hydroserverpy/etl/transformers/csv_transformer.py +++ b/src/hydroserverpy/etl/transformers/csv_transformer.py @@ -107,7 +107,7 @@ def transform( if "No columns to parse from file" in msg: user_message = "The source system returned no data." elif "Usecols do not match columns" in msg or "not in list" in msg: - user_message = "The header row contained unexpected values and could not be processed." + user_message = "One or more configured CSV columns were not found in the header row." logger.error("Error reading CSV data: %s", user_message) raise ValueError(user_message) from e From 51cc37d9dfe962e061955ce619062466f8a9813b Mon Sep 17 00:00:00 2001 From: Daniel Slaugh Date: Tue, 10 Feb 2026 15:08:27 -0700 Subject: [PATCH 7/8] Update ETL error messages --- src/hydroserverpy/etl/extractors/http_extractor.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/hydroserverpy/etl/extractors/http_extractor.py b/src/hydroserverpy/etl/extractors/http_extractor.py index 6c9aa75..37ade30 100644 --- a/src/hydroserverpy/etl/extractors/http_extractor.py +++ b/src/hydroserverpy/etl/extractors/http_extractor.py @@ -35,11 +35,11 @@ def extract(self, task: Task, loader=None): status = getattr(response, "status_code", None) if status in (401, 403): raise ValueError( - "Authentication with the source system failed; credentials may be invalid or expired." + "Authentication with the source system failed. The username, password, or token may be incorrect or expired." ) if status == 404: raise ValueError( - "The requested payload was not found on the source system." + "The requested data could not be found on the source system." ) if status is not None and status >= 400: logger.error( @@ -58,7 +58,9 @@ def extract(self, task: Task, loader=None): data.seek(0) if total_bytes == 0: - raise ValueError("The source system returned no data.") + raise ValueError( + "The connection to the source worked but no data were returned." + ) # Keep payload-level details at DEBUG; hydroserver-api-services already logs # a concise "Extractor returned payload" line for the end user. From 3fabde6c126d28406b7033f1fd3364676fa3c547 Mon Sep 17 00:00:00 2001 From: Daniel Slaugh Date: Tue, 10 Feb 2026 15:55:41 -0700 Subject: [PATCH 8/8] standardize error wording --- src/hydroserverpy/etl/extractors/http_extractor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/hydroserverpy/etl/extractors/http_extractor.py b/src/hydroserverpy/etl/extractors/http_extractor.py index 37ade30..9c98247 100644 --- a/src/hydroserverpy/etl/extractors/http_extractor.py +++ b/src/hydroserverpy/etl/extractors/http_extractor.py @@ -59,7 +59,7 @@ def extract(self, task: Task, loader=None): if total_bytes == 0: raise ValueError( - "The connection to the source worked but no data were returned." + "The connection to the source worked but no observations were returned." ) # Keep payload-level details at DEBUG; hydroserver-api-services already logs