Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
354 changes: 8 additions & 346 deletions src/mdio/converters/segy.py

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions src/mdio/ingestion/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""MDIO ingestion helpers."""
77 changes: 77 additions & 0 deletions src/mdio/ingestion/coordinates.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
"""Generic coordinate population for ingestion."""

from __future__ import annotations

from typing import TYPE_CHECKING

import numpy as np

from mdio.segy.scalar import SCALE_COORDINATE_KEYS
from mdio.segy.scalar import _apply_coordinate_scalar

if TYPE_CHECKING:
from segy.arrays import HeaderArray as SegyHeaderArray
from xarray import Dataset as xr_Dataset

from mdio.core.grid import Grid


def populate_dim_coordinates(
dataset: xr_Dataset, grid: Grid, drop_vars_delayed: list[str]
) -> tuple[xr_Dataset, list[str]]:
"""Populate the xarray dataset with dimension coordinate variables."""
for dim in grid.dims:
dataset[dim.name].values[:] = dim.coords
drop_vars_delayed.append(dim.name)
return dataset, drop_vars_delayed


def populate_non_dim_coordinates(
dataset: xr_Dataset,
grid: Grid,
coordinates: dict[str, SegyHeaderArray],
drop_vars_delayed: list[str],
spatial_coordinate_scalar: int,
) -> tuple[xr_Dataset, list[str]]:
"""Populate the xarray dataset with coordinate variables.

Memory optimization: Processes coordinates one at a time and explicitly
releases intermediate arrays to reduce peak memory usage.
"""
non_data_domain_dims = grid.dim_names[:-1]

coord_names = list(coordinates.keys())
for coord_name in coord_names:
coord_values = coordinates.pop(coord_name)
da_coord = dataset[coord_name]

coord_shape = da_coord.shape

fill_value = da_coord.encoding.get("_FillValue") or da_coord.encoding.get("fill_value")
if fill_value is None:
fill_value = np.nan
tmp_coord_values = np.full(coord_shape, fill_value, dtype=da_coord.dtype)

coord_axes = tuple(non_data_domain_dims.index(coord_dim) for coord_dim in da_coord.dims)
coord_slices = tuple(slice(None) if idx in coord_axes else 0 for idx in range(len(non_data_domain_dims)))

coord_trace_indices = np.asarray(grid.map[coord_slices])

not_null = coord_trace_indices != grid.map.fill_value

if not_null.any():
valid_indices = coord_trace_indices[not_null]
tmp_coord_values[not_null] = coord_values[valid_indices]

if coord_name in SCALE_COORDINATE_KEYS:
tmp_coord_values = _apply_coordinate_scalar(tmp_coord_values, spatial_coordinate_scalar)

dataset[coord_name][:] = tmp_coord_values
drop_vars_delayed.append(coord_name)

del tmp_coord_values, coord_trace_indices, not_null, coord_values

# TODO(Altay): Add verification of reduced coordinates being the same as the first
# https://github.com/TGSAI/mdio-python/issues/645

return dataset, drop_vars_delayed
69 changes: 69 additions & 0 deletions src/mdio/ingestion/grid_qc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""Grid sparsity quality control for ingestion."""

from __future__ import annotations

import logging
from typing import TYPE_CHECKING

import numpy as np

from mdio.converters.exceptions import GridTraceSparsityError
from mdio.core.config import MDIOSettings

if TYPE_CHECKING:
from mdio.core.grid import Grid

logger = logging.getLogger(__name__)


def grid_density_qc(grid: Grid, num_traces: int) -> None:
"""Quality control for sensible grid density during SEG-Y to MDIO conversion.

This function checks the density of the proposed grid by comparing the total possible traces
(`grid_traces`) to the actual number of traces in the SEG-Y file (`num_traces`). A warning is
logged if the sparsity ratio (`grid_traces / num_traces`) exceeds a configurable threshold,
indicating potential inefficiency or misconfiguration.

The warning threshold is set via the environment variable `MDIO__GRID__SPARSITY_RATIO_WARN`
(default 2), and the error threshold via `MDIO__GRID__SPARSITY_RATIO_LIMIT` (default 10). To
suppress the exception (but still log warnings), set `MDIO_IGNORE_CHECKS=1`.

Args:
grid: The Grid instance to check.
num_traces: Expected number of traces from the SEG-Y file.

Raises:
GridTraceSparsityError: If the sparsity ratio exceeds `MDIO__GRID__SPARSITY_RATIO_LIMIT`
and `MDIO_IGNORE_CHECKS` is not set to a truthy value (e.g., "1", "true").
"""
settings = MDIOSettings()
grid_traces = np.prod(grid.shape[:-1], dtype=np.uint64)

