From fb08433737c9d5c17c773b081d5170cb8f90df65 Mon Sep 17 00:00:00 2001 From: Daniel Slaugh Date: Fri, 6 Feb 2026 15:18:32 -0700 Subject: [PATCH 01/12] Pass improved logging to frontend --- domains/etl/tasks.py | 363 +++++++++++++++++++++++++++++++++++++------ 1 file changed, 315 insertions(+), 48 deletions(-) diff --git a/domains/etl/tasks.py b/domains/etl/tasks.py index 2de705ac..46d52831 100644 --- a/domains/etl/tasks.py +++ b/domains/etl/tasks.py @@ -1,7 +1,10 @@ import logging -import pandas as pd +from contextlib import contextmanager +from dataclasses import dataclass, field +from datetime import datetime, timedelta, timezone as dt_timezone +from typing import Any, Optional from uuid import UUID -from datetime import timedelta +import pandas as pd from celery import shared_task from pydantic import TypeAdapter from celery.signals import task_prerun, task_success, task_failure, task_postrun @@ -14,54 +17,299 @@ from hydroserverpy.etl.etl_configuration import ExtractorConfig, TransformerConfig, SourceTargetMapping, MappingPath -@shared_task(bind=True, expires=10) +@dataclass +class TaskRunContext: + stage: str = "setup" + runtime_source_uri: Optional[str] = None + stats: dict[str, Any] = field(default_factory=dict) + log_handler: Optional["TaskLogHandler"] = None + + +class TaskLogFilter(logging.Filter): + def filter(self, record: logging.LogRecord) -> bool: + path = (record.pathname or "").replace("\\", "/") + return "/hydroserverpy/" in path or "/domains/etl/" in path + + +class TaskLogHandler(logging.Handler): + def __init__(self, context: TaskRunContext): + super().__init__(level=logging.INFO) + self.context = context + self.lines: list[str] = [] + self.entries: list[dict[str, Any]] = [] + self._formatter = logging.Formatter() + + def emit(self, record: logging.LogRecord) -> None: + if not self.filter(record): + return + + timestamp = datetime.fromtimestamp( + record.created, tz=dt_timezone.utc + ).isoformat() + message = record.getMessage() + line = f"{timestamp} {record.levelname:<8} {message}" + if record.exc_info: + line = f"{line}\n{self._formatter.formatException(record.exc_info)}" + self.lines.append(line) + + entry: dict[str, Any] = { + "timestamp": timestamp, + "level": record.levelname, + "logger": record.name, + "message": message, + "pathname": record.pathname, + "lineno": record.lineno, + } + if record.exc_info: + entry["exception"] = self._formatter.formatException(record.exc_info) + self.entries.append(entry) + + self._capture_runtime_uri(message) + + def _capture_runtime_uri(self, message: str) -> None: + if self.context.runtime_source_uri: + return + if "Resolved runtime source URI:" in message: + self.context.runtime_source_uri = message.split( + "Resolved runtime source URI:", 1 + )[1].strip() + return + if "Requesting data from" in message: + if "→" in message: + self.context.runtime_source_uri = message.split("→", 1)[1].strip() + return + if "from" in message: + self.context.runtime_source_uri = message.split("from", 1)[1].strip() + + def as_text(self) -> str: + return "\n".join(self.lines).strip() + + +TASK_RUN_CONTEXT: dict[str, TaskRunContext] = {} + + +@contextmanager +def capture_task_logs(context: TaskRunContext): + logger = logging.getLogger() + handler = TaskLogHandler(context) + handler.addFilter(TaskLogFilter()) + context.log_handler = handler + + previous_level = logger.level + if previous_level > logging.INFO: + logger.setLevel(logging.INFO) + + logger.addHandler(handler) + try: + yield handler + finally: + logger.removeHandler(handler) + if previous_level > logging.INFO: + logger.setLevel(previous_level) + + +def _is_empty(data: Any) -> bool: + if data is None: + return True + if isinstance(data, pd.DataFrame) and data.empty: + return True + return False + + +def _describe_payload(data: Any) -> dict[str, Any]: + if isinstance(data, pd.DataFrame): + return { + "type": "DataFrame", + "rows": len(data), + "columns": len(data.columns), + } + return {"type": type(data).__name__} + + +def _describe_transformed_data(data: Any) -> dict[str, Any]: + if not isinstance(data, pd.DataFrame): + return {"type": type(data).__name__} + datastreams = [col for col in data.columns if col != "timestamp"] + return { + "type": "DataFrame", + "rows": len(data), + "columns": len(data.columns), + "datastreams": len(datastreams), + } + + +def _success_message(stats: dict[str, Any]) -> str: + transform_stats = stats.get("transform") or stats.get("load") or {} + rows = transform_stats.get("rows") + datastreams = transform_stats.get("datastreams") + if rows is not None and datastreams is not None: + return ( + f"Load completed successfully ({rows} rows across {datastreams} datastreams)." + ) + if rows is not None: + return f"Load completed successfully ({rows} rows processed)." + return "Load completed successfully." + + +def _apply_runtime_uri_aliases(result: dict[str, Any], runtime_source_uri: str) -> None: + result.setdefault("runtime_source_uri", runtime_source_uri) + result.setdefault("runtimeSourceUri", runtime_source_uri) + result.setdefault("runtime_url", runtime_source_uri) + result.setdefault("runtimeUrl", runtime_source_uri) + + +def _apply_log_aliases(result: dict[str, Any]) -> None: + if "log_entries" in result and "logEntries" not in result: + result["logEntries"] = result["log_entries"] + + +def _merge_result_with_context( + result: dict[str, Any], context: Optional[TaskRunContext] +) -> dict[str, Any]: + if "summary" not in result and "message" in result: + result["summary"] = result["message"] + + if context: + if context.runtime_source_uri and not ( + result.get("runtime_source_uri") + or result.get("runtimeSourceUri") + or result.get("runtime_url") + or result.get("runtimeUrl") + ): + _apply_runtime_uri_aliases(result, context.runtime_source_uri) + + if context.log_handler: + if "logs" not in result: + logs_text = context.log_handler.as_text() + if logs_text: + result["logs"] = logs_text + if "log_entries" not in result and context.log_handler.entries: + result["log_entries"] = context.log_handler.entries + + if context.stats and "stats" not in result: + result["stats"] = context.stats + + _apply_log_aliases(result) + return result + + +def _build_task_result( + message: str, + context: Optional[TaskRunContext] = None, + *, + stage: Optional[str] = None, + error: Optional[str] = None, + traceback: Optional[str] = None, +) -> dict[str, Any]: + result: dict[str, Any] = {"message": message, "summary": message} + if stage: + result["stage"] = stage + if error: + result.update( + { + "error": error, + "failure_reason": error, + "failureReason": error, + } + ) + if traceback: + result["traceback"] = traceback + + if context and context.runtime_source_uri: + _apply_runtime_uri_aliases(result, context.runtime_source_uri) + + if context and context.log_handler: + logs_text = context.log_handler.as_text() + if logs_text: + result["logs"] = logs_text + if context.log_handler.entries: + result["log_entries"] = context.log_handler.entries + + if context and context.stats: + result["stats"] = context.stats + + _apply_log_aliases(result) + return result + +@shared_task(bind=True, expires=10, name="etl.tasks.run_etl_task") def run_etl_task(self, task_id: str): """ Runs a HydroServer ETL task based on the task configuration provided. """ - task = Task.objects.select_related( - "data_connection" - ).prefetch_related( - "mappings", "mappings__paths" - ).get(pk=UUID(task_id)) - - extractor_cls = extractor_factory(TypeAdapter(ExtractorConfig).validate_python({ - "type": task.data_connection.extractor_type, - **task.data_connection.extractor_settings - })) - transformer_cls = transformer_factory(TypeAdapter(TransformerConfig).validate_python({ - "type": task.data_connection.transformer_type, - **task.data_connection.transformer_settings - })) - loader_cls = HydroServerInternalLoader(task) - - task_mappings = [ - SourceTargetMapping( - source_identifier=task_mapping.source_identifier, - paths=[ - MappingPath( - target_identifier=task_mapping_path.target_identifier, - data_transformations=task_mapping_path.data_transformations, - ) for task_mapping_path in task_mapping.paths.all() + task_run_id = self.request.id + context = TaskRunContext() + TASK_RUN_CONTEXT[task_run_id] = context + + with capture_task_logs(context): + try: + task = Task.objects.select_related( + "data_connection" + ).prefetch_related( + "mappings", "mappings__paths" + ).get(pk=UUID(task_id)) + + extractor_cls = extractor_factory(TypeAdapter(ExtractorConfig).validate_python({ + "type": task.data_connection.extractor_type, + **task.data_connection.extractor_settings + })) + transformer_cls = transformer_factory(TypeAdapter(TransformerConfig).validate_python({ + "type": task.data_connection.transformer_type, + **task.data_connection.transformer_settings + })) + loader_cls = HydroServerInternalLoader(task) + + task_mappings = [ + SourceTargetMapping( + source_identifier=task_mapping.source_identifier, + paths=[ + MappingPath( + target_identifier=task_mapping_path.target_identifier, + data_transformations=task_mapping_path.data_transformations, + ) for task_mapping_path in task_mapping.paths.all() + ] + ) for task_mapping in task.mappings.all() ] - ) for task_mapping in task.mappings.all() - ] - - logging.info("Starting extract") - data = extractor_cls.extract(task, loader_cls) - if data is None or (isinstance(data, pd.DataFrame) and data.empty): - return {"message": f"No data returned from the extractor for task: {str(task.id)}"} - logging.info("Starting transform") - data = transformer_cls.transform(data, task_mappings) - if data is None or (isinstance(data, pd.DataFrame) and data.empty): - return {"message": f"No data returned from the transformer for task: {str(task.id)}"} - - logging.info("Starting load") - loader_cls.load(data, task) - - return {"message": f"Finished processing task: {str(task.id)}"} + context.stage = "extract" + logging.info("Starting extract") + data = extractor_cls.extract(task, loader_cls) + context.runtime_source_uri = ( + getattr(extractor_cls, "runtime_source_uri", None) + or context.runtime_source_uri + ) + context.stats["extract"] = _describe_payload(data) + if _is_empty(data): + return _build_task_result( + "No data returned from the extractor. Nothing to load.", + context, + stage=context.stage, + ) + + context.stage = "transform" + logging.info("Starting transform") + data = transformer_cls.transform(data, task_mappings) + context.stats["transform"] = _describe_transformed_data(data) + if _is_empty(data): + return _build_task_result( + "Transform produced no rows. Nothing to load.", + context, + stage=context.stage, + ) + + context.stage = "load" + logging.info("Starting load") + loader_cls.load(data, task) + context.stats["load"] = _describe_transformed_data(data) + + return _build_task_result( + _success_message(context.stats), + context, + stage=context.stage, + ) + except Exception: + logging.exception("ETL task failed during %s", context.stage) + raise @task_prerun.connect @@ -119,11 +367,20 @@ def mark_etl_task_success(sender, result, **extra): if sender != run_etl_task: return + context = TASK_RUN_CONTEXT.pop(sender.request.id, None) + try: task_run = TaskRun.objects.get(id=sender.request.id) except TaskRun.DoesNotExist: return + if not isinstance(result, dict): + result = {"message": str(result)} + + result = _merge_result_with_context(result, context) + if context and context.stage and "stage" not in result: + result["stage"] = context.stage + task_run.status = "SUCCESS" task_run.finished_at = timezone.now() task_run.result = result @@ -140,22 +397,32 @@ def mark_etl_task_failure(sender, task_id, einfo, exception, **extra): if sender != run_etl_task: return + context = TASK_RUN_CONTEXT.pop(task_id, None) + try: task_run = TaskRun.objects.get(id=task_id) except TaskRun.DoesNotExist: return + stage = context.stage if context else None + message = ( + f"Failed during {stage}: {exception}" if stage else f"{exception}" + ) + task_run.status = "FAILURE" task_run.finished_at = timezone.now() - task_run.result = { - "error": str(exception), - "traceback": einfo.traceback, - } + task_run.result = _build_task_result( + message, + context, + stage=stage, + error=str(exception), + traceback=einfo.traceback, + ) task_run.save(update_fields=["status", "finished_at", "result"]) -@shared_task(bind=True, expires=10) +@shared_task(bind=True, expires=10, name="etl.tasks.cleanup_etl_task_runs") def cleanup_etl_task_runs(self, days=14): """ Celery task to run the cleanup_etl_task_runs management command. From b0064f93b411579249826c1027f885174fa16b54 Mon Sep 17 00:00:00 2001 From: Daniel Slaugh Date: Fri, 6 Feb 2026 15:43:01 -0700 Subject: [PATCH 02/12] More explicit run messages --- domains/etl/loader.py | 53 +++++++++++++++++++++++++++++++++++++++---- domains/etl/tasks.py | 36 ++++++++++++++++++++++++++--- 2 files changed, 81 insertions(+), 8 deletions(-) diff --git a/domains/etl/loader.py b/domains/etl/loader.py index 78289608..11e78bc2 100644 --- a/domains/etl/loader.py +++ b/domains/etl/loader.py @@ -1,5 +1,6 @@ from __future__ import annotations from uuid import UUID +from typing import Any, Dict from hydroserverpy.etl.loaders.base import Loader import logging @@ -24,25 +25,54 @@ def __init__(self, task): self._begin_cache: dict[str, pd.Timestamp] = {} self.task = task - def load(self, data: pd.DataFrame, task: Task) -> None: + def load(self, data: pd.DataFrame, task: Task) -> Dict[str, Any]: """ Load observations from a DataFrame to the HydroServer. """ - begin_date = self.earliest_begin_date(task) new_data = data[data["timestamp"] > begin_date] + + cutoff_value = ( + begin_date.isoformat() + if hasattr(begin_date, "isoformat") + else str(begin_date) + ) + stats: Dict[str, Any] = { + "cutoff": cutoff_value, + "timestamps_total": len(data), + "timestamps_after_cutoff": len(new_data), + "timestamps_filtered_by_cutoff": max(len(data) - len(new_data), 0), + "observations_available": 0, + "observations_loaded": 0, + "observations_skipped": 0, + "datastreams_total": 0, + "datastreams_available": 0, + "datastreams_loaded": 0, + "per_datastream": {}, + } + for col in new_data.columns.difference(["timestamp"]): + stats["datastreams_total"] += 1 df = ( new_data[["timestamp", col]] .rename(columns={col: "value"}) .dropna(subset=["value"]) ) - if df.empty: - logging.warning(f"No new data for {col}, skipping.") + available = len(df) + stats["observations_available"] += available + if available == 0: + logging.warning("No new data for %s after filtering; skipping.", col) + stats["per_datastream"][str(col)] = { + "available": 0, + "loaded": 0, + "skipped": 0, + } continue + stats["datastreams_available"] += 1 df = df.rename(columns={"timestamp": "phenomenonTime", "value": "result"}) + loaded = 0 # Chunked upload CHUNK_SIZE = 5000 total = len(df) @@ -59,7 +89,7 @@ def load(self, data: pd.DataFrame, task: Task) -> None: chunk_data = ObservationBulkPostBody( fields=["phenomenonTime", "result"], - data=chunk.values.tolist() + data=chunk.values.tolist(), ) try: @@ -69,6 +99,7 @@ def load(self, data: pd.DataFrame, task: Task) -> None: datastream_id=UUID(col), mode="append", ) + loaded += len(chunk) except Exception as e: status = getattr(e, "status_code", None) or getattr( getattr(e, "response", None), "status_code", None @@ -83,6 +114,18 @@ def load(self, data: pd.DataFrame, task: Task) -> None: break raise + stats["observations_loaded"] += loaded + stats["observations_skipped"] += max(available - loaded, 0) + if loaded > 0: + stats["datastreams_loaded"] += 1 + stats["per_datastream"][str(col)] = { + "available": available, + "loaded": loaded, + "skipped": max(available - loaded, 0), + } + + return stats + @staticmethod def _fetch_earliest_begin(task: Task) -> pd.Timestamp: logging.info("Querying HydroServer for earliest begin date for payload...") diff --git a/domains/etl/tasks.py b/domains/etl/tasks.py index 46d52831..5d4c45c7 100644 --- a/domains/etl/tasks.py +++ b/domains/etl/tasks.py @@ -139,7 +139,34 @@ def _describe_transformed_data(data: Any) -> dict[str, Any]: def _success_message(stats: dict[str, Any]) -> str: - transform_stats = stats.get("transform") or stats.get("load") or {} + load_stats = stats.get("load") or {} + loaded = load_stats.get("observations_loaded") + datastreams_loaded = load_stats.get("datastreams_loaded") + available = load_stats.get("observations_available") + timestamps_after_cutoff = load_stats.get("timestamps_after_cutoff") + timestamps_total = load_stats.get("timestamps_total") + + if loaded is not None: + if loaded == 0: + if timestamps_total and timestamps_after_cutoff == 0: + cutoff = load_stats.get("cutoff") + if cutoff: + return ( + "No new observations to load " + f"(all timestamps were at or before {cutoff})." + ) + return "No new observations to load (all timestamps were at or before the cutoff)." + if available == 0: + return "No new observations to load." + return "No new observations were loaded." + + if datastreams_loaded is not None: + return ( + f"Load completed successfully ({loaded} rows across {datastreams_loaded} datastreams)." + ) + return f"Load completed successfully ({loaded} rows loaded)." + + transform_stats = stats.get("transform") or {} rows = transform_stats.get("rows") datastreams = transform_stats.get("datastreams") if rows is not None and datastreams is not None: @@ -299,8 +326,11 @@ def run_etl_task(self, task_id: str): context.stage = "load" logging.info("Starting load") - loader_cls.load(data, task) - context.stats["load"] = _describe_transformed_data(data) + load_stats = loader_cls.load(data, task) + if isinstance(load_stats, dict): + context.stats["load"] = load_stats + else: + context.stats["load"] = _describe_transformed_data(data) return _build_task_result( _success_message(context.stats), From b30009bda2c6cdeb7df532266b680757fa2df7ac Mon Sep 17 00:00:00 2001 From: Daniel Slaugh Date: Tue, 10 Feb 2026 09:36:11 -0700 Subject: [PATCH 03/12] Add easier to read run messages --- .gitignore | 2 + domains/etl/etl_errors.py | 339 +++++++++++++++++++++++++++++++++ domains/etl/loader.py | 105 ++++++----- domains/etl/tasks.py | 387 +++++++++++++++++++++++++++++++------- 4 files changed, 720 insertions(+), 113 deletions(-) create mode 100644 domains/etl/etl_errors.py diff --git a/.gitignore b/.gitignore index e56523b9..c78cf983 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,7 @@ .env /frontend/.env +.venv/ +.venv*/ .pytest_cache __pycache__/ db.sqlite3 diff --git a/domains/etl/etl_errors.py b/domains/etl/etl_errors.py new file mode 100644 index 00000000..4cae7a58 --- /dev/null +++ b/domains/etl/etl_errors.py @@ -0,0 +1,339 @@ +from __future__ import annotations + +import re +from dataclasses import dataclass +from typing import Any, Iterable, Optional + +from pydantic import ValidationError + + +def _jsonish(value: Any) -> str: + if value is None: + return "null" + if isinstance(value, str): + if value == "": + return '""' + return repr(value) + return repr(value) + + +_EXTRACTOR_ALIAS_MAP: dict[str, str] = { + "source_uri": "sourceUri", + "placeholder_variables": "placeholderVariables", + "run_time_value": "runTimeValue", +} + +_TRANSFORMER_ALIAS_MAP: dict[str, str] = { + "header_row": "headerRow", + "data_start_row": "dataStartRow", + "identifier_type": "identifierType", + "custom_format": "customFormat", + "timezone_mode": "timezoneMode", + "run_time_value": "runTimeValue", + "jmespath": "JMESPath", + "target_identifier": "targetIdentifier", + "source_identifier": "sourceIdentifier", + "data_transformations": "dataTransformations", + "lookup_table_id": "lookupTableId", +} + + +def _alias(component: str, field: str) -> str: + if component == "extractor": + return _EXTRACTOR_ALIAS_MAP.get(field, field) + if component == "transformer": + return _TRANSFORMER_ALIAS_MAP.get(field, field) + return field + + +def _format_loc(component: str, loc: Iterable[Any]) -> str: + loc_list = list(loc) + if component == "extractor" and loc_list and loc_list[0] in ( + "HTTPExtractor", + "LocalFileExtractor", + ): + loc_list = loc_list[1:] + if component == "transformer" and loc_list and loc_list[0] in ( + "JSONTransformer", + "CSVTransformer", + ): + loc_list = loc_list[1:] + + parts: list[str] = [] + for item in loc_list: + if isinstance(item, int): + if not parts: + parts.append(f"[{item}]") + else: + parts[-1] = f"{parts[-1]}[{item}]" + continue + if isinstance(item, str): + parts.append(_alias(component, item)) + continue + parts.append(str(item)) + if not parts: + return component + return ".".join([component] + parts) + + +@dataclass +class EtlUserFacingError(Exception): + """ + Exception intended to be shown to end users (TaskDetails "run message"). + + Keep message short and actionable; put extra information in `details` and `hint`. + """ + + message: str + stage: Optional[str] = None + code: Optional[str] = None + hint: Optional[str] = None + details: Optional[list[dict[str, Any]]] = None + debug_error: Optional[str] = None + + def __post_init__(self) -> None: + # Keep Exception.args populated for Celery/logging/serialization interoperability. + super().__init__(self.message) + + def __str__(self) -> str: # pragma: no cover + return self.message + + def as_failure_dict(self) -> dict[str, Any]: + out: dict[str, Any] = {} + if self.code: + out["code"] = self.code + if self.hint: + out["hint"] = self.hint + if self.details: + out["details"] = self.details + if self.debug_error: + out["debug_error"] = self.debug_error + return out + + +def user_facing_error_from_validation_error( + component: str, + exc: ValidationError, + *, + raw: Optional[dict[str, Any]] = None, + stage: str = "setup", +) -> EtlUserFacingError: + """ + Turn pydantic's ValidationError into a clean message + structured details. + """ + errs = exc.errors(include_url=False) + # pydantic unions often emit errors for all union branches; filter to the selected type when possible. + if raw and component in ("extractor", "transformer"): + raw_type = raw.get("type") + type_to_model = { + "extractor": {"HTTP": "HTTPExtractor", "local": "LocalFileExtractor"}, + "transformer": {"JSON": "JSONTransformer", "CSV": "CSVTransformer"}, + } + selected_model = type_to_model.get(component, {}).get(raw_type) + if selected_model: + errs = [ + e for e in errs if not e.get("loc") or e["loc"][0] == selected_model + ] or errs + details: list[dict[str, Any]] = [] + for e in errs: + loc = e.get("loc") or () + msg = e.get("msg") or "Invalid value" + e_type = e.get("type") + inp = e.get("input", None) + if inp is None and e_type == "string_type": + msg = "Field must be a string, not null" + elif inp is None and e_type in ("int_type", "float_type", "bool_type"): + msg = f"Field must be a {e_type.split('_', 1)[0]}, not null" + details.append( + { + "path": _format_loc(component, loc), + "message": msg, + "type": e_type, + "input": inp, + } + ) + + first = details[0] if details else None + if first: + path = first.get("path") or component + msg = first.get("message") or "Invalid value" + inp = _jsonish(first.get("input")) + suffix = "" + if len(details) > 1: + suffix = f" (+{len(details) - 1} more issue(s))" + fix = "" + if component in ("extractor", "transformer", "loader"): + fix = f" Update the Data Connection {component} settings." + message = f"Invalid {component} configuration at {path}: {msg} (got {inp}).{suffix}{fix}" + else: + message = f"Invalid {component} configuration." + + hint = None + # Provide a consistent, user-facing pointer to where to fix config. + if component in ("extractor", "transformer", "loader"): + hint = ( + f"Check the Data Connection {component} settings and fix the field(s) listed above." + ) + if any(d.get("input") is None for d in details): + hint = ( + hint + + " One of the required fields is null; double-check your JSON config for missing values or placeholders that were not substituted." + ) + + return EtlUserFacingError( + message=message, + stage=stage, + code=f"invalid_{component}_config", + hint=hint, + details=details or None, + debug_error=str(exc), + ) + + +_MISSING_PER_TASK_VAR_RE = re.compile(r"Missing per-task variable '([^']+)'") +_MISSING_PLACEHOLDER_VAR_RE = re.compile(r"Missing placeholder variable: (.+)$") +_TIMESTAMP_COL_NOT_FOUND_RE = re.compile(r"Timestamp column '([^']*)' not found in data\\.") +_SOURCE_COL_NOT_FOUND_RE = re.compile(r"Source (?:column|index) '([^']+)' not found") + + +def user_facing_error_from_exception( + exc: Exception, + *, + stage: Optional[str] = None, +) -> Optional[EtlUserFacingError]: + """ + Map common hydroserverpy + ETL errors to user-actionable messages. + + Return None when the exception should fall back to default formatting. + """ + if isinstance(exc, EtlUserFacingError): + return exc + + if isinstance(exc, ValidationError): + # Caller should pass component via user_facing_error_from_validation_error. + return None + + if isinstance(exc, KeyError): + if exc.args and isinstance(exc.args[0], str): + msg = exc.args[0] + else: + msg = str(exc) + m = _MISSING_PER_TASK_VAR_RE.search(msg) + if m: + name = m.group(1) + return EtlUserFacingError( + message=( + f"Missing required per-task extractor variable '{name}'. " + f"Add it to the task's extractorVariables." + ), + stage=stage, + code="missing_task_variable", + hint=f"Add '{name}' to the task's extractorVariables so the extractor can build the source URL.", + details=[{"variable": name, "scope": "task.extractorVariables"}], + debug_error=msg, + ) + m = _MISSING_PLACEHOLDER_VAR_RE.search(msg) + if m: + name = m.group(1).strip() + return EtlUserFacingError( + message=( + f"Extractor sourceUri contains placeholder '{name}', but it was not provided. " + f"Define it in extractor.placeholderVariables and provide a value in task.extractorVariables if needed." + ), + stage=stage, + code="missing_placeholder_variable", + hint=( + f"Add '{name}' to extractor.placeholderVariables and (if perTask) provide it in task.extractorVariables." + ), + details=[{"variable": name, "scope": "extractor.placeholderVariables"}], + debug_error=msg, + ) + + if isinstance(exc, TypeError) and "JSONTransformer received None" in str(exc): + return EtlUserFacingError( + message=( + "Transformer did not receive any extracted data to parse. " + "Fix the extractor configuration so it returns a valid JSON payload." + ), + stage=stage, + code="missing_extracted_payload", + hint="Verify the extractor sourceUri and that the request returned a valid JSON payload.", + debug_error=str(exc), + ) + + # hydroserverpy TimestampParser (timezoneMode=daylightSavings) calls ZoneInfo(timezone). + # If timezone is missing, ZoneInfo(None) raises a confusing TypeError. Surface a clear fix. + msg_str = str(exc) + if ( + isinstance(exc, TypeError) + and "expected str, bytes or os.PathLike object, not NoneType" in msg_str + and stage in ("transform", "extract", "setup", None) + ): + return EtlUserFacingError( + message="Task configuration is missing required daylight savings offset.", + stage=stage, + code="missing_daylight_savings_offset", + hint=( + "If your timestamp timezoneMode is 'daylightSavings', you must set the " + "corresponding timestamp.timezone to an IANA time zone like 'America/Denver'." + ), + details=[ + { + "message": "timezone is required when timezoneMode is 'daylightSavings'.", + "path": "transformer.timestamp.timezone", + } + ], + debug_error=msg_str, + ) + + if ( + ("NoneType" in msg_str or "nonetype" in msg_str.lower()) + and "string" in msg_str.lower() + and "assign" in msg_str.lower() + ): + return EtlUserFacingError( + message=( + "A required configuration value is null where a string is expected. " + "Fix the ETL configuration JSON to provide a value for the missing field." + ), + stage=stage, + code="null_value_in_config", + hint="Look for missing/empty fields in the Data Connection settings (and any variable substitution that produced null).", + debug_error=msg_str, + ) + + if isinstance(exc, ValueError): + msg = msg_str + m = _TIMESTAMP_COL_NOT_FOUND_RE.search(msg) + if m: + key = m.group(1) + return EtlUserFacingError( + message=( + f"Timestamp column '{key}' was not found in the extracted data. " + "Fix the transformer timestamp.key (or identifierType/index settings) to match the extracted data." + ), + stage=stage, + code="timestamp_column_missing", + hint=( + "Update the transformer timestamp.key (or identifierType/index settings) to match the extracted data." + ), + debug_error=msg, + ) + if "Source index" in msg and "out of range" in msg: + return EtlUserFacingError( + message=msg, + stage=stage, + code="source_index_out_of_range", + hint="Check the task mappings: a sourceIdentifier index is outside the extracted dataset's column range.", + debug_error=msg, + ) + if "Source column" in msg and "not found in extracted data" in msg: + return EtlUserFacingError( + message=msg, + stage=stage, + code="source_column_missing", + hint="Check the task mappings: a sourceIdentifier does not exist in the extracted dataset columns.", + debug_error=msg, + ) + + return None diff --git a/domains/etl/loader.py b/domains/etl/loader.py index 11e78bc2..cee554d1 100644 --- a/domains/etl/loader.py +++ b/domains/etl/loader.py @@ -1,6 +1,7 @@ from __future__ import annotations from uuid import UUID -from typing import Any, Dict +from dataclasses import dataclass +from typing import Any from hydroserverpy.etl.loaders.base import Loader import logging @@ -16,6 +17,16 @@ observation_service = ObservationService() +@dataclass(frozen=True) +class LoadSummary: + cutoff: str + timestamps_total: int + timestamps_after_cutoff: int + observations_available: int + observations_loaded: int + datastreams_loaded: int + + class HydroServerInternalLoader(Loader): """ A class that extends the HydroServer client with ETL-specific functionalities. @@ -25,7 +36,7 @@ def __init__(self, task): self._begin_cache: dict[str, pd.Timestamp] = {} self.task = task - def load(self, data: pd.DataFrame, task: Task) -> Dict[str, Any]: + def load(self, data: pd.DataFrame, task: Task) -> LoadSummary: """ Load observations from a DataFrame to the HydroServer. """ @@ -37,54 +48,47 @@ def load(self, data: pd.DataFrame, task: Task) -> Dict[str, Any]: if hasattr(begin_date, "isoformat") else str(begin_date) ) - stats: Dict[str, Any] = { - "cutoff": cutoff_value, - "timestamps_total": len(data), - "timestamps_after_cutoff": len(new_data), - "timestamps_filtered_by_cutoff": max(len(data) - len(new_data), 0), - "observations_available": 0, - "observations_loaded": 0, - "observations_skipped": 0, - "datastreams_total": 0, - "datastreams_available": 0, - "datastreams_loaded": 0, - "per_datastream": {}, - } + timestamps_total = len(data) + timestamps_after_cutoff = len(new_data) + observations_available = 0 + observations_loaded = 0 + datastreams_loaded = 0 for col in new_data.columns.difference(["timestamp"]): - stats["datastreams_total"] += 1 df = ( new_data[["timestamp", col]] .rename(columns={col: "value"}) .dropna(subset=["value"]) ) available = len(df) - stats["observations_available"] += available + observations_available += available if available == 0: logging.warning("No new data for %s after filtering; skipping.", col) - stats["per_datastream"][str(col)] = { - "available": 0, - "loaded": 0, - "skipped": 0, - } continue - stats["datastreams_available"] += 1 df = df.rename(columns={"timestamp": "phenomenonTime", "value": "result"}) loaded = 0 # Chunked upload CHUNK_SIZE = 5000 total = len(df) + chunks = (total + CHUNK_SIZE - 1) // CHUNK_SIZE + logging.info( + "Uploading %s observation(s) to datastream %s (%s chunk(s), chunk_size=%s)", + total, + col, + chunks, + CHUNK_SIZE, + ) for start in range(0, total, CHUNK_SIZE): end = min(start + CHUNK_SIZE, total) chunk = df.iloc[start:end] - logging.info( - "Uploading %s rows (%s-%s) to datastream %s", - len(chunk), + logging.debug( + "Uploading chunk to datastream %s: rows %s-%s (%s rows)", + col, start, end - 1, - col, + len(chunk), ) chunk_data = ObservationBulkPostBody( @@ -114,29 +118,38 @@ def load(self, data: pd.DataFrame, task: Task) -> Dict[str, Any]: break raise - stats["observations_loaded"] += loaded - stats["observations_skipped"] += max(available - loaded, 0) if loaded > 0: - stats["datastreams_loaded"] += 1 - stats["per_datastream"][str(col)] = { - "available": available, - "loaded": loaded, - "skipped": max(available - loaded, 0), - } - - return stats + datastreams_loaded += 1 + observations_loaded += loaded + + return LoadSummary( + cutoff=cutoff_value, + timestamps_total=timestamps_total, + timestamps_after_cutoff=timestamps_after_cutoff, + observations_available=observations_available, + observations_loaded=observations_loaded, + datastreams_loaded=datastreams_loaded, + ) @staticmethod def _fetch_earliest_begin(task: Task) -> pd.Timestamp: - logging.info("Querying HydroServer for earliest begin date for payload...") - - return Datastream.objects.filter(id__in={ - path.target_identifier - for mapping in task.mappings.all() - for path in mapping.paths.all() - }).aggregate( - earliest_end=Coalesce(Min("phenomenon_end_time"), Value(datetime(1970, 1, 1))) - )["earliest_end"] + logging.info( + "Checking HydroServer for the most recent data already stored (so we only extract new observations)..." + ) + + return Datastream.objects.filter( + id__in={ + path.target_identifier + for mapping in task.mappings.all() + for path in mapping.paths.all() + } + ).aggregate( + earliest_end=Coalesce( + Min("phenomenon_end_time"), Value(datetime(1970, 1, 1)) + ) + )[ + "earliest_end" + ] def earliest_begin_date(self, task: Task) -> pd.Timestamp: """ diff --git a/domains/etl/tasks.py b/domains/etl/tasks.py index 5d4c45c7..b4203a73 100644 --- a/domains/etl/tasks.py +++ b/domains/etl/tasks.py @@ -2,17 +2,23 @@ from contextlib import contextmanager from dataclasses import dataclass, field from datetime import datetime, timedelta, timezone as dt_timezone +import os from typing import Any, Optional from uuid import UUID import pandas as pd from celery import shared_task -from pydantic import TypeAdapter +from pydantic import TypeAdapter, ValidationError from celery.signals import task_prerun, task_success, task_failure, task_postrun from django.utils import timezone from django.db.utils import IntegrityError from django.core.management import call_command from domains.etl.models import Task, TaskRun -from .loader import HydroServerInternalLoader +from .loader import HydroServerInternalLoader, LoadSummary +from .etl_errors import ( + EtlUserFacingError, + user_facing_error_from_exception, + user_facing_error_from_validation_error, +) from hydroserverpy.etl.factories import extractor_factory, transformer_factory from hydroserverpy.etl.etl_configuration import ExtractorConfig, TransformerConfig, SourceTargetMapping, MappingPath @@ -21,8 +27,60 @@ class TaskRunContext: stage: str = "setup" runtime_source_uri: Optional[str] = None - stats: dict[str, Any] = field(default_factory=dict) log_handler: Optional["TaskLogHandler"] = None + task_meta: dict[str, Any] = field(default_factory=dict) + emitted_runtime_vars_log: bool = False + emitted_task_vars_log: bool = False + + +def _enum_value(value: Any) -> Any: + return getattr(value, "value", value) + + +def _safe_lower(value: Any) -> str: + if value is None: + return "" + v = _enum_value(value) + if isinstance(v, str): + return v.lower() + return str(v).lower() + + +def _validate_daylight_savings_timezone(transformer_cfg: Any, *, stage: str) -> None: + """ + hydroserverpy allows timezoneMode=daylightSavings with timezone unset, which later fails + with a non-obvious TypeError from zoneinfo. Catch and explain it. + """ + ts = getattr(transformer_cfg, "timestamp", None) + if not ts: + return + mode = _safe_lower(getattr(ts, "timezone_mode", None)) + if mode != "daylightsavings": + return + tz = getattr(ts, "timezone", None) + if tz is None or (isinstance(tz, str) and not tz.strip()): + raise EtlUserFacingError( + message=( + "Missing required timezone: transformer.timestamp.timezone " + "(required when transformer.timestamp.timezoneMode is 'daylightSavings')." + ), + stage=stage, + code="missing_daylight_savings_offset", + hint=( + "If transformer.timestamp.timezoneMode is 'daylightSavings', you must set " + "transformer.timestamp.timezone to an IANA time zone like 'America/Denver'. " + "In exported config JSON this is typically at: " + "dataConnection.transformer.settings.timestamp.timezone" + ), + details=[ + { + "path": "transformer.timestamp.timezone", + "message": "Required when transformer.timestamp.timezoneMode is 'daylightSavings'.", + "input": tz, + } + ], + debug_error="transformer.timestamp.timezone is null/empty while timezoneMode=daylightSavings", + ) class TaskLogFilter(logging.Filter): @@ -38,15 +96,26 @@ def __init__(self, context: TaskRunContext): self.lines: list[str] = [] self.entries: list[dict[str, Any]] = [] self._formatter = logging.Formatter() + # hydroserverpy extractor logs (v1.7.0+) are very verbose: it logs one line + # per variable resolution. We collapse that into at most one runtime-vars + # log and one task-vars log for the whole task run. + self._pending_runtime_vars: set[str] = set() + self._pending_task_vars: set[str] = set() + self._collecting_placeholder_vars: bool = False def emit(self, record: logging.LogRecord) -> None: if not self.filter(record): return - timestamp = datetime.fromtimestamp( - record.created, tz=dt_timezone.utc - ).isoformat() message = record.getMessage() + + # Condense hydroserverpy extractor variable resolution logs into one line + # each for runtime vars and task vars. + if self._capture_placeholder_var_log(message, record): + return + self._flush_placeholder_var_summaries_if_needed(message, record) + + timestamp = datetime.fromtimestamp(record.created, tz=dt_timezone.utc).isoformat() line = f"{timestamp} {record.levelname:<8} {message}" if record.exc_info: line = f"{line}\n{self._formatter.formatException(record.exc_info)}" @@ -66,6 +135,92 @@ def emit(self, record: logging.LogRecord) -> None: self._capture_runtime_uri(message) + def _append_synthetic_entry( + self, *, timestamp: str, level: str, message: str, record: logging.LogRecord + ) -> None: + # Mirror the structure of "real" log capture entries. + line = f"{timestamp} {level:<8} {message}" + self.lines.append(line) + self.entries.append( + { + "timestamp": timestamp, + "level": level, + "logger": record.name, + "message": message, + "pathname": record.pathname, + "lineno": record.lineno, + } + ) + + def _capture_placeholder_var_log(self, message: str, record: logging.LogRecord) -> bool: + """ + Returns True if this record should be suppressed from captured logs. + + hydroserverpy.etl.extractors.base logs: + - "Creating runtime variables..." + - "Resolving runtime var: " + - "Resolving task var: " + - "Resolving extractor placeholder variables ( configured)." + - "Resolving per-task var: " + - "Resolved placeholder '' (...) -> '...'" + """ + + # Suppress extractor placeholder variable resolution chatter entirely. + # This includes both the verbose per-variable lines and our older synthetic summaries. + if message.startswith("Resolving extractor placeholder variables"): + return True + if message.startswith("Resolving per-task var:"): + return True + if message.startswith("Resolved placeholder"): + return True + + # If we see a new "creating" marker while already collecting, treat it as a + # boundary and flush pending summaries first. + if message == "Creating runtime variables...": + self._flush_placeholder_var_summaries(record) + self._collecting_placeholder_vars = True + return True + + runtime_prefix = "Resolving runtime var:" + if message.startswith(runtime_prefix): + name = message.split(runtime_prefix, 1)[1].strip() + if not self.context.emitted_runtime_vars_log and name: + self._pending_runtime_vars.add(name) + self._collecting_placeholder_vars = True + return True + + task_prefix = "Resolving task var:" + if message.startswith(task_prefix): + name = message.split(task_prefix, 1)[1].strip() + if not self.context.emitted_task_vars_log and name: + self._pending_task_vars.add(name) + self._collecting_placeholder_vars = True + return True + + return False + + def _flush_placeholder_var_summaries_if_needed( + self, message: str, record: logging.LogRecord + ) -> None: + if not self._collecting_placeholder_vars: + return + + # As soon as we leave the "variable resolution" block, emit summaries. + if ( + message != "Creating runtime variables..." + and not message.startswith("Resolving runtime var:") + and not message.startswith("Resolving task var:") + ): + self._flush_placeholder_var_summaries(record) + + def _flush_placeholder_var_summaries(self, record: logging.LogRecord) -> None: + # Previously we emitted synthetic summary lines like: + # "Runtime variables (1): start_time" + # Those are now removed to keep run logs focused. + self._pending_runtime_vars.clear() + self._pending_task_vars.clear() + self._collecting_placeholder_vars = False + def _capture_runtime_uri(self, message: str) -> None: if self.context.runtime_source_uri: return @@ -123,7 +278,27 @@ def _describe_payload(data: Any) -> dict[str, Any]: "rows": len(data), "columns": len(data.columns), } - return {"type": type(data).__name__} + info: dict[str, Any] = {"type": type(data).__name__} + if isinstance(data, (bytes, bytearray)): + info["bytes"] = len(data) + return info + # BytesIO and similar + try: + buf = getattr(data, "getbuffer", None) + if callable(buf): + info["bytes"] = len(data.getbuffer()) + return info + except Exception: + pass + # Real file handles + try: + fileno = getattr(data, "fileno", None) + if callable(fileno): + info["bytes"] = os.fstat(data.fileno()).st_size + return info + except Exception: + pass + return info def _describe_transformed_data(data: Any) -> dict[str, Any]: @@ -138,44 +313,28 @@ def _describe_transformed_data(data: Any) -> dict[str, Any]: } -def _success_message(stats: dict[str, Any]) -> str: - load_stats = stats.get("load") or {} - loaded = load_stats.get("observations_loaded") - datastreams_loaded = load_stats.get("datastreams_loaded") - available = load_stats.get("observations_available") - timestamps_after_cutoff = load_stats.get("timestamps_after_cutoff") - timestamps_total = load_stats.get("timestamps_total") - - if loaded is not None: - if loaded == 0: - if timestamps_total and timestamps_after_cutoff == 0: - cutoff = load_stats.get("cutoff") - if cutoff: - return ( - "No new observations to load " - f"(all timestamps were at or before {cutoff})." - ) - return "No new observations to load (all timestamps were at or before the cutoff)." - if available == 0: - return "No new observations to load." - return "No new observations were loaded." - - if datastreams_loaded is not None: - return ( - f"Load completed successfully ({loaded} rows across {datastreams_loaded} datastreams)." - ) - return f"Load completed successfully ({loaded} rows loaded)." +def _success_message(load: Optional[LoadSummary]) -> str: + if not load: + return "Load completed successfully." - transform_stats = stats.get("transform") or {} - rows = transform_stats.get("rows") - datastreams = transform_stats.get("datastreams") - if rows is not None and datastreams is not None: + loaded = load.observations_loaded + if loaded == 0: + if load.timestamps_total and load.timestamps_after_cutoff == 0: + if load.cutoff: + return ( + "No new observations to load " + f"(all timestamps were at or before {load.cutoff})." + ) + return "No new observations to load (all timestamps were at or before the cutoff)." + if load.observations_available == 0: + return "No new observations to load." + return "No new observations were loaded." + + if load.datastreams_loaded: return ( - f"Load completed successfully ({rows} rows across {datastreams} datastreams)." + f"Load completed successfully ({loaded} rows across {load.datastreams_loaded} datastreams)." ) - if rows is not None: - return f"Load completed successfully ({rows} rows processed)." - return "Load completed successfully." + return f"Load completed successfully ({loaded} rows loaded)." def _apply_runtime_uri_aliases(result: dict[str, Any], runtime_source_uri: str) -> None: @@ -213,9 +372,6 @@ def _merge_result_with_context( if "log_entries" not in result and context.log_handler.entries: result["log_entries"] = context.log_handler.entries - if context.stats and "stats" not in result: - result["stats"] = context.stats - _apply_log_aliases(result) return result @@ -227,6 +383,10 @@ def _build_task_result( stage: Optional[str] = None, error: Optional[str] = None, traceback: Optional[str] = None, + code: Optional[str] = None, + hint: Optional[str] = None, + details: Optional[list[dict[str, Any]]] = None, + debug_error: Optional[str] = None, ) -> dict[str, Any]: result: dict[str, Any] = {"message": message, "summary": message} if stage: @@ -241,10 +401,24 @@ def _build_task_result( ) if traceback: result["traceback"] = traceback + if code or hint or details or debug_error: + failure: dict[str, Any] = {} + if code: + failure["code"] = code + if hint: + failure["hint"] = hint + if details: + failure["details"] = details + if debug_error: + failure["debug_error"] = debug_error + result["failure"] = failure if context and context.runtime_source_uri: _apply_runtime_uri_aliases(result, context.runtime_source_uri) + if context and context.task_meta and "task" not in result: + result["task"] = context.task_meta + if context and context.log_handler: logs_text = context.log_handler.as_text() if logs_text: @@ -252,12 +426,28 @@ def _build_task_result( if context.log_handler.entries: result["log_entries"] = context.log_handler.entries - if context and context.stats: - result["stats"] = context.stats - _apply_log_aliases(result) return result + +def _last_logged_error(context: Optional[TaskRunContext]) -> Optional[str]: + if not context or not context.log_handler or not context.log_handler.entries: + return None + for entry in reversed(context.log_handler.entries): + if entry.get("level") == "ERROR": + msg = entry.get("message") + if msg: + return msg + return None + + +def _validate_component_config(component: str, adapter: TypeAdapter, raw: dict[str, Any], *, stage: str): + try: + return adapter.validate_python(raw) + except ValidationError as ve: + raise user_facing_error_from_validation_error(component, ve, raw=raw, stage=stage) from ve + + @shared_task(bind=True, expires=10, name="etl.tasks.run_etl_task") def run_etl_task(self, task_id: str): """ @@ -276,14 +466,32 @@ def run_etl_task(self, task_id: str): "mappings", "mappings__paths" ).get(pk=UUID(task_id)) - extractor_cls = extractor_factory(TypeAdapter(ExtractorConfig).validate_python({ + context.task_meta = { + "id": str(task.id), + "name": task.name, + "data_connection_id": str(task.data_connection_id), + "data_connection_name": task.data_connection.name, + } + + context.stage = "setup" + extractor_raw = { "type": task.data_connection.extractor_type, - **task.data_connection.extractor_settings - })) - transformer_cls = transformer_factory(TypeAdapter(TransformerConfig).validate_python({ + **(task.data_connection.extractor_settings or {}), + } + transformer_raw = { "type": task.data_connection.transformer_type, - **task.data_connection.transformer_settings - })) + **(task.data_connection.transformer_settings or {}), + } + + extractor_cfg = _validate_component_config( + "extractor", TypeAdapter(ExtractorConfig), extractor_raw, stage=context.stage + ) + transformer_cfg = _validate_component_config( + "transformer", TypeAdapter(TransformerConfig), transformer_raw, stage=context.stage + ) + + extractor_cls = extractor_factory(extractor_cfg) + transformer_cls = transformer_factory(transformer_cfg) loader_cls = HydroServerInternalLoader(task) task_mappings = [ @@ -305,7 +513,8 @@ def run_etl_task(self, task_id: str): getattr(extractor_cls, "runtime_source_uri", None) or context.runtime_source_uri ) - context.stats["extract"] = _describe_payload(data) + extract_summary = _describe_payload(data) + logging.info("Extractor returned payload: %s", extract_summary) if _is_empty(data): return _build_task_result( "No data returned from the extractor. Nothing to load.", @@ -315,9 +524,25 @@ def run_etl_task(self, task_id: str): context.stage = "transform" logging.info("Starting transform") + _validate_daylight_savings_timezone(transformer_cfg, stage=context.stage) data = transformer_cls.transform(data, task_mappings) - context.stats["transform"] = _describe_transformed_data(data) + transform_summary = _describe_transformed_data(data) + logging.info("Transform result: %s", transform_summary) if _is_empty(data): + # hydroserverpy's CSVTransformer returns None on read errors (but logs ERROR). + # Treat that as a failure to avoid misleading "produced no rows" messaging. + last_err = _last_logged_error(context) + if last_err and last_err.startswith("Error reading CSV data:"): + raise EtlUserFacingError( + message=( + f"{last_err}. Check delimiter/headerRow/dataStartRow/identifierType " + "and confirm the upstream CSV columns match your task mappings." + ), + stage=context.stage, + code="transform_read_error", + hint="Fix the CSV transformer settings (delimiter/headerRow/dataStartRow/identifierType) or the upstream CSV format.", + debug_error=last_err, + ) return _build_task_result( "Transform produced no rows. Nothing to load.", context, @@ -326,18 +551,26 @@ def run_etl_task(self, task_id: str): context.stage = "load" logging.info("Starting load") - load_stats = loader_cls.load(data, task) - if isinstance(load_stats, dict): - context.stats["load"] = load_stats - else: - context.stats["load"] = _describe_transformed_data(data) + load_summary = loader_cls.load(data, task) + logging.info( + "Load result: loaded=%s available=%s cutoff=%s", + getattr(load_summary, "observations_loaded", None), + getattr(load_summary, "observations_available", None), + getattr(load_summary, "cutoff", None), + ) return _build_task_result( - _success_message(context.stats), + _success_message(load_summary), context, stage=context.stage, ) - except Exception: + except Exception as e: + mapped = user_facing_error_from_exception(e, stage=getattr(context, "stage", None)) + if mapped: + logging.error("%s", mapped.message) + if mapped is e: + raise + raise mapped from e logging.exception("ETL task failed during %s", context.stage) raise @@ -435,9 +668,25 @@ def mark_etl_task_failure(sender, task_id, einfo, exception, **extra): return stage = context.stage if context else None - message = ( - f"Failed during {stage}: {exception}" if stage else f"{exception}" - ) + mapped = user_facing_error_from_exception(exception, stage=stage) + if mapped: + message = mapped.message + if stage and message: + prefix = f"failed during {stage.lower()}:" + if not message.lower().startswith(prefix): + message = f"Failed during {stage}: {message}" + code = mapped.code + hint = mapped.hint + details = mapped.details + debug_error = mapped.debug_error + error_str = message + else: + message = f"Failed during {stage}: {exception}" if stage else f"{exception}" + code = None + hint = None + details = None + debug_error = None + error_str = str(exception) task_run.status = "FAILURE" task_run.finished_at = timezone.now() @@ -445,8 +694,12 @@ def mark_etl_task_failure(sender, task_id, einfo, exception, **extra): message, context, stage=stage, - error=str(exception), + error=error_str, traceback=einfo.traceback, + code=code, + hint=hint, + details=details, + debug_error=debug_error, ) task_run.save(update_fields=["status", "finished_at", "result"]) From 7af46060520efd440c1e890498a64b3913f496b2 Mon Sep 17 00:00:00 2001 From: Daniel Slaugh Date: Tue, 10 Feb 2026 12:14:51 -0700 Subject: [PATCH 04/12] Simplify user facing errors --- domains/etl/etl_errors.py | 331 ++++++++++++++++---------------------- domains/etl/tasks.py | 296 ++++++++-------------------------- 2 files changed, 207 insertions(+), 420 deletions(-) diff --git a/domains/etl/etl_errors.py b/domains/etl/etl_errors.py index 4cae7a58..8e7e0f91 100644 --- a/domains/etl/etl_errors.py +++ b/domains/etl/etl_errors.py @@ -1,20 +1,18 @@ from __future__ import annotations +import json import re -from dataclasses import dataclass from typing import Any, Iterable, Optional from pydantic import ValidationError -def _jsonish(value: Any) -> str: - if value is None: - return "null" - if isinstance(value, str): - if value == "": - return '""' - return repr(value) - return repr(value) +class EtlUserFacingError(Exception): + """ + Exception intended to be shown to end users (TaskDetails "run message"). + + Keep this as a single readable string. Avoid structured payloads. + """ _EXTRACTOR_ALIAS_MAP: dict[str, str] = { @@ -48,6 +46,7 @@ def _alias(component: str, field: str) -> str: def _format_loc(component: str, loc: Iterable[Any]) -> str: loc_list = list(loc) + # Strip pydantic union branch model names from the front. if component == "extractor" and loc_list and loc_list[0] in ( "HTTPExtractor", "LocalFileExtractor", @@ -71,44 +70,20 @@ def _format_loc(component: str, loc: Iterable[Any]) -> str: parts.append(_alias(component, item)) continue parts.append(str(item)) + if not parts: return component return ".".join([component] + parts) -@dataclass -class EtlUserFacingError(Exception): - """ - Exception intended to be shown to end users (TaskDetails "run message"). - - Keep message short and actionable; put extra information in `details` and `hint`. - """ - - message: str - stage: Optional[str] = None - code: Optional[str] = None - hint: Optional[str] = None - details: Optional[list[dict[str, Any]]] = None - debug_error: Optional[str] = None - - def __post_init__(self) -> None: - # Keep Exception.args populated for Celery/logging/serialization interoperability. - super().__init__(self.message) - - def __str__(self) -> str: # pragma: no cover - return self.message - - def as_failure_dict(self) -> dict[str, Any]: - out: dict[str, Any] = {} - if self.code: - out["code"] = self.code - if self.hint: - out["hint"] = self.hint - if self.details: - out["details"] = self.details - if self.debug_error: - out["debug_error"] = self.debug_error - return out +def _jsonish(value: Any) -> str: + if value is None: + return "null" + if isinstance(value, str): + if value == "": + return '""' + return repr(value) + return repr(value) def user_facing_error_from_validation_error( @@ -116,13 +91,13 @@ def user_facing_error_from_validation_error( exc: ValidationError, *, raw: Optional[dict[str, Any]] = None, - stage: str = "setup", ) -> EtlUserFacingError: """ - Turn pydantic's ValidationError into a clean message + structured details. + Convert pydantic's ValidationError into one readable, actionable sentence. """ errs = exc.errors(include_url=False) - # pydantic unions often emit errors for all union branches; filter to the selected type when possible. + + # Unions emit errors for every branch. Filter to the selected type when possible. if raw and component in ("extractor", "transformer"): raw_type = raw.get("type") type_to_model = { @@ -134,156 +109,80 @@ def user_facing_error_from_validation_error( errs = [ e for e in errs if not e.get("loc") or e["loc"][0] == selected_model ] or errs - details: list[dict[str, Any]] = [] - for e in errs: - loc = e.get("loc") or () - msg = e.get("msg") or "Invalid value" - e_type = e.get("type") - inp = e.get("input", None) - if inp is None and e_type == "string_type": - msg = "Field must be a string, not null" - elif inp is None and e_type in ("int_type", "float_type", "bool_type"): - msg = f"Field must be a {e_type.split('_', 1)[0]}, not null" - details.append( - { - "path": _format_loc(component, loc), - "message": msg, - "type": e_type, - "input": inp, - } - ) - first = details[0] if details else None - if first: - path = first.get("path") or component - msg = first.get("message") or "Invalid value" - inp = _jsonish(first.get("input")) - suffix = "" - if len(details) > 1: - suffix = f" (+{len(details) - 1} more issue(s))" - fix = "" - if component in ("extractor", "transformer", "loader"): - fix = f" Update the Data Connection {component} settings." - message = f"Invalid {component} configuration at {path}: {msg} (got {inp}).{suffix}{fix}" - else: - message = f"Invalid {component} configuration." - - hint = None - # Provide a consistent, user-facing pointer to where to fix config. - if component in ("extractor", "transformer", "loader"): - hint = ( - f"Check the Data Connection {component} settings and fix the field(s) listed above." - ) - if any(d.get("input") is None for d in details): - hint = ( - hint - + " One of the required fields is null; double-check your JSON config for missing values or placeholders that were not substituted." - ) + if not errs: + return EtlUserFacingError(f"Invalid {component} configuration.") - return EtlUserFacingError( - message=message, - stage=stage, - code=f"invalid_{component}_config", - hint=hint, - details=details or None, - debug_error=str(exc), + first = errs[0] + loc = first.get("loc") or () + msg = first.get("msg") or "Invalid value" + inp = first.get("input", None) + + path = _format_loc(component, loc) + message = ( + f"Invalid {component} configuration at {path}: {msg} (got {_jsonish(inp)}). " + f"Fix: update the Data Connection {component} settings." ) + return EtlUserFacingError(message) _MISSING_PER_TASK_VAR_RE = re.compile(r"Missing per-task variable '([^']+)'") _MISSING_PLACEHOLDER_VAR_RE = re.compile(r"Missing placeholder variable: (.+)$") -_TIMESTAMP_COL_NOT_FOUND_RE = re.compile(r"Timestamp column '([^']*)' not found in data\\.") -_SOURCE_COL_NOT_FOUND_RE = re.compile(r"Source (?:column|index) '([^']+)' not found") +_TIMESTAMP_COL_NOT_FOUND_RE = re.compile(r"Timestamp column '([^']*)' not found in data\.") + +_MISSING_REQUIRED_TASK_VAR_RE = re.compile( + r"Missing required per-task extractor variable '([^']+)'" +) +_MISSING_URI_PLACEHOLDER_RE = re.compile( + r"Extractor source URI contains a placeholder '([^']+)', but it was not provided" +) +_SOURCE_INDEX_OOR_RE = re.compile(r"Source index (\d+) is out of range for extracted data\.") +_SOURCE_COL_NOT_FOUND_RE = re.compile( + r"Source column '([^']+)' not found in extracted data\." +) def user_facing_error_from_exception( exc: Exception, - *, - stage: Optional[str] = None, ) -> Optional[EtlUserFacingError]: """ - Map common hydroserverpy + ETL errors to user-actionable messages. - - Return None when the exception should fall back to default formatting. + Map common ETL/hydroserverpy exceptions to a single readable message. """ if isinstance(exc, EtlUserFacingError): return exc if isinstance(exc, ValidationError): - # Caller should pass component via user_facing_error_from_validation_error. return None if isinstance(exc, KeyError): - if exc.args and isinstance(exc.args[0], str): - msg = exc.args[0] - else: - msg = str(exc) + msg = exc.args[0] if exc.args and isinstance(exc.args[0], str) else str(exc) m = _MISSING_PER_TASK_VAR_RE.search(msg) if m: name = m.group(1) return EtlUserFacingError( - message=( - f"Missing required per-task extractor variable '{name}'. " - f"Add it to the task's extractorVariables." - ), - stage=stage, - code="missing_task_variable", - hint=f"Add '{name}' to the task's extractorVariables so the extractor can build the source URL.", - details=[{"variable": name, "scope": "task.extractorVariables"}], - debug_error=msg, + f"Missing required per-task extractor variable '{name}'. " + f"Fix: add it to task.extractorVariables." ) m = _MISSING_PLACEHOLDER_VAR_RE.search(msg) if m: name = m.group(1).strip() return EtlUserFacingError( - message=( - f"Extractor sourceUri contains placeholder '{name}', but it was not provided. " - f"Define it in extractor.placeholderVariables and provide a value in task.extractorVariables if needed." - ), - stage=stage, - code="missing_placeholder_variable", - hint=( - f"Add '{name}' to extractor.placeholderVariables and (if perTask) provide it in task.extractorVariables." - ), - details=[{"variable": name, "scope": "extractor.placeholderVariables"}], - debug_error=msg, + f"Extractor sourceUri contains placeholder '{name}', but it was not provided. " + f"Fix: define it in extractor.placeholderVariables and provide a value in task.extractorVariables if needed." ) - if isinstance(exc, TypeError) and "JSONTransformer received None" in str(exc): + msg_str = str(exc) + + if isinstance(exc, TypeError) and "JSONTransformer received None" in msg_str: return EtlUserFacingError( - message=( - "Transformer did not receive any extracted data to parse. " - "Fix the extractor configuration so it returns a valid JSON payload." - ), - stage=stage, - code="missing_extracted_payload", - hint="Verify the extractor sourceUri and that the request returned a valid JSON payload.", - debug_error=str(exc), + "Transformer did not receive any extracted data to parse. " + "Fix: update the extractor configuration so it returns a valid JSON payload." ) - # hydroserverpy TimestampParser (timezoneMode=daylightSavings) calls ZoneInfo(timezone). - # If timezone is missing, ZoneInfo(None) raises a confusing TypeError. Surface a clear fix. - msg_str = str(exc) - if ( - isinstance(exc, TypeError) - and "expected str, bytes or os.PathLike object, not NoneType" in msg_str - and stage in ("transform", "extract", "setup", None) - ): + if isinstance(exc, TypeError) and "CSVTransformer received None" in msg_str: return EtlUserFacingError( - message="Task configuration is missing required daylight savings offset.", - stage=stage, - code="missing_daylight_savings_offset", - hint=( - "If your timestamp timezoneMode is 'daylightSavings', you must set the " - "corresponding timestamp.timezone to an IANA time zone like 'America/Denver'." - ), - details=[ - { - "message": "timezone is required when timezoneMode is 'daylightSavings'.", - "path": "transformer.timestamp.timezone", - } - ], - debug_error=msg_str, + "Transformer did not receive any extracted data to parse. " + "Fix: update the extractor configuration so it returns a valid CSV payload." ) if ( @@ -292,48 +191,98 @@ def user_facing_error_from_exception( and "assign" in msg_str.lower() ): return EtlUserFacingError( - message=( - "A required configuration value is null where a string is expected. " - "Fix the ETL configuration JSON to provide a value for the missing field." - ), - stage=stage, - code="null_value_in_config", - hint="Look for missing/empty fields in the Data Connection settings (and any variable substitution that produced null).", - debug_error=msg_str, + "A required configuration value is null where a string is expected. " + "Fix: provide the missing value in your ETL configuration JSON." ) + # django-ninja HttpError (avoid importing ninja here to keep module import-safe) + status = getattr(exc, "status_code", None) + if status is not None and exc.__class__.__name__ == "HttpError": + message = getattr(exc, "message", None) or msg_str + if "Datastream does not exist" in message: + return EtlUserFacingError( + "The target data series (datastream) could not be found. " + "Fix: update task mappings so each targetIdentifier is a valid datastream ID." + ) + if status in (401, 403): + return EtlUserFacingError( + "HydroServer rejected the load due to authorization. " + "Fix: confirm the target datastream(s) belong to this workspace and the job has permission to write." + ) + if status >= 400: + return EtlUserFacingError( + "HydroServer rejected some or all of the data. " + "Fix: verify the transformed timestamps/values are valid and the target datastream mappings are correct." + ) + if isinstance(exc, ValueError): - msg = msg_str - m = _TIMESTAMP_COL_NOT_FOUND_RE.search(msg) + # Extractor placeholder/variable resolution + m = _MISSING_REQUIRED_TASK_VAR_RE.search(msg_str) + if m: + name = m.group(1) + return EtlUserFacingError( + f"Missing required per-task extractor variable '{name}'. " + "Fix: add it to task.extractorVariables." + ) + m = _MISSING_URI_PLACEHOLDER_RE.search(msg_str) + if m: + name = m.group(1) + return EtlUserFacingError( + f"Extractor sourceUri contains placeholder '{name}', but it was not provided. " + "Fix: define it in extractor.placeholderVariables and provide a value in task.extractorVariables if needed." + ) + + # CSV read errors from hydroserverpy (already user-friendly, but add the one place to fix) + if msg_str in ( + "The header row contained unexpected values and could not be processed.", + "One or more data rows contained unexpected values and could not be processed.", + ): + return EtlUserFacingError( + f"{msg_str} Fix: check transformer.delimiter/headerRow/dataStartRow/identifierType " + "and confirm the upstream CSV columns match your task mappings." + ) + + # JSON transformer common configuration errors + if msg_str == "The payload's expected fields were not found.": + return EtlUserFacingError( + "The payload's expected fields were not found. " + "Fix: update transformer.JMESPath and transformer.timestamp.key so the extracted JSON produces the expected fields." + ) + if msg_str == "The timestamp or value key could not be found with the specified query.": + return EtlUserFacingError( + "The timestamp or value key could not be found with the specified query. " + "Fix: update transformer.JMESPath and/or transformer.timestamp.key to match the extracted JSON." + ) + + m = _TIMESTAMP_COL_NOT_FOUND_RE.search(msg_str) if m: key = m.group(1) return EtlUserFacingError( - message=( - f"Timestamp column '{key}' was not found in the extracted data. " - "Fix the transformer timestamp.key (or identifierType/index settings) to match the extracted data." - ), - stage=stage, - code="timestamp_column_missing", - hint=( - "Update the transformer timestamp.key (or identifierType/index settings) to match the extracted data." - ), - debug_error=msg, + f"Timestamp column '{key}' was not found in the extracted data. " + "Fix: update transformer.timestamp.key (or identifierType/index settings) to match the extracted data." ) - if "Source index" in msg and "out of range" in msg: + + m = _SOURCE_INDEX_OOR_RE.search(msg_str) + if m: + idx = m.group(1) return EtlUserFacingError( - message=msg, - stage=stage, - code="source_index_out_of_range", - hint="Check the task mappings: a sourceIdentifier index is outside the extracted dataset's column range.", - debug_error=msg, + f"A mapping source index ({idx}) is out of range for the extracted data. " + "Fix: update task.mappings sourceIdentifier values (or switch identifierType) to match the extracted columns." ) - if "Source column" in msg and "not found in extracted data" in msg: + + m = _SOURCE_COL_NOT_FOUND_RE.search(msg_str) + if m: + col = m.group(1) + return EtlUserFacingError( + f"A mapping source column '{col}' was not found in the extracted data. " + "Fix: update task.mappings sourceIdentifier values to match the extracted columns." + ) + + # JSON decode failures (usually extractor returned HTML/text instead of JSON) + if isinstance(exc, json.JSONDecodeError): return EtlUserFacingError( - message=msg, - stage=stage, - code="source_column_missing", - hint="Check the task mappings: a sourceIdentifier does not exist in the extracted dataset columns.", - debug_error=msg, + "Extractor returned invalid JSON. " + "Fix: verify the extractor sourceUri returns JSON (and adjust transformer.JMESPath if needed)." ) return None diff --git a/domains/etl/tasks.py b/domains/etl/tasks.py index b4203a73..9f856885 100644 --- a/domains/etl/tasks.py +++ b/domains/etl/tasks.py @@ -20,7 +20,12 @@ user_facing_error_from_validation_error, ) from hydroserverpy.etl.factories import extractor_factory, transformer_factory -from hydroserverpy.etl.etl_configuration import ExtractorConfig, TransformerConfig, SourceTargetMapping, MappingPath +from hydroserverpy.etl.etl_configuration import ( + ExtractorConfig, + TransformerConfig, + SourceTargetMapping, + MappingPath, +) @dataclass @@ -29,58 +34,6 @@ class TaskRunContext: runtime_source_uri: Optional[str] = None log_handler: Optional["TaskLogHandler"] = None task_meta: dict[str, Any] = field(default_factory=dict) - emitted_runtime_vars_log: bool = False - emitted_task_vars_log: bool = False - - -def _enum_value(value: Any) -> Any: - return getattr(value, "value", value) - - -def _safe_lower(value: Any) -> str: - if value is None: - return "" - v = _enum_value(value) - if isinstance(v, str): - return v.lower() - return str(v).lower() - - -def _validate_daylight_savings_timezone(transformer_cfg: Any, *, stage: str) -> None: - """ - hydroserverpy allows timezoneMode=daylightSavings with timezone unset, which later fails - with a non-obvious TypeError from zoneinfo. Catch and explain it. - """ - ts = getattr(transformer_cfg, "timestamp", None) - if not ts: - return - mode = _safe_lower(getattr(ts, "timezone_mode", None)) - if mode != "daylightsavings": - return - tz = getattr(ts, "timezone", None) - if tz is None or (isinstance(tz, str) and not tz.strip()): - raise EtlUserFacingError( - message=( - "Missing required timezone: transformer.timestamp.timezone " - "(required when transformer.timestamp.timezoneMode is 'daylightSavings')." - ), - stage=stage, - code="missing_daylight_savings_offset", - hint=( - "If transformer.timestamp.timezoneMode is 'daylightSavings', you must set " - "transformer.timestamp.timezone to an IANA time zone like 'America/Denver'. " - "In exported config JSON this is typically at: " - "dataConnection.transformer.settings.timestamp.timezone" - ), - details=[ - { - "path": "transformer.timestamp.timezone", - "message": "Required when transformer.timestamp.timezoneMode is 'daylightSavings'.", - "input": tz, - } - ], - debug_error="transformer.timestamp.timezone is null/empty while timezoneMode=daylightSavings", - ) class TaskLogFilter(logging.Filter): @@ -96,12 +49,6 @@ def __init__(self, context: TaskRunContext): self.lines: list[str] = [] self.entries: list[dict[str, Any]] = [] self._formatter = logging.Formatter() - # hydroserverpy extractor logs (v1.7.0+) are very verbose: it logs one line - # per variable resolution. We collapse that into at most one runtime-vars - # log and one task-vars log for the whole task run. - self._pending_runtime_vars: set[str] = set() - self._pending_task_vars: set[str] = set() - self._collecting_placeholder_vars: bool = False def emit(self, record: logging.LogRecord) -> None: if not self.filter(record): @@ -109,13 +56,9 @@ def emit(self, record: logging.LogRecord) -> None: message = record.getMessage() - # Condense hydroserverpy extractor variable resolution logs into one line - # each for runtime vars and task vars. - if self._capture_placeholder_var_log(message, record): - return - self._flush_placeholder_var_summaries_if_needed(message, record) - - timestamp = datetime.fromtimestamp(record.created, tz=dt_timezone.utc).isoformat() + timestamp = datetime.fromtimestamp( + record.created, tz=dt_timezone.utc + ).isoformat() line = f"{timestamp} {record.levelname:<8} {message}" if record.exc_info: line = f"{line}\n{self._formatter.formatException(record.exc_info)}" @@ -135,92 +78,6 @@ def emit(self, record: logging.LogRecord) -> None: self._capture_runtime_uri(message) - def _append_synthetic_entry( - self, *, timestamp: str, level: str, message: str, record: logging.LogRecord - ) -> None: - # Mirror the structure of "real" log capture entries. - line = f"{timestamp} {level:<8} {message}" - self.lines.append(line) - self.entries.append( - { - "timestamp": timestamp, - "level": level, - "logger": record.name, - "message": message, - "pathname": record.pathname, - "lineno": record.lineno, - } - ) - - def _capture_placeholder_var_log(self, message: str, record: logging.LogRecord) -> bool: - """ - Returns True if this record should be suppressed from captured logs. - - hydroserverpy.etl.extractors.base logs: - - "Creating runtime variables..." - - "Resolving runtime var: " - - "Resolving task var: " - - "Resolving extractor placeholder variables ( configured)." - - "Resolving per-task var: " - - "Resolved placeholder '' (...) -> '...'" - """ - - # Suppress extractor placeholder variable resolution chatter entirely. - # This includes both the verbose per-variable lines and our older synthetic summaries. - if message.startswith("Resolving extractor placeholder variables"): - return True - if message.startswith("Resolving per-task var:"): - return True - if message.startswith("Resolved placeholder"): - return True - - # If we see a new "creating" marker while already collecting, treat it as a - # boundary and flush pending summaries first. - if message == "Creating runtime variables...": - self._flush_placeholder_var_summaries(record) - self._collecting_placeholder_vars = True - return True - - runtime_prefix = "Resolving runtime var:" - if message.startswith(runtime_prefix): - name = message.split(runtime_prefix, 1)[1].strip() - if not self.context.emitted_runtime_vars_log and name: - self._pending_runtime_vars.add(name) - self._collecting_placeholder_vars = True - return True - - task_prefix = "Resolving task var:" - if message.startswith(task_prefix): - name = message.split(task_prefix, 1)[1].strip() - if not self.context.emitted_task_vars_log and name: - self._pending_task_vars.add(name) - self._collecting_placeholder_vars = True - return True - - return False - - def _flush_placeholder_var_summaries_if_needed( - self, message: str, record: logging.LogRecord - ) -> None: - if not self._collecting_placeholder_vars: - return - - # As soon as we leave the "variable resolution" block, emit summaries. - if ( - message != "Creating runtime variables..." - and not message.startswith("Resolving runtime var:") - and not message.startswith("Resolving task var:") - ): - self._flush_placeholder_var_summaries(record) - - def _flush_placeholder_var_summaries(self, record: logging.LogRecord) -> None: - # Previously we emitted synthetic summary lines like: - # "Runtime variables (1): start_time" - # Those are now removed to keep run logs focused. - self._pending_runtime_vars.clear() - self._pending_task_vars.clear() - self._collecting_placeholder_vars = False - def _capture_runtime_uri(self, message: str) -> None: if self.context.runtime_source_uri: return @@ -322,18 +179,16 @@ def _success_message(load: Optional[LoadSummary]) -> str: if load.timestamps_total and load.timestamps_after_cutoff == 0: if load.cutoff: return ( - "No new observations to load " + "Already up to date - no new observations loaded " f"(all timestamps were at or before {load.cutoff})." ) - return "No new observations to load (all timestamps were at or before the cutoff)." + return "Already up to date - no new observations loaded (all timestamps were at or before the cutoff)." if load.observations_available == 0: - return "No new observations to load." + return "Already up to date - no new observations loaded." return "No new observations were loaded." if load.datastreams_loaded: - return ( - f"Load completed successfully ({loaded} rows across {load.datastreams_loaded} datastreams)." - ) + return f"Load completed successfully ({loaded} rows across {load.datastreams_loaded} datastreams)." return f"Load completed successfully ({loaded} rows loaded)." @@ -381,37 +236,13 @@ def _build_task_result( context: Optional[TaskRunContext] = None, *, stage: Optional[str] = None, - error: Optional[str] = None, traceback: Optional[str] = None, - code: Optional[str] = None, - hint: Optional[str] = None, - details: Optional[list[dict[str, Any]]] = None, - debug_error: Optional[str] = None, ) -> dict[str, Any]: result: dict[str, Any] = {"message": message, "summary": message} if stage: result["stage"] = stage - if error: - result.update( - { - "error": error, - "failure_reason": error, - "failureReason": error, - } - ) if traceback: result["traceback"] = traceback - if code or hint or details or debug_error: - failure: dict[str, Any] = {} - if code: - failure["code"] = code - if hint: - failure["hint"] = hint - if details: - failure["details"] = details - if debug_error: - failure["debug_error"] = debug_error - result["failure"] = failure if context and context.runtime_source_uri: _apply_runtime_uri_aliases(result, context.runtime_source_uri) @@ -441,11 +272,13 @@ def _last_logged_error(context: Optional[TaskRunContext]) -> Optional[str]: return None -def _validate_component_config(component: str, adapter: TypeAdapter, raw: dict[str, Any], *, stage: str): +def _validate_component_config( + component: str, adapter: TypeAdapter, raw: dict[str, Any] +): try: return adapter.validate_python(raw) except ValidationError as ve: - raise user_facing_error_from_validation_error(component, ve, raw=raw, stage=stage) from ve + raise user_facing_error_from_validation_error(component, ve, raw=raw) from ve @shared_task(bind=True, expires=10, name="etl.tasks.run_etl_task") @@ -460,11 +293,11 @@ def run_etl_task(self, task_id: str): with capture_task_logs(context): try: - task = Task.objects.select_related( - "data_connection" - ).prefetch_related( - "mappings", "mappings__paths" - ).get(pk=UUID(task_id)) + task = ( + Task.objects.select_related("data_connection") + .prefetch_related("mappings", "mappings__paths") + .get(pk=UUID(task_id)) + ) context.task_meta = { "id": str(task.id), @@ -483,11 +316,21 @@ def run_etl_task(self, task_id: str): **(task.data_connection.transformer_settings or {}), } + timestamp_cfg = transformer_raw.get("timestamp") or {} + if isinstance(timestamp_cfg, dict): + tz_mode = timestamp_cfg.get("timezoneMode") + tz_value = timestamp_cfg.get("timezone") + if tz_mode == "daylightSavings" and not tz_value: + raise EtlUserFacingError( + "Invalid transformer configuration at transformer.timestamp.timezone: " + "timezone is required when timezoneMode is 'daylightSavings'. " + ) + extractor_cfg = _validate_component_config( - "extractor", TypeAdapter(ExtractorConfig), extractor_raw, stage=context.stage + "extractor", TypeAdapter(ExtractorConfig), extractor_raw ) transformer_cfg = _validate_component_config( - "transformer", TypeAdapter(TransformerConfig), transformer_raw, stage=context.stage + "transformer", TypeAdapter(TransformerConfig), transformer_raw ) extractor_cls = extractor_factory(extractor_cfg) @@ -501,9 +344,11 @@ def run_etl_task(self, task_id: str): MappingPath( target_identifier=task_mapping_path.target_identifier, data_transformations=task_mapping_path.data_transformations, - ) for task_mapping_path in task_mapping.paths.all() - ] - ) for task_mapping in task.mappings.all() + ) + for task_mapping_path in task_mapping.paths.all() + ], + ) + for task_mapping in task.mappings.all() ] context.stage = "extract" @@ -524,24 +369,26 @@ def run_etl_task(self, task_id: str): context.stage = "transform" logging.info("Starting transform") - _validate_daylight_savings_timezone(transformer_cfg, stage=context.stage) data = transformer_cls.transform(data, task_mappings) transform_summary = _describe_transformed_data(data) logging.info("Transform result: %s", transform_summary) + if isinstance(data, pd.DataFrame) and "timestamp" in data.columns: + bad = data["timestamp"].isna().sum() + if bad: + raise EtlUserFacingError( + f"One or more timestamps could not be read with the current settings " + f"({bad} row(s) failed to parse). " + "Update transformer.timestamp.format/timezoneMode/timezone/customFormat " + "and confirm the extracted timestamp values match." + ) if _is_empty(data): # hydroserverpy's CSVTransformer returns None on read errors (but logs ERROR). # Treat that as a failure to avoid misleading "produced no rows" messaging. last_err = _last_logged_error(context) if last_err and last_err.startswith("Error reading CSV data:"): raise EtlUserFacingError( - message=( - f"{last_err}. Check delimiter/headerRow/dataStartRow/identifierType " - "and confirm the upstream CSV columns match your task mappings." - ), - stage=context.stage, - code="transform_read_error", - hint="Fix the CSV transformer settings (delimiter/headerRow/dataStartRow/identifierType) or the upstream CSV format.", - debug_error=last_err, + f"{last_err}. Check delimiter/headerRow/dataStartRow/identifierType " + "and confirm the upstream CSV columns match your task mappings." ) return _build_task_result( "Transform produced no rows. Nothing to load.", @@ -565,9 +412,9 @@ def run_etl_task(self, task_id: str): stage=context.stage, ) except Exception as e: - mapped = user_facing_error_from_exception(e, stage=getattr(context, "stage", None)) + mapped = user_facing_error_from_exception(e) if mapped: - logging.error("%s", mapped.message) + logging.error("%s", str(mapped)) if mapped is e: raise raise mapped from e @@ -601,9 +448,7 @@ def update_next_run(sender, task_id, kwargs, **extra): return try: - task = Task.objects.select_related("periodic_task").get( - pk=kwargs["task_id"] - ) + task = Task.objects.select_related("periodic_task").get(pk=kwargs["task_id"]) except Task.DoesNotExist: return @@ -668,25 +513,23 @@ def mark_etl_task_failure(sender, task_id, einfo, exception, **extra): return stage = context.stage if context else None - mapped = user_facing_error_from_exception(exception, stage=stage) + mapped = user_facing_error_from_exception(exception) if mapped: - message = mapped.message + message = str(mapped) if stage and message: - prefix = f"failed during {stage.lower()}:" - if not message.lower().startswith(prefix): - message = f"Failed during {stage}: {message}" - code = mapped.code - hint = mapped.hint - details = mapped.details - debug_error = mapped.debug_error - error_str = message + if stage.lower() == "setup": + prefix = "setup failed:" + if not message.lower().startswith(prefix): + message = f"Setup failed: {message}" + else: + prefix = f"failed during {stage.lower()}:" + if not message.lower().startswith(prefix): + message = f"Failed during {stage}: {message}" else: - message = f"Failed during {stage}: {exception}" if stage else f"{exception}" - code = None - hint = None - details = None - debug_error = None - error_str = str(exception) + if stage and stage.lower() == "setup": + message = f"Setup failed: {exception}" + else: + message = f"Failed during {stage}: {exception}" if stage else f"{exception}" task_run.status = "FAILURE" task_run.finished_at = timezone.now() @@ -694,12 +537,7 @@ def mark_etl_task_failure(sender, task_id, einfo, exception, **extra): message, context, stage=stage, - error=error_str, traceback=einfo.traceback, - code=code, - hint=hint, - details=details, - debug_error=debug_error, ) task_run.save(update_fields=["status", "finished_at", "result"]) From 4947b49566b31f8c4448b4585d56570c5ed8dce8 Mon Sep 17 00:00:00 2001 From: Daniel Slaugh Date: Tue, 10 Feb 2026 13:55:09 -0700 Subject: [PATCH 05/12] Allow text/plain --- interfaces/sensorthings/schemas/sensor.py | 1 + 1 file changed, 1 insertion(+) diff --git a/interfaces/sensorthings/schemas/sensor.py b/interfaces/sensorthings/schemas/sensor.py index 3f6c5d51..08231633 100644 --- a/interfaces/sensorthings/schemas/sensor.py +++ b/interfaces/sensorthings/schemas/sensor.py @@ -24,6 +24,7 @@ class SensorModel(Schema): "application/pdf", "http://www.opengis.net/doc/IS/SensorML/2.0", "text/html", + "text/plain", "application/json", ] From 843f5cf8c78cc632fcf67ced9c534025e156a90f Mon Sep 17 00:00:00 2001 From: Daniel Slaugh Date: Tue, 10 Feb 2026 13:55:47 -0700 Subject: [PATCH 06/12] Better testing --- .gitignore | 2 ++ domains/etl/etl_errors.py | 1 + 2 files changed, 3 insertions(+) diff --git a/.gitignore b/.gitignore index c78cf983..9ebb1f55 100644 --- a/.gitignore +++ b/.gitignore @@ -20,6 +20,8 @@ staticfiles/ coverage.txt photos/ csrf_test.py +tmp/ +.env # Elastic Beanstalk Files .elasticbeanstalk/* diff --git a/domains/etl/etl_errors.py b/domains/etl/etl_errors.py index 8e7e0f91..24e457c5 100644 --- a/domains/etl/etl_errors.py +++ b/domains/etl/etl_errors.py @@ -234,6 +234,7 @@ def user_facing_error_from_exception( # CSV read errors from hydroserverpy (already user-friendly, but add the one place to fix) if msg_str in ( + "One or more configured CSV columns were not found in the header row.", "The header row contained unexpected values and could not be processed.", "One or more data rows contained unexpected values and could not be processed.", ): From e77939f7bf057a6da14d6ca4fcdcf3774f12dd96 Mon Sep 17 00:00:00 2001 From: Daniel Slaugh Date: Tue, 10 Feb 2026 15:08:32 -0700 Subject: [PATCH 07/12] Update ETL error messages --- domains/etl/etl_errors.py | 198 ++++++++++++++++++++++++++++++-------- domains/etl/loader.py | 4 +- domains/etl/tasks.py | 51 +++++----- 3 files changed, 183 insertions(+), 70 deletions(-) diff --git a/domains/etl/etl_errors.py b/domains/etl/etl_errors.py index 24e457c5..94c8da4b 100644 --- a/domains/etl/etl_errors.py +++ b/domains/etl/etl_errors.py @@ -47,14 +47,24 @@ def _alias(component: str, field: str) -> str: def _format_loc(component: str, loc: Iterable[Any]) -> str: loc_list = list(loc) # Strip pydantic union branch model names from the front. - if component == "extractor" and loc_list and loc_list[0] in ( - "HTTPExtractor", - "LocalFileExtractor", + if ( + component == "extractor" + and loc_list + and loc_list[0] + in ( + "HTTPExtractor", + "LocalFileExtractor", + ) ): loc_list = loc_list[1:] - if component == "transformer" and loc_list and loc_list[0] in ( - "JSONTransformer", - "CSVTransformer", + if ( + component == "transformer" + and loc_list + and loc_list[0] + in ( + "JSONTransformer", + "CSVTransformer", + ) ): loc_list = loc_list[1:] @@ -119,16 +129,38 @@ def user_facing_error_from_validation_error( inp = first.get("input", None) path = _format_loc(component, loc) + if component == "transformer" and isinstance(raw, dict): + ts = raw.get("timestamp") + if isinstance(ts, dict): + tz_mode = ts.get("timezoneMode") or ts.get("timezone_mode") + tz_val = ts.get("timezone") + if ( + path.endswith("transformer.timestamp.timezone") + and str(tz_mode) == "daylightSavings" + ): + if tz_val is None or str(tz_val).strip() == "": + return EtlUserFacingError( + "Timezone information is required when daylight savings mode is enabled. " + "Select a valid timezone such as America/Denver and try again." + ) + if "Invalid timezone" in str(msg): + return EtlUserFacingError( + "The configured timezone is not recognized. " + "Use a valid IANA timezone such as America/Denver and run the job again." + ) + message = ( f"Invalid {component} configuration at {path}: {msg} (got {_jsonish(inp)}). " - f"Fix: update the Data Connection {component} settings." + f"Update the Data Connection {component} settings." ) return EtlUserFacingError(message) _MISSING_PER_TASK_VAR_RE = re.compile(r"Missing per-task variable '([^']+)'") _MISSING_PLACEHOLDER_VAR_RE = re.compile(r"Missing placeholder variable: (.+)$") -_TIMESTAMP_COL_NOT_FOUND_RE = re.compile(r"Timestamp column '([^']*)' not found in data\.") +_TIMESTAMP_COL_NOT_FOUND_RE = re.compile( + r"Timestamp column '([^']*)' not found in data\." +) _MISSING_REQUIRED_TASK_VAR_RE = re.compile( r"Missing required per-task extractor variable '([^']+)'" @@ -136,7 +168,9 @@ def user_facing_error_from_validation_error( _MISSING_URI_PLACEHOLDER_RE = re.compile( r"Extractor source URI contains a placeholder '([^']+)', but it was not provided" ) -_SOURCE_INDEX_OOR_RE = re.compile(r"Source index (\d+) is out of range for extracted data\.") +_SOURCE_INDEX_OOR_RE = re.compile( + r"Source index (\d+) is out of range for extracted data\." +) _SOURCE_COL_NOT_FOUND_RE = re.compile( r"Source column '([^']+)' not found in extracted data\." ) @@ -160,29 +194,29 @@ def user_facing_error_from_exception( if m: name = m.group(1) return EtlUserFacingError( - f"Missing required per-task extractor variable '{name}'. " - f"Fix: add it to task.extractorVariables." + f"A required task variable named '{name}' was not provided. " + "Add a value for it in the task configuration and run the job again." ) m = _MISSING_PLACEHOLDER_VAR_RE.search(msg) if m: name = m.group(1).strip() return EtlUserFacingError( - f"Extractor sourceUri contains placeholder '{name}', but it was not provided. " - f"Fix: define it in extractor.placeholderVariables and provide a value in task.extractorVariables if needed." + f"The extractor URL includes a placeholder '{name}', but no value was supplied. " + "Provide the missing value in the task variables." ) msg_str = str(exc) if isinstance(exc, TypeError) and "JSONTransformer received None" in msg_str: return EtlUserFacingError( - "Transformer did not receive any extracted data to parse. " - "Fix: update the extractor configuration so it returns a valid JSON payload." + "The transformer did not receive any extracted data to parse. " + "Confirm the extractor is returning a valid JSON payload." ) if isinstance(exc, TypeError) and "CSVTransformer received None" in msg_str: return EtlUserFacingError( - "Transformer did not receive any extracted data to parse. " - "Fix: update the extractor configuration so it returns a valid CSV payload." + "The transformer did not receive any extracted data to parse. " + "Confirm the extractor is returning a valid CSV payload." ) if ( @@ -192,7 +226,7 @@ def user_facing_error_from_exception( ): return EtlUserFacingError( "A required configuration value is null where a string is expected. " - "Fix: provide the missing value in your ETL configuration JSON." + "Provide the missing value in your ETL configuration JSON." ) # django-ninja HttpError (avoid importing ninja here to keep module import-safe) @@ -201,18 +235,18 @@ def user_facing_error_from_exception( message = getattr(exc, "message", None) or msg_str if "Datastream does not exist" in message: return EtlUserFacingError( - "The target data series (datastream) could not be found. " - "Fix: update task mappings so each targetIdentifier is a valid datastream ID." + "One or more destination datastream identifiers could not be found. " + "Update the task mappings to use valid datastream IDs." ) if status in (401, 403): return EtlUserFacingError( "HydroServer rejected the load due to authorization. " - "Fix: confirm the target datastream(s) belong to this workspace and the job has permission to write." + "Confirm the target datastream(s) belong to this workspace and the job has permission to write." ) if status >= 400: return EtlUserFacingError( "HydroServer rejected some or all of the data. " - "Fix: verify the transformed timestamps/values are valid and the target datastream mappings are correct." + "Verify the transformed timestamps/values are valid and the target datastream mappings are correct." ) if isinstance(exc, ValueError): @@ -221,15 +255,29 @@ def user_facing_error_from_exception( if m: name = m.group(1) return EtlUserFacingError( - f"Missing required per-task extractor variable '{name}'. " - "Fix: add it to task.extractorVariables." + f"A required task variable named '{name}' was not provided. " + "Add a value for it in the task configuration and run the job again." ) m = _MISSING_URI_PLACEHOLDER_RE.search(msg_str) if m: name = m.group(1) return EtlUserFacingError( - f"Extractor sourceUri contains placeholder '{name}', but it was not provided. " - "Fix: define it in extractor.placeholderVariables and provide a value in task.extractorVariables if needed." + f"The extractor URL includes a placeholder '{name}', but no value was supplied. " + "Provide the missing value in the task variables." + ) + + if "identifierType='index' requires timestamp.key" in msg_str: + return EtlUserFacingError( + "The timestamp column is set incorrectly. Index mode expects a column number such as 1 for the first column. " + "Update the timestamp setting to a valid column index." + ) + + if msg_str.startswith( + "One or more timestamps could not be read with the current settings" + ): + return EtlUserFacingError( + "One or more timestamps could not be read using the current format and timezone settings. " + "Confirm how dates appear in the source file and update the transformer configuration to match." ) # CSV read errors from hydroserverpy (already user-friendly, but add the one place to fix) @@ -239,28 +287,31 @@ def user_facing_error_from_exception( "One or more data rows contained unexpected values and could not be processed.", ): return EtlUserFacingError( - f"{msg_str} Fix: check transformer.delimiter/headerRow/dataStartRow/identifierType " - "and confirm the upstream CSV columns match your task mappings." + "A required column was not found in the file header. " + "The source file may have changed or the header row may be set incorrectly. " + "Confirm the file layout and update the column mappings if needed." ) # JSON transformer common configuration errors if msg_str == "The payload's expected fields were not found.": return EtlUserFacingError( - "The payload's expected fields were not found. " - "Fix: update transformer.JMESPath and transformer.timestamp.key so the extracted JSON produces the expected fields." + "The job could not find the expected timestamp or value fields using the current JSON query. " + "Confirm the JMESPath expression matches the structure returned by the source." ) - if msg_str == "The timestamp or value key could not be found with the specified query.": + if ( + msg_str + == "The timestamp or value key could not be found with the specified query." + ): return EtlUserFacingError( - "The timestamp or value key could not be found with the specified query. " - "Fix: update transformer.JMESPath and/or transformer.timestamp.key to match the extracted JSON." + "The job could not find the timestamp or value using the current JSON query. " + "Confirm the JMESPath expression matches the structure returned by the source." ) m = _TIMESTAMP_COL_NOT_FOUND_RE.search(msg_str) if m: - key = m.group(1) return EtlUserFacingError( - f"Timestamp column '{key}' was not found in the extracted data. " - "Fix: update transformer.timestamp.key (or identifierType/index settings) to match the extracted data." + "The column mapped as the timestamp does not exist in the file. " + "Confirm the source layout and update the mapping." ) m = _SOURCE_INDEX_OOR_RE.search(msg_str) @@ -268,22 +319,87 @@ def user_facing_error_from_exception( idx = m.group(1) return EtlUserFacingError( f"A mapping source index ({idx}) is out of range for the extracted data. " - "Fix: update task.mappings sourceIdentifier values (or switch identifierType) to match the extracted columns." + "Update task.mappings sourceIdentifier values (or switch identifierType) to match the extracted columns." ) m = _SOURCE_COL_NOT_FOUND_RE.search(msg_str) if m: col = m.group(1) return EtlUserFacingError( - f"A mapping source column '{col}' was not found in the extracted data. " - "Fix: update task.mappings sourceIdentifier values to match the extracted columns." + f"A mapped field named '{col}' was not found in the extracted data. " + "Update the task mapping so the source identifier matches the JSON." ) # JSON decode failures (usually extractor returned HTML/text instead of JSON) if isinstance(exc, json.JSONDecodeError): return EtlUserFacingError( - "Extractor returned invalid JSON. " - "Fix: verify the extractor sourceUri returns JSON (and adjust transformer.JMESPath if needed)." + "The source did not return valid JSON. " + "Verify the URL points to a JSON endpoint." + ) + + if msg_str == "Could not connect to the source system.": + return EtlUserFacingError( + "The orchestration system could't connect to the source system. This is often temporary, so try running the job again in a few minutes. " + "If it continues, the source system may be offline." + ) + + if msg_str == "The requested data could not be found on the source system.": + return EtlUserFacingError( + "The requested data could not be found on the source system. " + "Verify the URL is correct and that the file or endpoint still exists." + ) + + if msg_str.startswith("Authentication with the source system failed."): + return EtlUserFacingError( + "Authentication with the source system failed. The username, password, or token may be incorrect or expired. " + "Update the credentials and try again." + ) + + if msg_str in ( + "The connection to the source worked but no data was returned.", + "The connection to the source worked but no data were returned.", + ): + return EtlUserFacingError( + "The connection to the source worked but no observations were returned. " + "Confirm the source has data for the requested time range and that the endpoint is correct." + ) + + # Backward-compatible mappings for older hydroserverpy strings. + if msg_str == "The requested payload was not found on the source system.": + return EtlUserFacingError( + "The requested data could not be found on the source system. " + "Verify the URL is correct and that the file or endpoint still exists." + ) + + if msg_str == "The source system returned no data.": + return EtlUserFacingError( + "The connection to the source worked but no data were returned. " + "Confirm the source has data for the requested time range and that the endpoint is correct." ) + if ( + msg_str + == "Authentication with the source system failed; credentials may be invalid or expired." + ): + return EtlUserFacingError( + "Authentication with the source system failed. The username, password, or token may be incorrect or expired. " + "Update the credentials and try again." + ) + + if "jmespath.exceptions" in msg_str or "Parse error at column" in msg_str: + return EtlUserFacingError( + "The JSON query used to extract timestamps or values is invalid or returned unexpected data. " + "Review and correct the JMESPath expression." + ) + + if msg_str in ( + "The target datastream could not be found.", + "The target data series (datastream) could not be found.", + "The target datastream was not found.", + ): + return EtlUserFacingError( + "One or more destination datastream identifiers could not be found. " + "Update the task mappings to use valid datastream IDs." + ) + return None diff --git a/domains/etl/loader.py b/domains/etl/loader.py index cee554d1..535312c3 100644 --- a/domains/etl/loader.py +++ b/domains/etl/loader.py @@ -63,7 +63,9 @@ def load(self, data: pd.DataFrame, task: Task) -> LoadSummary: available = len(df) observations_available += available if available == 0: - logging.warning("No new data for %s after filtering; skipping.", col) + logging.warning( + "No new observations for %s after filtering; skipping.", col + ) continue df = df.rename(columns={"timestamp": "phenomenonTime", "value": "result"}) diff --git a/domains/etl/tasks.py b/domains/etl/tasks.py index 9f856885..97edc040 100644 --- a/domains/etl/tasks.py +++ b/domains/etl/tasks.py @@ -172,24 +172,22 @@ def _describe_transformed_data(data: Any) -> dict[str, Any]: def _success_message(load: Optional[LoadSummary]) -> str: if not load: - return "Load completed successfully." + return "Load complete." loaded = load.observations_loaded if loaded == 0: if load.timestamps_total and load.timestamps_after_cutoff == 0: - if load.cutoff: - return ( - "Already up to date - no new observations loaded " - f"(all timestamps were at or before {load.cutoff})." - ) - return "Already up to date - no new observations loaded (all timestamps were at or before the cutoff)." - if load.observations_available == 0: - return "Already up to date - no new observations loaded." + # We know the source returned timestamps, but every timestamp was filtered out by the cutoff. + return ( + "Already up to date. No new observations were loaded because all timestamps in the source are older " + "than what is already stored." + ) + # Otherwise, we don't have strong evidence for why nothing loaded beyond "no new observations". return "No new observations were loaded." - if load.datastreams_loaded: - return f"Load completed successfully ({loaded} rows across {load.datastreams_loaded} datastreams)." - return f"Load completed successfully ({loaded} rows loaded)." + ds_count = load.datastreams_loaded or 0 + ds_word = "datastream" if ds_count == 1 else "datastreams" + return f"Load complete. {loaded} rows were added to {ds_count} {ds_word}." def _apply_runtime_uri_aliases(result: dict[str, Any], runtime_source_uri: str) -> None: @@ -322,8 +320,8 @@ def run_etl_task(self, task_id: str): tz_value = timestamp_cfg.get("timezone") if tz_mode == "daylightSavings" and not tz_value: raise EtlUserFacingError( - "Invalid transformer configuration at transformer.timestamp.timezone: " - "timezone is required when timezoneMode is 'daylightSavings'. " + "Timezone information is required when daylight savings mode is enabled. " + "Select a valid timezone such as America/Denver and try again." ) extractor_cfg = _validate_component_config( @@ -361,8 +359,15 @@ def run_etl_task(self, task_id: str): extract_summary = _describe_payload(data) logging.info("Extractor returned payload: %s", extract_summary) if _is_empty(data): + if task.data_connection.extractor_type == "HTTP": + return _build_task_result( + "The connection to the source worked but no observations were returned. " + "Confirm the source has data for the requested time range and that the endpoint is correct.", + context, + stage=context.stage, + ) return _build_task_result( - "No data returned from the extractor. Nothing to load.", + "The extractor returned no data. Nothing to load.", context, stage=context.stage, ) @@ -376,10 +381,8 @@ def run_etl_task(self, task_id: str): bad = data["timestamp"].isna().sum() if bad: raise EtlUserFacingError( - f"One or more timestamps could not be read with the current settings " - f"({bad} row(s) failed to parse). " - "Update transformer.timestamp.format/timezoneMode/timezone/customFormat " - "and confirm the extracted timestamp values match." + "One or more timestamps could not be read using the current format and timezone settings. " + "Confirm how dates appear in the source file and update the transformer configuration to match." ) if _is_empty(data): # hydroserverpy's CSVTransformer returns None on read errors (but logs ERROR). @@ -515,16 +518,8 @@ def mark_etl_task_failure(sender, task_id, einfo, exception, **extra): stage = context.stage if context else None mapped = user_facing_error_from_exception(exception) if mapped: + # User-facing errors are already stage-aware and readable; don't prepend robotic prefixes. message = str(mapped) - if stage and message: - if stage.lower() == "setup": - prefix = "setup failed:" - if not message.lower().startswith(prefix): - message = f"Setup failed: {message}" - else: - prefix = f"failed during {stage.lower()}:" - if not message.lower().startswith(prefix): - message = f"Failed during {stage}: {message}" else: if stage and stage.lower() == "setup": message = f"Setup failed: {exception}" From 087c374a76e9c59679d94c6f71ad0950b8e51267 Mon Sep 17 00:00:00 2001 From: Daniel Slaugh Date: Tue, 10 Feb 2026 15:31:40 -0700 Subject: [PATCH 08/12] Fix typo --- domains/etl/etl_errors.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/domains/etl/etl_errors.py b/domains/etl/etl_errors.py index 94c8da4b..c3aa8c72 100644 --- a/domains/etl/etl_errors.py +++ b/domains/etl/etl_errors.py @@ -339,7 +339,7 @@ def user_facing_error_from_exception( if msg_str == "Could not connect to the source system.": return EtlUserFacingError( - "The orchestration system could't connect to the source system. This is often temporary, so try running the job again in a few minutes. " + "The orchestration system couldn't connect to the source system. This is often temporary, so try running the job again in a few minutes. " "If it continues, the source system may be offline." ) From 2b17b7770186cec2ba9cde02e5dcfe6bfca70b5d Mon Sep 17 00:00:00 2001 From: Daniel Slaugh Date: Tue, 10 Feb 2026 15:54:18 -0700 Subject: [PATCH 09/12] Update error wording --- domains/etl/etl_errors.py | 9 ++++----- domains/etl/tasks.py | 4 ++-- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/domains/etl/etl_errors.py b/domains/etl/etl_errors.py index c3aa8c72..d1875bbd 100644 --- a/domains/etl/etl_errors.py +++ b/domains/etl/etl_errors.py @@ -356,12 +356,11 @@ def user_facing_error_from_exception( ) if msg_str in ( - "The connection to the source worked but no data was returned.", - "The connection to the source worked but no data were returned.", + "The connection to the source worked but no observations were returned.", ): return EtlUserFacingError( "The connection to the source worked but no observations were returned. " - "Confirm the source has data for the requested time range and that the endpoint is correct." + "Confirm the source has observations for the requested time range and that the endpoint is correct." ) # Backward-compatible mappings for older hydroserverpy strings. @@ -373,8 +372,8 @@ def user_facing_error_from_exception( if msg_str == "The source system returned no data.": return EtlUserFacingError( - "The connection to the source worked but no data were returned. " - "Confirm the source has data for the requested time range and that the endpoint is correct." + "The connection to the source worked but no observations were returned. " + "Confirm the source has observations for the requested time range and that the endpoint is correct." ) if ( diff --git a/domains/etl/tasks.py b/domains/etl/tasks.py index 97edc040..c3a81014 100644 --- a/domains/etl/tasks.py +++ b/domains/etl/tasks.py @@ -362,12 +362,12 @@ def run_etl_task(self, task_id: str): if task.data_connection.extractor_type == "HTTP": return _build_task_result( "The connection to the source worked but no observations were returned. " - "Confirm the source has data for the requested time range and that the endpoint is correct.", + "Confirm the source has observations for the requested time range and that the endpoint is correct.", context, stage=context.stage, ) return _build_task_result( - "The extractor returned no data. Nothing to load.", + "The extractor returned no observations. Nothing to load.", context, stage=context.stage, ) From 5d1a81cc6c7b1b0938010015fc619f65e7ff918c Mon Sep 17 00:00:00 2001 From: Daniel Slaugh Date: Wed, 11 Feb 2026 09:22:42 -0700 Subject: [PATCH 10/12] Update message wording from team review --- domains/etl/etl_errors.py | 117 ++++++++++++++++++++++++++++++++------ domains/etl/tasks.py | 55 ++++++++++++++---- 2 files changed, 145 insertions(+), 27 deletions(-) diff --git a/domains/etl/etl_errors.py b/domains/etl/etl_errors.py index d1875bbd..26c36963 100644 --- a/domains/etl/etl_errors.py +++ b/domains/etl/etl_errors.py @@ -1,5 +1,6 @@ from __future__ import annotations +import ast import json import re from typing import Any, Iterable, Optional @@ -174,10 +175,59 @@ def user_facing_error_from_validation_error( _SOURCE_COL_NOT_FOUND_RE = re.compile( r"Source column '([^']+)' not found in extracted data\." ) +_USECOLS_NOT_FOUND_RE = re.compile( + r"columns expected but not found:\s*(\[[^\]]*\])", + re.IGNORECASE, +) + + +def _iter_exception_chain(exc: Exception) -> Iterable[Exception]: + seen: set[int] = set() + current: Optional[Exception] = exc + while current is not None and id(current) not in seen: + seen.add(id(current)) + yield current + next_exc = current.__cause__ or current.__context__ + current = next_exc if isinstance(next_exc, Exception) else None + + +def _extract_missing_usecols(exc: Exception) -> list[str]: + for err in _iter_exception_chain(exc): + msg = str(err) + match = _USECOLS_NOT_FOUND_RE.search(msg) + if not match: + continue + + raw_list = match.group(1) + try: + parsed = ast.literal_eval(raw_list) + if isinstance(parsed, (list, tuple, set)): + cols = [str(c).strip() for c in parsed if str(c).strip()] + if cols: + return cols + except Exception: + pass + + inner = raw_list.strip()[1:-1] + if inner: + cols = [part.strip().strip("'\"") for part in inner.split(",")] + cols = [c for c in cols if c] + if cols: + return cols + return [] + + +def _format_cols(cols: list[str], max_cols: int = 4) -> str: + shown = [f"'{c}'" for c in cols[:max_cols]] + if len(cols) > max_cols: + shown.append(f"+{len(cols) - max_cols} more") + return ", ".join(shown) def user_facing_error_from_exception( exc: Exception, + *, + transformer_raw: Optional[dict[str, Any]] = None, ) -> Optional[EtlUserFacingError]: """ Map common ETL/hydroserverpy exceptions to a single readable message. @@ -268,7 +318,7 @@ def user_facing_error_from_exception( if "identifierType='index' requires timestamp.key" in msg_str: return EtlUserFacingError( - "The timestamp column is set incorrectly. Index mode expects a column number such as 1 for the first column. " + "The timestamp column is set incorrectly. Index mode expects a 1-based column number (1 for the first column). " "Update the timestamp setting to a valid column index." ) @@ -280,11 +330,43 @@ def user_facing_error_from_exception( "Confirm how dates appear in the source file and update the transformer configuration to match." ) - # CSV read errors from hydroserverpy (already user-friendly, but add the one place to fix) - if msg_str in ( - "One or more configured CSV columns were not found in the header row.", - "The header row contained unexpected values and could not be processed.", - "One or more data rows contained unexpected values and could not be processed.", + if ( + msg_str + == "One or more configured CSV columns were not found in the header row." + ): + missing_cols = _extract_missing_usecols(exc) + if len(missing_cols) > 1: + return EtlUserFacingError( + f"Configured CSV columns were not found in the file header ({_format_cols(missing_cols)}). " + "This often means the delimiter or headerRow setting is incorrect. " + "Verify the delimiter and headerRow settings, then run the job again." + ) + if len(missing_cols) == 1 and isinstance(transformer_raw, dict): + ts_cfg = transformer_raw.get("timestamp") + ts_key = ts_cfg.get("key") if isinstance(ts_cfg, dict) else None + if ts_key is not None and str(missing_cols[0]) == str(ts_key): + col = missing_cols[0] + return EtlUserFacingError( + f"The configured timestamp column '{col}' was not found in the file header. " + "Confirm the timestamp mapping and verify the delimiter/headerRow settings match the source file." + ) + return EtlUserFacingError( + "A required column was not found in the file header. " + "The source file may have changed or the header row may be set incorrectly. " + "Confirm the file layout and update the column mappings if needed." + ) + if ( + msg_str + == "The header row contained unexpected values and could not be processed." + ): + return EtlUserFacingError( + "A required column was not found in the file header. " + "The source file may have changed or the header row may be set incorrectly. " + "Confirm the file layout and update the column mappings if needed." + ) + if ( + msg_str + == "One or more data rows contained unexpected values and could not be processed." ): return EtlUserFacingError( "A required column was not found in the file header. " @@ -295,7 +377,7 @@ def user_facing_error_from_exception( # JSON transformer common configuration errors if msg_str == "The payload's expected fields were not found.": return EtlUserFacingError( - "The job could not find the expected timestamp or value fields using the current JSON query. " + "Failed to find the timestamp or value using the current JSON query. " "Confirm the JMESPath expression matches the structure returned by the source." ) if ( @@ -303,15 +385,16 @@ def user_facing_error_from_exception( == "The timestamp or value key could not be found with the specified query." ): return EtlUserFacingError( - "The job could not find the timestamp or value using the current JSON query. " + "Failed to find the timestamp or value using the current JSON query. " "Confirm the JMESPath expression matches the structure returned by the source." ) m = _TIMESTAMP_COL_NOT_FOUND_RE.search(msg_str) if m: + col = m.group(1) return EtlUserFacingError( - "The column mapped as the timestamp does not exist in the file. " - "Confirm the source layout and update the mapping." + f"The configured timestamp column '{col}' was not found in the file header. " + "Confirm the timestamp mapping and verify the delimiter/headerRow settings match the source file." ) m = _SOURCE_INDEX_OOR_RE.search(msg_str) @@ -339,8 +422,8 @@ def user_facing_error_from_exception( if msg_str == "Could not connect to the source system.": return EtlUserFacingError( - "The orchestration system couldn't connect to the source system. This is often temporary, so try running the job again in a few minutes. " - "If it continues, the source system may be offline." + "Failed to connect to the source system. This may be temporary; try again shortly. " + "If it persists, the source system may be offline." ) if msg_str == "The requested data could not be found on the source system.": @@ -359,8 +442,8 @@ def user_facing_error_from_exception( "The connection to the source worked but no observations were returned.", ): return EtlUserFacingError( - "The connection to the source worked but no observations were returned. " - "Confirm the source has observations for the requested time range and that the endpoint is correct." + "No observations were returned from the source system. " + "Confirm the configured source system has observations available for the requested time range." ) # Backward-compatible mappings for older hydroserverpy strings. @@ -372,8 +455,8 @@ def user_facing_error_from_exception( if msg_str == "The source system returned no data.": return EtlUserFacingError( - "The connection to the source worked but no observations were returned. " - "Confirm the source has observations for the requested time range and that the endpoint is correct." + "No observations were returned from the source system. " + "Confirm the configured source system has observations available for the requested time range." ) if ( @@ -397,7 +480,7 @@ def user_facing_error_from_exception( "The target datastream was not found.", ): return EtlUserFacingError( - "One or more destination datastream identifiers could not be found. " + "One or more destination datastream identifiers could not be found in HydroServer. " "Update the task mappings to use valid datastream IDs." ) diff --git a/domains/etl/tasks.py b/domains/etl/tasks.py index c3a81014..8406aeb6 100644 --- a/domains/etl/tasks.py +++ b/domains/etl/tasks.py @@ -177,17 +177,14 @@ def _success_message(load: Optional[LoadSummary]) -> str: loaded = load.observations_loaded if loaded == 0: if load.timestamps_total and load.timestamps_after_cutoff == 0: - # We know the source returned timestamps, but every timestamp was filtered out by the cutoff. - return ( - "Already up to date. No new observations were loaded because all timestamps in the source are older " - "than what is already stored." - ) + return "Already up to date. No new observations were loaded." # Otherwise, we don't have strong evidence for why nothing loaded beyond "no new observations". return "No new observations were loaded." ds_count = load.datastreams_loaded or 0 + preposition = "into" if ds_count == 1 else "across" ds_word = "datastream" if ds_count == 1 else "datastreams" - return f"Load complete. {loaded} rows were added to {ds_count} {ds_word}." + return f"Loaded {loaded} total observations {preposition} {ds_count} {ds_word}." def _apply_runtime_uri_aliases(result: dict[str, Any], runtime_source_uri: str) -> None: @@ -270,6 +267,39 @@ def _last_logged_error(context: Optional[TaskRunContext]) -> Optional[str]: return None +def _mapped_csv_error_from_log(last_err: str) -> Optional[str]: + prefix = "Error reading CSV data:" + if not last_err.startswith(prefix): + return None + + detail = last_err[len(prefix) :].strip() + if detail == "One or more configured CSV columns were not found in the header row.": + return ( + "Configured CSV columns were not found in the file header. " + "This often means the delimiter or headerRow setting is incorrect. " + "Verify the delimiter and headerRow settings, then run the job again." + ) + if ( + detail + == "The header row contained unexpected values and could not be processed." + ): + return ( + "A required column was not found in the file header. " + "The source file may have changed or the header row may be set incorrectly. " + "Confirm the file layout and update the column mappings if needed." + ) + if ( + detail + == "One or more data rows contained unexpected values and could not be processed." + ): + return ( + "A required column was not found in the file header. " + "The source file may have changed or the header row may be set incorrectly. " + "Confirm the file layout and update the column mappings if needed." + ) + return None + + def _validate_component_config( component: str, adapter: TypeAdapter, raw: dict[str, Any] ): @@ -361,8 +391,8 @@ def run_etl_task(self, task_id: str): if _is_empty(data): if task.data_connection.extractor_type == "HTTP": return _build_task_result( - "The connection to the source worked but no observations were returned. " - "Confirm the source has observations for the requested time range and that the endpoint is correct.", + "No observations were returned from the source system. " + "Confirm the configured source system has observations available for the requested time range.", context, stage=context.stage, ) @@ -389,8 +419,11 @@ def run_etl_task(self, task_id: str): # Treat that as a failure to avoid misleading "produced no rows" messaging. last_err = _last_logged_error(context) if last_err and last_err.startswith("Error reading CSV data:"): + mapped_csv_error = _mapped_csv_error_from_log(last_err) + if mapped_csv_error: + raise EtlUserFacingError(mapped_csv_error) raise EtlUserFacingError( - f"{last_err}. Check delimiter/headerRow/dataStartRow/identifierType " + f"{last_err}. Check delimiter/headerRow/dataStartRow/identifierType settings " "and confirm the upstream CSV columns match your task mappings." ) return _build_task_result( @@ -415,7 +448,9 @@ def run_etl_task(self, task_id: str): stage=context.stage, ) except Exception as e: - mapped = user_facing_error_from_exception(e) + mapped = user_facing_error_from_exception( + e, transformer_raw=locals().get("transformer_raw") + ) if mapped: logging.error("%s", str(mapped)) if mapped is e: From feb382f762de30f734274a5008824183c98addd0 Mon Sep 17 00:00:00 2001 From: Daniel Slaugh Date: Wed, 11 Feb 2026 10:06:57 -0700 Subject: [PATCH 11/12] Allow ETL messages to work for external orchestration systems --- domains/etl/etl_errors.py | 2 +- domains/etl/models/run.py | 10 +++ domains/etl/run_result_normalizer.py | 125 +++++++++++++++++++++++++++ domains/etl/services/run.py | 18 +++- domains/etl/tasks.py | 23 +++-- 5 files changed, 171 insertions(+), 7 deletions(-) create mode 100644 domains/etl/run_result_normalizer.py diff --git a/domains/etl/etl_errors.py b/domains/etl/etl_errors.py index 26c36963..c24a4ff8 100644 --- a/domains/etl/etl_errors.py +++ b/domains/etl/etl_errors.py @@ -285,7 +285,7 @@ def user_facing_error_from_exception( message = getattr(exc, "message", None) or msg_str if "Datastream does not exist" in message: return EtlUserFacingError( - "One or more destination datastream identifiers could not be found. " + "One or more destination datastream identifiers could not be found in HydroServer. " "Update the task mappings to use valid datastream IDs." ) if status in (401, 403): diff --git a/domains/etl/models/run.py b/domains/etl/models/run.py index 8928fad2..3136d10d 100644 --- a/domains/etl/models/run.py +++ b/domains/etl/models/run.py @@ -1,6 +1,7 @@ import uuid6 from django.db import models from .task import Task +from domains.etl.run_result_normalizer import normalize_task_run_result, task_transformer_raw class TaskRun(models.Model): @@ -10,3 +11,12 @@ class TaskRun(models.Model): started_at = models.DateTimeField(auto_now_add=True) finished_at = models.DateTimeField(null=True, blank=True) result = models.JSONField(blank=True, null=True) + + def save(self, *args, **kwargs): + transformer_raw = task_transformer_raw(self.task) if self.task_id else None + self.result = normalize_task_run_result( + status=self.status, + result=self.result, + transformer_raw=transformer_raw, + ) + super().save(*args, **kwargs) diff --git a/domains/etl/run_result_normalizer.py b/domains/etl/run_result_normalizer.py new file mode 100644 index 00000000..f9f525af --- /dev/null +++ b/domains/etl/run_result_normalizer.py @@ -0,0 +1,125 @@ +from __future__ import annotations + +import re +from typing import Any, Optional + +from .etl_errors import user_facing_error_from_exception + + +_SUCCESS_LOAD_RE = re.compile( + r"^Load complete\.\s*(\d+)\s+rows were added to\s+(\d+)\s+datastreams?\.$" +) +_SUCCESS_LOADED_RE = re.compile( + r"^Loaded\s+(\d+)\s+total observations\s+(?:into|across)\s+(\d+)\s+datastream(?:s|\(s\))\.$" +) +_FAILURE_STAGE_PREFIX_RE = re.compile( + r"^(?:Setup failed|Failed during [^:]+):\s*", + re.IGNORECASE, +) + + +def task_transformer_raw(task: Any) -> Optional[dict[str, Any]]: + if task is None: + return None + + data_connection = getattr(task, "data_connection", None) + if data_connection is None: + return None + + raw_settings = getattr(data_connection, "transformer_settings", None) or {} + if not isinstance(raw_settings, dict): + return None + + raw: dict[str, Any] = dict(raw_settings) + transformer_type = getattr(data_connection, "transformer_type", None) + if transformer_type and "type" not in raw: + raw["type"] = transformer_type + return raw + + +def _format_loaded_success_message(loaded: int, ds_count: int) -> str: + preposition = "into" if ds_count == 1 else "across" + ds_word = "datastream" if ds_count == 1 else "datastreams" + return f"Loaded {loaded} total observations {preposition} {ds_count} {ds_word}." + + +def _extract_message(result: dict[str, Any]) -> Optional[str]: + for key in ("message", "summary", "error", "detail"): + val = result.get(key) + if isinstance(val, str) and val.strip(): + return val.strip() + return None + + +def _normalize_success_message(message: str) -> str: + m = _SUCCESS_LOAD_RE.match(message) + if m: + return _format_loaded_success_message(int(m.group(1)), int(m.group(2))) + + m = _SUCCESS_LOADED_RE.match(message) + if m: + return _format_loaded_success_message(int(m.group(1)), int(m.group(2))) + + if ( + message + == "Already up to date. No new observations were loaded because all timestamps in the source are older than what is already stored." + ): + return "Already up to date. No new observations were loaded." + + return message + + +def _normalize_failure_message( + message: str, + *, + transformer_raw: Optional[dict[str, Any]] = None, +) -> str: + candidate = _FAILURE_STAGE_PREFIX_RE.sub("", message).strip() + + if candidate.startswith("Error reading CSV data:"): + candidate = candidate.split("Error reading CSV data:", 1)[1].strip() + + mapped = user_facing_error_from_exception( + ValueError(candidate), + transformer_raw=transformer_raw, + ) + if mapped: + return str(mapped) + return candidate or message + + +def normalize_task_run_result( + *, + status: str, + result: Any, + transformer_raw: Optional[dict[str, Any]] = None, +) -> Any: + if result is None: + return None + + normalized: dict[str, Any] + if isinstance(result, dict): + normalized = dict(result) + else: + normalized = {"message": str(result)} + + message = _extract_message(normalized) + if not message: + return normalized + + if status == "SUCCESS": + normalized_message = _normalize_success_message(message) + elif status == "FAILURE": + normalized_message = _normalize_failure_message( + message, + transformer_raw=transformer_raw, + ) + else: + normalized_message = message + + normalized["message"] = normalized_message + summary = normalized.get("summary") + if not isinstance(summary, str) or not summary.strip() or summary.strip() == message: + normalized["summary"] = normalized_message + + return normalized diff --git a/domains/etl/services/run.py b/domains/etl/services/run.py index 2336adfc..688e2a0a 100644 --- a/domains/etl/services/run.py +++ b/domains/etl/services/run.py @@ -5,6 +5,10 @@ from django.contrib.auth import get_user_model from domains.iam.models import APIKey from domains.etl.models import TaskRun +from domains.etl.run_result_normalizer import ( + normalize_task_run_result, + task_transformer_raw, +) from interfaces.api.schemas import TaskRunFields, TaskRunPostBody, TaskRunPatchBody, TaskRunOrderByFields from interfaces.api.service import ServiceUtils from .task import TaskService @@ -79,9 +83,16 @@ def create( principal=principal, uid=task_id, action="edit", expand_related=True ) + task_run_data = data.dict(include=set(TaskRunFields.model_fields.keys())) + task_run_data["result"] = normalize_task_run_result( + status=task_run_data["status"], + result=task_run_data.get("result"), + transformer_raw=task_transformer_raw(task), + ) + task_run = TaskRun.objects.create( task=task, - **data.dict(include=set(TaskRunFields.model_fields.keys())), + **task_run_data, ) return self.get( @@ -114,6 +125,11 @@ def update( for field, value in task_run_data.items(): setattr(task_run, field, value) + task_run.result = normalize_task_run_result( + status=task_run.status, + result=task_run.result, + transformer_raw=task_transformer_raw(task), + ) task_run.save() return self.get( diff --git a/domains/etl/tasks.py b/domains/etl/tasks.py index 8406aeb6..cddbc466 100644 --- a/domains/etl/tasks.py +++ b/domains/etl/tasks.py @@ -19,6 +19,7 @@ user_facing_error_from_exception, user_facing_error_from_validation_error, ) +from .run_result_normalizer import normalize_task_run_result, task_transformer_raw from hydroserverpy.etl.factories import extractor_factory, transformer_factory from hydroserverpy.etl.etl_configuration import ( ExtractorConfig, @@ -527,6 +528,13 @@ def mark_etl_task_success(sender, result, **extra): if context and context.stage and "stage" not in result: result["stage"] = context.stage + transformer_raw = task_transformer_raw(task_run.task) + result = normalize_task_run_result( + status="SUCCESS", + result=result, + transformer_raw=transformer_raw, + ) + task_run.status = "SUCCESS" task_run.finished_at = timezone.now() task_run.result = result @@ -563,11 +571,16 @@ def mark_etl_task_failure(sender, task_id, einfo, exception, **extra): task_run.status = "FAILURE" task_run.finished_at = timezone.now() - task_run.result = _build_task_result( - message, - context, - stage=stage, - traceback=einfo.traceback, + transformer_raw = task_transformer_raw(task_run.task) + task_run.result = normalize_task_run_result( + status="FAILURE", + result=_build_task_result( + message, + context, + stage=stage, + traceback=einfo.traceback, + ), + transformer_raw=transformer_raw, ) task_run.save(update_fields=["status", "finished_at", "result"]) From aefc1fb6bf132fd0b055be282a17a73a41319cbe Mon Sep 17 00:00:00 2001 From: Ken Lippold Date: Wed, 11 Feb 2026 09:05:41 -0800 Subject: [PATCH 12/12] Fixed default data fixture paths. --- domains/etl/admin.py | 2 +- domains/iam/admin.py | 6 +++--- domains/sta/admin.py | 26 +++++++++++++------------- 3 files changed, 17 insertions(+), 17 deletions(-) diff --git a/domains/etl/admin.py b/domains/etl/admin.py index 25d90070..9a4ab67c 100644 --- a/domains/etl/admin.py +++ b/domains/etl/admin.py @@ -23,7 +23,7 @@ def load_default_data(self, request): return self.load_fixtures( request, "admin:etl_orchestrationsystem_changelist", - ["etl/fixtures/default_orchestration_systems.yaml"], + ["domains/etl/fixtures/default_orchestration_systems.yaml"], ) diff --git a/domains/iam/admin.py b/domains/iam/admin.py index 158f6c0e..8d1bc05c 100644 --- a/domains/iam/admin.py +++ b/domains/iam/admin.py @@ -46,7 +46,7 @@ def load_default_data(self, request): return self.load_fixtures( request, "admin:iam_usertype_changelist", - ["iam/fixtures/default_user_types.yaml"], + ["domains/iam/fixtures/default_user_types.yaml"], ) @@ -69,7 +69,7 @@ def load_default_data(self, request): return self.load_fixtures( request, "admin:iam_organizationtype_changelist", - ["iam/fixtures/default_organization_types.yaml"], + ["domains/iam/fixtures/default_organization_types.yaml"], ) @@ -98,7 +98,7 @@ def get_urls(self): def load_default_data(self, request): return self.load_fixtures( - request, "admin:iam_role_changelist", ["iam/fixtures/default_roles.yaml"] + request, "admin:iam_role_changelist", ["domains/iam/fixtures/default_roles.yaml"] ) def delete_queryset(self, request, queryset): diff --git a/domains/sta/admin.py b/domains/sta/admin.py index 1861c78a..96b28d82 100644 --- a/domains/sta/admin.py +++ b/domains/sta/admin.py @@ -74,7 +74,7 @@ def load_default_data(self, request): return self.load_fixtures( request, "admin:sta_observedproperty_changelist", - ["sta/fixtures/default_observed_properties.yaml"], + ["domains/sta/fixtures/default_observed_properties.yaml"], ) @@ -95,7 +95,7 @@ def get_urls(self): def load_default_data(self, request): return self.load_fixtures( - request, "admin:sta_unit_changelist", ["sta/fixtures/default_units.yaml"] + request, "admin:sta_unit_changelist", ["domains/sta/fixtures/default_units.yaml"] ) @@ -118,7 +118,7 @@ def load_default_data(self, request): return self.load_fixtures( request, "admin:sta_processinglevel_changelist", - ["sta/fixtures/default_processing_levels.yaml"], + ["domains/sta/fixtures/default_processing_levels.yaml"], ) @@ -202,7 +202,7 @@ def load_default_data(self, request): return self.load_fixtures( request, "admin:sta_sitetype_changelist", - ["sta/fixtures/default_site_types.yaml"], + ["domains/sta/fixtures/default_site_types.yaml"], ) @@ -225,7 +225,7 @@ def load_default_data(self, request): return self.load_fixtures( request, "admin:sta_samplingfeaturetype_changelist", - ["sta/fixtures/default_sampling_feature_types.yaml"], + ["domains/sta/fixtures/default_sampling_feature_types.yaml"], ) @@ -248,7 +248,7 @@ def load_default_data(self, request): return self.load_fixtures( request, "admin:sta_methodtype_changelist", - ["sta/fixtures/default_method_types.yaml"], + ["domains/sta/fixtures/default_method_types.yaml"], ) @@ -271,7 +271,7 @@ def load_default_data(self, request): return self.load_fixtures( request, "admin:sta_sensorencodingtype_changelist", - ["sta/fixtures/default_sensor_encoding_types.yaml"], + ["domains/sta/fixtures/default_sensor_encoding_types.yaml"], ) @@ -294,7 +294,7 @@ def load_default_data(self, request): return self.load_fixtures( request, "admin:sta_variabletype_changelist", - ["sta/fixtures/default_variable_types.yaml"], + ["domains/sta/fixtures/default_variable_types.yaml"], ) @@ -317,7 +317,7 @@ def load_default_data(self, request): return self.load_fixtures( request, "admin:sta_unittype_changelist", - ["sta/fixtures/default_unit_types.yaml"], + ["domains/sta/fixtures/default_unit_types.yaml"], ) @@ -340,7 +340,7 @@ def load_default_data(self, request): return self.load_fixtures( request, "admin:sta_datastreamaggregation_changelist", - ["sta/fixtures/default_datastream_aggregations.yaml"], + ["domains/sta/fixtures/default_datastream_aggregations.yaml"], ) @@ -363,7 +363,7 @@ def load_default_data(self, request): return self.load_fixtures( request, "admin:sta_datastreamstatus_changelist", - ["sta/fixtures/default_datastream_statuses.yaml"], + ["domains/sta/fixtures/default_datastream_statuses.yaml"], ) @@ -386,7 +386,7 @@ def load_default_data(self, request): return self.load_fixtures( request, "admin:sta_sampledmedium_changelist", - ["sta/fixtures/default_sampled_mediums.yaml"], + ["domains/sta/fixtures/default_sampled_mediums.yaml"], ) @@ -409,7 +409,7 @@ def load_default_data(self, request): return self.load_fixtures( request, "admin:sta_fileattachmenttype_changelist", - ["sta/fixtures/default_file_attachment_types.yaml"], + ["domains/sta/fixtures/default_file_attachment_types.yaml"], )