Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
d628966
build: add flexible versions to most core dependencies and removed so…
georgeRobertson2 Jan 24, 2026
efe0fa0
feat: Change how error messages are generated (by writing in batches)…
stevenhsd Jan 26, 2026
f1fd8aa
style: run black, isort and resolve linting issues
stevenhsd Jan 27, 2026
243a319
style: linting and static tpying now passing
stevenhsd Jan 28, 2026
233e18f
test: reenabled tests removed in error
stevenhsd Jan 28, 2026
c64da59
refactor: Modified business rule step to write feedback messages in b…
stevenhsd Jan 28, 2026
141e1b3
refactor: merge in main and resolve conflicts and linting issues
stevenhsd Feb 3, 2026
aaea473
style: Additional logging around file transformation, business rules …
stevenhsd Feb 6, 2026
fc120b1
refactor: merging logging additions from release branch
stevenhsd Feb 6, 2026
68f5686
feat: Add read of arrow ipc files to reference data loaders
stevenhsd Feb 6, 2026
70d2399
feat: added reference data loading of arrow ipc files including enhan…
stevenhsd Feb 9, 2026
d82ec35
style: Additional logging around file transformation, business rules …
georgeRobertson Feb 10, 2026
cc77eff
refactor: merging develop v06
stevenhsd Feb 10, 2026
5a5f3da
Merge branch 'feature/ndit-821_address_ddb_dc_performance_issues' of …
stevenhsd Feb 10, 2026
5a0b684
ci: deal with mypy error and remove redundant caching of poetry lock …
georgeRobertson2 Feb 10, 2026
3444690
ci: remove poetry lock from ci commands and refactored to install exa…
georgeRobertson2 Feb 10, 2026
61a70d8
ci: remove branches scope of ci as likely issue with it not triggering
georgeRobertson2 Feb 10, 2026
be0a630
Merge pull request #29 from georgeRobertson2/build/sort_dependencies
georgeRobertson Feb 10, 2026
a3b0ace
Merge branch 'develop_v06' of https://github.com/NHSDigital/data-vali…
stevenhsd Feb 10, 2026
2e2f236
refactor: address review comments
stevenhsd Feb 10, 2026
85e9693
refactor: amended process pool to use executor supplied to duckdb pip…
stevenhsd Feb 11, 2026
c2ad557
Merge pull request #37 from NHSDigital/feature/ndit-655_refdata_arrow…
stevenhsd Feb 12, 2026
9befcc0
refactor: removal of processing pool for duckdb data contract
stevenhsd Feb 13, 2026
0e36111
test: amend error utils test to load newline delimited json
stevenhsd Feb 13, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 1 addition & 11 deletions .github/workflows/ci_linting.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ name: CI Formatting & Linting
on:
pull_request:
types: [opened, reopened, synchronize]
branches:
- main


jobs:
Expand All @@ -30,17 +28,9 @@ jobs:
- name: ensure poetry using desired python version
run: poetry env use $(asdf which python)

- name: Cache Poetry virtualenv
uses: actions/cache@v4
with:
path: ~/.cache/pypoetry
key: ${{ runner.os }}-poetry-${{ hashFiles('**/poetry.lock') }}
restore-keys: |
${{ runner.os }}-poetry-

- name: Install lint dependencies
run: |
make install
poetry install --sync --no-interaction --with lint

- name: Run black
run: poetry run black src
Expand Down
10 changes: 1 addition & 9 deletions .github/workflows/ci_testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,10 @@ jobs:

- name: ensure poetry using desired python version
run: poetry env use $(asdf which python)

- name: Cache Poetry virtualenv
uses: actions/cache@v4
with:
path: ~/.cache/pypoetry
key: ${{ runner.os }}-poetry-${{ hashFiles('**/poetry.lock') }}
restore-keys: |
${{ runner.os }}-poetry-

- name: Install test dependencies
run: |
make install
poetry install --sync --no-interaction --with test

- name: Run pytest and coverage
run: |
Expand Down
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ share/python-wheels/
.installed.cfg
*.egg
MANIFEST
poetry.lock

# PyInstaller
# Usually these files are written by a python script from a template
Expand Down
4 changes: 2 additions & 2 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ for entity in data_contract_config.schemas:

