Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
f380299
Add tables needed to record information relevant to email notifications
d-j-hatton Feb 10, 2025
c122d56
Add method for setting up notification parameters from message sent b…
d-j-hatton Feb 10, 2025
cb61821
Add logic for determining when a notification should be raised
d-j-hatton Feb 10, 2025
5ec35bb
Add functionality to notify PATo that an email should be sent
d-j-hatton Feb 10, 2025
5a317c3
Fix database relationship
d-j-hatton Feb 10, 2025
9031b9a
Merged recent changes from 'main' branch
tieneupin Mar 11, 2025
e4db35d
typo
d-j-hatton Mar 11, 2025
c968f18
Typo
tieneupin Mar 11, 2025
5190331
Fixed issues with '_check_notifications' function
tieneupin Mar 12, 2025
e91fb86
Use 'add_all' instead of 'add' when committing a list/sequence of tables
tieneupin Mar 12, 2025
1dbf167
Notification tables now auto-generate ids
tieneupin Mar 12, 2025
4c5516b
Fixed indent level of if-block in '_check_notifications'
tieneupin Mar 12, 2025
d5f70d6
Merge branch 'email-notification-hookup' of github.com:DiamondLightSo…
tieneupin Mar 12, 2025
8368360
Sequence object should be able to be appended in-place
tieneupin Mar 12, 2025
e8006e1
Changed default notifications_queue value to singular form
tieneupin Mar 12, 2025
b79c0c3
Added logic to generate index number for first instance of a logged p…
tieneupin Mar 12, 2025
d95ed64
Added logic to increment counter for abnormal parameter instances; ad…
tieneupin Mar 13, 2025
5e47efe
Added logic to trigger warning email within first 500 messages receiv…
tieneupin Mar 13, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ murfey = "murfey.client:run"
"clem.register_align_and_merge_result" = "murfey.workflows.clem.register_align_and_merge_results:register_align_and_merge_result"
"clem.register_lif_preprocessing_result" = "murfey.workflows.clem.register_preprocessing_results:register_lif_preprocessing_result"
"clem.register_tiff_preprocessing_result" = "murfey.workflows.clem.register_preprocessing_results:register_tiff_preprocessing_result"
"pato" = "murfey.workflows.notifications:notification_setup"
"picked_particles" = "murfey.workflows.spa.picking:particles_picked"
"spa.flush_spa_preprocess" = "murfey.workflows.spa.flush_spa_preprocess:flush_spa_preprocess"