sparsity_ratio = float("inf") if num_traces == 0 else grid_traces / num_traces

warning_ratio = settings.grid_sparsity_ratio_warn
error_ratio = settings.grid_sparsity_ratio_limit
ignore_checks = settings.ignore_checks

should_warn = sparsity_ratio > warning_ratio
should_error = sparsity_ratio > error_ratio and not ignore_checks

if not should_warn and not should_error:
return

dims = dict(zip(grid.dim_names, grid.shape, strict=True))
msg = (
f"Ingestion grid is sparse. Sparsity ratio: {sparsity_ratio:.2f}. "
f"Ingestion grid: {dims}. "
f"SEG-Y trace count: {num_traces}, grid trace count: {grid_traces}."
)
for dim_name in grid.dim_names:
dim_min = grid.get_min(dim_name)
dim_max = grid.get_max(dim_name)
msg += f"\n{dim_name} min: {dim_min} max: {dim_max}"

if should_warn:
logger.warning(msg)

if should_error:
raise GridTraceSparsityError(grid.shape, num_traces, msg)
18 changes: 18 additions & 0 deletions src/mdio/ingestion/metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
"""Generic dataset metadata helpers for ingestion."""

from __future__ import annotations

from typing import TYPE_CHECKING
from typing import Any

if TYPE_CHECKING:
from mdio.builder.schemas import Dataset


def _add_grid_override_to_metadata(dataset: Dataset, grid_overrides: dict[str, Any] | None) -> None:
"""Add grid override to Dataset metadata if needed."""
if dataset.metadata.attributes is None:
dataset.metadata.attributes = {}

if grid_overrides is not None:
dataset.metadata.attributes["gridOverrides"] = grid_overrides
1 change: 1 addition & 0 deletions src/mdio/ingestion/segy/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""SEG-Y specific ingestion helpers."""
157 changes: 157 additions & 0 deletions src/mdio/ingestion/segy/coordinates.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
"""Coordinate extraction and unit resolution for SEG-Y ingestion."""

from __future__ import annotations

import logging
from typing import TYPE_CHECKING

import numpy as np
from segy.standards.codes import MeasurementSystem as SegyMeasurementSystem
from segy.standards.fields import binary as binary_header_fields

from mdio.builder.schemas.v1.units import AngleUnitEnum
from mdio.builder.schemas.v1.units import AngleUnitModel
from mdio.builder.schemas.v1.units import LengthUnitEnum
from mdio.builder.schemas.v1.units import LengthUnitModel
from mdio.ingestion.coordinates import populate_dim_coordinates
from mdio.ingestion.coordinates import populate_non_dim_coordinates

if TYPE_CHECKING:
from segy.arrays import HeaderArray as SegyHeaderArray
from xarray import Dataset as xr_Dataset

from mdio.builder.templates.base import AbstractDatasetTemplate
from mdio.core.dimension import Dimension
from mdio.core.grid import Grid
from mdio.segy.file import SegyFileInfo

logger = logging.getLogger(__name__)


MEASUREMENT_SYSTEM_KEY = binary_header_fields.Rev0.MEASUREMENT_SYSTEM_CODE.model.name
ANGLE_UNIT_KEYS = ["angle", "azimuth"]
SPATIAL_UNIT_KEYS = [
"cdp_x",
"cdp_y",
"source_coord_x",
"source_coord_y",
"group_coord_x",
"group_coord_y",
"offset",
]


def _get_coordinates(
grid: Grid,
segy_headers: SegyHeaderArray,
mdio_template: AbstractDatasetTemplate,
) -> tuple[list[Dimension], dict[str, SegyHeaderArray]]:
"""Get the data dim and non-dim coordinates from the SEG-Y headers and MDIO template.

Select a subset of the segy_dimensions that corresponds to the MDIO dimensions
The dimensions are ordered as in the MDIO template.
The last dimension is always the vertical domain dimension

Args:
grid: Inferred MDIO grid for SEG-Y file.
segy_headers: Headers read in from SEG-Y file.
mdio_template: The MDIO template to use for the conversion.

Raises:
ValueError: If a dimension or coordinate name from the MDIO template is not found in
the SEG-Y headers.

