diff --git a/src/hydroserverpy/api/models/etl/task.py b/src/hydroserverpy/api/models/etl/task.py index e0c4f7a..cdc2a0e 100644 --- a/src/hydroserverpy/api/models/etl/task.py +++ b/src/hydroserverpy/api/models/etl/task.py @@ -8,6 +8,7 @@ from datetime import datetime, timedelta, timezone from pydantic import Field, AliasPath, AliasChoices, TypeAdapter from hydroserverpy.etl.factories import extractor_factory, transformer_factory, loader_factory +from hydroserverpy.etl.loaders.hydroserver_loader import LoadSummary from hydroserverpy.etl.etl_configuration import ExtractorConfig, TransformerConfig, LoaderConfig, SourceTargetMapping, MappingPath from ..base import HydroServerBaseModel from .orchestration_system import OrchestrationSystem @@ -183,6 +184,7 @@ def run_local(self): task_run = self.create_task_run(status="RUNNING", started_at=datetime.now(timezone.utc)) + runtime_source_uri: Optional[str] = None try: logging.info("Starting extract") @@ -199,9 +201,13 @@ def run_local(self): ] data = extractor_cls.extract(self, loader_cls) + runtime_source_uri = getattr(extractor_cls, "runtime_source_uri", None) if self.is_empty(data): self._update_status( - loader_cls, True, "No data returned from the extractor" + task_run, + True, + "No data returned from the extractor", + runtime_source_uri=runtime_source_uri, ) return @@ -209,15 +215,25 @@ def run_local(self): data = transformer_cls.transform(data, task_mappings) if self.is_empty(data): self._update_status( - loader_cls, True, "No data returned from the transformer" + task_run, + True, + "No data returned from the transformer", + runtime_source_uri=runtime_source_uri, ) return logging.info("Starting load") - loader_cls.load(data, self) - self._update_status(task_run, True, "OK") + load_summary = loader_cls.load(data, self) + self._update_status( + task_run, + True, + self._success_message(load_summary), + runtime_source_uri=runtime_source_uri, + ) except Exception as e: - self._update_status(task_run, False, str(e)) + self._update_status( + task_run, False, str(e), runtime_source_uri=runtime_source_uri + ) @staticmethod def is_empty(data): @@ -229,15 +245,56 @@ def is_empty(data): return False - def _update_status(self, task_run: TaskRun, success: bool, msg: str): + def _update_status( + self, + task_run: TaskRun, + success: bool, + msg: str, + runtime_source_uri: Optional[str] = None, + ): + result = {"message": msg} + if runtime_source_uri: + result.update( + { + "runtimeSourceUri": runtime_source_uri, + "runtime_source_uri": runtime_source_uri, + "runtimeUrl": runtime_source_uri, + "runtime_url": runtime_source_uri, + } + ) + self.update_task_run( task_run.id, status="SUCCESS" if success else "FAILURE", - result={"message": msg} + result=result ) self.next_run_at = self._next_run() self.save() + @staticmethod + def _success_message(load: Optional[LoadSummary]) -> str: + if not load: + return "OK" + + loaded = load.observations_loaded + if loaded == 0: + if load.timestamps_total and load.timestamps_after_cutoff == 0: + if load.cutoff: + return ( + "Already up to date - no new observations loaded " + f"(all timestamps were at or before {load.cutoff})." + ) + return "Already up to date - no new observations loaded (all timestamps were at or before the cutoff)." + if load.observations_available == 0: + return "Already up to date - no new observations loaded." + return "No new observations were loaded." + + if load.datastreams_loaded: + return ( + f"Load completed successfully ({loaded} rows across {load.datastreams_loaded} datastreams)." + ) + return f"Load completed successfully ({loaded} rows loaded)." + def _next_run(self) -> Optional[str]: now = datetime.now(timezone.utc) if cron := self.crontab: diff --git a/src/hydroserverpy/api/services/etl/task.py b/src/hydroserverpy/api/services/etl/task.py index 69eefc8..0292fa2 100644 --- a/src/hydroserverpy/api/services/etl/task.py +++ b/src/hydroserverpy/api/services/etl/task.py @@ -2,7 +2,6 @@ from typing import Literal, Union, Optional, List, Dict, Any, TYPE_CHECKING from uuid import UUID from datetime import datetime -from pydantic import Field from hydroserverpy.api.models import DataConnection, Task, TaskRun, TaskMapping, OrchestrationSystem from hydroserverpy.api.utils import normalize_uuid from ..base import HydroServerBaseService @@ -79,16 +78,16 @@ def create( workspace: Union["Workspace", UUID, str], data_connection: Union["DataConnection", UUID, str], orchestration_system: Union["OrchestrationSystem", UUID, str], - extractor_variables: dict = Field(default_factory=dict), - transformer_variables: dict = Field(default_factory=dict), - loader_variables: dict = Field(default_factory=dict), + extractor_variables: Optional[dict] = None, + transformer_variables: Optional[dict] = None, + loader_variables: Optional[dict] = None, paused: bool = False, start_time: Optional[datetime] = None, next_run_at: Optional[datetime] = None, crontab: Optional[str] = None, interval: Optional[int] = None, interval_period: Optional[str] = None, - mappings: List[dict] = Field(default_factory=list), + mappings: Optional[List[dict]] = None, ) -> "Task": """Create a new ETL task.""" @@ -97,19 +96,23 @@ def create( "workspaceId": normalize_uuid(workspace), "dataConnectionId": normalize_uuid(data_connection), "orchestrationSystemId": normalize_uuid(orchestration_system), - "extractorVariables": extractor_variables, - "transformerVariables": transformer_variables, - "loaderVariables": loader_variables, - "schedule": { + "extractorVariables": extractor_variables or {}, + "transformerVariables": transformer_variables or {}, + "loaderVariables": loader_variables or {}, + "mappings": mappings or [], + } + + # Only include schedule if the caller provided scheduling information. + # Using Ellipsis here breaks JSON serialization. + if interval or crontab: + body["schedule"] = { "paused": paused, "startTime": start_time, "nextRunAt": next_run_at, "crontab": crontab, "interval": interval, "intervalPeriod": interval_period, - } if interval or crontab else ..., - "mappings": mappings if mappings else [] - } + } return super().create(**body) diff --git a/src/hydroserverpy/etl/STATES_README.md b/src/hydroserverpy/etl/STATES_README.md new file mode 100644 index 0000000..c408ba7 --- /dev/null +++ b/src/hydroserverpy/etl/STATES_README.md @@ -0,0 +1,65 @@ +## Possible Needs Attention states: + +These are the most important end-user messages the ETL system can return for a task run +that needs user action. + +### Configuration / Setup + +- Invalid extractor configuration. Tell the user exactly which field is missing or invalid. +- Invalid transformer configuration. Tell the user exactly which field is missing or invalid. +- A required configuration value is missing. +- A required configuration value is null where a value is expected. +- Missing required per-task extractor variable "". +- Extractor source URI contains a placeholder "", but it was not provided. +- Task configuration is missing required daylight savings offset (when using daylightSavings mode). + +### Data Source (Connectivity / Authentication) + +- Could not connect to the source system. +- The source system did not respond before the timeout. +- Authentication with the source system failed; credentials may be invalid or expired. +- The requested payload was not found on the source system. +- The source system returned no data. + +### Source Data Did Not Match The Task + +- The source returned a format different from what this job expects. +- The payload's expected fields were not found. +- One or more timestamps could not be read with the current settings. +- This job references a resource that no longer exists. +- The file structure does not match the configuration. + +For CSV: +- The header row contained unexpected values and could not be processed. +- One or more data rows contained unexpected values and could not be processed. +- Timestamp column "" was not found in the extracted data. +- A mapping source index is out of range for the extracted data. +- A mapping source column was not found in the extracted data. + +For JSON: +- The timestamp or value key could not be found with the specified query. +- Transformer did not receive any extracted data to parse. + +### Targets / HydroServer + +- HydroServer rejected some or all of the data. +- The target data series (datastream) could not be found. + - This may happen if the datastream was deleted or the mapping points to the wrong target. + +### Unexpected System Error + +- An internal system error occurred while processing the job. +- The job stopped before completion. + +## Possible OK states: + +These are the most important end-user messages the ETL system can return for a successful run. + +- Load completed successfully. +- Load completed successfully ( rows loaded). +- Load completed successfully ( rows across datastreams). +- Already up to date - no new observations loaded. +- No new observations were loaded. +- Already up to date - no new observations loaded (all timestamps were at or before ). +- No data returned from the extractor. Nothing to load. +- Transform produced no rows. Nothing to load. diff --git a/src/hydroserverpy/etl/etl_configuration.py b/src/hydroserverpy/etl/etl_configuration.py index 34ca9e7..8634668 100644 --- a/src/hydroserverpy/etl/etl_configuration.py +++ b/src/hydroserverpy/etl/etl_configuration.py @@ -2,6 +2,7 @@ from typing import Annotated, Dict, List, Literal, Optional, Union from pydantic import BaseModel, Field, field_validator from enum import Enum +from zoneinfo import ZoneInfo WorkflowType = Literal["ETL", "Aggregation", "Virtual", "SDL"] CSVDelimiterType = Literal[",", "|", "\t", ";", " "] @@ -76,12 +77,28 @@ class Timestamp(BaseModel): class Config: populate_by_name = True + validate_default = True - @field_validator("timezone") + @field_validator("timezone", mode="after") def check_timezone(cls, timezone_value, info): mode = info.data.get("timezone_mode") - if mode == TimezoneMode.fixedOffset and timezone_value is None: - raise ValueError("`timezone` must be set when timezoneMode is fixedOffset") + if mode == TimezoneMode.fixedOffset: + if timezone_value is None: + raise ValueError( + "`timezone` must be set when timezoneMode is fixedOffset (e.g. '-0700')" + ) + if mode == TimezoneMode.daylightSavings: + if timezone_value is None or str(timezone_value).strip() == "": + raise ValueError( + "Task configuration is missing required daylight savings offset (when using daylightSavings mode)." + ) + # Validate it's a real IANA tz name early to avoid cryptic ZoneInfo errors later. + try: + ZoneInfo(str(timezone_value)) + except Exception: + raise ValueError( + f"Invalid timezone {timezone_value!r}. Use an IANA timezone like 'America/Denver'." + ) return timezone_value diff --git a/src/hydroserverpy/etl/extractors/base.py b/src/hydroserverpy/etl/extractors/base.py index 5eeeebb..82f4bc5 100644 --- a/src/hydroserverpy/etl/extractors/base.py +++ b/src/hydroserverpy/etl/extractors/base.py @@ -4,28 +4,44 @@ from datetime import datetime from ..etl_configuration import ExtractorConfig, Task from ..timestamp_parser import TimestampParser +from ..logging_utils import summarize_list + + +logger = logging.getLogger(__name__) class Extractor: def __init__(self, extractor_config: ExtractorConfig): self.cfg = extractor_config + self.runtime_source_uri = None def resolve_placeholder_variables(self, task: Task, loader): - logging.info(f"Creating runtime variables...") + placeholders = list(self.cfg.placeholder_variables or []) filled = {} - for placeholder in self.cfg.placeholder_variables: + runtime_names: set[str] = set() + task_names: set[str] = set() + for placeholder in placeholders: name = placeholder.name if placeholder.type == "runTime": - logging.info(f"Resolving runtime var: {name}") + logger.debug("Resolving runtime var: %s", name) + runtime_names.add(name) if placeholder.run_time_value == "latestObservationTimestamp": value = loader.earliest_begin_date(task) elif placeholder.run_time_value == "jobExecutionTime": value = pd.Timestamp.now(tz="UTC") elif placeholder.type == "perTask": - logging.info(f"Resolving task var: {name}") + logger.debug("Resolving task var: %s", name) + task_names.add(name) if name not in task.extractor_variables: - raise KeyError(f"Missing per-task variable '{name}'") + logger.error( + "Missing per-task extractor variable '%s'. Provided extractorVariables keys=%s", + name, + summarize_list(sorted((task.extractor_variables or {}).keys())), + ) + raise ValueError( + f"Missing required per-task extractor variable '{name}'." + ) value = task.extractor_variables[name] else: continue @@ -35,16 +51,39 @@ def resolve_placeholder_variables(self, task: Task, loader): value = parser.utc_to_string(value) filled[name] = value + + if runtime_names: + names = ", ".join(sorted(runtime_names)) + logger.debug( + "Runtime variables resolved (%s): %s", len(runtime_names), names + ) + if task_names: + names = ", ".join(sorted(task_names)) + logger.debug("Task variables resolved (%s): %s", len(task_names), names) + if not filled: - return self.cfg.source_uri - return self.format_uri(filled) + uri = self.cfg.source_uri + else: + uri = self.format_uri(filled) + + self.runtime_source_uri = uri + # Keep a stable log prefix for downstream parsing. + logger.info("Resolved runtime source URI: %s", uri) + return uri def format_uri(self, placeholder_variables): try: uri = self.cfg.source_uri.format(**placeholder_variables) except KeyError as e: missing_key = e.args[0] - raise KeyError(f"Missing placeholder variable: {missing_key}") + logger.error( + "Failed to format sourceUri: missing placeholder '%s'. Provided placeholders=%s", + missing_key, + summarize_list(sorted(placeholder_variables.keys())), + ) + raise ValueError( + f"Extractor source URI contains a placeholder '{missing_key}', but it was not provided." + ) return uri @abstractmethod diff --git a/src/hydroserverpy/etl/extractors/ftp_extractor.py b/src/hydroserverpy/etl/extractors/ftp_extractor.py index b715308..15f08b3 100644 --- a/src/hydroserverpy/etl/extractors/ftp_extractor.py +++ b/src/hydroserverpy/etl/extractors/ftp_extractor.py @@ -1,5 +1,5 @@ import logging -from ftplib import FTP +from ftplib import FTP, error_perm from io import BytesIO from typing import Dict @@ -7,6 +7,9 @@ from ..types import TimeRange +logger = logging.getLogger(__name__) + + class FTPExtractor(Extractor): def __init__( self, @@ -33,18 +36,33 @@ def extract(self): try: ftp.connect(self.host, self.port) ftp.login(user=self.username, passwd=self.password) - logging.info(f"Connected to FTP server: {self.host}:{self.port}") + logger.debug("Connected to FTP server %s:%s", self.host, self.port) data = BytesIO() ftp.retrbinary(f"RETR {self.filepath}", data.write) - logging.info( - f"Successfully downloaded file '{self.filepath}' from FTP server." + logger.debug( + "Successfully downloaded file %r from FTP server.", + self.filepath, ) data.seek(0) + if data.getbuffer().nbytes == 0: + raise ValueError("The source system returned no data.") return data + except error_perm as e: + msg = str(e) + # Common FTP status codes: + # 530 = not logged in / auth failure + # 550 = file unavailable + if msg.startswith("530"): + raise ValueError( + "Authentication with the source system failed; credentials may be invalid or expired." + ) from e + if msg.startswith("550"): + raise ValueError("The requested payload was not found on the source system.") from e + raise ValueError("The source system returned an error.") from e except Exception as e: - logging.error(f"Error retrieving file from FTP server: {e}") - return None + logger.error("Error retrieving file from FTP server: %s", e, exc_info=True) + raise ValueError("Could not connect to the source system.") from e finally: if ftp: ftp.quit() diff --git a/src/hydroserverpy/etl/extractors/http_extractor.py b/src/hydroserverpy/etl/extractors/http_extractor.py index 8181848..9c98247 100644 --- a/src/hydroserverpy/etl/extractors/http_extractor.py +++ b/src/hydroserverpy/etl/extractors/http_extractor.py @@ -6,6 +6,9 @@ from .base import Extractor, ExtractorConfig +logger = logging.getLogger(__name__) + + class HTTPExtractor(Extractor): def __init__(self, settings: ExtractorConfig): super().__init__(settings) @@ -15,14 +18,60 @@ def extract(self, task: Task, loader=None): Downloads the file from the HTTP/HTTPS server and returns a file-like object. """ url = self.resolve_placeholder_variables(task, loader) - logging.info(f"Requesting data from → {url}") + logger.info("Requesting data from source URI") + + try: + response = requests.get(url) + except requests.exceptions.Timeout as e: + raise ValueError( + "The source system did not respond before the timeout." + ) from e + except requests.exceptions.ConnectionError as e: + raise ValueError("Could not connect to the source system.") from e + except requests.exceptions.RequestException as e: + # Generic network/client error. + raise ValueError("Could not connect to the source system.") from e - response = requests.get(url) - response.raise_for_status() + status = getattr(response, "status_code", None) + if status in (401, 403): + raise ValueError( + "Authentication with the source system failed. The username, password, or token may be incorrect or expired." + ) + if status == 404: + raise ValueError( + "The requested data could not be found on the source system." + ) + if status is not None and status >= 400: + logger.error( + "HTTP request failed (status=%s) for %s", + status, + url, + ) + raise ValueError("The source system returned an error.") data = BytesIO() + total_bytes = 0 for chunk in response.iter_content(chunk_size=8192): if chunk: + total_bytes += len(chunk) data.write(chunk) data.seek(0) + + if total_bytes == 0: + raise ValueError( + "The connection to the source worked but no observations were returned." + ) + + # Keep payload-level details at DEBUG; hydroserver-api-services already logs + # a concise "Extractor returned payload" line for the end user. + logger.debug( + "Extractor returned payload (status=%s, content_type=%r, bytes=%s).", + getattr(response, "status_code", None), + ( + response.headers.get("Content-Type") + if hasattr(response, "headers") + else None + ), + total_bytes, + ) return data diff --git a/src/hydroserverpy/etl/extractors/local_file_extractor.py b/src/hydroserverpy/etl/extractors/local_file_extractor.py index 813f855..0402c31 100644 --- a/src/hydroserverpy/etl/extractors/local_file_extractor.py +++ b/src/hydroserverpy/etl/extractors/local_file_extractor.py @@ -3,18 +3,28 @@ from ..etl_configuration import ExtractorConfig +logger = logging.getLogger(__name__) + + class LocalFileExtractor(Extractor): def __init__(self, extractor_config: ExtractorConfig): super().__init__(extractor_config) - def extract(self, *args, **kwargs): + def extract(self, task=None, loader=None): """ Opens the file and returns a file-like object. """ + path = ( + self.resolve_placeholder_variables(task, loader) + if task is not None + else self.cfg.source_uri + ) try: - file_handle = open(self.cfg.source_uri, "r") - logging.info(f"Successfully opened file '{self.cfg.source_uri}'.") + file_handle = open(path, "r") + logger.debug("Successfully opened local file %r.", path) return file_handle + except FileNotFoundError as e: + raise ValueError("This job references a resource that no longer exists.") from e except Exception as e: - logging.error(f"Error opening file '{self.cfg.source_uri}': {e}") - return None + logger.error("Error opening local file %r: %s", path, e, exc_info=True) + raise ValueError("Could not open the source file.") from e diff --git a/src/hydroserverpy/etl/loaders/hydroserver_loader.py b/src/hydroserverpy/etl/loaders/hydroserver_loader.py index b4e942b..7821836 100644 --- a/src/hydroserverpy/etl/loaders/hydroserver_loader.py +++ b/src/hydroserverpy/etl/loaders/hydroserver_loader.py @@ -1,15 +1,29 @@ from __future__ import annotations -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any, Dict from .base import Loader import logging import pandas as pd from ..etl_configuration import Task, SourceTargetMapping +from dataclasses import dataclass if TYPE_CHECKING: from hydroserverpy.api.client import HydroServer +logger = logging.getLogger(__name__) + + +@dataclass(frozen=True) +class LoadSummary: + cutoff: str + timestamps_total: int + timestamps_after_cutoff: int + observations_available: int + observations_loaded: int + datastreams_loaded: int + + class HydroServerLoader(Loader): """ A class that extends the HydroServer client with ETL-specific functionalities. @@ -20,37 +34,64 @@ def __init__(self, client: HydroServer, task_id): self._begin_cache: dict[str, pd.Timestamp] = {} self.task_id = task_id - def load(self, data: pd.DataFrame, task: Task) -> None: + def load(self, data: pd.DataFrame, task: Task) -> LoadSummary: """ Load observations from a DataFrame to the HydroServer. :param data: A Pandas DataFrame where each column corresponds to a datastream. """ + logger.info("Saving data to HydroServer...") begin_date = self.earliest_begin_date(task) new_data = data[data["timestamp"] > begin_date] + + cutoff_value = ( + begin_date.isoformat() + if hasattr(begin_date, "isoformat") + else str(begin_date) + ) + timestamps_total = len(data) + timestamps_after_cutoff = len(new_data) + observations_available = 0 + observations_loaded = 0 + datastreams_loaded = 0 + for col in new_data.columns.difference(["timestamp"]): - datastream = self.client.datastreams.get( - uid=str(col) - ) - ds_cutoff = datastream.phenomenon_end_time - df = ( + try: + datastream = self.client.datastreams.get(uid=str(col)) + except Exception as e: + status = getattr(e, "status_code", None) or getattr( + getattr(e, "response", None), "status_code", None + ) + if status == 404: + raise ValueError("The target datastream could not be found.") from e + raise ValueError("HydroServer rejected some or all of the data.") from e + + ds_cutoff = getattr(datastream, "phenomenon_end_time", None) + + base_df = ( new_data[["timestamp", col]] - .loc[lambda d: d["timestamp"] > ds_cutoff if ds_cutoff else True] .rename(columns={col: "value"}) .dropna(subset=["value"]) ) + pre_count = len(base_df) + if ds_cutoff: + base_df = base_df.loc[base_df["timestamp"] > ds_cutoff] + + df = base_df + available = len(df) + observations_available += available if df.empty: - logging.warning(f"No new data for {col}, skipping.") continue df = df.rename(columns={"timestamp": "phenomenon_time", "value": "result"}) + loaded = 0 # Chunked upload CHUNK_SIZE = 5000 total = len(df) for start in range(0, total, CHUNK_SIZE): end = min(start + CHUNK_SIZE, total) chunk = df.iloc[start:end] - logging.info( + logger.debug( "Uploading %s rows (%s-%s) to datastream %s", len(chunk), start, @@ -61,12 +102,21 @@ def load(self, data: pd.DataFrame, task: Task) -> None: self.client.datastreams.load_observations( uid=str(col), observations=chunk ) + loaded += len(chunk) except Exception as e: status = getattr(e, "status_code", None) or getattr( getattr(e, "response", None), "status_code", None ) + if status == 404: + raise ValueError( + "The target datastream could not be found." + ) from e + if status == 400: + raise ValueError( + "HydroServer rejected some or all of the data." + ) from e if status == 409 or "409" in str(e) or "Conflict" in str(e): - logging.info( + logger.info( "409 Conflict for datastream %s on rows %s-%s; skipping remainder for this stream.", col, start, @@ -74,20 +124,42 @@ def load(self, data: pd.DataFrame, task: Task) -> None: ) raise + if loaded > 0: + datastreams_loaded += 1 + observations_loaded += loaded + + return LoadSummary( + cutoff=cutoff_value, + timestamps_total=timestamps_total, + timestamps_after_cutoff=timestamps_after_cutoff, + observations_available=observations_available, + observations_loaded=observations_loaded, + datastreams_loaded=datastreams_loaded, + ) + def _fetch_earliest_begin( self, mappings: list[SourceTargetMapping] ) -> pd.Timestamp: - logging.info("Querying HydroServer for earliest begin date for task...") + logger.info( + "Checking HydroServer for the most recent data already stored (so we only load new observations)..." + ) timestamps = [] for m in mappings: for p in m["paths"] if isinstance(m, dict) else m.paths: - datastream_id = p["targetIdentifier"] if isinstance(p, dict) else p.target_identifier + datastream_id = ( + p["targetIdentifier"] + if isinstance(p, dict) + else p.target_identifier + ) datastream = self.client.datastreams.get(datastream_id) raw = datastream.phenomenon_end_time or "1970-01-01" ts = pd.to_datetime(raw, utc=True) timestamps.append(ts) - logging.info(f"Found earliest begin date: {min(timestamps)}") - return min(timestamps) + earliest = ( + min(timestamps) if timestamps else pd.Timestamp("1970-01-01", tz="UTC") + ) + logger.debug("Found earliest begin date: %s", earliest) + return earliest def earliest_begin_date(self, task: Task) -> pd.Timestamp: """ diff --git a/src/hydroserverpy/etl/logging_utils.py b/src/hydroserverpy/etl/logging_utils.py new file mode 100644 index 0000000..6cd185c --- /dev/null +++ b/src/hydroserverpy/etl/logging_utils.py @@ -0,0 +1,40 @@ +from __future__ import annotations + +import re +from typing import Any, Iterable, Mapping, Optional + + +_SENSITIVE_KEY_RE = re.compile( + r"(?:" + r"pass(word)?|passwd|secret|token|api[_-]?key|apikey|auth|bearer|signature|sig|" + r"access[_-]?token|refresh[_-]?token|client[_-]?secret" + r")", + re.IGNORECASE, +) + + +def is_sensitive_key(key: Optional[str]) -> bool: + if not key: + return False + return bool(_SENSITIVE_KEY_RE.search(key)) + + +def redact_value(key: str, value: Any) -> Any: + if is_sensitive_key(key): + return "REDACTED" + if isinstance(value, str) and len(value) > 256: + return value[:256] + "...(truncated)" + return value + + +def redact_mapping(values: Mapping[str, Any]) -> dict[str, Any]: + return {k: redact_value(k, v) for k, v in values.items()} + + +def summarize_list(values: Iterable[Any], *, max_items: int = 20) -> str: + items = list(values) + if len(items) <= max_items: + return repr(items) + return ( + repr(items[:max_items])[:-1] + f", ... (+{len(items) - max_items} more)]" + ) diff --git a/src/hydroserverpy/etl/timestamp_parser.py b/src/hydroserverpy/etl/timestamp_parser.py index c2301b9..d46e46e 100644 --- a/src/hydroserverpy/etl/timestamp_parser.py +++ b/src/hydroserverpy/etl/timestamp_parser.py @@ -12,6 +12,8 @@ TimezoneMode = Literal["utc", "daylightSavings", "fixedOffset", "embeddedOffset"] ALLOWED_TIMEZONE_MODES = {m.lower() for m in get_args(TimezoneMode)} +logger = logging.getLogger(__name__) + class Timestamp(BaseModel): format: TimestampFormat @@ -42,6 +44,10 @@ def __init__(self, raw: Union[Timestamp, dict]): @cached_property def tz(self): if self.tz_mode == "fixedoffset": + if not self.timestamp.timezone: + raise ValueError( + "`timezone` must be set when timezoneMode is fixedOffset (e.g. '-0700')" + ) offset = self.timestamp.timezone.strip() if len(offset) != 5 or offset[0] not in "+-": raise ValueError(f"Invalid timezone: {offset}") @@ -49,7 +55,16 @@ def tz(self): hrs, mins = int(offset[1:3]), int(offset[3:5]) return timezone(timedelta(minutes=sign * (hrs * 60 + mins))) if self.tz_mode == "daylightsavings": - return ZoneInfo(self.timestamp.timezone) + if not self.timestamp.timezone: + raise ValueError( + "Task configuration is missing required daylight savings offset (when using daylightSavings mode)." + ) + try: + return ZoneInfo(self.timestamp.timezone) + except Exception as e: + raise ValueError( + f"Invalid timezone {self.timestamp.timezone!r}. Use an IANA timezone like 'America/Denver'." + ) from e if self.tz_mode == "utc": return timezone.utc @@ -81,9 +96,14 @@ def parse_series(self, raw_series: pd.Series) -> pd.Series: if parsed.isna().any(): bad_rows = s[parsed.isna()].head(2).tolist() - logging.warning( - f"{parsed.isna().sum()} timestamps failed to parse. " - f"Sample bad values: {bad_rows}" + logger.warning( + "%s timestamps failed to parse (format=%r, timezoneMode=%r, timezone=%r, customFormat=%r). Sample bad values: %s", + parsed.isna().sum(), + self.timestamp.format, + self.timestamp.timezone_mode, + self.timestamp.timezone, + self.timestamp.custom_format, + bad_rows, ) return parsed @@ -106,7 +126,12 @@ def utc_to_string(self, dt: Union[datetime, pd.Timestamp]) -> str: return dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S") if tz_format == "custom": - logging.info(f"custom timestamp: ... {self.timestamp}") + logger.debug( + "Formatting runtime timestamp using custom format (customFormat=%r, timezoneMode=%r, timezone=%r).", + self.timestamp.custom_format, + self.timestamp.timezone_mode, + self.timestamp.timezone, + ) return dt.astimezone(self.tz).strftime(self.timestamp.custom_format) raise ValueError(f"Unknown timestamp.format: {self.timestamp.format!r}") diff --git a/src/hydroserverpy/etl/transformers/base.py b/src/hydroserverpy/etl/transformers/base.py index 42a754a..89bc824 100644 --- a/src/hydroserverpy/etl/transformers/base.py +++ b/src/hydroserverpy/etl/transformers/base.py @@ -8,6 +8,7 @@ from ..timestamp_parser import TimestampParser from ..etl_configuration import MappingPath, TransformerConfig, SourceTargetMapping +from ..logging_utils import summarize_list ALLOWED_AST = ( ast.Expression, @@ -51,6 +52,9 @@ def _compile_arithmetic_expr(expr: str): return _compile_arithmetic_expr_canon(_canonicalize_expr(expr)) +logger = logging.getLogger(__name__) + + class Transformer(ABC): def __init__(self, transformer_config: TransformerConfig): self.cfg = transformer_config @@ -68,18 +72,39 @@ def needs_datastreams(self) -> bool: def standardize_dataframe( self, df: pd.DataFrame, mappings: List[SourceTargetMapping] ): + logger.debug( + "Standardizing extracted dataframe (rows=%s, columns=%s).", + len(df), + len(df.columns), + ) + logger.debug( + "Extracted dataframe columns (sample): %s", + summarize_list(list(df.columns), max_items=30), + ) if not df.empty: - logging.info(f"Read task into dataframe: {df.iloc[0].to_dict()}") - else: - logging.info("Read task into dataframe: [empty dataframe]") + # Avoid dumping full rows; just log a compact preview. + preview = df.iloc[0].to_dict() + for k, v in list(preview.items()): + if isinstance(v, str) and len(v) > 128: + preview[k] = v[:128] + "...(truncated)" + logger.debug("Extracted dataframe first-row preview: %s", preview) # 1) Normalize timestamp column df.rename(columns={self.timestamp.key: "timestamp"}, inplace=True) if "timestamp" not in df.columns: msg = f"Timestamp column '{self.timestamp.key}' not found in data." - logging.error(msg) + logger.error( + "%s Available columns=%s", + msg, + summarize_list(list(df.columns), max_items=30), + ) raise ValueError(msg) - logging.info(f"Renamed timestamp column to 'timestamp'") + logger.debug( + "Normalized timestamp column '%s' -> 'timestamp' (timezoneMode=%r, format=%r).", + self.timestamp.key, + getattr(self.timestamp, "timezone_mode", None), + getattr(self.timestamp, "format", None), + ) df["timestamp"] = self.timestamp_parser.parse_series(df["timestamp"]) df = df.drop_duplicates(subset=["timestamp"], keep="last") @@ -89,10 +114,21 @@ def _resolve_source_col(s_id: Union[str, int]) -> str: try: return df.columns[s_id] except IndexError: + logger.error( + "Source index %s is out of range. Extracted columns count=%s, columns(sample)=%s", + s_id, + len(df.columns), + summarize_list(list(df.columns), max_items=30), + ) raise ValueError( f"Source index {s_id} is out of range for extracted data." ) if s_id not in df.columns: + logger.error( + "Source column %r not found. Available columns=%s", + s_id, + summarize_list(list(df.columns), max_items=30), + ) raise ValueError(f"Source column '{s_id}' not found in extracted data.") return s_id @@ -107,19 +143,25 @@ def _apply_transformations(series: pd.Series, path: MappingPath) -> pd.Series: try: out = eval(code, {"__builtins__": {}}, {"x": out}) except Exception as ee: - logging.exception( - "Data transformation failed for expression=%r", + logger.exception( + "Data transformation failed for target=%r expression=%r", + path.target_identifier, transformation.expression, ) raise else: msg = f"Unsupported transformation type: {transformation.type}" - logging.error(msg) + logger.error(msg) raise ValueError(msg) return out # source target mappings may be one to many. Therefore, create a new column for each target and apply transformations transformed_df = pd.DataFrame(index=df.index) + logger.debug( + "Applying %s source mapping(s): %s", + len(mappings), + summarize_list([m.source_identifier for m in mappings], max_items=30), + ) for m in mappings: src_col = _resolve_source_col(m.source_identifier) base = df[src_col] @@ -130,6 +172,6 @@ def _apply_transformations(series: pd.Series, path: MappingPath) -> pd.Series: # 6) Keep only timestamp + target columns df = pd.concat([df[["timestamp"]], pd.DataFrame(transformed_df)], axis=1) - logging.info(f"standardized dataframe created: {df.shape}") + logger.debug("Standardized dataframe created: %s", df.shape) return df diff --git a/src/hydroserverpy/etl/transformers/csv_transformer.py b/src/hydroserverpy/etl/transformers/csv_transformer.py index 4d219f0..c1225a1 100644 --- a/src/hydroserverpy/etl/transformers/csv_transformer.py +++ b/src/hydroserverpy/etl/transformers/csv_transformer.py @@ -6,6 +6,9 @@ from ..etl_configuration import TransformerConfig, SourceTargetMapping +logger = logging.getLogger(__name__) + + class CSVTransformer(Transformer): def __init__(self, transformer_config: TransformerConfig): super().__init__(transformer_config) @@ -32,16 +35,48 @@ def transform( Returns: observations_map (dict): Dict mapping datastream IDs to pandas DataFrames. """ + if data_file is None: + raise TypeError( + "CSVTransformer received None; expected file-like, bytes, or str" + ) clean_file = self._strip_comments(data_file) use_index = self.identifier_type == "index" if use_index: # Users will always interact in 1-based, so if the key is a column index, convert to 0-based to work with Pandas - timestamp_pos = int(self.timestamp.key) - 1 - usecols = [timestamp_pos] + [int(m.source_identifier) - 1 for m in mappings] + try: + timestamp_pos = int(self.timestamp.key) - 1 + except Exception as e: + raise ValueError( + "CSV transformer identifierType='index' requires timestamp.key to be a 1-based " + "column index string (example: '1')." + ) from e + try: + source_positions = [int(m.source_identifier) - 1 for m in mappings] + except Exception as e: + raise ValueError( + "CSV transformer identifierType='index' requires each mapping sourceIdentifier to be a " + "1-based column index string (example: '2')." + ) from e + + usecols = [timestamp_pos] + source_positions + # When selecting columns by integer position, `dtype={col_name: ...}` can break because there is + # no stable column name at parse time. Reading everything as string is safe; we coerce later. + dtype = "string" else: usecols = [self.timestamp.key] + [m.source_identifier for m in mappings] + dtype = {self.timestamp.key: "string"} + + logger.info("Reading CSV data...") + logger.debug( + "Parsing CSV (identifierType=%r, delimiter=%r, headerRow=%r, dataStartRow=%r, usecols=%r).", + self.identifier_type, + self.delimiter, + getattr(self.cfg, "header_row", None), + getattr(self.cfg, "data_start_row", None), + usecols, + ) try: # Pandas’ heuristics strip offsets and silently coerce failures to strings. @@ -53,16 +88,34 @@ def transform( header=0, skiprows=self._build_skiprows(), usecols=usecols, - dtype={self.timestamp.key: "string"}, + dtype=dtype, ) - logging.info(f"CSV file read into dataframe: {df.shape}") + logger.debug("CSV file read into dataframe: %s", df.shape) except Exception as e: - logging.error(f"Error reading CSV data: {e}") - return None + # Try to extract a safe preview of the header for "usecols do not match columns" failures. + header_preview = None + try: + pos = clean_file.tell() + clean_file.seek(0) + header_preview = (clean_file.readline() or "").strip() + clean_file.seek(pos) + except Exception: + header_preview = None + + msg = str(e) + user_message = "One or more data rows contained unexpected values and could not be processed." + if "No columns to parse from file" in msg: + user_message = "The source system returned no data." + elif "Usecols do not match columns" in msg or "not in list" in msg: + user_message = "One or more configured CSV columns were not found in the header row." + + logger.error("Error reading CSV data: %s", user_message) + raise ValueError(user_message) from e # In index mode, relabel columns back to original 1-based indices so base transformer can use integer labels directly if use_index: - df.columns = [(c + 1) if isinstance(c, int) else c for c in usecols] + # Task config stores keys as strings; keep columns as strings so timestamp.key/sourceIdentifier match. + df.columns = [str(c + 1) if isinstance(c, int) else str(c) for c in usecols] return self.standardize_dataframe(df, mappings) diff --git a/src/hydroserverpy/etl/transformers/json_transformer.py b/src/hydroserverpy/etl/transformers/json_transformer.py index fa3e89f..09af63b 100644 --- a/src/hydroserverpy/etl/transformers/json_transformer.py +++ b/src/hydroserverpy/etl/transformers/json_transformer.py @@ -7,6 +7,9 @@ from ..etl_configuration import TransformerConfig, SourceTargetMapping +logger = logging.getLogger(__name__) + + class JSONTransformer(Transformer): def __init__(self, transformer_config: TransformerConfig): super().__init__(transformer_config) @@ -30,11 +33,24 @@ def transform(self, data_file, mappings: List[SourceTargetMapping]): ) json_data = json.load(data_file) + logger.debug("Loaded JSON payload (type=%s).", type(json_data).__name__) + if not isinstance(json_data, (dict, list)): + raise ValueError("The payload's expected fields were not found.") + data_points = self.extract_data_points(json_data) - if not data_points: - logging.warning("No data points found in the JSON data.") - return None + if data_points is None: + # JMESPath returned null: this usually means the expected field/path was not present. + raise ValueError( + "The timestamp or value key could not be found with the specified query." + ) + if len(data_points) == 0: + # Treat an empty result list as "no data" (not a configuration error). + # Build an empty frame with the expected columns so standardization can proceed cleanly. + cols = [self.timestamp.key] + [m.source_identifier for m in mappings] + df = pd.DataFrame(columns=cols) + return self.standardize_dataframe(df, mappings) + logger.debug("Extracted %s JSON data point(s).", len(data_points)) df = pd.DataFrame(data_points) return self.standardize_dataframe(df, mappings) @@ -42,7 +58,12 @@ def transform(self, data_file, mappings: List[SourceTargetMapping]): def extract_data_points(self, json_data: Any) -> Optional[List[dict]]: """Extracts data points from the JSON data using the data_path.""" data_points = jmespath.search(self.jmespath, json_data) + if data_points is None: + return None if isinstance(data_points, dict): data_points = [data_points] - return data_points + if isinstance(data_points, list): + return data_points + # Unexpected output type; surface it as a configuration issue. + return None