Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 64 additions & 7 deletions src/hydroserverpy/api/models/etl/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from datetime import datetime, timedelta, timezone
from pydantic import Field, AliasPath, AliasChoices, TypeAdapter
from hydroserverpy.etl.factories import extractor_factory, transformer_factory, loader_factory
from hydroserverpy.etl.loaders.hydroserver_loader import LoadSummary
from hydroserverpy.etl.etl_configuration import ExtractorConfig, TransformerConfig, LoaderConfig, SourceTargetMapping, MappingPath
from ..base import HydroServerBaseModel
from .orchestration_system import OrchestrationSystem
Expand Down Expand Up @@ -183,6 +184,7 @@ def run_local(self):

task_run = self.create_task_run(status="RUNNING", started_at=datetime.now(timezone.utc))

runtime_source_uri: Optional[str] = None
try:
logging.info("Starting extract")

Expand All @@ -199,25 +201,39 @@ def run_local(self):
]

data = extractor_cls.extract(self, loader_cls)
runtime_source_uri = getattr(extractor_cls, "runtime_source_uri", None)
if self.is_empty(data):
self._update_status(
loader_cls, True, "No data returned from the extractor"
task_run,
True,
"No data returned from the extractor",
runtime_source_uri=runtime_source_uri,
)
return

logging.info("Starting transform")
data = transformer_cls.transform(data, task_mappings)
if self.is_empty(data):
self._update_status(
loader_cls, True, "No data returned from the transformer"
task_run,
True,
"No data returned from the transformer",
runtime_source_uri=runtime_source_uri,
)
return

logging.info("Starting load")
loader_cls.load(data, self)
self._update_status(task_run, True, "OK")
load_summary = loader_cls.load(data, self)
self._update_status(
task_run,
True,
self._success_message(load_summary),
runtime_source_uri=runtime_source_uri,
)
except Exception as e:
self._update_status(task_run, False, str(e))
self._update_status(
task_run, False, str(e), runtime_source_uri=runtime_source_uri
)

@staticmethod
def is_empty(data):
Expand All @@ -229,15 +245,56 @@ def is_empty(data):

return False

def _update_status(self, task_run: TaskRun, success: bool, msg: str):
def _update_status(
self,
task_run: TaskRun,
success: bool,
msg: str,
runtime_source_uri: Optional[str] = None,
):
result = {"message": msg}
if runtime_source_uri:
result.update(
{
"runtimeSourceUri": runtime_source_uri,
"runtime_source_uri": runtime_source_uri,
"runtimeUrl": runtime_source_uri,
"runtime_url": runtime_source_uri,
}
)

self.update_task_run(
task_run.id,
status="SUCCESS" if success else "FAILURE",
result={"message": msg}
result=result
)
self.next_run_at = self._next_run()
self.save()

@staticmethod
def _success_message(load: Optional[LoadSummary]) -> str:
if not load:
return "OK"

loaded = load.observations_loaded
if loaded == 0:
if load.timestamps_total and load.timestamps_after_cutoff == 0:
if load.cutoff:
return (
"Already up to date - no new observations loaded "
f"(all timestamps were at or before {load.cutoff})."
)
return "Already up to date - no new observations loaded (all timestamps were at or before the cutoff)."
if load.observations_available == 0:
return "Already up to date - no new observations loaded."
return "No new observations were loaded."

if load.datastreams_loaded:
return (
f"Load completed successfully ({loaded} rows across {load.datastreams_loaded} datastreams)."
)
return f"Load completed successfully ({loaded} rows loaded)."

def _next_run(self) -> Optional[str]:
now = datetime.now(timezone.utc)
if cron := self.crontab:
Expand Down
27 changes: 15 additions & 12 deletions src/hydroserverpy/api/services/etl/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from typing import Literal, Union, Optional, List, Dict, Any, TYPE_CHECKING
from uuid import UUID
from datetime import datetime
from pydantic import Field
from hydroserverpy.api.models import DataConnection, Task, TaskRun, TaskMapping, OrchestrationSystem
from hydroserverpy.api.utils import normalize_uuid
from ..base import HydroServerBaseService
Expand Down Expand Up @@ -79,16 +78,16 @@ def create(
workspace: Union["Workspace", UUID, str],
data_connection: Union["DataConnection", UUID, str],
orchestration_system: Union["OrchestrationSystem", UUID, str],
extractor_variables: dict = Field(default_factory=dict),
transformer_variables: dict = Field(default_factory=dict),
loader_variables: dict = Field(default_factory=dict),
extractor_variables: Optional[dict] = None,
transformer_variables: Optional[dict] = None,
loader_variables: Optional[dict] = None,
paused: bool = False,
start_time: Optional[datetime] = None,
next_run_at: Optional[datetime] = None,
crontab: Optional[str] = None,
interval: Optional[int] = None,
interval_period: Optional[str] = None,
mappings: List[dict] = Field(default_factory=list),
mappings: Optional[List[dict]] = None,
) -> "Task":
"""Create a new ETL task."""