Returns:
A tuple containing:
- A list of dimension coordinates (1-D arrays).
- A dict of non-dimension coordinates (str: N-D arrays).
"""
dimensions_coords = []
for dim_name in mdio_template.dimension_names:
if dim_name not in grid.dim_names:
err = f"Dimension '{dim_name}' was not found in SEG-Y dimensions."
raise ValueError(err)
dimensions_coords.append(grid.select_dim(dim_name))

non_dim_coords: dict[str, SegyHeaderArray] = {}
for coord_name in mdio_template.coordinate_names:
if coord_name not in segy_headers.dtype.names:
err = f"Coordinate '{coord_name}' not found in SEG-Y dimensions."
raise ValueError(err)
non_dim_coords[coord_name] = np.array(segy_headers[coord_name])

return dimensions_coords, non_dim_coords


def _populate_coordinates(
dataset: xr_Dataset,
grid: Grid,
coords: dict[str, SegyHeaderArray],
spatial_coordinate_scalar: int,
) -> tuple[xr_Dataset, list[str]]:
"""Populate dim and non-dim coordinates in the xarray dataset and write to Zarr.

This will write the xr Dataset with coords and dimensions, but empty traces and headers.

Args:
dataset: The xarray dataset to populate.
grid: The grid object containing the grid map.
coords: The non-dim coordinates to populate.
spatial_coordinate_scalar: The X/Y coordinate scalar from the SEG-Y file.

Returns:
Xarray dataset with filled coordinates and updated variables to drop after writing
"""
drop_vars_delayed = []
dataset, drop_vars_delayed = populate_dim_coordinates(dataset, grid, drop_vars_delayed=drop_vars_delayed)

dataset, drop_vars_delayed = populate_non_dim_coordinates(
dataset,
grid,
coordinates=coords,
drop_vars_delayed=drop_vars_delayed,
spatial_coordinate_scalar=spatial_coordinate_scalar,
)

return dataset, drop_vars_delayed


def _get_spatial_coordinate_unit(segy_file_info: SegyFileInfo) -> LengthUnitModel | None:
"""Get the coordinate unit from the SEG-Y headers."""
measurement_system_code = int(segy_file_info.binary_header_dict[MEASUREMENT_SYSTEM_KEY])

if measurement_system_code not in (1, 2):
logger.warning(
"Unexpected value in coordinate unit (%s) header: %s. Can't extract coordinate unit and will "
"ingest without coordinate units.",
MEASUREMENT_SYSTEM_KEY,
measurement_system_code,
)
return None

if measurement_system_code == SegyMeasurementSystem.METERS:
unit = LengthUnitEnum.METER
if measurement_system_code == SegyMeasurementSystem.FEET:
unit = LengthUnitEnum.FOOT

return LengthUnitModel(length=unit)


def _update_template_units(template: AbstractDatasetTemplate, unit: LengthUnitModel | None) -> AbstractDatasetTemplate:
"""Update the template with dynamic and some pre-defined units."""
new_units = {key: AngleUnitModel(angle=AngleUnitEnum.DEGREES) for key in ANGLE_UNIT_KEYS}

if unit is None:
template.add_units(new_units)
return template

for key in SPATIAL_UNIT_KEYS:
current_value = template.get_unit_by_key(key)
if current_value is not None:
logger.warning("Unit for %s already in template. Will keep the original unit: %s", key, current_value)
continue

new_units[key] = unit

template.add_units(new_units)
return template
48 changes: 48 additions & 0 deletions src/mdio/ingestion/segy/file_headers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
"""Attach SEG-Y text and binary file headers to the dataset."""

from __future__ import annotations

import base64
from typing import TYPE_CHECKING

from mdio.core.config import MDIOSettings

if TYPE_CHECKING:
from xarray import Dataset as xr_Dataset

from mdio.segy.file import SegyFileInfo


def _add_segy_file_headers(xr_dataset: xr_Dataset, segy_file_info: SegyFileInfo) -> xr_Dataset:
"""Attach the SEG-Y text and binary file headers as attrs on a scalar variable."""
settings = MDIOSettings()

if not settings.save_segy_file_header:
return xr_dataset

expected_rows = 40
expected_cols = 80

text_header_rows = segy_file_info.text_header.splitlines()
text_header_cols_bad = [len(row) != expected_cols for row in text_header_rows]

if len(text_header_rows) != expected_rows:
err = f"Invalid text header count: expected {expected_rows}, got {len(segy_file_info.text_header)}"
raise ValueError(err)

if any(text_header_cols_bad):
err = f"Invalid text header columns: expected {expected_cols} per line."
raise ValueError(err)

xr_dataset["segy_file_header"] = ((), "")
xr_dataset["segy_file_header"].attrs.update(
{
"textHeader": segy_file_info.text_header,
"binaryHeader": segy_file_info.binary_header_dict,
}
)
if settings.raw_headers:
raw_binary_base64 = base64.b64encode(segy_file_info.raw_binary_headers).decode("ascii")
xr_dataset["segy_file_header"].attrs.update({"rawBinaryHeader": raw_binary_base64})

return xr_dataset
Loading
Loading