Skip to content

Commit fff45e9

Browse files
committed
refactor: further logging details
1 parent 98cea40 commit fff45e9

File tree

6 files changed

+24
-6
lines changed

6 files changed

+24
-6
lines changed

src/dve/core_engine/backends/base/rules.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -413,8 +413,12 @@ def apply_sync_filters(
413413
if not success:
414414
return messages, False
415415

416+
self.logger.info(f"Filter {rule.reporting.code} found {len(temp_messages)} issues")
417+
416418
if filter_column_names:
417-
self.logger.info("Filtering records where validation is record level")
419+
self.logger.info(
420+
f"Filtering records from entity {entity_name} for error code {rule.reporting.code}" # pylint: disable=line-too-long
421+
)
418422
success_condition = " AND ".join(
419423
[f"({c_name} IS NOT NULL AND {c_name})" for c_name in filter_column_names]
420424
)
@@ -476,7 +480,7 @@ def apply_rules(self, entities: Entities, rule_metadata: RuleMetadata) -> Messag
476480
rules_and_locals = rule_metadata
477481

478482
messages: Messages = []
479-
483+
480484
self.logger.info("Applying pre-sync steps")
481485
for rule, local_variables in rules_and_locals:
482486
for step in rule.pre_sync_steps:
@@ -505,7 +509,7 @@ def apply_rules(self, entities: Entities, rule_metadata: RuleMetadata) -> Messag
505509
return messages
506510

507511
self.logger.info("Applying post-sync steps")
508-
512+
509513
for rule, local_variables in rules_and_locals:
510514
for step in rule.post_sync_steps:
511515
if rule_metadata.templating_strategy == "runtime":

src/dve/core_engine/backends/implementations/duckdb/contract.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,6 @@ def apply_data_contract(
102102
self, entities: DuckDBEntities, contract_metadata: DataContractMetadata
103103
) -> tuple[DuckDBEntities, Messages, StageSuccessful]:
104104
"""Apply the data contract to the duckdb relations"""
105-
self.logger.info("Applying data contracts")
106105
all_messages: Messages = []
107106

108107
successful = True
@@ -131,6 +130,9 @@ def apply_data_contract(
131130
coerce_inferred_numpy_array_to_list(relation.df()).apply(
132131
application_helper, axis=1
133132
) # pandas uses eager evaluation so potential memory issue here?
133+
self.logger.info(
134+
f"Data contract found {len(application_helper.errors)} issues in {entity_name}"
135+
)
134136
all_messages.extend(application_helper.errors)
135137

136138
casting_statements = [

src/dve/core_engine/backends/implementations/spark/contract.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,10 @@ def apply_data_contract(
113113
# .persist(storageLevel=StorageLevel.MEMORY_AND_DISK)
114114
)
115115
messages = validated.flatMap(lambda row: row[1]).filter(bool)
116+
messages.cache()
117+
self.logger.info(f"Data contract found {messages.count()} issues in {entity_name}")
116118
all_messages.extend(messages.collect())
119+
messages.unpersist()
117120

118121
try:
119122
record_df = record_df.select(

src/dve/pipeline/duckdb_pipeline.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ class DDBDVEPipeline(BaseDVEPipeline):
2121
"""
2222
Modified Pipeline class for running a DVE Pipeline with Spark
2323
"""
24+
2425
# pylint: disable=R0913
2526
def __init__(
2627
self,

src/dve/pipeline/pipeline.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,7 @@ def audit_received_file_step(
231231
self, pool: ThreadPoolExecutor, submitted_files: Iterable[tuple[FileURI, InfoURI]]
232232
) -> tuple[list[SubmissionInfo], list[SubmissionInfo]]:
233233
"""Set files as being received and mark them for file transformation"""
234+
self._logger.info("Starting audit received file service")
234235
audit_received_futures: list[tuple[str, FileURI, Future]] = []
235236
for submission_file in submitted_files:
236237
data_uri, metadata_uri = submission_file
@@ -292,7 +293,7 @@ def file_transformation(
292293
"""Transform a file from its original format into a 'stringified' parquet file"""
293294
if not self.processed_files_path:
294295
raise AttributeError("processed files path not provided")
295-
296+
self._logger.info(f"Applying file transformation to {submission_info.submission_id}")
296297
errors: list[FeedbackMessage] = []
297298
submission_status: SubmissionStatus = SubmissionStatus()
298299
submission_file_uri: URI = fh.joinuri(
@@ -327,6 +328,7 @@ def file_transformation_step(
327328
list[tuple[SubmissionInfo, SubmissionStatus]], list[tuple[SubmissionInfo, SubmissionStatus]]
328329
]:
329330
"""Step to transform files from their original format into parquet files"""
331+
self._logger.info("Starting file transformation service")
330332
file_transform_futures: list[tuple[SubmissionInfo, Future]] = []
331333

332334
for submission_info in submissions_to_process:
@@ -398,6 +400,7 @@ def apply_data_contract(
398400
self, submission_info: SubmissionInfo, submission_status: Optional[SubmissionStatus] = None
399401
) -> tuple[SubmissionInfo, SubmissionStatus]:
400402
"""Method for applying the data contract given a submission_info"""
403+
self._logger.info(f"Applying data contract to {submission_info.submission_id}")
401404
if not submission_status:
402405
submission_status = self.get_submission_status(
403406
"contract", submission_info.submission_id
@@ -451,6 +454,7 @@ def data_contract_step(
451454
list[tuple[SubmissionInfo, SubmissionStatus]], list[tuple[SubmissionInfo, SubmissionStatus]]
452455
]:
453456
"""Step to validate the types of an untyped (stringly typed) parquet file"""
457+
self._logger.info("Starting data contract service")
454458
processed_files: list[tuple[SubmissionInfo, SubmissionStatus]] = []
455459
failed_processing: list[tuple[SubmissionInfo, SubmissionStatus]] = []
456460
dc_futures: list[tuple[SubmissionInfo, SubmissionStatus, Future]] = []
@@ -518,6 +522,7 @@ def apply_business_rules(
518522
"""Apply the business rules to a given submission, the submission may have failed at the
519523
data_contract step so this should be passed in as a bool
520524
"""
525+
self._logger.info(f"Applying business rules to {submission_info.submission_id}")
521526
if not submission_status:
522527
submission_status = self.get_submission_status(
523528
"business_rules", submission_info.submission_id
@@ -607,6 +612,7 @@ def business_rule_step(
607612
list[tuple[SubmissionInfo, SubmissionStatus]],
608613
]:
609614
"""Step to apply business rules (Step impl) to a typed parquet file"""
615+
self._logger.info("Starting business rules service")
610616
future_files: list[tuple[SubmissionInfo, SubmissionStatus, Future]] = []
611617

612618
for submission_info, submission_status in files:
@@ -748,7 +754,7 @@ def error_report(
748754
SubmissionInfo, SubmissionStatus, Optional[SubmissionStatisticsRecord], Optional[URI]
749755
]:
750756
"""Creates the error reports given a submission info and submission status"""
751-
self._logger.info("Generating error report")
757+
self._logger.info(f"Generating error report for {submission_info.submission_id}")
752758
if not submission_status:
753759
submission_status = self.get_submission_status(
754760
"error_report", submission_info.submission_id
@@ -816,6 +822,7 @@ def error_report_step(
816822
"""Step to produce error reports
817823
takes processed files and files that failed file transformation
818824
"""
825+
self._logger.info("Starting error reports service")
819826
futures: list[tuple[SubmissionInfo, SubmissionStatus, Future]] = []
820827
reports: list[
821828
tuple[SubmissionInfo, SubmissionStatus, Union[None, SubmissionStatisticsRecord], URI]

src/dve/pipeline/spark_pipeline.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ class SparkDVEPipeline(BaseDVEPipeline):
2323
"""
2424
Polymorphed Pipeline class for running a DVE Pipeline with Spark
2525
"""
26+
2627
# pylint: disable=R0913
2728
def __init__(
2829
self,

0 commit comments

Comments
 (0)