Expand All @@ -97,19 +96,23 @@ def create(
"workspaceId": normalize_uuid(workspace),
"dataConnectionId": normalize_uuid(data_connection),
"orchestrationSystemId": normalize_uuid(orchestration_system),
"extractorVariables": extractor_variables,
"transformerVariables": transformer_variables,
"loaderVariables": loader_variables,
"schedule": {
"extractorVariables": extractor_variables or {},
"transformerVariables": transformer_variables or {},
"loaderVariables": loader_variables or {},
"mappings": mappings or [],
}

# Only include schedule if the caller provided scheduling information.
# Using Ellipsis here breaks JSON serialization.
if interval or crontab:
body["schedule"] = {
"paused": paused,
"startTime": start_time,
"nextRunAt": next_run_at,
"crontab": crontab,
"interval": interval,
"intervalPeriod": interval_period,
} if interval or crontab else ...,
"mappings": mappings if mappings else []
}
}

return super().create(**body)

Expand Down
65 changes: 65 additions & 0 deletions src/hydroserverpy/etl/STATES_README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
## Possible Needs Attention states:

These are the most important end-user messages the ETL system can return for a task run
that needs user action.

### Configuration / Setup

- Invalid extractor configuration. Tell the user exactly which field is missing or invalid.
- Invalid transformer configuration. Tell the user exactly which field is missing or invalid.
- A required configuration value is missing.
- A required configuration value is null where a value is expected.
- Missing required per-task extractor variable "<name>".
- Extractor source URI contains a placeholder "<name>", but it was not provided.
- Task configuration is missing required daylight savings offset (when using daylightSavings mode).

### Data Source (Connectivity / Authentication)

- Could not connect to the source system.
- The source system did not respond before the timeout.
- Authentication with the source system failed; credentials may be invalid or expired.
- The requested payload was not found on the source system.
- The source system returned no data.

### Source Data Did Not Match The Task

- The source returned a format different from what this job expects.
- The payload's expected fields were not found.
- One or more timestamps could not be read with the current settings.
- This job references a resource that no longer exists.
- The file structure does not match the configuration.

For CSV:
- The header row contained unexpected values and could not be processed.
- One or more data rows contained unexpected values and could not be processed.
- Timestamp column "<key>" was not found in the extracted data.
- A mapping source index is out of range for the extracted data.
- A mapping source column was not found in the extracted data.

For JSON:
- The timestamp or value key could not be found with the specified query.
- Transformer did not receive any extracted data to parse.

### Targets / HydroServer

- HydroServer rejected some or all of the data.
- The target data series (datastream) could not be found.
- This may happen if the datastream was deleted or the mapping points to the wrong target.

### Unexpected System Error

- An internal system error occurred while processing the job.
- The job stopped before completion.

## Possible OK states:

These are the most important end-user messages the ETL system can return for a successful run.

- Load completed successfully.
- Load completed successfully (<n> rows loaded).
- Load completed successfully (<n> rows across <m> datastreams).
- Already up to date - no new observations loaded.
- No new observations were loaded.
- Already up to date - no new observations loaded (all timestamps were at or before <cutoff>).
- No data returned from the extractor. Nothing to load.
- Transform produced no rows. Nothing to load.
23 changes: 20 additions & 3 deletions src/hydroserverpy/etl/etl_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import Annotated, Dict, List, Literal, Optional, Union
from pydantic import BaseModel, Field, field_validator
from enum import Enum
from zoneinfo import ZoneInfo

