From f38029992465a4f5002302b0d805e42127cfe75c Mon Sep 17 00:00:00 2001 From: Daniel Hatton Date: Mon, 10 Feb 2025 14:39:46 +0000 Subject: [PATCH 01/16] Add tables needed to record information relevant to email notifications --- src/murfey/util/db.py | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/src/murfey/util/db.py b/src/murfey/util/db.py index 4ba1f90fe..1f9257988 100644 --- a/src/murfey/util/db.py +++ b/src/murfey/util/db.py @@ -360,6 +360,10 @@ class DataCollectionGroup(SQLModel, table=True): # type: ignore back_populates="data_collection_group", sa_relationship_kwargs={"cascade": "delete"}, ) + notification_parameters: List["NotificationParameter"] = Relationship( + back_populates="data_collection_group", + sa_relationship_kwargs={"cascade": "delete"}, + ) tomography_preprocessing_parameters: List["TomographyPreprocessingParameters"] = ( Relationship( back_populates="data_collection_group", @@ -368,6 +372,34 @@ class DataCollectionGroup(SQLModel, table=True): # type: ignore ) +class NotificationParameter(SQLModel, table=True): # type: ignore + id: int = Field(primary_key=True, unique=True) + dcg_id: int = Field(foreign_key="datacollectiongroup.id", primary_key=True) + name: str + min_value: float + max_value: float + num_instances_since_triggered: int = 0 + notification_active: bool = False + data_collection_group: Optional[DataCollectionGroup] = Relationship( + back_populates="notification_parameters" + ) + notification_values: List["NotificationValue"] = Relationship( + back_populates="notification_parameter", + sa_relationship_kwargs={"cascade": "delete"}, + ) + + +class NotificationValue(SQLModel, table=True): # type: ignore + id: int = Field(primary_key=True, unique=True) + notification_parameter_id: int = Field( + foreign_key="notificationparameter.id", primary_key=True + ) + index: int + notification_parameter: Optional[DataCollectionGroup] = Relationship( + back_populates="notification_values" + ) + + class DataCollection(SQLModel, table=True): # type: ignore id: int = Field(primary_key=True, unique=True) tag: str = Field(primary_key=True) From c122d56f70c92a69a67d4aaaeda1d2e4d62946bb Mon Sep 17 00:00:00 2001 From: Daniel Hatton Date: Mon, 10 Feb 2025 15:32:13 +0000 Subject: [PATCH 02/16] Add method for setting up notification parameters from message sent by PATo --- pyproject.toml | 1 + .../workflows/notifications/__init__.py | 48 +++++++++++++++++++ 2 files changed, 49 insertions(+) create mode 100644 src/murfey/workflows/notifications/__init__.py diff --git a/pyproject.toml b/pyproject.toml index da9f3381d..f71875aa3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -107,6 +107,7 @@ murfey = "murfey.client:run" "clem.register_align_and_merge_result" = "murfey.workflows.clem.register_align_and_merge_results:register_align_and_merge_result" "clem.register_lif_preprocessing_result" = "murfey.workflows.clem.register_preprocessing_results:register_lif_preprocessing_result" "clem.register_tiff_preprocessing_result" = "murfey.workflows.clem.register_preprocessing_results:register_tiff_preprocessing_result" +"pato" = "murfey.workflows.notifications:notification_setup" "spa.flush_spa_preprocess" = "murfey.workflows.spa.flush_spa_preprocess:flush_spa_preprocess" [tool.setuptools] diff --git a/src/murfey/workflows/notifications/__init__.py b/src/murfey/workflows/notifications/__init__.py new file mode 100644 index 000000000..5aea95bae --- /dev/null +++ b/src/murfey/workflows/notifications/__init__.py @@ -0,0 +1,48 @@ +from typing import Dict, Tuple + +from sqlmodel import Session, select + +from murfey.util.db import NotificationParameter + + +def notification_setup( + message: dict, murfey_db: Session, num_instances_between_triggers: int = 500 +) -> bool: + parameters: Dict[str, Tuple[float, float]] = {} + for k in message.keys(): + parameter_name = "" + if k.endswith(("Min", "Max")): + parameter_name = k[:-3] + else: + continue + if parameter_name in parameters.keys(): + continue + parameters[parameter_name] = ( + message.get(f"{parameter_name}Min", 0), + message.get(f"{parameter_name}Max", 10000), + ) + dcgid = message["dcg"] + existing_notification_parameters = murfey_db.exec( + select(NotificationParameter).where(NotificationParameter.dcg_id == dcgid) + ).all() + new_notification_parameters = [] + for k, v in parameters.items(): + for enp in existing_notification_parameters: + if enp.name == k: + enp.min_value = v[0] + enp.max_value = v[1] + break + else: + new_notification_parameters.append( + NotificationParameter( + dcg_id=dcgid, + name=k, + min_value=v[0], + max_value=v[1], + num_instances_since_triggered=num_instances_between_triggers, + ) + ) + murfey_db.add(existing_notification_parameters + new_notification_parameters) + murfey_db.commit() + murfey_db.close() + return True From cb6182195fcf6a7fbd77f5b43f4e95819c1f5b2d Mon Sep 17 00:00:00 2001 From: Daniel Hatton Date: Mon, 10 Feb 2025 17:29:54 +0000 Subject: [PATCH 03/16] Add logic for determining when a notification should be raised There are some hard coded parameters in here at the moment --- pyproject.toml | 1 + src/murfey/server/__init__.py | 317 -------------- src/murfey/util/db.py | 1 + .../workflows/notifications/__init__.py | 6 +- src/murfey/workflows/spa/picking.py | 405 ++++++++++++++++++ 5 files changed, 411 insertions(+), 319 deletions(-) create mode 100644 src/murfey/workflows/spa/picking.py diff --git a/pyproject.toml b/pyproject.toml index f71875aa3..dac2fef54 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -108,6 +108,7 @@ murfey = "murfey.client:run" "clem.register_lif_preprocessing_result" = "murfey.workflows.clem.register_preprocessing_results:register_lif_preprocessing_result" "clem.register_tiff_preprocessing_result" = "murfey.workflows.clem.register_preprocessing_results:register_tiff_preprocessing_result" "pato" = "murfey.workflows.notifications:notification_setup" +"picked_particles" = "murfey.wrokflows.spa.picking:particles_picked" "spa.flush_spa_preprocess" = "murfey.workflows.spa.flush_spa_preprocess:flush_spa_preprocess" [tool.setuptools] diff --git a/src/murfey/server/__init__.py b/src/murfey/server/__init__.py index caf38d049..5c060ca13 100644 --- a/src/murfey/server/__init__.py +++ b/src/murfey/server/__init__.py @@ -573,298 +573,6 @@ def _get_spa_params( return relion_params, feedback_params -def _register_picked_particles_use_diameter( - message: dict, _db=murfey_db, demo: bool = False -): - """Received picked particles from the autopick service""" - # Add this message to the table of seen messages - params_to_forward = message.get("extraction_parameters") - assert isinstance(params_to_forward, dict) - pj_id = _pj_id(message["program_id"], _db) - ctf_params = db.CtfParameters( - pj_id=pj_id, - micrographs_file=params_to_forward["micrographs_file"], - extract_file=params_to_forward["extract_file"], - coord_list_file=params_to_forward["coord_list_file"], - ctf_image=params_to_forward["ctf_values"]["CtfImage"], - ctf_max_resolution=params_to_forward["ctf_values"]["CtfMaxResolution"], - ctf_figure_of_merit=params_to_forward["ctf_values"]["CtfFigureOfMerit"], - defocus_u=params_to_forward["ctf_values"]["DefocusU"], - defocus_v=params_to_forward["ctf_values"]["DefocusV"], - defocus_angle=params_to_forward["ctf_values"]["DefocusAngle"], - ) - _db.add(ctf_params) - _db.commit() - _db.close() - - picking_db_len = _db.exec( - select(func.count(db.ParticleSizes.id)).where(db.ParticleSizes.pj_id == pj_id) - ).one() - if picking_db_len > default_spa_parameters.nr_picks_before_diameter: - # If there are enough particles to get a diameter - instrument_name = ( - _db.exec(select(db.Session).where(db.Session.id == message["session_id"])) - .one() - .instrument_name - ) - machine_config = get_machine_config(instrument_name=instrument_name)[ - instrument_name - ] - relion_params = _db.exec( - select(db.SPARelionParameters).where(db.SPARelionParameters.pj_id == pj_id) - ).one() - relion_options = dict(relion_params) - feedback_params = _db.exec( - select(db.SPAFeedbackParameters).where( - db.SPAFeedbackParameters.pj_id == pj_id - ) - ).one() - - particle_diameter = relion_params.particle_diameter - - if feedback_params.picker_ispyb_id is None: - if demo or not _transport_object: - feedback_params.picker_ispyb_id = 1000 - else: - assert feedback_params.picker_murfey_id is not None - feedback_params.picker_ispyb_id = _transport_object.do_buffer_lookup( - message["program_id"], feedback_params.picker_murfey_id - ) - if feedback_params.picker_ispyb_id is not None: - _flush_class2d(message["session_id"], message["program_id"], _db) - _db.add(feedback_params) - _db.commit() - selection_stash = _db.exec( - select(db.SelectionStash).where(db.SelectionStash.pj_id == pj_id) - ).all() - for s in selection_stash: - _register_class_selection( - { - "session_id": s.session_id, - "class_selection_score": s.class_selection_score or 0, - }, - _db=_db, - demo=demo, - ) - _db.delete(s) - _db.commit() - - if not particle_diameter: - # If the diameter has not been calculated then find it - picking_db = _db.exec( - select(db.ParticleSizes.particle_size).where( - db.ParticleSizes.pj_id == pj_id - ) - ).all() - particle_diameter = np.quantile(list(picking_db), 0.75) - relion_params.particle_diameter = particle_diameter - _db.add(relion_params) - _db.commit() - - ctf_db = _db.exec( - select(db.CtfParameters).where(db.CtfParameters.pj_id == pj_id) - ).all() - for saved_message in ctf_db: - # Send on all saved messages to extraction - _db.expunge(saved_message) - zocalo_message: dict = { - "parameters": { - "micrographs_file": saved_message.micrographs_file, - "coord_list_file": saved_message.coord_list_file, - "output_file": saved_message.extract_file, - "pixel_size": ( - relion_options["angpix"] - * relion_options["motion_corr_binning"] - ), - "ctf_image": saved_message.ctf_image, - "ctf_max_resolution": saved_message.ctf_max_resolution, - "ctf_figure_of_merit": saved_message.ctf_figure_of_merit, - "defocus_u": saved_message.defocus_u, - "defocus_v": saved_message.defocus_v, - "defocus_angle": saved_message.defocus_angle, - "particle_diameter": particle_diameter, - "downscale": relion_options["downscale"], - "kv": relion_options["voltage"], - "node_creator_queue": machine_config.node_creator_queue, - "session_id": message["session_id"], - "autoproc_program_id": _app_id( - _pj_id(message["program_id"], _db, recipe="em-spa-extract"), - _db, - ), - "batch_size": default_spa_parameters.batch_size_2d, - }, - "recipes": ["em-spa-extract"], - } - if _transport_object: - zocalo_message["parameters"][ - "feedback_queue" - ] = _transport_object.feedback_queue - _transport_object.send( - "processing_recipe", zocalo_message, new_connection=True - ) - else: - # If the diameter is known then just send the new message - particle_diameter = relion_params.particle_diameter - zocalo_message = { - "parameters": { - "micrographs_file": params_to_forward["micrographs_file"], - "coord_list_file": params_to_forward["coord_list_file"], - "output_file": params_to_forward["extract_file"], - "pixel_size": ( - relion_options["angpix"] * relion_options["motion_corr_binning"] - ), - "ctf_image": params_to_forward["ctf_values"]["CtfImage"], - "ctf_max_resolution": params_to_forward["ctf_values"][ - "CtfMaxResolution" - ], - "ctf_figure_of_merit": params_to_forward["ctf_values"][ - "CtfFigureOfMerit" - ], - "defocus_u": params_to_forward["ctf_values"]["DefocusU"], - "defocus_v": params_to_forward["ctf_values"]["DefocusV"], - "defocus_angle": params_to_forward["ctf_values"]["DefocusAngle"], - "particle_diameter": particle_diameter, - "downscale": relion_options["downscale"], - "kv": relion_options["voltage"], - "node_creator_queue": machine_config.node_creator_queue, - "session_id": message["session_id"], - "autoproc_program_id": _app_id( - _pj_id(message["program_id"], _db, recipe="em-spa-extract"), _db - ), - "batch_size": default_spa_parameters.batch_size_2d, - }, - "recipes": ["em-spa-extract"], - } - if _transport_object: - zocalo_message["parameters"][ - "feedback_queue" - ] = _transport_object.feedback_queue - _transport_object.send( - "processing_recipe", zocalo_message, new_connection=True - ) - if demo: - _register_incomplete_2d_batch( - { - "session_id": message["session_id"], - "program_id": message["program_id"], - "class2d_message": { - "particles_file": "Select/job009/particles_split_1.star", - "class2d_dir": "Class2D", - "batch_size": 50000, - }, - }, - _db=_db, - demo=demo, - ) - - else: - # If not enough particles then save the new sizes - particle_list = message.get("particle_diameters") - assert isinstance(particle_list, list) - for particle in particle_list: - new_particle = db.ParticleSizes(pj_id=pj_id, particle_size=particle) - _db.add(new_particle) - _db.commit() - _db.close() - - -def _register_picked_particles_use_boxsize(message: dict, _db=murfey_db): - """Received picked particles from the autopick service""" - # Add this message to the table of seen messages - params_to_forward = message.get("extraction_parameters") - assert isinstance(params_to_forward, dict) - - instrument_name = ( - _db.exec(select(db.Session).where(db.Session.id == message["session_id"])) - .one() - .instrument_name - ) - machine_config = get_machine_config(instrument_name=instrument_name)[ - instrument_name - ] - pj_id = _pj_id(message["program_id"], _db) - ctf_params = db.CtfParameters( - pj_id=pj_id, - micrographs_file=params_to_forward["micrographs_file"], - coord_list_file=params_to_forward["coord_list_file"], - extract_file=params_to_forward["extract_file"], - ctf_image=params_to_forward["ctf_values"]["CtfImage"], - ctf_max_resolution=params_to_forward["ctf_values"]["CtfMaxResolution"], - ctf_figure_of_merit=params_to_forward["ctf_values"]["CtfFigureOfMerit"], - defocus_u=params_to_forward["ctf_values"]["DefocusU"], - defocus_v=params_to_forward["ctf_values"]["DefocusV"], - defocus_angle=params_to_forward["ctf_values"]["DefocusAngle"], - ) - _db.add(ctf_params) - _db.commit() - _db.close() - - # Set particle diameter as zero and send box sizes - relion_params = _db.exec( - select(db.SPARelionParameters).where(db.SPARelionParameters.pj_id == pj_id) - ).one() - feedback_params = _db.exec( - select(db.SPAFeedbackParameters).where(db.SPAFeedbackParameters.pj_id == pj_id) - ).one() - - if feedback_params.picker_ispyb_id is None and _transport_object: - assert feedback_params.picker_murfey_id is not None - feedback_params.picker_ispyb_id = _transport_object.do_buffer_lookup( - message["program_id"], feedback_params.picker_murfey_id - ) - if feedback_params.picker_ispyb_id is not None: - _flush_class2d(message["session_id"], message["program_id"], _db) - _db.add(feedback_params) - _db.commit() - selection_stash = _db.exec( - select(db.SelectionStash).where(db.SelectionStash.pj_id == pj_id) - ).all() - for s in selection_stash: - _register_class_selection( - { - "session_id": s.session_id, - "class_selection_score": s.class_selection_score or 0, - }, - _db=_db, - ) - _db.delete(s) - _db.commit() - - # Send the message to extraction with the box sizes - zocalo_message: dict = { - "parameters": { - "micrographs_file": params_to_forward["micrographs_file"], - "coord_list_file": params_to_forward["coord_list_file"], - "output_file": params_to_forward["extract_file"], - "pixel_size": relion_params.angpix * relion_params.motion_corr_binning, - "ctf_image": params_to_forward["ctf_values"]["CtfImage"], - "ctf_max_resolution": params_to_forward["ctf_values"]["CtfMaxResolution"], - "ctf_figure_of_merit": params_to_forward["ctf_values"]["CtfFigureOfMerit"], - "defocus_u": params_to_forward["ctf_values"]["DefocusU"], - "defocus_v": params_to_forward["ctf_values"]["DefocusV"], - "defocus_angle": params_to_forward["ctf_values"]["DefocusAngle"], - "particle_diameter": relion_params.particle_diameter, - "boxsize": relion_params.boxsize, - "small_boxsize": relion_params.small_boxsize, - "downscale": relion_params.downscale, - "kv": relion_params.voltage, - "node_creator_queue": machine_config.node_creator_queue, - "session_id": message["session_id"], - "autoproc_program_id": _app_id( - _pj_id(message["program_id"], _db, recipe="em-spa-extract"), _db - ), - "batch_size": default_spa_parameters.batch_size_2d, - }, - "recipes": ["em-spa-extract"], - } - if _transport_object: - zocalo_message["parameters"][ - "feedback_queue" - ] = _transport_object.feedback_queue - _transport_object.send("processing_recipe", zocalo_message, new_connection=True) - _db.close() - - def _release_2d_hold(message: dict, _db=murfey_db): relion_params, feedback_params = _get_spa_params(message["program_id"], _db) if not feedback_params.star_combination_job: @@ -2841,31 +2549,6 @@ def feedback_callback(header: dict, message: dict) -> None: if _transport_object: _transport_object.transport.ack(header) return None - elif message["register"] == "picked_particles": - movie = murfey_db.exec( - select(db.Movie).where( - db.Movie.murfey_id == message["motion_correction_id"] - ) - ).one() - movie.preprocessed = True - murfey_db.add(movie) - murfey_db.commit() - feedback_params = murfey_db.exec( - select(db.SPAFeedbackParameters).where( - db.SPAFeedbackParameters.pj_id - == _pj_id(message["program_id"], murfey_db) - ) - ).one() - if feedback_params.estimate_particle_diameter: - _register_picked_particles_use_diameter(message) - else: - _register_picked_particles_use_boxsize(message) - prom.preprocessed_movies.labels( - processing_job=_pj_id(message["program_id"], murfey_db) - ).inc() - if _transport_object: - _transport_object.transport.ack(header) - return None elif message["register"] == "done_incomplete_2d_batch": _release_2d_hold(message) if _transport_object: diff --git a/src/murfey/util/db.py b/src/murfey/util/db.py index 1f9257988..2502370d7 100644 --- a/src/murfey/util/db.py +++ b/src/murfey/util/db.py @@ -395,6 +395,7 @@ class NotificationValue(SQLModel, table=True): # type: ignore foreign_key="notificationparameter.id", primary_key=True ) index: int + within_bounds: bool notification_parameter: Optional[DataCollectionGroup] = Relationship( back_populates="notification_values" ) diff --git a/src/murfey/workflows/notifications/__init__.py b/src/murfey/workflows/notifications/__init__.py index 5aea95bae..fadeabc45 100644 --- a/src/murfey/workflows/notifications/__init__.py +++ b/src/murfey/workflows/notifications/__init__.py @@ -1,3 +1,4 @@ +import re from typing import Dict, Tuple from sqlmodel import Session, select @@ -15,9 +16,10 @@ def notification_setup( parameter_name = k[:-3] else: continue - if parameter_name in parameters.keys(): + snake_parameter_name = re.sub(r"(? default_spa_parameters.nr_picks_before_diameter: + # If there are enough particles to get a diameter + instrument_name = ( + _db.exec( + select(MurfeySession).where(MurfeySession.id == message["session_id"]) + ) + .one() + .instrument_name + ) + machine_config = get_machine_config(instrument_name=instrument_name)[ + instrument_name + ] + relion_params = _db.exec( + select(SPARelionParameters).where(SPARelionParameters.pj_id == pj_id) + ).one() + relion_options = dict(relion_params) + feedback_params = _db.exec( + select(SPAFeedbackParameters).where(SPAFeedbackParameters.pj_id == pj_id) + ).one() + + particle_diameter = relion_params.particle_diameter + + if feedback_params.picker_ispyb_id is None: + if demo or not _transport_object: + feedback_params.picker_ispyb_id = 1000 + else: + assert feedback_params.picker_murfey_id is not None + feedback_params.picker_ispyb_id = _transport_object.do_buffer_lookup( + message["program_id"], feedback_params.picker_murfey_id + ) + if feedback_params.picker_ispyb_id is not None: + _flush_class2d(message["session_id"], message["program_id"], _db) + _db.add(feedback_params) + _db.commit() + selection_stash = _db.exec( + select(SelectionStash).where(SelectionStash.pj_id == pj_id) + ).all() + for s in selection_stash: + _register_class_selection( + { + "session_id": s.session_id, + "class_selection_score": s.class_selection_score or 0, + }, + _db=_db, + demo=demo, + ) + _db.delete(s) + _db.commit() + + if not particle_diameter: + # If the diameter has not been calculated then find it + picking_db = _db.exec( + select(ParticleSizes.particle_size).where(ParticleSizes.pj_id == pj_id) + ).all() + particle_diameter = np.quantile(list(picking_db), 0.75) + relion_params.particle_diameter = particle_diameter + _db.add(relion_params) + _db.commit() + + ctf_db = _db.exec( + select(CtfParameters).where(CtfParameters.pj_id == pj_id) + ).all() + for saved_message in ctf_db: + # Send on all saved messages to extraction + _db.expunge(saved_message) + zocalo_message: dict = { + "parameters": { + "micrographs_file": saved_message.micrographs_file, + "coord_list_file": saved_message.coord_list_file, + "output_file": saved_message.extract_file, + "pixel_size": ( + relion_options["angpix"] + * relion_options["motion_corr_binning"] + ), + "ctf_image": saved_message.ctf_image, + "ctf_max_resolution": saved_message.ctf_max_resolution, + "ctf_figure_of_merit": saved_message.ctf_figure_of_merit, + "defocus_u": saved_message.defocus_u, + "defocus_v": saved_message.defocus_v, + "defocus_angle": saved_message.defocus_angle, + "particle_diameter": particle_diameter, + "downscale": relion_options["downscale"], + "kv": relion_options["voltage"], + "node_creator_queue": machine_config.node_creator_queue, + "session_id": message["session_id"], + "autoproc_program_id": _app_id( + _pj_id(message["program_id"], _db, recipe="em-spa-extract"), + _db, + ), + "batch_size": default_spa_parameters.batch_size_2d, + }, + "recipes": ["em-spa-extract"], + } + if _transport_object: + zocalo_message["parameters"][ + "feedback_queue" + ] = _transport_object.feedback_queue + _transport_object.send( + "processing_recipe", zocalo_message, new_connection=True + ) + else: + # If the diameter is known then just send the new message + particle_diameter = relion_params.particle_diameter + zocalo_message = { + "parameters": { + "micrographs_file": params_to_forward["micrographs_file"], + "coord_list_file": params_to_forward["coord_list_file"], + "output_file": params_to_forward["extract_file"], + "pixel_size": ( + relion_options["angpix"] * relion_options["motion_corr_binning"] + ), + "ctf_image": params_to_forward["ctf_values"]["CtfImage"], + "ctf_max_resolution": params_to_forward["ctf_values"][ + "CtfMaxResolution" + ], + "ctf_figure_of_merit": params_to_forward["ctf_values"][ + "CtfFigureOfMerit" + ], + "defocus_u": params_to_forward["ctf_values"]["DefocusU"], + "defocus_v": params_to_forward["ctf_values"]["DefocusV"], + "defocus_angle": params_to_forward["ctf_values"]["DefocusAngle"], + "particle_diameter": particle_diameter, + "downscale": relion_options["downscale"], + "kv": relion_options["voltage"], + "node_creator_queue": machine_config.node_creator_queue, + "session_id": message["session_id"], + "autoproc_program_id": _app_id( + _pj_id(message["program_id"], _db, recipe="em-spa-extract"), _db + ), + "batch_size": default_spa_parameters.batch_size_2d, + }, + "recipes": ["em-spa-extract"], + } + if _transport_object: + zocalo_message["parameters"][ + "feedback_queue" + ] = _transport_object.feedback_queue + _transport_object.send( + "processing_recipe", zocalo_message, new_connection=True + ) + if demo: + _register_incomplete_2d_batch( + { + "session_id": message["session_id"], + "program_id": message["program_id"], + "class2d_message": { + "particles_file": "Select/job009/particles_split_1.star", + "class2d_dir": "Class2D", + "batch_size": 50000, + }, + }, + _db=_db, + demo=demo, + ) + + else: + # If not enough particles then save the new sizes + particle_list = message.get("particle_diameters") + assert isinstance(particle_list, list) + for particle in particle_list: + new_particle = ParticleSizes(pj_id=pj_id, particle_size=particle) + _db.add(new_particle) + _db.commit() + _db.close() + + +def _register_picked_particles_use_boxsize(message: dict, _db: Session): + """Received picked particles from the autopick service""" + # Add this message to the table of seen messages + params_to_forward = message.get("extraction_parameters") + assert isinstance(params_to_forward, dict) + + instrument_name = ( + _db.exec(select(MurfeySession).where(MurfeySession.id == message["session_id"])) + .one() + .instrument_name + ) + machine_config = get_machine_config(instrument_name=instrument_name)[ + instrument_name + ] + pj_id = _pj_id(message["program_id"], _db) + ctf_params = CtfParameters( + pj_id=pj_id, + micrographs_file=params_to_forward["micrographs_file"], + coord_list_file=params_to_forward["coord_list_file"], + extract_file=params_to_forward["extract_file"], + ctf_image=params_to_forward["ctf_values"]["CtfImage"], + ctf_max_resolution=params_to_forward["ctf_values"]["CtfMaxResolution"], + ctf_figure_of_merit=params_to_forward["ctf_values"]["CtfFigureOfMerit"], + defocus_u=params_to_forward["ctf_values"]["DefocusU"], + defocus_v=params_to_forward["ctf_values"]["DefocusV"], + defocus_angle=params_to_forward["ctf_values"]["DefocusAngle"], + ) + _db.add(ctf_params) + _db.commit() + _db.close() + + # Set particle diameter as zero and send box sizes + relion_params = _db.exec( + select(SPARelionParameters).where(SPARelionParameters.pj_id == pj_id) + ).one() + feedback_params = _db.exec( + select(SPAFeedbackParameters).where(SPAFeedbackParameters.pj_id == pj_id) + ).one() + + if feedback_params.picker_ispyb_id is None and _transport_object: + assert feedback_params.picker_murfey_id is not None + feedback_params.picker_ispyb_id = _transport_object.do_buffer_lookup( + message["program_id"], feedback_params.picker_murfey_id + ) + if feedback_params.picker_ispyb_id is not None: + _flush_class2d(message["session_id"], message["program_id"], _db) + _db.add(feedback_params) + _db.commit() + selection_stash = _db.exec( + select(SelectionStash).where(SelectionStash.pj_id == pj_id) + ).all() + for s in selection_stash: + _register_class_selection( + { + "session_id": s.session_id, + "class_selection_score": s.class_selection_score or 0, + }, + _db=_db, + ) + _db.delete(s) + _db.commit() + + # Send the message to extraction with the box sizes + zocalo_message: dict = { + "parameters": { + "micrographs_file": params_to_forward["micrographs_file"], + "coord_list_file": params_to_forward["coord_list_file"], + "output_file": params_to_forward["extract_file"], + "pixel_size": relion_params.angpix * relion_params.motion_corr_binning, + "ctf_image": params_to_forward["ctf_values"]["CtfImage"], + "ctf_max_resolution": params_to_forward["ctf_values"]["CtfMaxResolution"], + "ctf_figure_of_merit": params_to_forward["ctf_values"]["CtfFigureOfMerit"], + "defocus_u": params_to_forward["ctf_values"]["DefocusU"], + "defocus_v": params_to_forward["ctf_values"]["DefocusV"], + "defocus_angle": params_to_forward["ctf_values"]["DefocusAngle"], + "particle_diameter": relion_params.particle_diameter, + "boxsize": relion_params.boxsize, + "small_boxsize": relion_params.small_boxsize, + "downscale": relion_params.downscale, + "kv": relion_params.voltage, + "node_creator_queue": machine_config.node_creator_queue, + "session_id": message["session_id"], + "autoproc_program_id": _app_id( + _pj_id(message["program_id"], _db, recipe="em-spa-extract"), _db + ), + "batch_size": default_spa_parameters.batch_size_2d, + }, + "recipes": ["em-spa-extract"], + } + if _transport_object: + zocalo_message["parameters"][ + "feedback_queue" + ] = _transport_object.feedback_queue + _transport_object.send("processing_recipe", zocalo_message, new_connection=True) + _db.close() + + +def _request_email(failed_params: List[str]) -> None: + return None + + +def _check_notifications(message: dict, murfey_db: Session) -> None: + data_collection_hierarchy = murfey_db.exec( + select(DataCollection, ProcessingJob, AutoProcProgram) + .where(ProcessingJob.dc_id == DataCollection.id) + .where(AutoProcProgram.pj_id == ProcessingJob.id) + .where(AutoProcProgram == message["program_id"]) + ).all() + dcgid = data_collection_hierarchy[0][0].dcg_id + notification_parameters = murfey_db.exec( + select(NotificationParameter).where(NotificationParameter.dcg_id == dcgid) + ).all() + failures = [] + for param in notification_parameters: + if message.get(param.name) is not None: + param_values = murfey_db.exec( + select(NotificationValue).where( + NotificationValue.notification_parameter_id == param.id + ) + ).all() + param_values.sort(ley=lambda x: x.index) + param_value_to_drop = None + if len(param_values) >= 25: + param_value_to_drop = param_values[0] + param_values = param_values[1:] + param_values = param_values.append( + NotificationValue( + notification_parameter_id=param.id, + index=param_values[-1].index + 1, + within_bounds=param.min_value + <= message[param.name] + <= param.max_value, + ) + ) + if ( + len(param_values) >= 25 + and sum(p.within_bounds for p in param_values) / len(param_values) + < 0.25 + and not param.notification_active + ): + if not param.notification_active: + param.notification_active = True + if param.num_instances_since_triggered >= 500: + failures.append(param.name) + param.num_instances_since_triggered = 0 + else: + if param.notification_active: + param.notification_active = False + if param_value_to_drop is not None: + murfey_db.delete(param_value_to_drop) + murfey_db.add(param_values[-1]) + murfey_db.add(notification_parameters) + murfey_db.commit() + murfey_db.close() + if failures: + _request_email(failures) + return None + + +def particles_picked(message: dict, murfey_db: Session) -> bool: + movie = murfey_db.exec( + select(Movie).where(Movie.murfey_id == message["motion_correction_id"]) + ).one() + movie.preprocessed = True + murfey_db.add(movie) + murfey_db.commit() + feedback_params = murfey_db.exec( + select(SPAFeedbackParameters).where( + SPAFeedbackParameters.pj_id == _pj_id(message["program_id"], murfey_db) + ) + ).one() + if feedback_params.estimate_particle_diameter: + _register_picked_particles_use_diameter(message, murfey_db) + else: + _register_picked_particles_use_boxsize(message, murfey_db) + prom.preprocessed_movies.labels( + processing_job=_pj_id(message["program_id"], murfey_db) + ).inc() + _check_notifications(message, murfey_db) + return True From 5ec35bbc2e2180b1b417e0a01d454748fac34859 Mon Sep 17 00:00:00 2001 From: Daniel Hatton Date: Mon, 10 Feb 2025 17:40:58 +0000 Subject: [PATCH 04/16] Add functionality to notify PATo that an email should be sent The email body is a little sparse at the moment but we're trying to avoid recording too much information in the database --- src/murfey/util/config.py | 2 ++ src/murfey/workflows/spa/picking.py | 21 +++++++++++++++++++-- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/src/murfey/util/config.py b/src/murfey/util/config.py index ca8c65ed3..1aca838ab 100644 --- a/src/murfey/util/config.py +++ b/src/murfey/util/config.py @@ -70,6 +70,8 @@ class MachineConfig(BaseModel, extra=Extra.allow): # type: ignore security_configuration_path: Optional[Path] = None auth_url: str = "" + notifications_queue: str = "pato_notifications" + def from_file(config_file_path: Path, instrument: str = "") -> Dict[str, MachineConfig]: with open(config_file_path, "r") as config_stream: diff --git a/src/murfey/workflows/spa/picking.py b/src/murfey/workflows/spa/picking.py index 961d61205..8ffccb3d6 100644 --- a/src/murfey/workflows/spa/picking.py +++ b/src/murfey/workflows/spa/picking.py @@ -320,7 +320,24 @@ def _register_picked_particles_use_boxsize(message: dict, _db: Session): _db.close() -def _request_email(failed_params: List[str]) -> None: +def _request_email( + failed_params: List[str], session_id: int, murfey_db: Session +) -> None: + session = murfey_db.exec( + select(MurfeySession).where(MurfeySession.id == session_id) + ).one() + config = get_machine_config(instrument_name=session.instrument_name)[ + session.instrument_name + ] + if _transport_object: + _transport_object.send( + config.notifications_queue, + { + "session": session.visit, + "message": f"The following parameters consistently exceeded the user set bounds: {failed_params}", + }, + new_connection=True, + ) return None @@ -378,7 +395,7 @@ def _check_notifications(message: dict, murfey_db: Session) -> None: murfey_db.commit() murfey_db.close() if failures: - _request_email(failures) + _request_email(failures, message["session_id"], murfey_db) return None From 5a317c30dbfce150d0f58026893baa491515f04a Mon Sep 17 00:00:00 2001 From: Daniel Hatton Date: Mon, 10 Feb 2025 17:52:08 +0000 Subject: [PATCH 05/16] Fix database relationship --- src/murfey/util/db.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/murfey/util/db.py b/src/murfey/util/db.py index 2502370d7..4a3d9f471 100644 --- a/src/murfey/util/db.py +++ b/src/murfey/util/db.py @@ -396,7 +396,7 @@ class NotificationValue(SQLModel, table=True): # type: ignore ) index: int within_bounds: bool - notification_parameter: Optional[DataCollectionGroup] = Relationship( + notification_parameter: Optional[NotificationParameter] = Relationship( back_populates="notification_values" ) From e4db35d6cd203311b1dd09cb657e141931ce26c9 Mon Sep 17 00:00:00 2001 From: Daniel Hatton Date: Tue, 11 Mar 2025 17:24:38 +0000 Subject: [PATCH 06/16] typo --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index dac2fef54..eae807041 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -108,7 +108,7 @@ murfey = "murfey.client:run" "clem.register_lif_preprocessing_result" = "murfey.workflows.clem.register_preprocessing_results:register_lif_preprocessing_result" "clem.register_tiff_preprocessing_result" = "murfey.workflows.clem.register_preprocessing_results:register_tiff_preprocessing_result" "pato" = "murfey.workflows.notifications:notification_setup" -"picked_particles" = "murfey.wrokflows.spa.picking:particles_picked" +"picked_particles" = "murfey.workflows.spa.picking:particles_picked" "spa.flush_spa_preprocess" = "murfey.workflows.spa.flush_spa_preprocess:flush_spa_preprocess" [tool.setuptools] From c968f180cb46f295d1659f9895bc9711c214f07b Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Tue, 11 Mar 2025 18:01:16 +0000 Subject: [PATCH 07/16] Typo --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 07a771e4c..0b32e2610 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -108,7 +108,7 @@ murfey = "murfey.client:run" "clem.register_lif_preprocessing_result" = "murfey.workflows.clem.register_preprocessing_results:register_lif_preprocessing_result" "clem.register_tiff_preprocessing_result" = "murfey.workflows.clem.register_preprocessing_results:register_tiff_preprocessing_result" "pato" = "murfey.workflows.notifications:notification_setup" -"picked_particles" = "murfey.wrokflows.spa.picking:particles_picked" +"picked_particles" = "murfey.workflows.spa.picking:particles_picked" "spa.flush_spa_preprocess" = "murfey.workflows.spa.flush_spa_preprocess:flush_spa_preprocess" [tool.setuptools] From 5190331824869472cb473a17c4f6cfb6e837a139 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 12 Mar 2025 14:26:18 +0000 Subject: [PATCH 08/16] Fixed issues with '_check_notifications' function --- src/murfey/workflows/spa/picking.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/murfey/workflows/spa/picking.py b/src/murfey/workflows/spa/picking.py index 8ffccb3d6..aa10edf40 100644 --- a/src/murfey/workflows/spa/picking.py +++ b/src/murfey/workflows/spa/picking.py @@ -346,7 +346,7 @@ def _check_notifications(message: dict, murfey_db: Session) -> None: select(DataCollection, ProcessingJob, AutoProcProgram) .where(ProcessingJob.dc_id == DataCollection.id) .where(AutoProcProgram.pj_id == ProcessingJob.id) - .where(AutoProcProgram == message["program_id"]) + .where(AutoProcProgram.id == message["program_id"]) ).all() dcgid = data_collection_hierarchy[0][0].dcg_id notification_parameters = murfey_db.exec( @@ -354,6 +354,7 @@ def _check_notifications(message: dict, murfey_db: Session) -> None: ).all() failures = [] for param in notification_parameters: + param_value_to_drop = None if message.get(param.name) is not None: param_values = murfey_db.exec( select(NotificationValue).where( @@ -361,7 +362,6 @@ def _check_notifications(message: dict, murfey_db: Session) -> None: ) ).all() param_values.sort(ley=lambda x: x.index) - param_value_to_drop = None if len(param_values) >= 25: param_value_to_drop = param_values[0] param_values = param_values[1:] @@ -391,7 +391,7 @@ def _check_notifications(message: dict, murfey_db: Session) -> None: if param_value_to_drop is not None: murfey_db.delete(param_value_to_drop) murfey_db.add(param_values[-1]) - murfey_db.add(notification_parameters) + murfey_db.add_all(notification_parameters) murfey_db.commit() murfey_db.close() if failures: From e91fb8619dad1284d2d291448cddbafb1a994c38 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 12 Mar 2025 14:27:31 +0000 Subject: [PATCH 09/16] Use 'add_all' instead of 'add' when committing a list/sequence of tables --- src/murfey/workflows/notifications/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/murfey/workflows/notifications/__init__.py b/src/murfey/workflows/notifications/__init__.py index fadeabc45..7c9709f21 100644 --- a/src/murfey/workflows/notifications/__init__.py +++ b/src/murfey/workflows/notifications/__init__.py @@ -27,7 +27,7 @@ def notification_setup( existing_notification_parameters = murfey_db.exec( select(NotificationParameter).where(NotificationParameter.dcg_id == dcgid) ).all() - new_notification_parameters = [] + new_notification_parameters: list[NotificationParameter] = [] for k, v in parameters.items(): for enp in existing_notification_parameters: if enp.name == k: @@ -44,7 +44,7 @@ def notification_setup( num_instances_since_triggered=num_instances_between_triggers, ) ) - murfey_db.add(existing_notification_parameters + new_notification_parameters) + murfey_db.add_all(existing_notification_parameters + new_notification_parameters) murfey_db.commit() murfey_db.close() return True From 1dbf1678faf193674fcd23cd8f6c70e40fd08c5d Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 12 Mar 2025 14:43:20 +0000 Subject: [PATCH 10/16] Notification tables now auto-generate ids --- src/murfey/util/db.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/murfey/util/db.py b/src/murfey/util/db.py index 598e7ae6b..262f08c41 100644 --- a/src/murfey/util/db.py +++ b/src/murfey/util/db.py @@ -373,8 +373,8 @@ class DataCollectionGroup(SQLModel, table=True): # type: ignore class NotificationParameter(SQLModel, table=True): # type: ignore - id: int = Field(primary_key=True, unique=True) - dcg_id: int = Field(foreign_key="datacollectiongroup.id", primary_key=True) + id: Optional[int] = Field(default=None, primary_key=True) + dcg_id: int = Field(foreign_key="datacollectiongroup.id") name: str min_value: float max_value: float @@ -390,10 +390,8 @@ class NotificationParameter(SQLModel, table=True): # type: ignore class NotificationValue(SQLModel, table=True): # type: ignore - id: int = Field(primary_key=True, unique=True) - notification_parameter_id: int = Field( - foreign_key="notificationparameter.id", primary_key=True - ) + id: Optional[int] = Field(default=None, primary_key=True) + notification_parameter_id: int = Field(foreign_key="notificationparameter.id") index: int within_bounds: bool notification_parameter: Optional[NotificationParameter] = Relationship( From 4c5516b4e31adcb6b88114e1790655c033d1c420 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 12 Mar 2025 14:59:15 +0000 Subject: [PATCH 11/16] Fixed indent level of if-block in '_check_notifications' --- src/murfey/workflows/spa/picking.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/murfey/workflows/spa/picking.py b/src/murfey/workflows/spa/picking.py index aa10edf40..cd68b0309 100644 --- a/src/murfey/workflows/spa/picking.py +++ b/src/murfey/workflows/spa/picking.py @@ -354,7 +354,6 @@ def _check_notifications(message: dict, murfey_db: Session) -> None: ).all() failures = [] for param in notification_parameters: - param_value_to_drop = None if message.get(param.name) is not None: param_values = murfey_db.exec( select(NotificationValue).where( @@ -362,6 +361,7 @@ def _check_notifications(message: dict, murfey_db: Session) -> None: ) ).all() param_values.sort(ley=lambda x: x.index) + param_value_to_drop = None if len(param_values) >= 25: param_value_to_drop = param_values[0] param_values = param_values[1:] @@ -388,9 +388,9 @@ def _check_notifications(message: dict, murfey_db: Session) -> None: else: if param.notification_active: param.notification_active = False - if param_value_to_drop is not None: - murfey_db.delete(param_value_to_drop) - murfey_db.add(param_values[-1]) + if param_value_to_drop is not None: + murfey_db.delete(param_value_to_drop) + murfey_db.add(param_values[-1]) murfey_db.add_all(notification_parameters) murfey_db.commit() murfey_db.close() From 8368360bb6cc49febf56948319e113a644de9466 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 12 Mar 2025 15:12:42 +0000 Subject: [PATCH 12/16] Sequence object should be able to be appended in-place --- src/murfey/workflows/spa/picking.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/murfey/workflows/spa/picking.py b/src/murfey/workflows/spa/picking.py index cd68b0309..63eed096e 100644 --- a/src/murfey/workflows/spa/picking.py +++ b/src/murfey/workflows/spa/picking.py @@ -365,7 +365,7 @@ def _check_notifications(message: dict, murfey_db: Session) -> None: if len(param_values) >= 25: param_value_to_drop = param_values[0] param_values = param_values[1:] - param_values = param_values.append( + param_values.append( NotificationValue( notification_parameter_id=param.id, index=param_values[-1].index + 1, From e8006e170697e85b715a0baf2469d1964221008e Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 12 Mar 2025 16:20:29 +0000 Subject: [PATCH 13/16] Changed default notifications_queue value to singular form --- src/murfey/util/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/murfey/util/config.py b/src/murfey/util/config.py index f07dc9689..22b993ca1 100644 --- a/src/murfey/util/config.py +++ b/src/murfey/util/config.py @@ -70,7 +70,7 @@ class MachineConfig(BaseModel, extra=Extra.allow): # type: ignore security_configuration_path: Optional[Path] = None auth_url: str = "" - notifications_queue: str = "pato_notifications" + notifications_queue: str = "pato_notification" def from_file(config_file_path: Path, instrument: str = "") -> Dict[str, MachineConfig]: From b79c0c3b3095fbb87226e7d011621aab49e9046a Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 12 Mar 2025 16:59:30 +0000 Subject: [PATCH 14/16] Added logic to generate index number for first instance of a logged param in NotificationValue table --- src/murfey/workflows/spa/picking.py | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/src/murfey/workflows/spa/picking.py b/src/murfey/workflows/spa/picking.py index 63eed096e..371ed2b07 100644 --- a/src/murfey/workflows/spa/picking.py +++ b/src/murfey/workflows/spa/picking.py @@ -1,3 +1,4 @@ +from logging import getLogger from typing import List import numpy as np @@ -29,6 +30,8 @@ from murfey.util.db import SPAFeedbackParameters, SPARelionParameters from murfey.util.processing_params import default_spa_parameters +logger = getLogger("murfey.workflows.spa.picking") + def _register_picked_particles_use_diameter( message: dict, _db: Session, demo: bool = False @@ -355,25 +358,32 @@ def _check_notifications(message: dict, murfey_db: Session) -> None: failures = [] for param in notification_parameters: if message.get(param.name) is not None: + # Load instances of current parameter from database param_values = murfey_db.exec( select(NotificationValue).where( NotificationValue.notification_parameter_id == param.id ) ).all() - param_values.sort(ley=lambda x: x.index) + param_values.sort(key=lambda x: x.index) + + # Drop oldest value if number of entries exceeds threshold param_value_to_drop = None if len(param_values) >= 25: param_value_to_drop = param_values[0] param_values = param_values[1:] + + # Add newest value to end of list param_values.append( NotificationValue( notification_parameter_id=param.id, - index=param_values[-1].index + 1, + index=param_values[-1].index + 1 if len(param_values) else 0, within_bounds=param.min_value <= message[param.name] <= param.max_value, ) ) + + # Trigger message if this param has consistently exceeded the set threshold if ( len(param_values) >= 25 and sum(p.within_bounds for p in param_values) / len(param_values) @@ -388,9 +398,14 @@ def _check_notifications(message: dict, murfey_db: Session) -> None: else: if param.notification_active: param.notification_active = False + + # Delete oldest value if param_value_to_drop is not None: murfey_db.delete(param_value_to_drop) + + # Add newest value murfey_db.add(param_values[-1]) + murfey_db.add_all(notification_parameters) murfey_db.commit() murfey_db.close() From d95ed64d41e15baf0fd385385a06fac9b88b6a2a Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Thu, 13 Mar 2025 10:32:38 +0000 Subject: [PATCH 15/16] Added logic to increment counter for abnormal parameter instances; add logs to notify when notification is sent --- src/murfey/workflows/spa/picking.py | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/src/murfey/workflows/spa/picking.py b/src/murfey/workflows/spa/picking.py index 371ed2b07..706d886e4 100644 --- a/src/murfey/workflows/spa/picking.py +++ b/src/murfey/workflows/spa/picking.py @@ -341,6 +341,11 @@ def _request_email( }, new_connection=True, ) + logger.debug( + f"Sent notification to {config.notifications_queue!r} for " + f"visit {session.visit!r} about the following abnormal parameters: \n" + f"{', '.join([f'{p}' for p in failed_params])}" + ) return None @@ -392,9 +397,14 @@ def _check_notifications(message: dict, murfey_db: Session) -> None: ): if not param.notification_active: param.notification_active = True - if param.num_instances_since_triggered >= 500: - failures.append(param.name) - param.num_instances_since_triggered = 0 + + if param.num_instances_since_triggered >= 500: + logger.debug( + f"Parameter {param.name!r} has consistently exceeded normal " + "operating thresholds" + ) + failures.append(param.name) + param.num_instances_since_triggered = 0 else: if param.notification_active: param.notification_active = False @@ -403,13 +413,18 @@ def _check_notifications(message: dict, murfey_db: Session) -> None: if param_value_to_drop is not None: murfey_db.delete(param_value_to_drop) - # Add newest value + # Add newest value and increment record of instances murfey_db.add(param_values[-1]) + param.num_instances_since_triggered += 1 murfey_db.add_all(notification_parameters) murfey_db.commit() murfey_db.close() if failures: + logger.debug( + "Requested email notification for the following abnormal parameters: \n" + f"{', '.join([f'{p}' for p in failures])}" + ) _request_email(failures, message["session_id"], murfey_db) return None From 5e47efe3af930aec49aca02fbd5ac1bb4a36debf Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Thu, 13 Mar 2025 11:34:48 +0000 Subject: [PATCH 16/16] Added logic to trigger warning email within first 500 messages received; optimised logic for subsequent emails --- src/murfey/workflows/spa/picking.py | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/src/murfey/workflows/spa/picking.py b/src/murfey/workflows/spa/picking.py index 706d886e4..a5384b865 100644 --- a/src/murfey/workflows/spa/picking.py +++ b/src/murfey/workflows/spa/picking.py @@ -393,20 +393,30 @@ def _check_notifications(message: dict, murfey_db: Session) -> None: len(param_values) >= 25 and sum(p.within_bounds for p in param_values) / len(param_values) < 0.25 - and not param.notification_active ): + # If notifications disabled, enable them now + trigger = False if not param.notification_active: + # Use a variable to trigger the notification for the first + # time within the first 500 messages received + if param_values[-1].index < 500: + logger.debug( + f"First abnormal instance of parameter {param.name!r} detected" + ) + trigger = True param.notification_active = True - if param.num_instances_since_triggered >= 500: - logger.debug( - f"Parameter {param.name!r} has consistently exceeded normal " - "operating thresholds" - ) + if param.num_instances_since_triggered >= 500 or trigger: + if not trigger: + logger.debug( + f"Parameter {param.name!r} has exceeded normal operating thresholds" + ) failures.append(param.name) param.num_instances_since_triggered = 0 else: - if param.notification_active: + # Only reset to False if there are more than 500 instances + # to stop multiple triggers within the first 500 + if param.notification_active and param_values[-1].index > 500: param.notification_active = False # Delete oldest value