[tool.setuptools]
Expand Down
315 changes: 0 additions & 315 deletions src/murfey/server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -573,296 +573,6 @@ def _get_spa_params(
return relion_params, feedback_params


def _register_picked_particles_use_diameter(
message: dict, _db=murfey_db, demo: bool = False
):
"""Received picked particles from the autopick service"""
# Add this message to the table of seen messages
params_to_forward = message.get("extraction_parameters")
assert isinstance(params_to_forward, dict)
pj_id = _pj_id(message["program_id"], _db)
ctf_params = db.CtfParameters(
pj_id=pj_id,
micrographs_file=params_to_forward["micrographs_file"],
extract_file=params_to_forward["extract_file"],
coord_list_file=params_to_forward["coord_list_file"],
ctf_image=params_to_forward["ctf_values"]["CtfImage"],
ctf_max_resolution=params_to_forward["ctf_values"]["CtfMaxResolution"],
ctf_figure_of_merit=params_to_forward["ctf_values"]["CtfFigureOfMerit"],
defocus_u=params_to_forward["ctf_values"]["DefocusU"],
defocus_v=params_to_forward["ctf_values"]["DefocusV"],
defocus_angle=params_to_forward["ctf_values"]["DefocusAngle"],
)
_db.add(ctf_params)
_db.commit()
_db.close()

picking_db_len = _db.exec(
select(func.count(db.ParticleSizes.id)).where(db.ParticleSizes.pj_id == pj_id)
).one()
if picking_db_len > default_spa_parameters.nr_picks_before_diameter:
# If there are enough particles to get a diameter
instrument_name = (
_db.exec(select(db.Session).where(db.Session.id == message["session_id"]))
.one()
.instrument_name
)
machine_config = get_machine_config(instrument_name=instrument_name)[
instrument_name
]
relion_params = _db.exec(
select(db.SPARelionParameters).where(db.SPARelionParameters.pj_id == pj_id)
).one()
relion_options = dict(relion_params)
feedback_params = _db.exec(
select(db.SPAFeedbackParameters).where(
db.SPAFeedbackParameters.pj_id == pj_id
)
).one()

if feedback_params.picker_ispyb_id is None:
if demo or not _transport_object:
feedback_params.picker_ispyb_id = 1000
else:
assert feedback_params.picker_murfey_id is not None
feedback_params.picker_ispyb_id = _transport_object.do_buffer_lookup(
message["program_id"], feedback_params.picker_murfey_id
)
if feedback_params.picker_ispyb_id is not None:
_flush_class2d(message["session_id"], message["program_id"], _db)
_db.add(feedback_params)
_db.commit()
selection_stash = _db.exec(
select(db.SelectionStash).where(db.SelectionStash.pj_id == pj_id)
).all()
for s in selection_stash:
_register_class_selection(
{
"session_id": s.session_id,
"class_selection_score": s.class_selection_score or 0,
},
_db=_db,
demo=demo,
)
_db.delete(s)
_db.commit()

# Calculate diameter if it wasn't provided
if not relion_params.particle_diameter:
# If the diameter has not been calculated then find it
picking_db = _db.exec(
select(db.ParticleSizes.particle_size).where(
db.ParticleSizes.pj_id == pj_id
)
).all()
relion_params.particle_diameter = np.quantile(list(picking_db), 0.75)
_db.add(relion_params)
_db.commit()

ctf_db = _db.exec(
select(db.CtfParameters).where(db.CtfParameters.pj_id == pj_id)
).all()
for saved_message in ctf_db:
# Send on all saved messages to extraction
_db.expunge(saved_message)
zocalo_message: dict = {
"parameters": {
"micrographs_file": saved_message.micrographs_file,
"coord_list_file": saved_message.coord_list_file,
"output_file": saved_message.extract_file,
"pixel_size": (
relion_options["angpix"]
* relion_options["motion_corr_binning"]
),
"ctf_image": saved_message.ctf_image,
"ctf_max_resolution": saved_message.ctf_max_resolution,
"ctf_figure_of_merit": saved_message.ctf_figure_of_merit,
"defocus_u": saved_message.defocus_u,
"defocus_v": saved_message.defocus_v,
"defocus_angle": saved_message.defocus_angle,
"particle_diameter": relion_params.particle_diameter,
"downscale": relion_options["downscale"],
"kv": relion_options["voltage"],
"node_creator_queue": machine_config.node_creator_queue,
"session_id": message["session_id"],
"autoproc_program_id": _app_id(
_pj_id(message["program_id"], _db, recipe="em-spa-extract"),
_db,
),
"batch_size": default_spa_parameters.batch_size_2d,
},
"recipes": ["em-spa-extract"],
}
if _transport_object:
zocalo_message["parameters"][
"feedback_queue"
] = _transport_object.feedback_queue
_transport_object.send(
"processing_recipe", zocalo_message, new_connection=True
)
# Use provided diameter for next step
else:
# If the diameter is known then just send the new message
zocalo_message = {
"parameters": {
"micrographs_file": params_to_forward["micrographs_file"],
"coord_list_file": params_to_forward["coord_list_file"],
"output_file": params_to_forward["extract_file"],
"pixel_size": (
relion_options["angpix"] * relion_options["motion_corr_binning"]
),
"ctf_image": params_to_forward["ctf_values"]["CtfImage"],
"ctf_max_resolution": params_to_forward["ctf_values"][
"CtfMaxResolution"
],
"ctf_figure_of_merit": params_to_forward["ctf_values"][
"CtfFigureOfMerit"
],
"defocus_u": params_to_forward["ctf_values"]["DefocusU"],
"defocus_v": params_to_forward["ctf_values"]["DefocusV"],
"defocus_angle": params_to_forward["ctf_values"]["DefocusAngle"],
"particle_diameter": relion_params.particle_diameter,
"downscale": relion_options["downscale"],
"kv": relion_options["voltage"],
"node_creator_queue": machine_config.node_creator_queue,
"session_id": message["session_id"],
"autoproc_program_id": _app_id(
_pj_id(message["program_id"], _db, recipe="em-spa-extract"), _db
),
"batch_size": default_spa_parameters.batch_size_2d,
},
"recipes": ["em-spa-extract"],
}
if _transport_object:
zocalo_message["parameters"][
"feedback_queue"
] = _transport_object.feedback_queue
_transport_object.send(
"processing_recipe", zocalo_message, new_connection=True
)
if demo:
_register_incomplete_2d_batch(
{
"session_id": message["session_id"],
"program_id": message["program_id"],
"class2d_message": {
"particles_file": "Select/job009/particles_split_1.star",
"class2d_dir": "Class2D",
"batch_size": 50000,
},
},
_db=_db,
demo=demo,
)

else:
# If not enough particles then save the new sizes
particle_list = message.get("particle_diameters")
assert isinstance(particle_list, list)
for particle in particle_list:
new_particle = db.ParticleSizes(pj_id=pj_id, particle_size=particle)
_db.add(new_particle)
_db.commit()
_db.close()


def _register_picked_particles_use_boxsize(message: dict, _db=murfey_db):
"""Received picked particles from the autopick service"""
# Add this message to the table of seen messages
params_to_forward = message.get("extraction_parameters")
assert isinstance(params_to_forward, dict)

instrument_name = (
_db.exec(select(db.Session).where(db.Session.id == message["session_id"]))
.one()
.instrument_name
)
machine_config = get_machine_config(instrument_name=instrument_name)[
instrument_name
]
pj_id = _pj_id(message["program_id"], _db)
ctf_params = db.CtfParameters(
pj_id=pj_id,
micrographs_file=params_to_forward["micrographs_file"],
coord_list_file=params_to_forward["coord_list_file"],
extract_file=params_to_forward["extract_file"],
ctf_image=params_to_forward["ctf_values"]["CtfImage"],
ctf_max_resolution=params_to_forward["ctf_values"]["CtfMaxResolution"],
ctf_figure_of_merit=params_to_forward["ctf_values"]["CtfFigureOfMerit"],
defocus_u=params_to_forward["ctf_values"]["DefocusU"],
defocus_v=params_to_forward["ctf_values"]["DefocusV"],
defocus_angle=params_to_forward["ctf_values"]["DefocusAngle"],
)
_db.add(ctf_params)
_db.commit()
_db.close()

# Set particle diameter as zero and send box sizes
relion_params = _db.exec(
select(db.SPARelionParameters).where(db.SPARelionParameters.pj_id == pj_id)
).one()
feedback_params = _db.exec(
select(db.SPAFeedbackParameters).where(db.SPAFeedbackParameters.pj_id == pj_id)
).one()

if feedback_params.picker_ispyb_id is None and _transport_object:
assert feedback_params.picker_murfey_id is not None
feedback_params.picker_ispyb_id = _transport_object.do_buffer_lookup(
message["program_id"], feedback_params.picker_murfey_id
)
if feedback_params.picker_ispyb_id is not None:
_flush_class2d(message["session_id"], message["program_id"], _db)
_db.add(feedback_params)
_db.commit()
selection_stash = _db.exec(
select(db.SelectionStash).where(db.SelectionStash.pj_id == pj_id)
).all()
for s in selection_stash:
_register_class_selection(
{
"session_id": s.session_id,
"class_selection_score": s.class_selection_score or 0,
},
_db=_db,
)
_db.delete(s)
_db.commit()

# Send the message to extraction with the box sizes
zocalo_message: dict = {
"parameters": {
"micrographs_file": params_to_forward["micrographs_file"],
"coord_list_file": params_to_forward["coord_list_file"],
"output_file": params_to_forward["extract_file"],
"pixel_size": relion_params.angpix * relion_params.motion_corr_binning,
"ctf_image": params_to_forward["ctf_values"]["CtfImage"],
"ctf_max_resolution": params_to_forward["ctf_values"]["CtfMaxResolution"],
"ctf_figure_of_merit": params_to_forward["ctf_values"]["CtfFigureOfMerit"],
"defocus_u": params_to_forward["ctf_values"]["DefocusU"],
"defocus_v": params_to_forward["ctf_values"]["DefocusV"],
"defocus_angle": params_to_forward["ctf_values"]["DefocusAngle"],
"particle_diameter": relion_params.particle_diameter,
"boxsize": relion_params.boxsize,
"small_boxsize": relion_params.small_boxsize,
"downscale": relion_params.downscale,
"kv": relion_params.voltage,
"node_creator_queue": machine_config.node_creator_queue,
"session_id": message["session_id"],
"autoproc_program_id": _app_id(
_pj_id(message["program_id"], _db, recipe="em-spa-extract"), _db
),
"batch_size": default_spa_parameters.batch_size_2d,
},
"recipes": ["em-spa-extract"],
}
if _transport_object:
zocalo_message["parameters"][
"feedback_queue"
] = _transport_object.feedback_queue
_transport_object.send("processing_recipe", zocalo_message, new_connection=True)
_db.close()


def _release_2d_hold(message: dict, _db=murfey_db):
relion_params, feedback_params = _get_spa_params(message["program_id"], _db)
if not feedback_params.star_combination_job:
Expand Down Expand Up @@ -2851,31 +2561,6 @@ def feedback_callback(header: dict, message: dict) -> None:
if _transport_object:
_transport_object.transport.ack(header)
return None
elif message["register"] == "picked_particles":
movie = murfey_db.exec(
select(db.Movie).where(
db.Movie.murfey_id == message["motion_correction_id"]
)
).one()
movie.preprocessed = True
murfey_db.add(movie)
murfey_db.commit()
feedback_params = murfey_db.exec(
select(db.SPAFeedbackParameters).where(
db.SPAFeedbackParameters.pj_id
== _pj_id(message["program_id"], murfey_db)
)
).one()
if feedback_params.estimate_particle_diameter:
_register_picked_particles_use_diameter(message)
else:
_register_picked_particles_use_boxsize(message)
prom.preprocessed_movies.labels(
processing_job=_pj_id(message["program_id"], murfey_db)
).inc()
if _transport_object:
_transport_object.transport.ack(header)
return None
elif message["register"] == "done_incomplete_2d_batch":
_release_2d_hold(message)
if _transport_object:
Expand Down
2 changes: 2 additions & 0 deletions src/murfey/util/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ class MachineConfig(BaseModel, extra=Extra.allow): # type: ignore
security_configuration_path: Optional[Path] = None
auth_url: str = ""

notifications_queue: str = "pato_notification"


def from_file(config_file_path: Path, instrument: str = "") -> Dict[str, MachineConfig]:
with open(config_file_path, "r") as config_stream:
Expand Down
31 changes: 31 additions & 0 deletions src/murfey/util/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,10 @@ class DataCollectionGroup(SQLModel, table=True): # type: ignore
back_populates="data_collection_group",
sa_relationship_kwargs={"cascade": "delete"},
)
notification_parameters: List["NotificationParameter"] = Relationship(
back_populates="data_collection_group",
sa_relationship_kwargs={"cascade": "delete"},
)
tomography_preprocessing_parameters: List["TomographyPreprocessingParameters"] = (
Relationship(
back_populates="data_collection_group",
Expand All @@ -368,6 +372,33 @@ class DataCollectionGroup(SQLModel, table=True): # type: ignore
)


class NotificationParameter(SQLModel, table=True): # type: ignore
id: Optional[int] = Field(default=None, primary_key=True)
dcg_id: int = Field(foreign_key="datacollectiongroup.id")
name: str
min_value: float
max_value: float
num_instances_since_triggered: int = 0
notification_active: bool = False
data_collection_group: Optional[DataCollectionGroup] = Relationship(
back_populates="notification_parameters"
)
notification_values: List["NotificationValue"] = Relationship(
back_populates="notification_parameter",
sa_relationship_kwargs={"cascade": "delete"},
)


class NotificationValue(SQLModel, table=True): # type: ignore
id: Optional[int] = Field(default=None, primary_key=True)
notification_parameter_id: int = Field(foreign_key="notificationparameter.id")
index: int
within_bounds: bool
notification_parameter: Optional[NotificationParameter] = Relationship(
back_populates="notification_values"
)


class DataCollection(SQLModel, table=True): # type: ignore
id: int = Field(primary_key=True, unique=True)
tag: str = Field(primary_key=True)
Expand Down
Loading