WorkflowType = Literal["ETL", "Aggregation", "Virtual", "SDL"]
CSVDelimiterType = Literal[",", "|", "\t", ";", " "]
Expand Down Expand Up @@ -76,12 +77,28 @@ class Timestamp(BaseModel):

class Config:
populate_by_name = True
validate_default = True

@field_validator("timezone")
@field_validator("timezone", mode="after")
def check_timezone(cls, timezone_value, info):
mode = info.data.get("timezone_mode")
if mode == TimezoneMode.fixedOffset and timezone_value is None:
raise ValueError("`timezone` must be set when timezoneMode is fixedOffset")
if mode == TimezoneMode.fixedOffset:
if timezone_value is None:
raise ValueError(
"`timezone` must be set when timezoneMode is fixedOffset (e.g. '-0700')"
)
if mode == TimezoneMode.daylightSavings:
if timezone_value is None or str(timezone_value).strip() == "":
raise ValueError(
"Task configuration is missing required daylight savings offset (when using daylightSavings mode)."
)
# Validate it's a real IANA tz name early to avoid cryptic ZoneInfo errors later.
try:
ZoneInfo(str(timezone_value))
except Exception:
raise ValueError(
f"Invalid timezone {timezone_value!r}. Use an IANA timezone like 'America/Denver'."
)
return timezone_value


Expand Down
55 changes: 47 additions & 8 deletions src/hydroserverpy/etl/extractors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,44 @@
from datetime import datetime
from ..etl_configuration import ExtractorConfig, Task
from ..timestamp_parser import TimestampParser
from ..logging_utils import summarize_list


logger = logging.getLogger(__name__)


class Extractor:
def __init__(self, extractor_config: ExtractorConfig):
self.cfg = extractor_config
self.runtime_source_uri = None

def resolve_placeholder_variables(self, task: Task, loader):
logging.info(f"Creating runtime variables...")
placeholders = list(self.cfg.placeholder_variables or [])
filled = {}
for placeholder in self.cfg.placeholder_variables:
runtime_names: set[str] = set()
task_names: set[str] = set()
for placeholder in placeholders:
name = placeholder.name

if placeholder.type == "runTime":
logging.info(f"Resolving runtime var: {name}")
logger.debug("Resolving runtime var: %s", name)
runtime_names.add(name)
if placeholder.run_time_value == "latestObservationTimestamp":
value = loader.earliest_begin_date(task)
elif placeholder.run_time_value == "jobExecutionTime":
value = pd.Timestamp.now(tz="UTC")
elif placeholder.type == "perTask":
logging.info(f"Resolving task var: {name}")
logger.debug("Resolving task var: %s", name)
task_names.add(name)
if name not in task.extractor_variables:
raise KeyError(f"Missing per-task variable '{name}'")
logger.error(
"Missing per-task extractor variable '%s'. Provided extractorVariables keys=%s",
name,
summarize_list(sorted((task.extractor_variables or {}).keys())),
)
raise ValueError(
f"Missing required per-task extractor variable '{name}'."
)
value = task.extractor_variables[name]
else:
continue
Expand All @@ -35,16 +51,39 @@ def resolve_placeholder_variables(self, task: Task, loader):
value = parser.utc_to_string(value)

filled[name] = value

if runtime_names:
names = ", ".join(sorted(runtime_names))
logger.debug(
"Runtime variables resolved (%s): %s", len(runtime_names), names
)
if task_names:
names = ", ".join(sorted(task_names))
logger.debug("Task variables resolved (%s): %s", len(task_names), names)

if not filled:
return self.cfg.source_uri
return self.format_uri(filled)
uri = self.cfg.source_uri
else:
uri = self.format_uri(filled)

self.runtime_source_uri = uri
# Keep a stable log prefix for downstream parsing.
logger.info("Resolved runtime source URI: %s", uri)
return uri

def format_uri(self, placeholder_variables):
try:
uri = self.cfg.source_uri.format(**placeholder_variables)
except KeyError as e:
missing_key = e.args[0]
raise KeyError(f"Missing placeholder variable: {missing_key}")
logger.error(
"Failed to format sourceUri: missing placeholder '%s'. Provided placeholders=%s",
missing_key,
summarize_list(sorted(placeholder_variables.keys())),
)
raise ValueError(
f"Extractor source URI contains a placeholder '{missing_key}', but it was not provided."
)
return uri

@abstractmethod
Expand Down
Loading
Loading