diff --git a/.gitignore b/.gitignore index e56523b9..9ebb1f55 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,7 @@ .env /frontend/.env +.venv/ +.venv*/ .pytest_cache __pycache__/ db.sqlite3 @@ -18,6 +20,8 @@ staticfiles/ coverage.txt photos/ csrf_test.py +tmp/ +.env # Elastic Beanstalk Files .elasticbeanstalk/* diff --git a/domains/etl/etl_errors.py b/domains/etl/etl_errors.py new file mode 100644 index 00000000..c24a4ff8 --- /dev/null +++ b/domains/etl/etl_errors.py @@ -0,0 +1,487 @@ +from __future__ import annotations + +import ast +import json +import re +from typing import Any, Iterable, Optional + +from pydantic import ValidationError + + +class EtlUserFacingError(Exception): + """ + Exception intended to be shown to end users (TaskDetails "run message"). + + Keep this as a single readable string. Avoid structured payloads. + """ + + +_EXTRACTOR_ALIAS_MAP: dict[str, str] = { + "source_uri": "sourceUri", + "placeholder_variables": "placeholderVariables", + "run_time_value": "runTimeValue", +} + +_TRANSFORMER_ALIAS_MAP: dict[str, str] = { + "header_row": "headerRow", + "data_start_row": "dataStartRow", + "identifier_type": "identifierType", + "custom_format": "customFormat", + "timezone_mode": "timezoneMode", + "run_time_value": "runTimeValue", + "jmespath": "JMESPath", + "target_identifier": "targetIdentifier", + "source_identifier": "sourceIdentifier", + "data_transformations": "dataTransformations", + "lookup_table_id": "lookupTableId", +} + + +def _alias(component: str, field: str) -> str: + if component == "extractor": + return _EXTRACTOR_ALIAS_MAP.get(field, field) + if component == "transformer": + return _TRANSFORMER_ALIAS_MAP.get(field, field) + return field + + +def _format_loc(component: str, loc: Iterable[Any]) -> str: + loc_list = list(loc) + # Strip pydantic union branch model names from the front. + if ( + component == "extractor" + and loc_list + and loc_list[0] + in ( + "HTTPExtractor", + "LocalFileExtractor", + ) + ): + loc_list = loc_list[1:] + if ( + component == "transformer" + and loc_list + and loc_list[0] + in ( + "JSONTransformer", + "CSVTransformer", + ) + ): + loc_list = loc_list[1:] + + parts: list[str] = [] + for item in loc_list: + if isinstance(item, int): + if not parts: + parts.append(f"[{item}]") + else: + parts[-1] = f"{parts[-1]}[{item}]" + continue + if isinstance(item, str): + parts.append(_alias(component, item)) + continue + parts.append(str(item)) + + if not parts: + return component + return ".".join([component] + parts) + + +def _jsonish(value: Any) -> str: + if value is None: + return "null" + if isinstance(value, str): + if value == "": + return '""' + return repr(value) + return repr(value) + + +def user_facing_error_from_validation_error( + component: str, + exc: ValidationError, + *, + raw: Optional[dict[str, Any]] = None, +) -> EtlUserFacingError: + """ + Convert pydantic's ValidationError into one readable, actionable sentence. + """ + errs = exc.errors(include_url=False) + + # Unions emit errors for every branch. Filter to the selected type when possible. + if raw and component in ("extractor", "transformer"): + raw_type = raw.get("type") + type_to_model = { + "extractor": {"HTTP": "HTTPExtractor", "local": "LocalFileExtractor"}, + "transformer": {"JSON": "JSONTransformer", "CSV": "CSVTransformer"}, + } + selected_model = type_to_model.get(component, {}).get(raw_type) + if selected_model: + errs = [ + e for e in errs if not e.get("loc") or e["loc"][0] == selected_model + ] or errs + + if not errs: + return EtlUserFacingError(f"Invalid {component} configuration.") + + first = errs[0] + loc = first.get("loc") or () + msg = first.get("msg") or "Invalid value" + inp = first.get("input", None) + + path = _format_loc(component, loc) + if component == "transformer" and isinstance(raw, dict): + ts = raw.get("timestamp") + if isinstance(ts, dict): + tz_mode = ts.get("timezoneMode") or ts.get("timezone_mode") + tz_val = ts.get("timezone") + if ( + path.endswith("transformer.timestamp.timezone") + and str(tz_mode) == "daylightSavings" + ): + if tz_val is None or str(tz_val).strip() == "": + return EtlUserFacingError( + "Timezone information is required when daylight savings mode is enabled. " + "Select a valid timezone such as America/Denver and try again." + ) + if "Invalid timezone" in str(msg): + return EtlUserFacingError( + "The configured timezone is not recognized. " + "Use a valid IANA timezone such as America/Denver and run the job again." + ) + + message = ( + f"Invalid {component} configuration at {path}: {msg} (got {_jsonish(inp)}). " + f"Update the Data Connection {component} settings." + ) + return EtlUserFacingError(message) + + +_MISSING_PER_TASK_VAR_RE = re.compile(r"Missing per-task variable '([^']+)'") +_MISSING_PLACEHOLDER_VAR_RE = re.compile(r"Missing placeholder variable: (.+)$") +_TIMESTAMP_COL_NOT_FOUND_RE = re.compile( + r"Timestamp column '([^']*)' not found in data\." +) + +_MISSING_REQUIRED_TASK_VAR_RE = re.compile( + r"Missing required per-task extractor variable '([^']+)'" +) +_MISSING_URI_PLACEHOLDER_RE = re.compile( + r"Extractor source URI contains a placeholder '([^']+)', but it was not provided" +) +_SOURCE_INDEX_OOR_RE = re.compile( + r"Source index (\d+) is out of range for extracted data\." +) +_SOURCE_COL_NOT_FOUND_RE = re.compile( + r"Source column '([^']+)' not found in extracted data\." +) +_USECOLS_NOT_FOUND_RE = re.compile( + r"columns expected but not found:\s*(\[[^\]]*\])", + re.IGNORECASE, +) + + +def _iter_exception_chain(exc: Exception) -> Iterable[Exception]: + seen: set[int] = set() + current: Optional[Exception] = exc + while current is not None and id(current) not in seen: + seen.add(id(current)) + yield current + next_exc = current.__cause__ or current.__context__ + current = next_exc if isinstance(next_exc, Exception) else None + + +def _extract_missing_usecols(exc: Exception) -> list[str]: + for err in _iter_exception_chain(exc): + msg = str(err) + match = _USECOLS_NOT_FOUND_RE.search(msg) + if not match: + continue + + raw_list = match.group(1) + try: + parsed = ast.literal_eval(raw_list) + if isinstance(parsed, (list, tuple, set)): + cols = [str(c).strip() for c in parsed if str(c).strip()] + if cols: + return cols + except Exception: + pass + + inner = raw_list.strip()[1:-1] + if inner: + cols = [part.strip().strip("'\"") for part in inner.split(",")] + cols = [c for c in cols if c] + if cols: + return cols + return [] + + +def _format_cols(cols: list[str], max_cols: int = 4) -> str: + shown = [f"'{c}'" for c in cols[:max_cols]] + if len(cols) > max_cols: + shown.append(f"+{len(cols) - max_cols} more") + return ", ".join(shown) + + +def user_facing_error_from_exception( + exc: Exception, + *, + transformer_raw: Optional[dict[str, Any]] = None, +) -> Optional[EtlUserFacingError]: + """ + Map common ETL/hydroserverpy exceptions to a single readable message. + """ + if isinstance(exc, EtlUserFacingError): + return exc + + if isinstance(exc, ValidationError): + return None + + if isinstance(exc, KeyError): + msg = exc.args[0] if exc.args and isinstance(exc.args[0], str) else str(exc) + m = _MISSING_PER_TASK_VAR_RE.search(msg) + if m: + name = m.group(1) + return EtlUserFacingError( + f"A required task variable named '{name}' was not provided. " + "Add a value for it in the task configuration and run the job again." + ) + m = _MISSING_PLACEHOLDER_VAR_RE.search(msg) + if m: + name = m.group(1).strip() + return EtlUserFacingError( + f"The extractor URL includes a placeholder '{name}', but no value was supplied. " + "Provide the missing value in the task variables." + ) + + msg_str = str(exc) + + if isinstance(exc, TypeError) and "JSONTransformer received None" in msg_str: + return EtlUserFacingError( + "The transformer did not receive any extracted data to parse. " + "Confirm the extractor is returning a valid JSON payload." + ) + + if isinstance(exc, TypeError) and "CSVTransformer received None" in msg_str: + return EtlUserFacingError( + "The transformer did not receive any extracted data to parse. " + "Confirm the extractor is returning a valid CSV payload." + ) + + if ( + ("NoneType" in msg_str or "nonetype" in msg_str.lower()) + and "string" in msg_str.lower() + and "assign" in msg_str.lower() + ): + return EtlUserFacingError( + "A required configuration value is null where a string is expected. " + "Provide the missing value in your ETL configuration JSON." + ) + + # django-ninja HttpError (avoid importing ninja here to keep module import-safe) + status = getattr(exc, "status_code", None) + if status is not None and exc.__class__.__name__ == "HttpError": + message = getattr(exc, "message", None) or msg_str + if "Datastream does not exist" in message: + return EtlUserFacingError( + "One or more destination datastream identifiers could not be found in HydroServer. " + "Update the task mappings to use valid datastream IDs." + ) + if status in (401, 403): + return EtlUserFacingError( + "HydroServer rejected the load due to authorization. " + "Confirm the target datastream(s) belong to this workspace and the job has permission to write." + ) + if status >= 400: + return EtlUserFacingError( + "HydroServer rejected some or all of the data. " + "Verify the transformed timestamps/values are valid and the target datastream mappings are correct." + ) + + if isinstance(exc, ValueError): + # Extractor placeholder/variable resolution + m = _MISSING_REQUIRED_TASK_VAR_RE.search(msg_str) + if m: + name = m.group(1) + return EtlUserFacingError( + f"A required task variable named '{name}' was not provided. " + "Add a value for it in the task configuration and run the job again." + ) + m = _MISSING_URI_PLACEHOLDER_RE.search(msg_str) + if m: + name = m.group(1) + return EtlUserFacingError( + f"The extractor URL includes a placeholder '{name}', but no value was supplied. " + "Provide the missing value in the task variables." + ) + + if "identifierType='index' requires timestamp.key" in msg_str: + return EtlUserFacingError( + "The timestamp column is set incorrectly. Index mode expects a 1-based column number (1 for the first column). " + "Update the timestamp setting to a valid column index." + ) + + if msg_str.startswith( + "One or more timestamps could not be read with the current settings" + ): + return EtlUserFacingError( + "One or more timestamps could not be read using the current format and timezone settings. " + "Confirm how dates appear in the source file and update the transformer configuration to match." + ) + + if ( + msg_str + == "One or more configured CSV columns were not found in the header row." + ): + missing_cols = _extract_missing_usecols(exc) + if len(missing_cols) > 1: + return EtlUserFacingError( + f"Configured CSV columns were not found in the file header ({_format_cols(missing_cols)}). " + "This often means the delimiter or headerRow setting is incorrect. " + "Verify the delimiter and headerRow settings, then run the job again." + ) + if len(missing_cols) == 1 and isinstance(transformer_raw, dict): + ts_cfg = transformer_raw.get("timestamp") + ts_key = ts_cfg.get("key") if isinstance(ts_cfg, dict) else None + if ts_key is not None and str(missing_cols[0]) == str(ts_key): + col = missing_cols[0] + return EtlUserFacingError( + f"The configured timestamp column '{col}' was not found in the file header. " + "Confirm the timestamp mapping and verify the delimiter/headerRow settings match the source file." + ) + return EtlUserFacingError( + "A required column was not found in the file header. " + "The source file may have changed or the header row may be set incorrectly. " + "Confirm the file layout and update the column mappings if needed." + ) + if ( + msg_str + == "The header row contained unexpected values and could not be processed." + ): + return EtlUserFacingError( + "A required column was not found in the file header. " + "The source file may have changed or the header row may be set incorrectly. " + "Confirm the file layout and update the column mappings if needed." + ) + if ( + msg_str + == "One or more data rows contained unexpected values and could not be processed." + ): + return EtlUserFacingError( + "A required column was not found in the file header. " + "The source file may have changed or the header row may be set incorrectly. " + "Confirm the file layout and update the column mappings if needed." + ) + + # JSON transformer common configuration errors + if msg_str == "The payload's expected fields were not found.": + return EtlUserFacingError( + "Failed to find the timestamp or value using the current JSON query. " + "Confirm the JMESPath expression matches the structure returned by the source." + ) + if ( + msg_str + == "The timestamp or value key could not be found with the specified query." + ): + return EtlUserFacingError( + "Failed to find the timestamp or value using the current JSON query. " + "Confirm the JMESPath expression matches the structure returned by the source." + ) + + m = _TIMESTAMP_COL_NOT_FOUND_RE.search(msg_str) + if m: + col = m.group(1) + return EtlUserFacingError( + f"The configured timestamp column '{col}' was not found in the file header. " + "Confirm the timestamp mapping and verify the delimiter/headerRow settings match the source file." + ) + + m = _SOURCE_INDEX_OOR_RE.search(msg_str) + if m: + idx = m.group(1) + return EtlUserFacingError( + f"A mapping source index ({idx}) is out of range for the extracted data. " + "Update task.mappings sourceIdentifier values (or switch identifierType) to match the extracted columns." + ) + + m = _SOURCE_COL_NOT_FOUND_RE.search(msg_str) + if m: + col = m.group(1) + return EtlUserFacingError( + f"A mapped field named '{col}' was not found in the extracted data. " + "Update the task mapping so the source identifier matches the JSON." + ) + + # JSON decode failures (usually extractor returned HTML/text instead of JSON) + if isinstance(exc, json.JSONDecodeError): + return EtlUserFacingError( + "The source did not return valid JSON. " + "Verify the URL points to a JSON endpoint." + ) + + if msg_str == "Could not connect to the source system.": + return EtlUserFacingError( + "Failed to connect to the source system. This may be temporary; try again shortly. " + "If it persists, the source system may be offline." + ) + + if msg_str == "The requested data could not be found on the source system.": + return EtlUserFacingError( + "The requested data could not be found on the source system. " + "Verify the URL is correct and that the file or endpoint still exists." + ) + + if msg_str.startswith("Authentication with the source system failed."): + return EtlUserFacingError( + "Authentication with the source system failed. The username, password, or token may be incorrect or expired. " + "Update the credentials and try again." + ) + + if msg_str in ( + "The connection to the source worked but no observations were returned.", + ): + return EtlUserFacingError( + "No observations were returned from the source system. " + "Confirm the configured source system has observations available for the requested time range." + ) + + # Backward-compatible mappings for older hydroserverpy strings. + if msg_str == "The requested payload was not found on the source system.": + return EtlUserFacingError( + "The requested data could not be found on the source system. " + "Verify the URL is correct and that the file or endpoint still exists." + ) + + if msg_str == "The source system returned no data.": + return EtlUserFacingError( + "No observations were returned from the source system. " + "Confirm the configured source system has observations available for the requested time range." + ) + + if ( + msg_str + == "Authentication with the source system failed; credentials may be invalid or expired." + ): + return EtlUserFacingError( + "Authentication with the source system failed. The username, password, or token may be incorrect or expired. " + "Update the credentials and try again." + ) + + if "jmespath.exceptions" in msg_str or "Parse error at column" in msg_str: + return EtlUserFacingError( + "The JSON query used to extract timestamps or values is invalid or returned unexpected data. " + "Review and correct the JMESPath expression." + ) + + if msg_str in ( + "The target datastream could not be found.", + "The target data series (datastream) could not be found.", + "The target datastream was not found.", + ): + return EtlUserFacingError( + "One or more destination datastream identifiers could not be found in HydroServer. " + "Update the task mappings to use valid datastream IDs." + ) + + return None diff --git a/domains/etl/loader.py b/domains/etl/loader.py index 78289608..535312c3 100644 --- a/domains/etl/loader.py +++ b/domains/etl/loader.py @@ -1,5 +1,7 @@ from __future__ import annotations from uuid import UUID +from dataclasses import dataclass +from typing import Any from hydroserverpy.etl.loaders.base import Loader import logging @@ -15,6 +17,16 @@ observation_service = ObservationService() +@dataclass(frozen=True) +class LoadSummary: + cutoff: str + timestamps_total: int + timestamps_after_cutoff: int + observations_available: int + observations_loaded: int + datastreams_loaded: int + + class HydroServerInternalLoader(Loader): """ A class that extends the HydroServer client with ETL-specific functionalities. @@ -24,42 +36,66 @@ def __init__(self, task): self._begin_cache: dict[str, pd.Timestamp] = {} self.task = task - def load(self, data: pd.DataFrame, task: Task) -> None: + def load(self, data: pd.DataFrame, task: Task) -> LoadSummary: """ Load observations from a DataFrame to the HydroServer. """ - begin_date = self.earliest_begin_date(task) new_data = data[data["timestamp"] > begin_date] + + cutoff_value = ( + begin_date.isoformat() + if hasattr(begin_date, "isoformat") + else str(begin_date) + ) + timestamps_total = len(data) + timestamps_after_cutoff = len(new_data) + observations_available = 0 + observations_loaded = 0 + datastreams_loaded = 0 + for col in new_data.columns.difference(["timestamp"]): df = ( new_data[["timestamp", col]] .rename(columns={col: "value"}) .dropna(subset=["value"]) ) - if df.empty: - logging.warning(f"No new data for {col}, skipping.") + available = len(df) + observations_available += available + if available == 0: + logging.warning( + "No new observations for %s after filtering; skipping.", col + ) continue df = df.rename(columns={"timestamp": "phenomenonTime", "value": "result"}) + loaded = 0 # Chunked upload CHUNK_SIZE = 5000 total = len(df) + chunks = (total + CHUNK_SIZE - 1) // CHUNK_SIZE + logging.info( + "Uploading %s observation(s) to datastream %s (%s chunk(s), chunk_size=%s)", + total, + col, + chunks, + CHUNK_SIZE, + ) for start in range(0, total, CHUNK_SIZE): end = min(start + CHUNK_SIZE, total) chunk = df.iloc[start:end] - logging.info( - "Uploading %s rows (%s-%s) to datastream %s", - len(chunk), + logging.debug( + "Uploading chunk to datastream %s: rows %s-%s (%s rows)", + col, start, end - 1, - col, + len(chunk), ) chunk_data = ObservationBulkPostBody( fields=["phenomenonTime", "result"], - data=chunk.values.tolist() + data=chunk.values.tolist(), ) try: @@ -69,6 +105,7 @@ def load(self, data: pd.DataFrame, task: Task) -> None: datastream_id=UUID(col), mode="append", ) + loaded += len(chunk) except Exception as e: status = getattr(e, "status_code", None) or getattr( getattr(e, "response", None), "status_code", None @@ -83,17 +120,38 @@ def load(self, data: pd.DataFrame, task: Task) -> None: break raise + if loaded > 0: + datastreams_loaded += 1 + observations_loaded += loaded + + return LoadSummary( + cutoff=cutoff_value, + timestamps_total=timestamps_total, + timestamps_after_cutoff=timestamps_after_cutoff, + observations_available=observations_available, + observations_loaded=observations_loaded, + datastreams_loaded=datastreams_loaded, + ) + @staticmethod def _fetch_earliest_begin(task: Task) -> pd.Timestamp: - logging.info("Querying HydroServer for earliest begin date for payload...") - - return Datastream.objects.filter(id__in={ - path.target_identifier - for mapping in task.mappings.all() - for path in mapping.paths.all() - }).aggregate( - earliest_end=Coalesce(Min("phenomenon_end_time"), Value(datetime(1970, 1, 1))) - )["earliest_end"] + logging.info( + "Checking HydroServer for the most recent data already stored (so we only extract new observations)..." + ) + + return Datastream.objects.filter( + id__in={ + path.target_identifier + for mapping in task.mappings.all() + for path in mapping.paths.all() + } + ).aggregate( + earliest_end=Coalesce( + Min("phenomenon_end_time"), Value(datetime(1970, 1, 1)) + ) + )[ + "earliest_end" + ] def earliest_begin_date(self, task: Task) -> pd.Timestamp: """ diff --git a/domains/etl/models/run.py b/domains/etl/models/run.py index 8928fad2..3136d10d 100644 --- a/domains/etl/models/run.py +++ b/domains/etl/models/run.py @@ -1,6 +1,7 @@ import uuid6 from django.db import models from .task import Task +from domains.etl.run_result_normalizer import normalize_task_run_result, task_transformer_raw class TaskRun(models.Model): @@ -10,3 +11,12 @@ class TaskRun(models.Model): started_at = models.DateTimeField(auto_now_add=True) finished_at = models.DateTimeField(null=True, blank=True) result = models.JSONField(blank=True, null=True) + + def save(self, *args, **kwargs): + transformer_raw = task_transformer_raw(self.task) if self.task_id else None + self.result = normalize_task_run_result( + status=self.status, + result=self.result, + transformer_raw=transformer_raw, + ) + super().save(*args, **kwargs) diff --git a/domains/etl/run_result_normalizer.py b/domains/etl/run_result_normalizer.py new file mode 100644 index 00000000..f9f525af --- /dev/null +++ b/domains/etl/run_result_normalizer.py @@ -0,0 +1,125 @@ +from __future__ import annotations + +import re +from typing import Any, Optional + +from .etl_errors import user_facing_error_from_exception + + +_SUCCESS_LOAD_RE = re.compile( + r"^Load complete\.\s*(\d+)\s+rows were added to\s+(\d+)\s+datastreams?\.$" +) +_SUCCESS_LOADED_RE = re.compile( + r"^Loaded\s+(\d+)\s+total observations\s+(?:into|across)\s+(\d+)\s+datastream(?:s|\(s\))\.$" +) +_FAILURE_STAGE_PREFIX_RE = re.compile( + r"^(?:Setup failed|Failed during [^:]+):\s*", + re.IGNORECASE, +) + + +def task_transformer_raw(task: Any) -> Optional[dict[str, Any]]: + if task is None: + return None + + data_connection = getattr(task, "data_connection", None) + if data_connection is None: + return None + + raw_settings = getattr(data_connection, "transformer_settings", None) or {} + if not isinstance(raw_settings, dict): + return None + + raw: dict[str, Any] = dict(raw_settings) + transformer_type = getattr(data_connection, "transformer_type", None) + if transformer_type and "type" not in raw: + raw["type"] = transformer_type + return raw + + +def _format_loaded_success_message(loaded: int, ds_count: int) -> str: + preposition = "into" if ds_count == 1 else "across" + ds_word = "datastream" if ds_count == 1 else "datastreams" + return f"Loaded {loaded} total observations {preposition} {ds_count} {ds_word}." + + +def _extract_message(result: dict[str, Any]) -> Optional[str]: + for key in ("message", "summary", "error", "detail"): + val = result.get(key) + if isinstance(val, str) and val.strip(): + return val.strip() + return None + + +def _normalize_success_message(message: str) -> str: + m = _SUCCESS_LOAD_RE.match(message) + if m: + return _format_loaded_success_message(int(m.group(1)), int(m.group(2))) + + m = _SUCCESS_LOADED_RE.match(message) + if m: + return _format_loaded_success_message(int(m.group(1)), int(m.group(2))) + + if ( + message + == "Already up to date. No new observations were loaded because all timestamps in the source are older than what is already stored." + ): + return "Already up to date. No new observations were loaded." + + return message + + +def _normalize_failure_message( + message: str, + *, + transformer_raw: Optional[dict[str, Any]] = None, +) -> str: + candidate = _FAILURE_STAGE_PREFIX_RE.sub("", message).strip() + + if candidate.startswith("Error reading CSV data:"): + candidate = candidate.split("Error reading CSV data:", 1)[1].strip() + + mapped = user_facing_error_from_exception( + ValueError(candidate), + transformer_raw=transformer_raw, + ) + if mapped: + return str(mapped) + return candidate or message + + +def normalize_task_run_result( + *, + status: str, + result: Any, + transformer_raw: Optional[dict[str, Any]] = None, +) -> Any: + if result is None: + return None + + normalized: dict[str, Any] + if isinstance(result, dict): + normalized = dict(result) + else: + normalized = {"message": str(result)} + + message = _extract_message(normalized) + if not message: + return normalized + + if status == "SUCCESS": + normalized_message = _normalize_success_message(message) + elif status == "FAILURE": + normalized_message = _normalize_failure_message( + message, + transformer_raw=transformer_raw, + ) + else: + normalized_message = message + + normalized["message"] = normalized_message + summary = normalized.get("summary") + if not isinstance(summary, str) or not summary.strip() or summary.strip() == message: + normalized["summary"] = normalized_message + + return normalized diff --git a/domains/etl/services/run.py b/domains/etl/services/run.py index 012ffa11..7d592dc8 100644 --- a/domains/etl/services/run.py +++ b/domains/etl/services/run.py @@ -6,6 +6,10 @@ from django.contrib.auth import get_user_model from domains.iam.models import APIKey from domains.etl.models import TaskRun +from domains.etl.run_result_normalizer import ( + normalize_task_run_result, + task_transformer_raw, +) from interfaces.api.schemas import TaskRunFields, TaskRunPostBody, TaskRunPatchBody, TaskRunOrderByFields from interfaces.api.service import ServiceUtils from .task import TaskService @@ -81,13 +85,18 @@ def create( task = task_service.get_task_for_action( principal=principal, uid=task_id, action="edit", expand_related=True ) + task_run_data = data.dict(include=set(TaskRunFields.model_fields.keys())) + task_run_data["result"] = normalize_task_run_result( + status=task_run_data["status"], + result=task_run_data.get("result"), + transformer_raw=task_transformer_raw(task), + ) try: - task_run = TaskRun.objects.create( - pk=data.id, - task=task, - **data.dict(include=set(TaskRunFields.model_fields.keys())), - ) + task_run = TaskRun.objects.create( + task=task, + **task_run_data, + ) except IntegrityError: raise HttpError(409, "The operation could not be completed due to a resource conflict.") @@ -121,6 +130,11 @@ def update( for field, value in task_run_data.items(): setattr(task_run, field, value) + task_run.result = normalize_task_run_result( + status=task_run.status, + result=task_run.result, + transformer_raw=task_transformer_raw(task), + ) task_run.save() return self.get( diff --git a/domains/etl/tasks.py b/domains/etl/tasks.py index 2de705ac..cddbc466 100644 --- a/domains/etl/tasks.py +++ b/domains/etl/tasks.py @@ -1,67 +1,464 @@ import logging -import pandas as pd +from contextlib import contextmanager +from dataclasses import dataclass, field +from datetime import datetime, timedelta, timezone as dt_timezone +import os +from typing import Any, Optional from uuid import UUID -from datetime import timedelta +import pandas as pd from celery import shared_task -from pydantic import TypeAdapter +from pydantic import TypeAdapter, ValidationError from celery.signals import task_prerun, task_success, task_failure, task_postrun from django.utils import timezone from django.db.utils import IntegrityError from django.core.management import call_command from domains.etl.models import Task, TaskRun -from .loader import HydroServerInternalLoader +from .loader import HydroServerInternalLoader, LoadSummary +from .etl_errors import ( + EtlUserFacingError, + user_facing_error_from_exception, + user_facing_error_from_validation_error, +) +from .run_result_normalizer import normalize_task_run_result, task_transformer_raw from hydroserverpy.etl.factories import extractor_factory, transformer_factory -from hydroserverpy.etl.etl_configuration import ExtractorConfig, TransformerConfig, SourceTargetMapping, MappingPath +from hydroserverpy.etl.etl_configuration import ( + ExtractorConfig, + TransformerConfig, + SourceTargetMapping, + MappingPath, +) + + +@dataclass +class TaskRunContext: + stage: str = "setup" + runtime_source_uri: Optional[str] = None + log_handler: Optional["TaskLogHandler"] = None + task_meta: dict[str, Any] = field(default_factory=dict) + + +class TaskLogFilter(logging.Filter): + def filter(self, record: logging.LogRecord) -> bool: + path = (record.pathname or "").replace("\\", "/") + return "/hydroserverpy/" in path or "/domains/etl/" in path + + +class TaskLogHandler(logging.Handler): + def __init__(self, context: TaskRunContext): + super().__init__(level=logging.INFO) + self.context = context + self.lines: list[str] = [] + self.entries: list[dict[str, Any]] = [] + self._formatter = logging.Formatter() + + def emit(self, record: logging.LogRecord) -> None: + if not self.filter(record): + return + + message = record.getMessage() + + timestamp = datetime.fromtimestamp( + record.created, tz=dt_timezone.utc + ).isoformat() + line = f"{timestamp} {record.levelname:<8} {message}" + if record.exc_info: + line = f"{line}\n{self._formatter.formatException(record.exc_info)}" + self.lines.append(line) + + entry: dict[str, Any] = { + "timestamp": timestamp, + "level": record.levelname, + "logger": record.name, + "message": message, + "pathname": record.pathname, + "lineno": record.lineno, + } + if record.exc_info: + entry["exception"] = self._formatter.formatException(record.exc_info) + self.entries.append(entry) + + self._capture_runtime_uri(message) + + def _capture_runtime_uri(self, message: str) -> None: + if self.context.runtime_source_uri: + return + if "Resolved runtime source URI:" in message: + self.context.runtime_source_uri = message.split( + "Resolved runtime source URI:", 1 + )[1].strip() + return + if "Requesting data from" in message: + if "→" in message: + self.context.runtime_source_uri = message.split("→", 1)[1].strip() + return + if "from" in message: + self.context.runtime_source_uri = message.split("from", 1)[1].strip() + + def as_text(self) -> str: + return "\n".join(self.lines).strip() + + +TASK_RUN_CONTEXT: dict[str, TaskRunContext] = {} + + +@contextmanager +def capture_task_logs(context: TaskRunContext): + logger = logging.getLogger() + handler = TaskLogHandler(context) + handler.addFilter(TaskLogFilter()) + context.log_handler = handler + + previous_level = logger.level + if previous_level > logging.INFO: + logger.setLevel(logging.INFO) + + logger.addHandler(handler) + try: + yield handler + finally: + logger.removeHandler(handler) + if previous_level > logging.INFO: + logger.setLevel(previous_level) + + +def _is_empty(data: Any) -> bool: + if data is None: + return True + if isinstance(data, pd.DataFrame) and data.empty: + return True + return False + + +def _describe_payload(data: Any) -> dict[str, Any]: + if isinstance(data, pd.DataFrame): + return { + "type": "DataFrame", + "rows": len(data), + "columns": len(data.columns), + } + info: dict[str, Any] = {"type": type(data).__name__} + if isinstance(data, (bytes, bytearray)): + info["bytes"] = len(data) + return info + # BytesIO and similar + try: + buf = getattr(data, "getbuffer", None) + if callable(buf): + info["bytes"] = len(data.getbuffer()) + return info + except Exception: + pass + # Real file handles + try: + fileno = getattr(data, "fileno", None) + if callable(fileno): + info["bytes"] = os.fstat(data.fileno()).st_size + return info + except Exception: + pass + return info + + +def _describe_transformed_data(data: Any) -> dict[str, Any]: + if not isinstance(data, pd.DataFrame): + return {"type": type(data).__name__} + datastreams = [col for col in data.columns if col != "timestamp"] + return { + "type": "DataFrame", + "rows": len(data), + "columns": len(data.columns), + "datastreams": len(datastreams), + } + +def _success_message(load: Optional[LoadSummary]) -> str: + if not load: + return "Load complete." + + loaded = load.observations_loaded + if loaded == 0: + if load.timestamps_total and load.timestamps_after_cutoff == 0: + return "Already up to date. No new observations were loaded." + # Otherwise, we don't have strong evidence for why nothing loaded beyond "no new observations". + return "No new observations were loaded." + + ds_count = load.datastreams_loaded or 0 + preposition = "into" if ds_count == 1 else "across" + ds_word = "datastream" if ds_count == 1 else "datastreams" + return f"Loaded {loaded} total observations {preposition} {ds_count} {ds_word}." + + +def _apply_runtime_uri_aliases(result: dict[str, Any], runtime_source_uri: str) -> None: + result.setdefault("runtime_source_uri", runtime_source_uri) + result.setdefault("runtimeSourceUri", runtime_source_uri) + result.setdefault("runtime_url", runtime_source_uri) + result.setdefault("runtimeUrl", runtime_source_uri) + + +def _apply_log_aliases(result: dict[str, Any]) -> None: + if "log_entries" in result and "logEntries" not in result: + result["logEntries"] = result["log_entries"] + + +def _merge_result_with_context( + result: dict[str, Any], context: Optional[TaskRunContext] +) -> dict[str, Any]: + if "summary" not in result and "message" in result: + result["summary"] = result["message"] + + if context: + if context.runtime_source_uri and not ( + result.get("runtime_source_uri") + or result.get("runtimeSourceUri") + or result.get("runtime_url") + or result.get("runtimeUrl") + ): + _apply_runtime_uri_aliases(result, context.runtime_source_uri) + + if context.log_handler: + if "logs" not in result: + logs_text = context.log_handler.as_text() + if logs_text: + result["logs"] = logs_text + if "log_entries" not in result and context.log_handler.entries: + result["log_entries"] = context.log_handler.entries + + _apply_log_aliases(result) + return result + + +def _build_task_result( + message: str, + context: Optional[TaskRunContext] = None, + *, + stage: Optional[str] = None, + traceback: Optional[str] = None, +) -> dict[str, Any]: + result: dict[str, Any] = {"message": message, "summary": message} + if stage: + result["stage"] = stage + if traceback: + result["traceback"] = traceback + + if context and context.runtime_source_uri: + _apply_runtime_uri_aliases(result, context.runtime_source_uri) + + if context and context.task_meta and "task" not in result: + result["task"] = context.task_meta + + if context and context.log_handler: + logs_text = context.log_handler.as_text() + if logs_text: + result["logs"] = logs_text + if context.log_handler.entries: + result["log_entries"] = context.log_handler.entries + + _apply_log_aliases(result) + return result + + +def _last_logged_error(context: Optional[TaskRunContext]) -> Optional[str]: + if not context or not context.log_handler or not context.log_handler.entries: + return None + for entry in reversed(context.log_handler.entries): + if entry.get("level") == "ERROR": + msg = entry.get("message") + if msg: + return msg + return None + + +def _mapped_csv_error_from_log(last_err: str) -> Optional[str]: + prefix = "Error reading CSV data:" + if not last_err.startswith(prefix): + return None + + detail = last_err[len(prefix) :].strip() + if detail == "One or more configured CSV columns were not found in the header row.": + return ( + "Configured CSV columns were not found in the file header. " + "This often means the delimiter or headerRow setting is incorrect. " + "Verify the delimiter and headerRow settings, then run the job again." + ) + if ( + detail + == "The header row contained unexpected values and could not be processed." + ): + return ( + "A required column was not found in the file header. " + "The source file may have changed or the header row may be set incorrectly. " + "Confirm the file layout and update the column mappings if needed." + ) + if ( + detail + == "One or more data rows contained unexpected values and could not be processed." + ): + return ( + "A required column was not found in the file header. " + "The source file may have changed or the header row may be set incorrectly. " + "Confirm the file layout and update the column mappings if needed." + ) + return None + + +def _validate_component_config( + component: str, adapter: TypeAdapter, raw: dict[str, Any] +): + try: + return adapter.validate_python(raw) + except ValidationError as ve: + raise user_facing_error_from_validation_error(component, ve, raw=raw) from ve -@shared_task(bind=True, expires=10) + +@shared_task(bind=True, expires=10, name="etl.tasks.run_etl_task") def run_etl_task(self, task_id: str): """ Runs a HydroServer ETL task based on the task configuration provided. """ - task = Task.objects.select_related( - "data_connection" - ).prefetch_related( - "mappings", "mappings__paths" - ).get(pk=UUID(task_id)) - - extractor_cls = extractor_factory(TypeAdapter(ExtractorConfig).validate_python({ - "type": task.data_connection.extractor_type, - **task.data_connection.extractor_settings - })) - transformer_cls = transformer_factory(TypeAdapter(TransformerConfig).validate_python({ - "type": task.data_connection.transformer_type, - **task.data_connection.transformer_settings - })) - loader_cls = HydroServerInternalLoader(task) - - task_mappings = [ - SourceTargetMapping( - source_identifier=task_mapping.source_identifier, - paths=[ - MappingPath( - target_identifier=task_mapping_path.target_identifier, - data_transformations=task_mapping_path.data_transformations, - ) for task_mapping_path in task_mapping.paths.all() + task_run_id = self.request.id + context = TaskRunContext() + TASK_RUN_CONTEXT[task_run_id] = context + + with capture_task_logs(context): + try: + task = ( + Task.objects.select_related("data_connection") + .prefetch_related("mappings", "mappings__paths") + .get(pk=UUID(task_id)) + ) + + context.task_meta = { + "id": str(task.id), + "name": task.name, + "data_connection_id": str(task.data_connection_id), + "data_connection_name": task.data_connection.name, + } + + context.stage = "setup" + extractor_raw = { + "type": task.data_connection.extractor_type, + **(task.data_connection.extractor_settings or {}), + } + transformer_raw = { + "type": task.data_connection.transformer_type, + **(task.data_connection.transformer_settings or {}), + } + + timestamp_cfg = transformer_raw.get("timestamp") or {} + if isinstance(timestamp_cfg, dict): + tz_mode = timestamp_cfg.get("timezoneMode") + tz_value = timestamp_cfg.get("timezone") + if tz_mode == "daylightSavings" and not tz_value: + raise EtlUserFacingError( + "Timezone information is required when daylight savings mode is enabled. " + "Select a valid timezone such as America/Denver and try again." + ) + + extractor_cfg = _validate_component_config( + "extractor", TypeAdapter(ExtractorConfig), extractor_raw + ) + transformer_cfg = _validate_component_config( + "transformer", TypeAdapter(TransformerConfig), transformer_raw + ) + + extractor_cls = extractor_factory(extractor_cfg) + transformer_cls = transformer_factory(transformer_cfg) + loader_cls = HydroServerInternalLoader(task) + + task_mappings = [ + SourceTargetMapping( + source_identifier=task_mapping.source_identifier, + paths=[ + MappingPath( + target_identifier=task_mapping_path.target_identifier, + data_transformations=task_mapping_path.data_transformations, + ) + for task_mapping_path in task_mapping.paths.all() + ], + ) + for task_mapping in task.mappings.all() ] - ) for task_mapping in task.mappings.all() - ] - - logging.info("Starting extract") - data = extractor_cls.extract(task, loader_cls) - if data is None or (isinstance(data, pd.DataFrame) and data.empty): - return {"message": f"No data returned from the extractor for task: {str(task.id)}"} - - logging.info("Starting transform") - data = transformer_cls.transform(data, task_mappings) - if data is None or (isinstance(data, pd.DataFrame) and data.empty): - return {"message": f"No data returned from the transformer for task: {str(task.id)}"} - logging.info("Starting load") - loader_cls.load(data, task) - - return {"message": f"Finished processing task: {str(task.id)}"} + context.stage = "extract" + logging.info("Starting extract") + data = extractor_cls.extract(task, loader_cls) + context.runtime_source_uri = ( + getattr(extractor_cls, "runtime_source_uri", None) + or context.runtime_source_uri + ) + extract_summary = _describe_payload(data) + logging.info("Extractor returned payload: %s", extract_summary) + if _is_empty(data): + if task.data_connection.extractor_type == "HTTP": + return _build_task_result( + "No observations were returned from the source system. " + "Confirm the configured source system has observations available for the requested time range.", + context, + stage=context.stage, + ) + return _build_task_result( + "The extractor returned no observations. Nothing to load.", + context, + stage=context.stage, + ) + + context.stage = "transform" + logging.info("Starting transform") + data = transformer_cls.transform(data, task_mappings) + transform_summary = _describe_transformed_data(data) + logging.info("Transform result: %s", transform_summary) + if isinstance(data, pd.DataFrame) and "timestamp" in data.columns: + bad = data["timestamp"].isna().sum() + if bad: + raise EtlUserFacingError( + "One or more timestamps could not be read using the current format and timezone settings. " + "Confirm how dates appear in the source file and update the transformer configuration to match." + ) + if _is_empty(data): + # hydroserverpy's CSVTransformer returns None on read errors (but logs ERROR). + # Treat that as a failure to avoid misleading "produced no rows" messaging. + last_err = _last_logged_error(context) + if last_err and last_err.startswith("Error reading CSV data:"): + mapped_csv_error = _mapped_csv_error_from_log(last_err) + if mapped_csv_error: + raise EtlUserFacingError(mapped_csv_error) + raise EtlUserFacingError( + f"{last_err}. Check delimiter/headerRow/dataStartRow/identifierType settings " + "and confirm the upstream CSV columns match your task mappings." + ) + return _build_task_result( + "Transform produced no rows. Nothing to load.", + context, + stage=context.stage, + ) + + context.stage = "load" + logging.info("Starting load") + load_summary = loader_cls.load(data, task) + logging.info( + "Load result: loaded=%s available=%s cutoff=%s", + getattr(load_summary, "observations_loaded", None), + getattr(load_summary, "observations_available", None), + getattr(load_summary, "cutoff", None), + ) + + return _build_task_result( + _success_message(load_summary), + context, + stage=context.stage, + ) + except Exception as e: + mapped = user_facing_error_from_exception( + e, transformer_raw=locals().get("transformer_raw") + ) + if mapped: + logging.error("%s", str(mapped)) + if mapped is e: + raise + raise mapped from e + logging.exception("ETL task failed during %s", context.stage) + raise @task_prerun.connect @@ -90,9 +487,7 @@ def update_next_run(sender, task_id, kwargs, **extra): return try: - task = Task.objects.select_related("periodic_task").get( - pk=kwargs["task_id"] - ) + task = Task.objects.select_related("periodic_task").get(pk=kwargs["task_id"]) except Task.DoesNotExist: return @@ -119,11 +514,27 @@ def mark_etl_task_success(sender, result, **extra): if sender != run_etl_task: return + context = TASK_RUN_CONTEXT.pop(sender.request.id, None) + try: task_run = TaskRun.objects.get(id=sender.request.id) except TaskRun.DoesNotExist: return + if not isinstance(result, dict): + result = {"message": str(result)} + + result = _merge_result_with_context(result, context) + if context and context.stage and "stage" not in result: + result["stage"] = context.stage + + transformer_raw = task_transformer_raw(task_run.task) + result = normalize_task_run_result( + status="SUCCESS", + result=result, + transformer_raw=transformer_raw, + ) + task_run.status = "SUCCESS" task_run.finished_at = timezone.now() task_run.result = result @@ -140,22 +551,42 @@ def mark_etl_task_failure(sender, task_id, einfo, exception, **extra): if sender != run_etl_task: return + context = TASK_RUN_CONTEXT.pop(task_id, None) + try: task_run = TaskRun.objects.get(id=task_id) except TaskRun.DoesNotExist: return + stage = context.stage if context else None + mapped = user_facing_error_from_exception(exception) + if mapped: + # User-facing errors are already stage-aware and readable; don't prepend robotic prefixes. + message = str(mapped) + else: + if stage and stage.lower() == "setup": + message = f"Setup failed: {exception}" + else: + message = f"Failed during {stage}: {exception}" if stage else f"{exception}" + task_run.status = "FAILURE" task_run.finished_at = timezone.now() - task_run.result = { - "error": str(exception), - "traceback": einfo.traceback, - } + transformer_raw = task_transformer_raw(task_run.task) + task_run.result = normalize_task_run_result( + status="FAILURE", + result=_build_task_result( + message, + context, + stage=stage, + traceback=einfo.traceback, + ), + transformer_raw=transformer_raw, + ) task_run.save(update_fields=["status", "finished_at", "result"]) -@shared_task(bind=True, expires=10) +@shared_task(bind=True, expires=10, name="etl.tasks.cleanup_etl_task_runs") def cleanup_etl_task_runs(self, days=14): """ Celery task to run the cleanup_etl_task_runs management command. diff --git a/interfaces/sensorthings/schemas/sensor.py b/interfaces/sensorthings/schemas/sensor.py index e2fcd365..659937dc 100644 --- a/interfaces/sensorthings/schemas/sensor.py +++ b/interfaces/sensorthings/schemas/sensor.py @@ -24,6 +24,7 @@ class SensorModel(Schema): "application/pdf", "http://www.opengis.net/doc/IS/SensorML/2.0", "text/html", + "text/plain", "application/json", "text/plain", ]