# Data contract step here
data_contract = SparkDataContract(spark_session=spark)
entities, validation_messages, success = data_contract.apply_data_contract(
entities, data_contract_config
entities, feedback_errors_uri, success = data_contract.apply_data_contract(
entities, None, data_contract_config
)
```

Expand Down
3,130 changes: 3,130 additions & 0 deletions poetry.lock

Large diffs are not rendered by default.

40 changes: 14 additions & 26 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,19 @@ classifiers = [

[tool.poetry.dependencies]
python = ">=3.10,<3.12"
boto3 = "1.34.162"
botocore = "1.34.162"
delta-spark = "2.4.0"
duckdb = "1.1.0" # mitigates security vuln in < 1.1.0
formulas = "1.2.4"
idna = "3.7" # Downstream dep of requests but has security vuln < 3.7
Jinja2 = "3.1.6" # mitigates security vuln in < 3.1.6
lxml = "4.9.1"
openpyxl = "3.1.0"
pandas = "2.2.2"
polars = "0.20.14"
pyarrow = "17.0.0"
pydantic = "1.10.15" # Mitigates security vuln in < 1.10.13
pymongo = "4.6.3"
pyspark = "3.4.4"
pytz = "2022.1"
PyYAML = "6.0.3"
requests = "2.32.4" # Mitigates security vuln in < 2.31.0
schedula = "1.2.19"
sqlalchemy = "2.0.19"
typing_extensions = "4.6.2"
urllib3 = "2.6.3" # Mitigates security vuln in < 2.6.0
xmltodict = "0.13.0"
boto3 = ">=1.34.162,<1.36" # breaking change beyond 1.36
botocore = ">=1.34.162,<1.36" # breaking change beyond 1.36
delta-spark = "2.4.*"
duckdb = "1.1.*" # breaking changes beyond 1.1
Jinja2 = "3.1.*"
lxml = "^4.9.1"
openpyxl = "^3.1"
pandas = "^2.2.2"
polars = "0.20.*"
pyarrow = "^17.0.0"
pydantic = "1.10.15"
pyspark = "3.4.*"
typing_extensions = "^4.6.2"

[tool.poetry.group.dev]
optional = true
Expand All @@ -62,7 +52,6 @@ behave = "1.3.3"
coverage = "7.11.0"
moto = {extras = ["s3"], version = "4.0.13"}
Werkzeug = "3.0.6" # Dependency of moto which needs 3.0.6 for security vuln mitigation
mongomock = "4.1.2"
pytest = "8.4.2"
pytest-lazy-fixtures = "1.4.0" # switched from https://github.com/TvoroG/pytest-lazy-fixture as it's no longer supported
xlsx2csv = "0.8.2"
Expand Down Expand Up @@ -98,10 +87,9 @@ ignore = ["B028", "D213", "D203", "D205", "D107", "D105"]

[tool.mypy]
plugins = ["pydantic.mypy"]
enable_recursive_aliases = true

[[tool.mypy.overrides]]
module = "polars"
module = "polars.*"
follow_imports = "skip"
# ^language server knows what's going on, but mypy can't find attributes on Self? type

Expand Down
Empty file added src/dve/common/__init__.py
Empty file.
189 changes: 189 additions & 0 deletions src/dve/common/error_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
"""Utilities to support reporting"""

import datetime as dt
import json
import logging
from collections.abc import Iterable
from itertools import chain
from multiprocessing import Queue
from threading import Thread
from typing import Optional, Union

import dve.parser.file_handling as fh
from dve.core_engine.exceptions import CriticalProcessingError
from dve.core_engine.loggers import get_logger
from dve.core_engine.message import UserMessage
from dve.core_engine.type_hints import URI, DVEStageName, Messages


def get_feedback_errors_uri(working_folder: URI, step_name: DVEStageName) -> URI:
"""Determine the location of json lines file containing all errors generated in a step"""
return fh.joinuri(working_folder, "errors", f"{step_name}_errors.jsonl")


def get_processing_errors_uri(working_folder: URI) -> URI:
"""Determine the location of json lines file containing all processing
errors generated from DVE run"""
return fh.joinuri(working_folder, "processing_errors", "processing_errors.jsonl")


def dump_feedback_errors(
working_folder: URI,
step_name: DVEStageName,
messages: Messages,
key_fields: Optional[dict[str, list[str]]] = None,
) -> URI:
"""Write out captured feedback error messages."""
if not working_folder:
raise AttributeError("processed files path not passed")

if not key_fields:
key_fields = {}

error_file = get_feedback_errors_uri(working_folder, step_name)
processed = []

for message in messages:
if message.original_entity is not None:
primary_keys = key_fields.get(message.original_entity, [])
elif message.entity is not None:
primary_keys = key_fields.get(message.entity, [])
else:
primary_keys = []

error = message.to_dict(
key_field=primary_keys,
value_separator=" -- ",
max_number_of_values=10,
record_converter=None,
)
error["Key"] = conditional_cast(error["Key"], primary_keys, value_separator=" -- ")
processed.append(error)

with fh.open_stream(error_file, "a") as f:
f.write("\n".join([json.dumps(rec, default=str) for rec in processed]) + "\n")
return error_file


def dump_processing_errors(
working_folder: URI, step_name: str, errors: list[CriticalProcessingError]
):
"""Write out critical processing errors"""
if not working_folder:
raise AttributeError("processed files path not passed")
if not step_name:
raise AttributeError("step name not passed")
if not errors:
raise AttributeError("errors list not passed")

error_file: URI = get_processing_errors_uri(working_folder)
processed = []

for error in errors:
processed.append(
{
"step_name": step_name,
"error_location": "processing",
"error_level": "integrity",
"error_message": error.error_message,
"error_traceback": error.messages,
}
)

with fh.open_stream(error_file, "a") as f:
f.write("\n".join([json.dumps(rec, default=str) for rec in processed]) + "\n")

return error_file


def load_feedback_messages(feedback_messages_uri: URI) -> Iterable[UserMessage]:
"""Load user messages from jsonl file"""
if not fh.get_resource_exists(feedback_messages_uri):
return
with fh.open_stream(feedback_messages_uri) as errs:
yield from (UserMessage(**json.loads(err)) for err in errs.readlines())


def load_all_error_messages(error_directory_uri: URI) -> Iterable[UserMessage]:
"Load user messages from all jsonl files"
return chain.from_iterable(
[
load_feedback_messages(err_file)
for err_file, _ in fh.iter_prefix(error_directory_uri)
if err_file.endswith(".jsonl")
]
)


class BackgroundMessageWriter:
"""Controls batch writes to error jsonl files"""

def __init__(
self,
working_directory: URI,
dve_stage: DVEStageName,
key_fields: Optional[dict[str, list[str]]] = None,
logger: Optional[logging.Logger] = None,
):
self._working_directory = working_directory
self._dve_stage = dve_stage
self._feedback_message_uri = get_feedback_errors_uri(
self._working_directory, self._dve_stage
)
self._key_fields = key_fields
self.logger = logger or get_logger(type(self).__name__)
self._write_thread: Optional[Thread] = None
self._queue: Queue = Queue()

@property
def write_queue(self) -> Queue: # type: ignore
"""Queue for storing batches of messages to be written"""
return self._queue

@property
def write_thread(self) -> Thread: # type: ignore
"""Thread to write batches of messages to jsonl file"""
if not self._write_thread:
self._write_thread = Thread(target=self._write_process_wrapper)
return self._write_thread

def _write_process_wrapper(self):
"""Wrapper for dump feedback errors to run in background process"""
# writing thread will block if nothing in queue
while True:
if msgs := self.write_queue.get():
dump_feedback_errors(
self._working_directory, self._dve_stage, msgs, self._key_fields
)
else:
break

def __enter__(self) -> "BackgroundMessageWriter":
self.write_thread.start()
return self

def __exit__(self, exc_type, exc_value, traceback):
if exc_type:
self.logger.exception(
"Issue occured during background write process:",
exc_info=(exc_type, exc_value, traceback),
)
# None value in queue will trigger break in target
self.write_queue.put(None)
self.write_thread.join()


def conditional_cast(value, primary_keys: list[str], value_separator: str) -> Union[list[str], str]:
"""Determines what to do with a value coming back from the error list"""
if isinstance(value, list):
casts = [
conditional_cast(val, primary_keys, value_separator) for val in value
] # type: ignore
return value_separator.join(
[f"{pk}: {id}" if pk else "" for pk, id in zip(primary_keys, casts)]
)
if isinstance(value, dt.date):
return value.isoformat()
if isinstance(value, dict):
return ""
return str(value)
Loading
Loading