From af61be2819038de638d6d707b8547b1428b3a612 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Thu, 13 Feb 2025 08:32:58 +0000 Subject: [PATCH 01/18] Get 'feedback_queue' from security config or transport object instead of machine config --- src/murfey/server/demo_api.py | 27 +++++++++++++++---- src/murfey/workflows/clem/align_and_merge.py | 2 +- src/murfey/workflows/clem/process_raw_lifs.py | 5 +--- .../workflows/clem/process_raw_tiffs.py | 5 +--- 4 files changed, 25 insertions(+), 14 deletions(-) diff --git a/src/murfey/server/demo_api.py b/src/murfey/server/demo_api.py index 3c6b74886..08668ce18 100644 --- a/src/murfey/server/demo_api.py +++ b/src/murfey/server/demo_api.py @@ -40,7 +40,7 @@ from murfey.server.api import MurfeySessionID from murfey.server.api.auth import validate_token from murfey.server.murfey_db import murfey_db -from murfey.util.config import MachineConfig, from_file +from murfey.util.config import MachineConfig, from_file, security_from_file from murfey.util.db import ( AutoProcProgram, ClientEnvironment, @@ -962,9 +962,26 @@ def flush_spa_processing( return detached_ids = [c.id for c in collected_ids] - instrument_name = ( - db.exec(select(Session).where(Session.id == session_id)).one().instrument_name - ) + try: + instrument_name = ( + db.exec(select(Session).where(Session.id == session_id)) + .one() + .instrument_name + ) + except Exception: + log.error( + f"Unable to find a Murfey session associated with session ID {session_id}" + ) + return + + # Load the security config + security_config_file = machine_config[instrument_name].security_configuration_path + if not security_config_file: + log.error( + f"No security configuration file set for instrument {instrument_name!r}" + ) + return + security_config = security_from_file(security_config_file) murfey_ids = _murfey_id( detached_ids[3], db, number=2 * len(stashed_files), close=False @@ -986,7 +1003,7 @@ def flush_spa_processing( zocalo_message = { "recipes": ["em-spa-preprocess"], "parameters": { - "feedback_queue": machine_config[instrument_name].feedback_queue, + "feedback_queue": security_config.feedback_queue, "node_creator_queue": machine_config[ instrument_name ].node_creator_queue, diff --git a/src/murfey/workflows/clem/align_and_merge.py b/src/murfey/workflows/clem/align_and_merge.py index c5df9fecf..efe358e07 100644 --- a/src/murfey/workflows/clem/align_and_merge.py +++ b/src/murfey/workflows/clem/align_and_merge.py @@ -37,7 +37,7 @@ def submit_cluster_request( # Load feedback queue machine_config = get_machine_config()[instrument_name] - feedback_queue: str = machine_config.feedback_queue + feedback_queue: str = messenger.feedback_queue # Work out session directory from file path processed_folder = machine_config.processed_directory_name diff --git a/src/murfey/workflows/clem/process_raw_lifs.py b/src/murfey/workflows/clem/process_raw_lifs.py index d02e6ac1e..1d56bff68 100644 --- a/src/murfey/workflows/clem/process_raw_lifs.py +++ b/src/murfey/workflows/clem/process_raw_lifs.py @@ -6,8 +6,6 @@ from pathlib import Path from typing import Optional -from murfey.util.config import get_machine_config - try: from murfey.server.ispyb import TransportManager # Session except AttributeError: @@ -43,8 +41,7 @@ def zocalo_cluster_request( ) # Load machine config to get the feedback queue - machine_config = get_machine_config() - feedback_queue = machine_config[instrument_name].feedback_queue + feedback_queue: str = messenger.feedback_queue # Send the message # The keys under "parameters" will populate all the matching fields in {} diff --git a/src/murfey/workflows/clem/process_raw_tiffs.py b/src/murfey/workflows/clem/process_raw_tiffs.py index dbca8577e..52c371092 100644 --- a/src/murfey/workflows/clem/process_raw_tiffs.py +++ b/src/murfey/workflows/clem/process_raw_tiffs.py @@ -6,8 +6,6 @@ from pathlib import Path from typing import Optional -from murfey.util.config import get_machine_config - try: from murfey.server.ispyb import TransportManager # Session except AttributeError: @@ -50,8 +48,7 @@ def zocalo_cluster_request( metadata = tiff_list[0].parent / "Metadata" / (series_name + ".xlif") # Load machine config to get the feedback queue - machine_config = get_machine_config() - feedback_queue = machine_config[instrument_name].feedback_queue + feedback_queue: str = messenger.feedback_queue messenger.send( "processing_recipe", From dce6d93d90ea4bcc36fd7388b4103d3f6e05354b Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Thu, 13 Feb 2025 08:44:02 +0000 Subject: [PATCH 02/18] Sanitised 'session_id' object in log --- src/murfey/server/demo_api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/murfey/server/demo_api.py b/src/murfey/server/demo_api.py index 08668ce18..18f08797d 100644 --- a/src/murfey/server/demo_api.py +++ b/src/murfey/server/demo_api.py @@ -970,7 +970,7 @@ def flush_spa_processing( ) except Exception: log.error( - f"Unable to find a Murfey session associated with session ID {session_id}" + f"Unable to find a Murfey session associated with session ID {sanitise(str(session_id))}" ) return From f90f4ad3dd7fa8d90c0e20ec18f94e4b23f484fc Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Thu, 13 Feb 2025 09:14:19 +0000 Subject: [PATCH 03/18] Added draft unit test for 'process_raw_lifs' workflow --- tests/workflows/clem/test_process_raw_lifs.py | 75 +++++++++++++++++++ 1 file changed, 75 insertions(+) create mode 100644 tests/workflows/clem/test_process_raw_lifs.py diff --git a/tests/workflows/clem/test_process_raw_lifs.py b/tests/workflows/clem/test_process_raw_lifs.py new file mode 100644 index 000000000..284dbe3ea --- /dev/null +++ b/tests/workflows/clem/test_process_raw_lifs.py @@ -0,0 +1,75 @@ +from pathlib import Path +from unittest.mock import MagicMock + +import pytest + +from murfey.server.ispyb import TransportManager +from murfey.workflows.clem.process_raw_lifs import zocalo_cluster_request + +# Set up variables +visit_name = "cm12345-6" +root_folder = "images" +session_id = 0 +instrument_name = "clem" +feedback_queue = "murfey_feedback" + + +@pytest.fixture +def raw_dir(tmp_path: Path): + raw_dir = tmp_path / visit_name / root_folder + raw_dir.mkdir(parents=True, exist_ok=True) + return raw_dir + + +@pytest.fixture +def lif_file(raw_dir: Path): + file = raw_dir / "test_file.lif" + if not file.exists(): + file.touch() + return file + + +def test_zocalo_cluster_request( + lif_file: Path, + raw_dir: Path, +): + + # Create a mock tranpsort object + mock_transport = MagicMock(spec=TransportManager) + mock_transport.feedback_queue = feedback_queue + + # Run the function with the listed parameters + zocalo_cluster_request( + file=lif_file, + root_folder=root_folder, + session_id=session_id, + instrument_name=instrument_name, + messenger=mock_transport, + ) + + # Construct the recipe that we expect to send + job_name = "--".join( + [ + p.replace(" ", "_") if " " in p else p + for p in (lif_file.relative_to(raw_dir).parent / lif_file.stem).parts + ] + ) + sent_recipe = { + "recipes": ["clem-lif-to-stack"], + "parameters": { + # Job parameters + "lif_file": f"{str(lif_file)}", + "root_folder": root_folder, + # Other recipe parameters + "session_dir": f"{str(raw_dir.parent)}", + "session_id": session_id, + "job_name": job_name, + "feedback_queue": feedback_queue, + }, + } + mock_transport.send.assert_called_once_with( + "processing_recipe", + sent_recipe, + new_connection=True, + ) + pass From 166d1420774c06d9d6d83c42d6b67dc2fc368397 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Thu, 13 Feb 2025 09:18:55 +0000 Subject: [PATCH 04/18] Removed leftover 'pass' --- tests/workflows/clem/test_process_raw_lifs.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/workflows/clem/test_process_raw_lifs.py b/tests/workflows/clem/test_process_raw_lifs.py index 284dbe3ea..5db0846ed 100644 --- a/tests/workflows/clem/test_process_raw_lifs.py +++ b/tests/workflows/clem/test_process_raw_lifs.py @@ -72,4 +72,3 @@ def test_zocalo_cluster_request( sent_recipe, new_connection=True, ) - pass From d593a561ee5ad483c6fafaa5b6f7a725fcc19d93 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Thu, 13 Feb 2025 09:41:22 +0000 Subject: [PATCH 05/18] Added unit test for 'process_raw_tiffs' workflow --- tests/workflows/clem/test_process_raw_lifs.py | 2 + .../workflows/clem/test_process_raw_tiffs.py | 101 ++++++++++++++++++ 2 files changed, 103 insertions(+) create mode 100644 tests/workflows/clem/test_process_raw_tiffs.py diff --git a/tests/workflows/clem/test_process_raw_lifs.py b/tests/workflows/clem/test_process_raw_lifs.py index 5db0846ed..857b0979c 100644 --- a/tests/workflows/clem/test_process_raw_lifs.py +++ b/tests/workflows/clem/test_process_raw_lifs.py @@ -67,6 +67,8 @@ def test_zocalo_cluster_request( "feedback_queue": feedback_queue, }, } + + # Check that it sends the expected recipe mock_transport.send.assert_called_once_with( "processing_recipe", sent_recipe, diff --git a/tests/workflows/clem/test_process_raw_tiffs.py b/tests/workflows/clem/test_process_raw_tiffs.py new file mode 100644 index 000000000..e70874f53 --- /dev/null +++ b/tests/workflows/clem/test_process_raw_tiffs.py @@ -0,0 +1,101 @@ +from pathlib import Path +from unittest.mock import MagicMock + +import pytest + +from murfey.server.ispyb import TransportManager +from murfey.workflows.clem.process_raw_tiffs import zocalo_cluster_request + +# Set up variables +session_id = 0 +instrument_name = "clem" +root_folder = "images" +visit_name = "cm12345-6" +area_name = "test_area" +feedback_queue = "murfey_feedback" + +# Properties for TIFF images +num_z = 5 +num_c = 3 + + +@pytest.fixture +def raw_dir(tmp_path: Path): + raw_dir = tmp_path / visit_name / root_folder + raw_dir.mkdir(parents=True, exist_ok=True) + return raw_dir + + +@pytest.fixture +def tiff_list(raw_dir: Path): + tiff_list = [ + raw_dir / area_name / f"test_series--Z{str(z).zfill(2)}--C{str(c).zfill(2)}.tif" + for z in range(num_z) + for c in range(num_c) + ] + for file in tiff_list: + if not file.exists(): + file.touch() + return tiff_list + + +@pytest.fixture +def metadata(raw_dir: Path): + metadata = raw_dir / area_name / "Metadata" / "test_series.xlif" + if not metadata.exists(): + metadata.touch() + return metadata + + +def test_zocalo_cluster_request( + tiff_list: list[Path], + metadata: Path, + raw_dir: Path, +): + + # Create a mock tranpsort object + mock_transport = MagicMock(spec=TransportManager) + mock_transport.feedback_queue = feedback_queue + + # Run the function with the listed parameters + zocalo_cluster_request( + tiff_list=tiff_list, + root_folder=root_folder, + session_id=session_id, + instrument_name=instrument_name, + metadata=metadata, + messenger=mock_transport, + ) + + # Construct the recipe that we expect to send + job_name = "--".join( + [ + p.replace(" ", "_") if " " in p else p + for p in ( + tiff_list[0].parent.relative_to(raw_dir) + / tiff_list[0].stem.split("--")[0] + ).parts + ] + ) + sent_recipe = { + "recipes": ["clem-tiff-to-stack"], + "parameters": { + # Job parameters + "tiff_list": "null", + "tiff_file": f"{str(tiff_list[0])}", + "root_folder": root_folder, + "metadata": f"{str(metadata)}", + # Other recipe parameters + "session_dir": f"{str(raw_dir.parent)}", + "session_id": session_id, + "job_name": job_name, + "feedback_queue": feedback_queue, + }, + } + + # Check that it sends the expected recipe + mock_transport.send.assert_called_once_with( + "processing_recipe", + sent_recipe, + new_connection=True, + ) From fb4f5e97cdd82d7e996562801b48cf1727738ce4 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Thu, 13 Feb 2025 09:45:42 +0000 Subject: [PATCH 06/18] Missed creating a directory level --- tests/workflows/clem/test_process_raw_tiffs.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/workflows/clem/test_process_raw_tiffs.py b/tests/workflows/clem/test_process_raw_tiffs.py index e70874f53..85d1ad55d 100644 --- a/tests/workflows/clem/test_process_raw_tiffs.py +++ b/tests/workflows/clem/test_process_raw_tiffs.py @@ -28,6 +28,7 @@ def raw_dir(tmp_path: Path): @pytest.fixture def tiff_list(raw_dir: Path): + (raw_dir / area_name).mkdir(parents=True, exist_ok=True) tiff_list = [ raw_dir / area_name / f"test_series--Z{str(z).zfill(2)}--C{str(c).zfill(2)}.tif" for z in range(num_z) From 64cb6928a4a3b440ebb3f915791aa6f49896a2e9 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Thu, 13 Feb 2025 09:52:20 +0000 Subject: [PATCH 07/18] Forgot to do the same for the metadata file --- tests/workflows/clem/test_process_raw_tiffs.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/workflows/clem/test_process_raw_tiffs.py b/tests/workflows/clem/test_process_raw_tiffs.py index 85d1ad55d..885aa69a6 100644 --- a/tests/workflows/clem/test_process_raw_tiffs.py +++ b/tests/workflows/clem/test_process_raw_tiffs.py @@ -42,6 +42,7 @@ def tiff_list(raw_dir: Path): @pytest.fixture def metadata(raw_dir: Path): + (raw_dir / area_name / "Metadata").mkdir(parents=True, exist_ok=True) metadata = raw_dir / area_name / "Metadata" / "test_series.xlif" if not metadata.exists(): metadata.touch() From eea1674810150650d276791a169e7c8f47e01436 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Thu, 13 Feb 2025 11:14:48 +0000 Subject: [PATCH 08/18] Added unit test for 'align_and_merge' workflow --- tests/workflows/clem/test_align_and_merge.py | 129 +++++++++++++++++++ 1 file changed, 129 insertions(+) create mode 100644 tests/workflows/clem/test_align_and_merge.py diff --git a/tests/workflows/clem/test_align_and_merge.py b/tests/workflows/clem/test_align_and_merge.py new file mode 100644 index 000000000..06e2f50f6 --- /dev/null +++ b/tests/workflows/clem/test_align_and_merge.py @@ -0,0 +1,129 @@ +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from murfey.server.ispyb import TransportManager +from murfey.util.config import MachineConfig +from murfey.workflows.clem.align_and_merge import submit_cluster_request + +# Folder and file settings +session_id = 0 +instrument_name = "clem" +raw_folder = "images" +processed_folder = "processed" +visit_name = "cm12345-6" +area_name = "test_area" +series_name = "test_series" +colors = [ + "gray", + "green", + "red", +] +feedback_queue = "murfey_feedback" + +# Align and merge settings +crop_to_n_frames = 20 +align_self = "enabled" +flatten = "max" +align_across = "enabled" + + +@pytest.fixture +def processed_dir(tmp_path: Path): + processed_dir = tmp_path / visit_name / processed_folder + processed_dir.mkdir(parents=True, exist_ok=True) + return processed_dir + + +@pytest.fixture +def image_stacks(processed_dir: Path): + + image_dir = processed_dir / area_name / series_name + image_dir.mkdir(parents=True, exist_ok=True) + + images = [image_dir / f"{color}.tiff" for color in colors] + for image in images: + if not image.exists(): + image.touch() + + return images + + +@pytest.fixture +def metadata(processed_dir: Path): + + metadata_dir = processed_dir / area_name / series_name / "metadata" + metadata_dir.mkdir(parents=True, exist_ok=True) + + metadata = metadata_dir / f"{series_name}.xml" + if not metadata.exists(): + metadata.touch() + + return metadata + + +@patch("murfey.workflows.clem.align_and_merge.get_machine_config") +def test_submit_cluster_request( + image_stacks: list[Path], + metadata: Path, + processed_dir: Path, + mock_get_machine_config, +): + + # Construct the long series name + series_name_long = "--".join( + image_stacks[0].parent.relative_to(processed_dir).parts + ) + + # Create a mock tranpsort object + mock_transport = MagicMock(spec=TransportManager) + mock_transport.feedback_queue = feedback_queue + + # Construct a mock MachineConfig object for use within the function + mock_machine_config = MagicMock(spec=MachineConfig) + mock_machine_config.return_value.processed_directory_name = processed_folder + mock_get_machine_config.return_value = { + instrument_name: mock_machine_config, + } + + # Run the function + submit_cluster_request( + session_id=session_id, + instrument_name=instrument_name, + series_name=series_name_long, + images=image_stacks, + metadata=metadata, + crop_to_n_frames=crop_to_n_frames, + align_self=align_self, + flatten=flatten, + align_across=align_across, + messenger=mock_transport, + ) + + # Construct expected recipe to be sent + sent_recipe = { + "recipes": ["clem-align-and-merge"], + "parameters": { + # Job parameters + "series_name": series_name_long, + "images": [str(file) for file in image_stacks], + "metadata": str(metadata), + "crop_to_n_frames": crop_to_n_frames, + "align_self": align_self, + "flatten": flatten, + "align_across": align_across, + # Other recipe parameters + "session_dir": str(processed_dir.parent), + "session_id": session_id, + "job_name": series_name, + "feedback_queue": feedback_queue, + }, + } + + # Check that it sends the expected recipe + mock_transport.send.assert_called_once_with( + "processing_recipe", + sent_recipe, + new_connection=True, + ) From 0da2cb4d680d41c0e21adf778853c39f65dd2558 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Thu, 13 Feb 2025 11:18:09 +0000 Subject: [PATCH 09/18] Patched function placed in wrong order --- tests/workflows/clem/test_align_and_merge.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/workflows/clem/test_align_and_merge.py b/tests/workflows/clem/test_align_and_merge.py index 06e2f50f6..de1b46f71 100644 --- a/tests/workflows/clem/test_align_and_merge.py +++ b/tests/workflows/clem/test_align_and_merge.py @@ -65,10 +65,10 @@ def metadata(processed_dir: Path): @patch("murfey.workflows.clem.align_and_merge.get_machine_config") def test_submit_cluster_request( + mock_get_machine_config, image_stacks: list[Path], metadata: Path, processed_dir: Path, - mock_get_machine_config, ): # Construct the long series name From 9b4a6aceeb75b9fb7e193127f83b73fb40c4c993 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Thu, 13 Feb 2025 11:29:39 +0000 Subject: [PATCH 10/18] '.return_value' might not be neded when mocking class attribute --- tests/workflows/clem/test_align_and_merge.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/workflows/clem/test_align_and_merge.py b/tests/workflows/clem/test_align_and_merge.py index de1b46f71..1905859f9 100644 --- a/tests/workflows/clem/test_align_and_merge.py +++ b/tests/workflows/clem/test_align_and_merge.py @@ -82,7 +82,7 @@ def test_submit_cluster_request( # Construct a mock MachineConfig object for use within the function mock_machine_config = MagicMock(spec=MachineConfig) - mock_machine_config.return_value.processed_directory_name = processed_folder + mock_machine_config.processed_directory_name = processed_folder mock_get_machine_config.return_value = { instrument_name: mock_machine_config, } From 6994141c9c0e61d2d389d720a83bb2d824af268c Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Thu, 13 Feb 2025 11:35:30 +0000 Subject: [PATCH 11/18] Wrong job name --- tests/workflows/clem/test_align_and_merge.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/workflows/clem/test_align_and_merge.py b/tests/workflows/clem/test_align_and_merge.py index 1905859f9..96e4cc430 100644 --- a/tests/workflows/clem/test_align_and_merge.py +++ b/tests/workflows/clem/test_align_and_merge.py @@ -116,7 +116,7 @@ def test_submit_cluster_request( # Other recipe parameters "session_dir": str(processed_dir.parent), "session_id": session_id, - "job_name": series_name, + "job_name": series_name_long, "feedback_queue": feedback_queue, }, } From 28ceb5e42222f68cda4d7d2f0a6a80a4c97a72fd Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Tue, 18 Feb 2025 16:03:15 +0000 Subject: [PATCH 12/18] Added logic to retry database refreshes and file path validation to avoid race conditions when transferring/registering items --- src/murfey/server/api/clem.py | 79 +++++++-- src/murfey/workflows/clem/__init__.py | 27 ++- .../clem/register_align_and_merge_results.py | 47 +++-- .../clem/register_preprocessing_results.py | 164 +++++++++++++----- 4 files changed, 245 insertions(+), 72 deletions(-) diff --git a/src/murfey/server/api/clem.py b/src/murfey/server/api/clem.py index 606e18496..500bfa4b5 100644 --- a/src/murfey/server/api/clem.py +++ b/src/murfey/server/api/clem.py @@ -1,6 +1,7 @@ from __future__ import annotations import re +import time import traceback from ast import literal_eval from importlib.metadata import EntryPoint # type hinting only @@ -94,7 +95,14 @@ def validate_and_sanitise( raise ValueError(f"{file} points to a directory that is not permitted") # Check that it's a file, not a directory - if full_path.is_file() is False: + # Make a couple of attempts to rule out race condition + attempts = 0 + while attempts < 50: + if full_path.is_file() is True: + break + attempts += 1 + time.sleep(0.1) + else: raise ValueError(f"{file} is not a file") # Check that it is of a permitted file type @@ -184,7 +192,23 @@ def get_db_entry( ) db.add(db_entry) db.commit() - db.refresh(db_entry) + + # Make multiple attempts to refresh data in case of race condition + attempts = 0 + while attempts < 50: + try: + db.refresh(db_entry) + break + except Exception: + pass + attempts += 1 + time.sleep(0.1) + else: + raise RuntimeError( + "Maximum number of attempts reached while trying to refresh database " + f"entry for {str(file_path if file_path else series_name)!r}" + ) + except Exception: raise Exception @@ -215,7 +239,10 @@ def register_lif_file( file_path=lif_file, ) except Exception: - logger.error(traceback.format_exc()) + logger.error( + f"Exception encountered while registering LIF file {str(lif_file)!r}: \n" + f"{traceback.format_exc()}" + ) return False # Add metadata information if provided @@ -224,7 +251,11 @@ def register_lif_file( master_metadata = validate_and_sanitise(master_metadata, session_id, db) clem_lif_file.master_metadata = str(master_metadata) except Exception: - logger.warning(traceback.format_exc()) + logger.warning( + "Unable to add master metadata information to database entry for " + f"LIF file {str(lif_file)!r}: \n" + f"{traceback.format_exc()}" + ) # Register child metadata if provided for metadata in child_metadata: @@ -238,7 +269,11 @@ def register_lif_file( # Append to database entry clem_lif_file.child_metadata.append(metadata_db_entry) except Exception: - logger.warning(traceback.format_exc()) + logger.warning( + "Unable to add child metadata information to database entry for " + f"LIF file {str(lif_file)!r}: \n" + f"{traceback.format_exc()}" + ) continue # Register child image series if provided @@ -253,7 +288,11 @@ def register_lif_file( # Append to database entry clem_lif_file.child_series.append(series_db_entry) except Exception: - logger.warning(traceback.format_exc()) + logger.warning( + "Unable to add child series information to database entry for " + f"LIF file {str(lif_file)!r}: \n" + f"{traceback.format_exc()}" + ) continue # Register child image stacks if provided @@ -268,7 +307,11 @@ def register_lif_file( # Append to database entry clem_lif_file.child_stacks.append(stack_db_entry) except Exception: - logger.warning(traceback.format_exc()) + logger.warning( + "Unable to add child image stack information to database entry for " + f"LIF file {str(lif_file)!r}: \n" + f"{traceback.format_exc()}" + ) continue # Commit to database @@ -296,7 +339,10 @@ def register_tiff_file( file_path=tiff_file, ) except Exception: - logger.error(traceback.format_exc()) + logger.error( + f"Exception encountered while registering TIFF file {str(tiff_file)!r}: \n" + f"{traceback.format_exc()}" + ) return False # Add metadata if provided @@ -311,7 +357,10 @@ def register_tiff_file( # Link database entries clem_tiff_file.associated_metadata = metadata_db_entry except Exception: - logger.warning(traceback.format_exc()) + logger.warning( + f"Unable to register metadata for TIFF file {str(tiff_file)!r}: \n" + f"{traceback.format_exc()}" + ) # Add series information if provided if associated_series is not None: @@ -325,7 +374,11 @@ def register_tiff_file( # Link database entries clem_tiff_file.child_series = series_db_entry except Exception: - logger.warning(traceback.format_exc()) + logger.warning( + "Unable to register series information for TIFF file " + f"{str(tiff_file)!r}: \n" + f"{traceback.format_exc()}" + ) # Add image stack information if provided if associated_stack is not None: @@ -339,7 +392,11 @@ def register_tiff_file( # Link database entries clem_tiff_file.child_stack = stack_db_entry except Exception: - logger.warning(traceback.format_exc()) + logger.warning( + f"Unable to register image stack {str(associated_stack)!r} " + f"in database for TIFF file {str(tiff_file)!r}: \n" + f"{traceback.format_exc()}" + ) # Commit to database db.add(clem_tiff_file) diff --git a/src/murfey/workflows/clem/__init__.py b/src/murfey/workflows/clem/__init__.py index 789fbabbe..965d7bad6 100644 --- a/src/murfey/workflows/clem/__init__.py +++ b/src/murfey/workflows/clem/__init__.py @@ -2,6 +2,7 @@ import logging import re +import time from pathlib import Path from typing import Optional, Type, Union @@ -90,7 +91,14 @@ def _validate_and_sanitise( raise ValueError(f"{file} points to a directory that is not permitted") # Check that it's a file, not a directory - if full_path.is_file() is False: + # Make a couple of attempts to rule out race condition + attempts = 0 + while attempts < 50: + if full_path.is_file() is True: + break + attempts += 1 + time.sleep(0.1) + else: raise ValueError(f"{file} is not a file") # Check that it is of a permitted file type @@ -180,7 +188,22 @@ def get_db_entry( ) db.add(db_entry) db.commit() - db.refresh(db_entry) + + # Make multiple attempts data retrieval attempts in case of race condition + attempts = 0 + while attempts < 50: + try: + db.refresh(db_entry) + break + except Exception: + pass + attempts += 1 + time.sleep(0.1) + else: + raise RuntimeError( + "Maximum number of attempts reached while trying to refresh database " + f"entry for {str(file_path if file_path else series_name)!r}" + ) except Exception: raise Exception diff --git a/src/murfey/workflows/clem/register_align_and_merge_results.py b/src/murfey/workflows/clem/register_align_and_merge_results.py index b02c56a30..0582be95c 100644 --- a/src/murfey/workflows/clem/register_align_and_merge_results.py +++ b/src/murfey/workflows/clem/register_align_and_merge_results.py @@ -2,6 +2,7 @@ import json import logging +import time import traceback from ast import literal_eval from pathlib import Path @@ -60,29 +61,22 @@ def register_align_and_merge_result( ) # Validate message and try and load results - if isinstance(message["result"], str): - try: + try: + if isinstance(message["result"], str): json_obj: dict = json.loads(message["result"]) result = AlignAndMergeResult(**json_obj) - except Exception: - logger.error(traceback.format_exc()) - logger.error( - "Exception encountered when parsing align-and-merge processing result" - ) - return False - elif isinstance(message["result"], dict): - try: + elif isinstance(message["result"], dict): result = AlignAndMergeResult(**message["result"]) - except Exception: - logger.error(traceback.format_exc()) + else: logger.error( - "Exception encountered when parsing align-and-merge processing result" + "Invalid type for align-and-merge processing result: " + f"{type(message['result'])}" ) return False - else: + except Exception: logger.error( - "Invalid type for align-and-merge processing result: " - f"{type(message['result'])}" + "Exception encountered when parsing align-and-merge processing result: \n" + f"{traceback.format_exc()}" ) return False @@ -100,7 +94,22 @@ def register_align_and_merge_result( clem_img_series.composite_created = True murfey_db.add(clem_img_series) murfey_db.commit() - murfey_db.refresh(clem_img_series) + + # Make multiple attempts to refresh data in case of race condition + attempts = 0 + while attempts < 50: + try: + murfey_db.refresh(clem_img_series) + break + except Exception: + pass + attempts += 1 + time.sleep(0.1) + else: + raise RuntimeError( + "Maximum number of attempts reached while trying to refresh database " + f"entry for {result.series_name!r}" + ) logger.info( "Align-and-merge processing result registered for " @@ -108,10 +117,10 @@ def register_align_and_merge_result( ) except Exception: - logger.error(traceback.format_exc()) logger.error( "Exception encountered when registering LIF preprocessing result for " - f"{result.series_name!r} {result.channel!r} image stack" + f"{result.series_name!r} {result.channel!r} image stack: \n" + f"{traceback.format_exc()}" ) return False diff --git a/src/murfey/workflows/clem/register_preprocessing_results.py b/src/murfey/workflows/clem/register_preprocessing_results.py index 396f1109e..fc4ce1f4d 100644 --- a/src/murfey/workflows/clem/register_preprocessing_results.py +++ b/src/murfey/workflows/clem/register_preprocessing_results.py @@ -9,6 +9,7 @@ import json import logging +import time import traceback from ast import literal_eval from pathlib import Path @@ -62,24 +63,21 @@ def register_lif_preprocessing_result( ) # Validate message and try and load results - if isinstance(message["result"], str): - try: + try: + if isinstance(message["result"], str): json_obj: dict = json.loads(message["result"]) result = LIFPreprocessingResult(**json_obj) - except Exception: - logger.error(traceback.format_exc()) - logger.error("Exception encountered when parsing LIF preprocessing result") - return False - elif isinstance(message["result"], dict): - try: + elif isinstance(message["result"], dict): result = LIFPreprocessingResult(**message["result"]) - except Exception: - logger.error(traceback.format_exc()) - logger.error("Exception encountered when parsing LIF preprocessing result") + else: + logger.error( + f"Invalid type for LIF preprocessing result: {type(message['result'])}" + ) return False - else: + except Exception: logger.error( - f"Invalid type for LIF preprocessing result: {type(message['result'])}" + "Exception encountered when parsing LIF preprocessing result: \n" + f"{traceback.format_exc()}" ) return False @@ -122,19 +120,63 @@ def register_lif_preprocessing_result( clem_img_stk.channel_name = result.channel murfey_db.add(clem_img_stk) murfey_db.commit() - murfey_db.refresh(clem_img_stk) + + # Make multiple attempts to refresh data in case of race condition + attempts = 0 + while attempts < 50: + try: + murfey_db.refresh(clem_img_stk) + break + except Exception: + pass + attempts += 1 + time.sleep(0.1) + else: + raise RuntimeError( + "Maximum number of attempts reached while trying to refresh database " + f"entry for {str(result.image_stack)!r}" + ) clem_img_series.associated_metadata = clem_metadata clem_img_series.parent_lif = clem_lif_file clem_img_series.number_of_members = result.number_of_members murfey_db.add(clem_img_series) murfey_db.commit() - murfey_db.refresh(clem_img_series) + + # Make multiple attempts to refresh data in case of race condition + attempts = 0 + while attempts < 50: + try: + murfey_db.refresh(clem_img_series) + break + except Exception: + pass + attempts += 1 + time.sleep(0.1) + else: + raise RuntimeError( + "Maximum number of attempts reached while trying to refresh database " + f"entry for {str(result.series_name)!r}" + ) clem_metadata.parent_lif = clem_lif_file murfey_db.add(clem_metadata) murfey_db.commit() - murfey_db.refresh(clem_metadata) + + # Make multiple attempts to refresh data in case of race condition + while attempts < 50: + try: + murfey_db.refresh(clem_metadata) + break + except Exception: + pass + attempts += 1 + time.sleep(0.1) + else: + raise RuntimeError( + "Maximum number of attempts reached while trying to refresh database " + f"entry for {str(result.metadata)!r}" + ) logger.info( f"LIF preprocessing results registered for {result.series_name!r} " @@ -142,10 +184,10 @@ def register_lif_preprocessing_result( ) except Exception: - logger.error(traceback.format_exc()) logger.error( "Exception encountered when registering LIF preprocessing result for " - f"{result.series_name!r} {result.channel!r} image stack" + f"{result.series_name!r} {result.channel!r} image stack: \n" + f"{traceback.format_exc()}" ) return False @@ -170,9 +212,9 @@ def register_lif_preprocessing_result( .instrument_name ) except Exception: - logger.error(traceback.format_exc()) logger.error( - f"Error requesting data from database for {result.series_name!r} series" + f"Error requesting data from database for {result.series_name!r} series: \n" + f"{traceback.format_exc()}" ) return False @@ -247,24 +289,21 @@ def register_tiff_preprocessing_result( if not isinstance(message["session_id"], int) else message["session_id"] ) - if isinstance(message["result"], str): - try: + try: + if isinstance(message["result"], str): json_obj: dict = json.loads(message["result"]) result = TIFFPreprocessingResult(**json_obj) - except Exception: - logger.error(traceback.format_exc()) - logger.error("Exception encountered when parsing TIFF preprocessing result") - return False - elif isinstance(message["result"], dict): - try: + elif isinstance(message["result"], dict): result = TIFFPreprocessingResult(**message["result"]) - except Exception: - logger.error(traceback.format_exc()) - logger.error("Exception encountered when parsing TIFF preprocessing result") + else: + logger.error( + f"Invalid type for TIFF preprocessing result: {type(message['result'])}" + ) return False - else: + except Exception: logger.error( - f"Invalid type for TIFF preprocessing result: {type(message['result'])}" + "Exception encountered when parsing TIFF preprocessing result: \n" + f"{traceback.format_exc()}" ) return False @@ -305,20 +344,65 @@ def register_tiff_preprocessing_result( clem_tiff_file.child_stack = clem_img_stk murfey_db.add(clem_tiff_file) murfey_db.commit() - murfey_db.refresh(clem_tiff_file) + + # Make multiple attempts to refresh data in case of race condition + attempts = 0 + while attempts < 50: + try: + murfey_db.refresh(clem_tiff_file) + break + except Exception: + pass + attempts += 1 + time.sleep(0.1) + else: + raise RuntimeError( + "Maximum number of attempts reached while trying to refresh database " + f"entry for {str(file)!r}" + ) clem_img_stk.associated_metadata = clem_metadata clem_img_stk.parent_series = clem_img_series clem_img_stk.channel_name = result.channel murfey_db.add(clem_img_stk) murfey_db.commit() - murfey_db.refresh(clem_img_stk) + + # Make multiple attempts to refresh data in case of race condition + attempts = 0 + while attempts < 50: + try: + murfey_db.refresh(clem_img_stk) + break + except Exception: + pass + attempts += 1 + time.sleep(0.1) + else: + raise RuntimeError( + "Maximum number of attempts reached while trying to refresh database " + f"entry for {str(result.image_stack)!r}" + ) clem_img_series.associated_metadata = clem_metadata clem_img_series.number_of_members = result.number_of_members murfey_db.add(clem_img_series) murfey_db.commit() - murfey_db.refresh(clem_img_series) + + # Make multiple attempts to refresh data in case of race condition + attempts = 0 + while attempts < 50: + try: + murfey_db.refresh(clem_img_series) + break + except Exception: + pass + attempts += 1 + time.sleep(0.1) + else: + raise RuntimeError( + "Maximum number of attempts reached while trying to refresh database " + f"entry for {str(result.series_name)!r}" + ) logger.info( f"TIFF preprocessing results registered for {result.series_name!r} " @@ -326,10 +410,10 @@ def register_tiff_preprocessing_result( ) except Exception: - logger.error(traceback.format_exc()) logger.error( "Exception encountered when registering TIFF preprocessing result for " - f"{result.series_name!r} {result.channel!r} image stack" + f"{result.series_name!r} {result.channel!r} image stack: \n" + f"{traceback.format_exc()}" ) return False @@ -354,9 +438,9 @@ def register_tiff_preprocessing_result( .instrument_name ) except Exception: - logger.error(traceback.format_exc()) logger.error( - f"Error requesting data from database for {result.series_name!r} series" + f"Error requesting data from database for {result.series_name!r} series: \n" + f"{traceback.format_exc()}" ) return False From 33b3478bc4b4f471df98aff0d7928a23053a486a Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 19 Feb 2025 05:17:34 +0000 Subject: [PATCH 13/18] Added logs to empty 'except:' blocks --- src/murfey/server/api/clem.py | 10 +++- src/murfey/workflows/clem/__init__.py | 11 +++- .../clem/register_align_and_merge_results.py | 10 +++- .../clem/register_preprocessing_results.py | 60 +++++++++++++------ 4 files changed, 64 insertions(+), 27 deletions(-) diff --git a/src/murfey/server/api/clem.py b/src/murfey/server/api/clem.py index 500bfa4b5..6ef3c4a77 100644 --- a/src/murfey/server/api/clem.py +++ b/src/murfey/server/api/clem.py @@ -200,9 +200,13 @@ def get_db_entry( db.refresh(db_entry) break except Exception: - pass - attempts += 1 - time.sleep(0.1) + logger.warning( + f"Attempt {attempts + 1} at refreshing database entry for " + f"{str(file_path if file_path else series_name)!r} failed: \n" + f"{traceback.format_exc()}" + ) + attempts += 1 + time.sleep(0.1) else: raise RuntimeError( "Maximum number of attempts reached while trying to refresh database " diff --git a/src/murfey/workflows/clem/__init__.py b/src/murfey/workflows/clem/__init__.py index 965d7bad6..5c7974029 100644 --- a/src/murfey/workflows/clem/__init__.py +++ b/src/murfey/workflows/clem/__init__.py @@ -3,6 +3,7 @@ import logging import re import time +import traceback from pathlib import Path from typing import Optional, Type, Union @@ -196,9 +197,13 @@ def get_db_entry( db.refresh(db_entry) break except Exception: - pass - attempts += 1 - time.sleep(0.1) + logger.warning( + f"Attempt {attempts + 1} at refreshing database entry for " + f"{str(file_path if file_path else series_name)!r} failed: \n" + f"{traceback.format_exc()}" + ) + attempts += 1 + time.sleep(0.1) else: raise RuntimeError( "Maximum number of attempts reached while trying to refresh database " diff --git a/src/murfey/workflows/clem/register_align_and_merge_results.py b/src/murfey/workflows/clem/register_align_and_merge_results.py index 0582be95c..d2a1ef435 100644 --- a/src/murfey/workflows/clem/register_align_and_merge_results.py +++ b/src/murfey/workflows/clem/register_align_and_merge_results.py @@ -102,9 +102,13 @@ def register_align_and_merge_result( murfey_db.refresh(clem_img_series) break except Exception: - pass - attempts += 1 - time.sleep(0.1) + logger.warning( + f"Attempt {attempts + 1} at refreshing database entry for " + f"{str(result.series_name)!r} failed: \n" + f"{traceback.format_exc()}" + ) + attempts += 1 + time.sleep(0.1) else: raise RuntimeError( "Maximum number of attempts reached while trying to refresh database " diff --git a/src/murfey/workflows/clem/register_preprocessing_results.py b/src/murfey/workflows/clem/register_preprocessing_results.py index fc4ce1f4d..a7a83b78e 100644 --- a/src/murfey/workflows/clem/register_preprocessing_results.py +++ b/src/murfey/workflows/clem/register_preprocessing_results.py @@ -128,9 +128,13 @@ def register_lif_preprocessing_result( murfey_db.refresh(clem_img_stk) break except Exception: - pass - attempts += 1 - time.sleep(0.1) + logger.warning( + f"Attempt {attempts + 1} at refreshing database entry for " + f"{str(result.image_stack)!r} failed: \n" + f"{traceback.format_exc()}" + ) + attempts += 1 + time.sleep(0.1) else: raise RuntimeError( "Maximum number of attempts reached while trying to refresh database " @@ -150,9 +154,13 @@ def register_lif_preprocessing_result( murfey_db.refresh(clem_img_series) break except Exception: - pass - attempts += 1 - time.sleep(0.1) + logger.warning( + f"Attempt {attempts + 1} at refreshing database entry for " + f"{str(result.series_name)!r} failed: \n" + f"{traceback.format_exc()}" + ) + attempts += 1 + time.sleep(0.1) else: raise RuntimeError( "Maximum number of attempts reached while trying to refresh database " @@ -169,9 +177,13 @@ def register_lif_preprocessing_result( murfey_db.refresh(clem_metadata) break except Exception: - pass - attempts += 1 - time.sleep(0.1) + logger.warning( + f"Attempt {attempts + 1} at refreshing database entry for " + f"{str(result.metadata)!r} failed: \n" + f"{traceback.format_exc()}" + ) + attempts += 1 + time.sleep(0.1) else: raise RuntimeError( "Maximum number of attempts reached while trying to refresh database " @@ -352,9 +364,13 @@ def register_tiff_preprocessing_result( murfey_db.refresh(clem_tiff_file) break except Exception: - pass - attempts += 1 - time.sleep(0.1) + logger.warning( + f"Attempt {attempts + 1} at refreshing database entry for " + f"{str(file)!r} failed: \n" + f"{traceback.format_exc()}" + ) + attempts += 1 + time.sleep(0.1) else: raise RuntimeError( "Maximum number of attempts reached while trying to refresh database " @@ -374,9 +390,13 @@ def register_tiff_preprocessing_result( murfey_db.refresh(clem_img_stk) break except Exception: - pass - attempts += 1 - time.sleep(0.1) + logger.warning( + f"Attempt {attempts + 1} at refreshing database entry for " + f"{str(result.image_stack)!r} failed: \n" + f"{traceback.format_exc()}" + ) + attempts += 1 + time.sleep(0.1) else: raise RuntimeError( "Maximum number of attempts reached while trying to refresh database " @@ -395,9 +415,13 @@ def register_tiff_preprocessing_result( murfey_db.refresh(clem_img_series) break except Exception: - pass - attempts += 1 - time.sleep(0.1) + logger.warning( + f"Attempt {attempts + 1} at refreshing database entry for " + f"{str(result.series_name)!r} failed: \n" + f"{traceback.format_exc()}" + ) + attempts += 1 + time.sleep(0.1) else: raise RuntimeError( "Maximum number of attempts reached while trying to refresh database " From b2d4a1e9b3efcb215edf599f4d0c344629f298ca Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 19 Feb 2025 06:39:56 +0000 Subject: [PATCH 14/18] More verbose logging for errors in CLEM API endpoints; sanitised variables in logs --- src/murfey/server/api/clem.py | 143 ++++++++++++++++++++++++++-------- 1 file changed, 112 insertions(+), 31 deletions(-) diff --git a/src/murfey/server/api/clem.py b/src/murfey/server/api/clem.py index 6ef3c4a77..174a2312f 100644 --- a/src/murfey/server/api/clem.py +++ b/src/murfey/server/api/clem.py @@ -17,6 +17,7 @@ from murfey.server import _transport_object from murfey.server.murfey_db import murfey_db +from murfey.util import sanitise from murfey.util.config import get_machine_config from murfey.util.db import ( CLEMImageMetadata, @@ -202,7 +203,7 @@ def get_db_entry( except Exception: logger.warning( f"Attempt {attempts + 1} at refreshing database entry for " - f"{str(file_path if file_path else series_name)!r} failed: \n" + f"{sanitise(str(file_path if file_path else series_name))!r} failed: \n" f"{traceback.format_exc()}" ) attempts += 1 @@ -210,7 +211,7 @@ def get_db_entry( else: raise RuntimeError( "Maximum number of attempts reached while trying to refresh database " - f"entry for {str(file_path if file_path else series_name)!r}" + f"entry for {sanitise(str(file_path if file_path else series_name))!r}" ) except Exception: @@ -244,7 +245,8 @@ def register_lif_file( ) except Exception: logger.error( - f"Exception encountered while registering LIF file {str(lif_file)!r}: \n" + "Exception encountered while registering " + f"LIF file {sanitise(str(lif_file))!r}: \n" f"{traceback.format_exc()}" ) return False @@ -257,7 +259,7 @@ def register_lif_file( except Exception: logger.warning( "Unable to add master metadata information to database entry for " - f"LIF file {str(lif_file)!r}: \n" + f"LIF file {sanitise(str(lif_file))!r}: \n" f"{traceback.format_exc()}" ) @@ -274,8 +276,9 @@ def register_lif_file( clem_lif_file.child_metadata.append(metadata_db_entry) except Exception: logger.warning( - "Unable to add child metadata information to database entry for " - f"LIF file {str(lif_file)!r}: \n" + "Unable to register " + f"metadata file {sanitise(str(metadata))!r} in association with " + f"LIF file {sanitise(str(lif_file))!r}: \n" f"{traceback.format_exc()}" ) continue @@ -293,8 +296,9 @@ def register_lif_file( clem_lif_file.child_series.append(series_db_entry) except Exception: logger.warning( - "Unable to add child series information to database entry for " - f"LIF file {str(lif_file)!r}: \n" + "Unable to register " + f"metadata file {sanitise(series)!r} in association with " + f"LIF file {sanitise(str(lif_file))!r}: \n" f"{traceback.format_exc()}" ) continue @@ -312,8 +316,9 @@ def register_lif_file( clem_lif_file.child_stacks.append(stack_db_entry) except Exception: logger.warning( - "Unable to add child image stack information to database entry for " - f"LIF file {str(lif_file)!r}: \n" + "Unable to register " + f"image stack {sanitise(str(stack))!r} in association with " + f"LIF file {sanitise(str(lif_file))!r}: \n" f"{traceback.format_exc()}" ) continue @@ -344,7 +349,8 @@ def register_tiff_file( ) except Exception: logger.error( - f"Exception encountered while registering TIFF file {str(tiff_file)!r}: \n" + "Exception encountered while registering " + f"TIFF file {sanitise(str(tiff_file))!r}: \n" f"{traceback.format_exc()}" ) return False @@ -362,7 +368,9 @@ def register_tiff_file( clem_tiff_file.associated_metadata = metadata_db_entry except Exception: logger.warning( - f"Unable to register metadata for TIFF file {str(tiff_file)!r}: \n" + "Unable to register " + f"metadata file {sanitise(str(associated_metadata))!r} in association with " + f"TIFF file {sanitise(str(tiff_file))!r}: \n" f"{traceback.format_exc()}" ) @@ -379,8 +387,9 @@ def register_tiff_file( clem_tiff_file.child_series = series_db_entry except Exception: logger.warning( - "Unable to register series information for TIFF file " - f"{str(tiff_file)!r}: \n" + "Unable to register " + f"CLEM series {sanitise(associated_series)!r} in association with " + f"TIFF file {sanitise(str(tiff_file))!r}: \n" f"{traceback.format_exc()}" ) @@ -397,8 +406,8 @@ def register_tiff_file( clem_tiff_file.child_stack = stack_db_entry except Exception: logger.warning( - f"Unable to register image stack {str(associated_stack)!r} " - f"in database for TIFF file {str(tiff_file)!r}: \n" + "Unable to register " + f"image stack {sanitise(str(associated_stack))!r} in association with " f"{traceback.format_exc()}" ) @@ -429,7 +438,11 @@ def register_clem_metadata( file_path=metadata_file, ) except Exception: - logger.error(traceback.format_exc()) + logger.error( + "Exception encountered while registering" + f"metadata file {sanitise(str(metadata_file))!r}" + f"{traceback.format_exc()}" + ) return False # Register a parent LIF file if provided @@ -444,7 +457,12 @@ def register_clem_metadata( # Link database entries clem_metadata.parent_lif = lif_db_entry except Exception: - logger.warning(traceback.format_exc()) + logger.warning( + "Unable to register " + f"LIF file {sanitise(str(parent_lif))!r} in association with " + f"metadata file {sanitise(str(metadata_file))!r}: \n" + f"{traceback.format_exc()}" + ) # Register associated TIFF files if provided for tiff in associated_tiffs: @@ -458,7 +476,12 @@ def register_clem_metadata( # Append entry clem_metadata.associated_tiffs.append(tiff_db_entry) except Exception: - logger.warning(traceback.format_exc()) + logger.warning( + "Unable to register " + f"TIFF file {sanitise(str(tiff))!r} in association with " + f"metadata file {sanitise(str(metadata_file))!r}: \n" + f"{traceback.format_exc()}" + ) continue # Register associated image series if provided @@ -475,7 +498,12 @@ def register_clem_metadata( db.add(series_db_entry) db.commit() except Exception: - logger.warning(traceback.format_exc()) + logger.warning( + "Unable to register " + f"CLEM series {sanitise(associated_series)!r} in association with " + f"metadata file {sanitise(str(metadata_file))!r}: \n" + f"{traceback.format_exc()}" + ) # Register associated image stacks if provided for stack in associated_stacks: @@ -488,7 +516,12 @@ def register_clem_metadata( ) clem_metadata.associated_stacks.append(stack_db_entry) except Exception: - logger.warning(traceback.format_exc()) + logger.warning( + "Unable to register " + f"image stack {sanitise(str(stack))!r} in association with " + f"metadata file {sanitise(str(metadata_file))!r}: \n" + f"{traceback.format_exc()}" + ) continue # Commit to database @@ -517,7 +550,11 @@ def register_image_series( series_name=series_name, ) except Exception: - logger.error(traceback.format_exc()) + logger.error( + "Exception encountered while registering " + f"CLEM series {sanitise(series_name)!r}: \n" + f"{traceback.format_exc()}" + ) return False # Register parent LIF file if provided @@ -532,7 +569,12 @@ def register_image_series( # Link entries clem_image_series.parent_lif = lif_db_entry except Exception: - logger.warning(traceback.format_exc()) + logger.warning( + "Unable to register " + f"LIF file {sanitise(str(parent_lif))!r} in association with " + f"CLEM series {sanitise(series_name)!r}: \n" + f"{traceback.format_exc()}" + ) # Register parent TIFFs if provided for tiff in parent_tiffs: @@ -546,7 +588,12 @@ def register_image_series( # Append entry clem_image_series.parent_tiffs.append(tiff_db_entry) except Exception: - logger.warning(traceback.format_exc()) + logger.warning( + "Unable to register " + f"TIFF file {sanitise(str(tiff))!r} in association with " + f"CLEM series {sanitise(series_name)!r}: \n" + f"{traceback.format_exc()}" + ) continue # Try next item in loop # Register associated metadata if provided @@ -561,7 +608,12 @@ def register_image_series( # Link entries clem_image_series.associated_metadata = metadata_db_entry except Exception: - logger.warning(traceback.format_exc()) + logger.warning( + "Unable to register " + f"metadata file {sanitise(str(associated_metadata))!r} in association with " + f"CLEM series {sanitise(series_name)!r}: \n" + f"{traceback.format_exc()}" + ) # Register child image stacks if provided for stack in child_stacks: @@ -575,7 +627,12 @@ def register_image_series( # Append entry clem_image_series.child_stacks.append(stack_db_entry) except Exception: - logger.warning(traceback.format_exc()) + logger.warning( + "Unable to register " + f"image stack {sanitise(str(stack))!r} in association with " + f"CLEM series {sanitise(series_name)!r}: \n" + f"{traceback.format_exc()}" + ) continue # Register @@ -605,7 +662,11 @@ def register_image_stack( file_path=image_stack, ) except Exception: - logger.error(traceback.format_exc()) + logger.error( + "Exception encountered while registering " + f"image stack {sanitise(str(image_stack))!r}: \n" + f"{traceback.format_exc()}" + ) return False # Register channel name if provided @@ -623,7 +684,12 @@ def register_image_stack( ) clem_image_stack.parent_lif = lif_db_entry except Exception: - logger.warning(traceback.format_exc()) + logger.warning( + "Unable to register " + f"LIF file {sanitise(str(parent_lif))!r} in association with " + f"image stack {sanitise(str(image_stack))!r}: \n" + f"{traceback.format_exc()}" + ) # Register parent TIFF files if provided for tiff in parent_tiffs: @@ -637,7 +703,12 @@ def register_image_stack( # Append entry clem_image_stack.parent_tiffs.append(tiff_db_entry) except Exception: - logger.warning(traceback.format_exc()) + logger.warning( + "Unable to register " + f"TIFF file {sanitise(str(tiff))!r} in association with " + f"image stack {sanitise(str(image_stack))!r}: \n" + f"{traceback.format_exc()}" + ) continue # Register associated metadata if provided @@ -652,7 +723,12 @@ def register_image_stack( # Link entries clem_image_stack.associated_metadata = metadata_db_entry except Exception: - logger.warning(traceback.format_exc()) + logger.warning( + "Unable to register " + f"metadata file {sanitise(str(associated_metadata))!r} in association with " + f"image stack {sanitise(str(image_stack))!r}: \n" + f"{traceback.format_exc()}" + ) # Register parent series if provided if parent_series is not None: @@ -666,7 +742,12 @@ def register_image_stack( # Link entries clem_image_stack.parent_series = series_db_entry except Exception: - logger.warning(traceback.format_exc()) + logger.warning( + "Unable to register " + f"CLEM series {sanitise(parent_series)!r} in association with " + f"image stack {sanitise(str(image_stack))!r}: \n" + f"{traceback.format_exc()}" + ) # Register updates to entry db.add(clem_image_stack) From d50f92943ace4a17470d85a66f2754c01d5338b3 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 19 Feb 2025 06:52:49 +0000 Subject: [PATCH 15/18] Fixed incorrect canonical representation in boostrap API logs --- src/murfey/server/api/bootstrap.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/murfey/server/api/bootstrap.py b/src/murfey/server/api/bootstrap.py index 067e59b0a..5a9661b34 100644 --- a/src/murfey/server/api/bootstrap.py +++ b/src/murfey/server/api/bootstrap.py @@ -539,7 +539,7 @@ def get_msys2_package_file( # Validate environment if any(system in env[0] and environment in env[1] for env in valid_envs) is False: - raise ValueError(f"{system!r}/{environment!r} is not a valid msys2 environment") + raise ValueError(f"'{system}/{environment}' is not a valid msys2 environment") # Validate package name # MSYS2 package names contain: @@ -593,7 +593,7 @@ def _get_full_pypi_path_response(package: str) -> requests.Response: else: raise HTTPException(status_code=response.status_code) else: - raise ValueError(f"{package} is not a valid package name") + raise ValueError(f"{package!r} is not a valid package name") @pypi.get("/", response_class=Response) From 2d2eb40b394d7eada9eaa82218fa6db46c7000fd Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Thu, 20 Feb 2025 05:54:36 +0000 Subject: [PATCH 16/18] Removed database refresh instances from CLEM workflows, as they seem unneeded --- src/murfey/server/api/clem.py | 41 ++---- src/murfey/workflows/clem/__init__.py | 20 --- .../clem/register_align_and_merge_results.py | 21 --- .../clem/register_preprocessing_results.py | 120 ------------------ 4 files changed, 10 insertions(+), 192 deletions(-) diff --git a/src/murfey/server/api/clem.py b/src/murfey/server/api/clem.py index 174a2312f..3082595ae 100644 --- a/src/murfey/server/api/clem.py +++ b/src/murfey/server/api/clem.py @@ -1,7 +1,6 @@ from __future__ import annotations import re -import time import traceback from ast import literal_eval from importlib.metadata import EntryPoint # type hinting only @@ -95,16 +94,16 @@ def validate_and_sanitise( if not str(full_path).startswith(str(base_path)): raise ValueError(f"{file} points to a directory that is not permitted") - # Check that it's a file, not a directory - # Make a couple of attempts to rule out race condition - attempts = 0 - while attempts < 50: - if full_path.is_file() is True: - break - attempts += 1 - time.sleep(0.1) - else: - raise ValueError(f"{file} is not a file") + # # Check that it's a file, not a directory + # # Make a couple of attempts to rule out race condition + # attempts = 0 + # while attempts < 50: + # if full_path.is_file() is True: + # break + # attempts += 1 + # time.sleep(0.1) + # else: + # raise ValueError(f"{file} is not a file") # Check that it is of a permitted file type if f"{full_path.suffix}" not in valid_file_types: @@ -194,26 +193,6 @@ def get_db_entry( db.add(db_entry) db.commit() - # Make multiple attempts to refresh data in case of race condition - attempts = 0 - while attempts < 50: - try: - db.refresh(db_entry) - break - except Exception: - logger.warning( - f"Attempt {attempts + 1} at refreshing database entry for " - f"{sanitise(str(file_path if file_path else series_name))!r} failed: \n" - f"{traceback.format_exc()}" - ) - attempts += 1 - time.sleep(0.1) - else: - raise RuntimeError( - "Maximum number of attempts reached while trying to refresh database " - f"entry for {sanitise(str(file_path if file_path else series_name))!r}" - ) - except Exception: raise Exception diff --git a/src/murfey/workflows/clem/__init__.py b/src/murfey/workflows/clem/__init__.py index 5c7974029..da3e585e0 100644 --- a/src/murfey/workflows/clem/__init__.py +++ b/src/murfey/workflows/clem/__init__.py @@ -3,7 +3,6 @@ import logging import re import time -import traceback from pathlib import Path from typing import Optional, Type, Union @@ -190,25 +189,6 @@ def get_db_entry( db.add(db_entry) db.commit() - # Make multiple attempts data retrieval attempts in case of race condition - attempts = 0 - while attempts < 50: - try: - db.refresh(db_entry) - break - except Exception: - logger.warning( - f"Attempt {attempts + 1} at refreshing database entry for " - f"{str(file_path if file_path else series_name)!r} failed: \n" - f"{traceback.format_exc()}" - ) - attempts += 1 - time.sleep(0.1) - else: - raise RuntimeError( - "Maximum number of attempts reached while trying to refresh database " - f"entry for {str(file_path if file_path else series_name)!r}" - ) except Exception: raise Exception diff --git a/src/murfey/workflows/clem/register_align_and_merge_results.py b/src/murfey/workflows/clem/register_align_and_merge_results.py index d2a1ef435..f47c83bab 100644 --- a/src/murfey/workflows/clem/register_align_and_merge_results.py +++ b/src/murfey/workflows/clem/register_align_and_merge_results.py @@ -2,7 +2,6 @@ import json import logging -import time import traceback from ast import literal_eval from pathlib import Path @@ -95,26 +94,6 @@ def register_align_and_merge_result( murfey_db.add(clem_img_series) murfey_db.commit() - # Make multiple attempts to refresh data in case of race condition - attempts = 0 - while attempts < 50: - try: - murfey_db.refresh(clem_img_series) - break - except Exception: - logger.warning( - f"Attempt {attempts + 1} at refreshing database entry for " - f"{str(result.series_name)!r} failed: \n" - f"{traceback.format_exc()}" - ) - attempts += 1 - time.sleep(0.1) - else: - raise RuntimeError( - "Maximum number of attempts reached while trying to refresh database " - f"entry for {result.series_name!r}" - ) - logger.info( "Align-and-merge processing result registered for " f"{result.series_name!r} series" diff --git a/src/murfey/workflows/clem/register_preprocessing_results.py b/src/murfey/workflows/clem/register_preprocessing_results.py index a7a83b78e..642165bc2 100644 --- a/src/murfey/workflows/clem/register_preprocessing_results.py +++ b/src/murfey/workflows/clem/register_preprocessing_results.py @@ -9,7 +9,6 @@ import json import logging -import time import traceback from ast import literal_eval from pathlib import Path @@ -121,75 +120,16 @@ def register_lif_preprocessing_result( murfey_db.add(clem_img_stk) murfey_db.commit() - # Make multiple attempts to refresh data in case of race condition - attempts = 0 - while attempts < 50: - try: - murfey_db.refresh(clem_img_stk) - break - except Exception: - logger.warning( - f"Attempt {attempts + 1} at refreshing database entry for " - f"{str(result.image_stack)!r} failed: \n" - f"{traceback.format_exc()}" - ) - attempts += 1 - time.sleep(0.1) - else: - raise RuntimeError( - "Maximum number of attempts reached while trying to refresh database " - f"entry for {str(result.image_stack)!r}" - ) - clem_img_series.associated_metadata = clem_metadata clem_img_series.parent_lif = clem_lif_file clem_img_series.number_of_members = result.number_of_members murfey_db.add(clem_img_series) murfey_db.commit() - # Make multiple attempts to refresh data in case of race condition - attempts = 0 - while attempts < 50: - try: - murfey_db.refresh(clem_img_series) - break - except Exception: - logger.warning( - f"Attempt {attempts + 1} at refreshing database entry for " - f"{str(result.series_name)!r} failed: \n" - f"{traceback.format_exc()}" - ) - attempts += 1 - time.sleep(0.1) - else: - raise RuntimeError( - "Maximum number of attempts reached while trying to refresh database " - f"entry for {str(result.series_name)!r}" - ) - clem_metadata.parent_lif = clem_lif_file murfey_db.add(clem_metadata) murfey_db.commit() - # Make multiple attempts to refresh data in case of race condition - while attempts < 50: - try: - murfey_db.refresh(clem_metadata) - break - except Exception: - logger.warning( - f"Attempt {attempts + 1} at refreshing database entry for " - f"{str(result.metadata)!r} failed: \n" - f"{traceback.format_exc()}" - ) - attempts += 1 - time.sleep(0.1) - else: - raise RuntimeError( - "Maximum number of attempts reached while trying to refresh database " - f"entry for {str(result.metadata)!r}" - ) - logger.info( f"LIF preprocessing results registered for {result.series_name!r} " f"{result.channel!r} image stack" @@ -357,77 +297,17 @@ def register_tiff_preprocessing_result( murfey_db.add(clem_tiff_file) murfey_db.commit() - # Make multiple attempts to refresh data in case of race condition - attempts = 0 - while attempts < 50: - try: - murfey_db.refresh(clem_tiff_file) - break - except Exception: - logger.warning( - f"Attempt {attempts + 1} at refreshing database entry for " - f"{str(file)!r} failed: \n" - f"{traceback.format_exc()}" - ) - attempts += 1 - time.sleep(0.1) - else: - raise RuntimeError( - "Maximum number of attempts reached while trying to refresh database " - f"entry for {str(file)!r}" - ) - clem_img_stk.associated_metadata = clem_metadata clem_img_stk.parent_series = clem_img_series clem_img_stk.channel_name = result.channel murfey_db.add(clem_img_stk) murfey_db.commit() - # Make multiple attempts to refresh data in case of race condition - attempts = 0 - while attempts < 50: - try: - murfey_db.refresh(clem_img_stk) - break - except Exception: - logger.warning( - f"Attempt {attempts + 1} at refreshing database entry for " - f"{str(result.image_stack)!r} failed: \n" - f"{traceback.format_exc()}" - ) - attempts += 1 - time.sleep(0.1) - else: - raise RuntimeError( - "Maximum number of attempts reached while trying to refresh database " - f"entry for {str(result.image_stack)!r}" - ) - clem_img_series.associated_metadata = clem_metadata clem_img_series.number_of_members = result.number_of_members murfey_db.add(clem_img_series) murfey_db.commit() - # Make multiple attempts to refresh data in case of race condition - attempts = 0 - while attempts < 50: - try: - murfey_db.refresh(clem_img_series) - break - except Exception: - logger.warning( - f"Attempt {attempts + 1} at refreshing database entry for " - f"{str(result.series_name)!r} failed: \n" - f"{traceback.format_exc()}" - ) - attempts += 1 - time.sleep(0.1) - else: - raise RuntimeError( - "Maximum number of attempts reached while trying to refresh database " - f"entry for {str(result.series_name)!r}" - ) - logger.info( f"TIFF preprocessing results registered for {result.series_name!r} " f"{result.channel!r} image stack" From d85d9131df6ca8806532b100e01f838ed8450f3c Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Thu, 20 Feb 2025 05:56:28 +0000 Subject: [PATCH 17/18] Removed checks from 'validate_and_sanitise' functions for whether a Path object is a file --- src/murfey/server/api/clem.py | 11 ----------- src/murfey/workflows/clem/__init__.py | 12 ------------ 2 files changed, 23 deletions(-) diff --git a/src/murfey/server/api/clem.py b/src/murfey/server/api/clem.py index 3082595ae..5b7b93850 100644 --- a/src/murfey/server/api/clem.py +++ b/src/murfey/server/api/clem.py @@ -94,17 +94,6 @@ def validate_and_sanitise( if not str(full_path).startswith(str(base_path)): raise ValueError(f"{file} points to a directory that is not permitted") - # # Check that it's a file, not a directory - # # Make a couple of attempts to rule out race condition - # attempts = 0 - # while attempts < 50: - # if full_path.is_file() is True: - # break - # attempts += 1 - # time.sleep(0.1) - # else: - # raise ValueError(f"{file} is not a file") - # Check that it is of a permitted file type if f"{full_path.suffix}" not in valid_file_types: raise ValueError(f"{full_path.suffix} is not a permitted file format") diff --git a/src/murfey/workflows/clem/__init__.py b/src/murfey/workflows/clem/__init__.py index da3e585e0..58a042669 100644 --- a/src/murfey/workflows/clem/__init__.py +++ b/src/murfey/workflows/clem/__init__.py @@ -2,7 +2,6 @@ import logging import re -import time from pathlib import Path from typing import Optional, Type, Union @@ -90,17 +89,6 @@ def _validate_and_sanitise( if not str(full_path).startswith(str(base_path)): raise ValueError(f"{file} points to a directory that is not permitted") - # Check that it's a file, not a directory - # Make a couple of attempts to rule out race condition - attempts = 0 - while attempts < 50: - if full_path.is_file() is True: - break - attempts += 1 - time.sleep(0.1) - else: - raise ValueError(f"{file} is not a file") - # Check that it is of a permitted file type if f"{full_path.suffix}" not in valid_file_types: raise ValueError(f"{full_path.suffix} is not a permitted file format") From 5acdbfff84e190779c04cd85be4b5f73918311ea Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Thu, 20 Feb 2025 09:39:36 +0000 Subject: [PATCH 18/18] Use full rsync basepath when validating files --- src/murfey/server/api/clem.py | 4 ++-- src/murfey/workflows/clem/__init__.py | 13 ++----------- 2 files changed, 4 insertions(+), 13 deletions(-) diff --git a/src/murfey/server/api/clem.py b/src/murfey/server/api/clem.py index 5b7b93850..b4e64327d 100644 --- a/src/murfey/server/api/clem.py +++ b/src/murfey/server/api/clem.py @@ -78,7 +78,7 @@ def validate_and_sanitise( machine_config = get_machine_config(instrument_name=instrument_name)[ instrument_name ] - base_path = machine_config.rsync_basepath.as_posix() + rsync_basepath = machine_config.rsync_basepath.resolve() # Check that full file path doesn't contain unallowed characters # Currently allows only: @@ -91,7 +91,7 @@ def validate_and_sanitise( raise ValueError(f"Unallowed characters present in {file}") # Check that it's not accessing somehwere it's not allowed - if not str(full_path).startswith(str(base_path)): + if not str(full_path).startswith(str(rsync_basepath)): raise ValueError(f"{file} points to a directory that is not permitted") # Check that it is of a permitted file type diff --git a/src/murfey/workflows/clem/__init__.py b/src/murfey/workflows/clem/__init__.py index 58a042669..16ff4e4f7 100644 --- a/src/murfey/workflows/clem/__init__.py +++ b/src/murfey/workflows/clem/__init__.py @@ -64,16 +64,7 @@ def _validate_and_sanitise( machine_config = get_machine_config(instrument_name=instrument_name)[ instrument_name ] - rsync_basepath = machine_config.rsync_basepath - try: - base_path = list(rsync_basepath.parents)[-2].as_posix() - except IndexError: - logger.warning(f"Base path {rsync_basepath!r} is too short") - base_path = rsync_basepath.as_posix() - except Exception as e: - raise Exception( - f"Unexpected exception encountered when loading the file base path: {e}" - ) + rsync_basepath = machine_config.rsync_basepath.resolve() # Check that full file path doesn't contain unallowed characters # Currently allows only: @@ -86,7 +77,7 @@ def _validate_and_sanitise( raise ValueError(f"Unallowed characters present in {file}") # Check that it's not accessing somehwere it's not allowed - if not str(full_path).startswith(str(base_path)): + if not str(full_path).startswith(str(rsync_basepath)): raise ValueError(f"{file} points to a directory that is not permitted") # Check that it is of a permitted file type