From 2e2a7c0dc0f2b0a2950e3dc89de86a1588e11429 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Tue, 14 Jan 2025 14:44:27 +0000 Subject: [PATCH 01/10] Duplicated '_midpoint' function from 'murfey.client.contexts.tomo' to preserve client-server independence --- src/murfey/server/__init__.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/src/murfey/server/__init__.py b/src/murfey/server/__init__.py index 823df3063..16dfce9ce 100644 --- a/src/murfey/server/__init__.py +++ b/src/murfey/server/__init__.py @@ -48,7 +48,6 @@ import murfey.server.prometheus as prom import murfey.server.websocket import murfey.util.db as db -from murfey.client.contexts.tomo import _midpoint from murfey.server.murfey_db import url # murfey_db from murfey.util import LogFilter from murfey.util.config import ( @@ -165,6 +164,24 @@ def _mc_path(mov_path: Path) -> str: return [_mc_path(Path(r.movie_path)) for r in results] +def _midpoint(angles: List[float]) -> int: + """ + Duplicate of the function in 'murfey.client.contexts.tomo', so as to preserve + client-server independence. + """ + if not angles: + return 0 + if len(angles) <= 2: + return round(angles[0]) + sorted_angles = sorted(angles) + return round( + sorted_angles[len(sorted_angles) // 2] + if sorted_angles[len(sorted_angles) // 2] + and sorted_angles[len(sorted_angles) // 2 + 1] + else 0 + ) + + def get_job_ids(tilt_series_id: int, appid: int) -> JobIDs: results = murfey_db.exec( select( From 69888076b841c3ad1ba9cbd294a54c2eda9cd003 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Tue, 14 Jan 2025 15:20:56 +0000 Subject: [PATCH 02/10] Moved 'midpoint' function to 'murfey.util.tomo' to function duplication --- src/murfey/client/contexts/tomo.py | 17 ++--------------- src/murfey/server/__init__.py | 21 ++------------------- src/murfey/server/api/__init__.py | 4 ++-- src/murfey/util/tomo.py | 16 ++++++++++++++++ 4 files changed, 22 insertions(+), 36 deletions(-) create mode 100644 src/murfey/util/tomo.py diff --git a/src/murfey/client/contexts/tomo.py b/src/murfey/client/contexts/tomo.py index e9bcd2aaf..f09dad951 100644 --- a/src/murfey/client/contexts/tomo.py +++ b/src/murfey/client/contexts/tomo.py @@ -21,6 +21,7 @@ ) from murfey.util import authorised_requests, capture_post, get_machine_config_client from murfey.util.mdoc import get_block, get_global_data, get_num_blocks +from murfey.util.tomo import midpoint logger = logging.getLogger("murfey.client.contexts.tomo") @@ -64,20 +65,6 @@ def _construct_tilt_series_name(file_path: Path) -> str: return "_".join(split_name[:-5]) -def _midpoint(angles: List[float]) -> int: - if not angles: - return 0 - if len(angles) <= 2: - return round(angles[0]) - sorted_angles = sorted(angles) - return round( - sorted_angles[len(sorted_angles) // 2] - if sorted_angles[len(sorted_angles) // 2] - and sorted_angles[len(sorted_angles) // 2 + 1] - else 0 - ) - - class ProcessFileIncomplete(BaseModel): dest: Path source: Path @@ -738,7 +725,7 @@ def gather_metadata( if environment else None ) - mdoc_metadata["manual_tilt_offset"] = -_midpoint( + mdoc_metadata["manual_tilt_offset"] = -midpoint( [float(b["TiltAngle"]) for b in blocks] ) mdoc_metadata["source"] = str(self._basepath) diff --git a/src/murfey/server/__init__.py b/src/murfey/server/__init__.py index 16dfce9ce..57b10dcc7 100644 --- a/src/murfey/server/__init__.py +++ b/src/murfey/server/__init__.py @@ -59,6 +59,7 @@ ) from murfey.util.processing_params import default_spa_parameters from murfey.util.state import global_state +from murfey.util.tomo import midpoint try: from murfey.server.ispyb import TransportManager # Session @@ -164,24 +165,6 @@ def _mc_path(mov_path: Path) -> str: return [_mc_path(Path(r.movie_path)) for r in results] -def _midpoint(angles: List[float]) -> int: - """ - Duplicate of the function in 'murfey.client.contexts.tomo', so as to preserve - client-server independence. - """ - if not angles: - return 0 - if len(angles) <= 2: - return round(angles[0]) - sorted_angles = sorted(angles) - return round( - sorted_angles[len(sorted_angles) // 2] - if sorted_angles[len(sorted_angles) // 2] - and sorted_angles[len(sorted_angles) // 2 + 1] - else 0 - ) - - def get_job_ids(tilt_series_id: int, appid: int) -> JobIDs: results = murfey_db.exec( select( @@ -2593,7 +2576,7 @@ def feedback_callback(header: dict, message: dict) -> None: ) if not stack_file.parent.exists(): stack_file.parent.mkdir(parents=True) - tilt_offset = _midpoint([float(get_angle(t)) for t in tilts]) + tilt_offset = midpoint([float(get_angle(t)) for t in tilts]) zocalo_message = { "recipes": ["em-tomo-align"], "parameters": { diff --git a/src/murfey/server/api/__init__.py b/src/murfey/server/api/__init__.py index 6f5d411e9..d6232f9ac 100644 --- a/src/murfey/server/api/__init__.py +++ b/src/murfey/server/api/__init__.py @@ -33,7 +33,6 @@ import murfey.server.websocket as ws import murfey.util.eer from murfey.server import ( - _midpoint, _murfey_id, _transport_object, check_tilt_series_mc, @@ -108,6 +107,7 @@ ) from murfey.util.processing_params import default_spa_parameters from murfey.util.state import global_state +from murfey.util.tomo import midpoint log = logging.getLogger("murfey.server.api") @@ -840,7 +840,7 @@ def register_completed_tilt_series( ) if not stack_file.parent.exists(): stack_file.parent.mkdir(parents=True) - tilt_offset = _midpoint([float(get_angle(t)) for t in tilts]) + tilt_offset = midpoint([float(get_angle(t)) for t in tilts]) zocalo_message = { "recipes": ["em-tomo-align"], "parameters": { diff --git a/src/murfey/util/tomo.py b/src/murfey/util/tomo.py new file mode 100644 index 000000000..dc7314ec1 --- /dev/null +++ b/src/murfey/util/tomo.py @@ -0,0 +1,16 @@ +def midpoint(angles: list[float]) -> int: + """ + Utility function to calculate the midpoint of the angles used in a tilt series. + Used primarily in the tomography workflow. + """ + if not angles: + return 0 + if len(angles) <= 2: + return round(angles[0]) + sorted_angles = sorted(angles) + return round( + sorted_angles[len(sorted_angles) // 2] + if sorted_angles[len(sorted_angles) // 2] + and sorted_angles[len(sorted_angles) // 2 + 1] + else 0 + ) From a67ae92f4cfb75c7e1eb67b41ca4c9e7ea355f05 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Tue, 14 Jan 2025 15:40:56 +0000 Subject: [PATCH 03/10] Added new optional dependency key for client-side instrument server --- pyproject.toml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index d956932fa..44c532795 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -53,6 +53,10 @@ developer = [ "pre-commit", # Formatting, linting, type checking, etc. "pytest", # Test code functionality ] +instrument-server = [ + "fastapi[standard]", + "uvicorn[standard]", +] server = [ # "matplotlib", # For visual statistical analysis of images "aiohttp", From 45590a9750768a3510ad936fd1b52015c07fe8e5 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Tue, 14 Jan 2025 15:55:28 +0000 Subject: [PATCH 04/10] Replaced remaining instances of 'procrunner' with 'subprocess' --- pyproject.toml | 1 - src/murfey/cli/transfer.py | 4 ++-- src/murfey/instrument_server/api.py | 4 ++-- src/murfey/util/rsync.py | 24 +++++++++++++----------- 4 files changed, 17 insertions(+), 16 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 44c532795..620f8b37b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,7 +42,6 @@ cicd = [ "pytest-cov", # Used by Azure Pipelines for PyTest coverage reports ] client = [ - "procrunner", "textual==0.42.0", "websocket-client", "xmltodict", diff --git a/src/murfey/cli/transfer.py b/src/murfey/cli/transfer.py index 3c03c8e05..4b23793c4 100644 --- a/src/murfey/cli/transfer.py +++ b/src/murfey/cli/transfer.py @@ -1,10 +1,10 @@ from __future__ import annotations import argparse +import subprocess from pathlib import Path from urllib.parse import urlparse -import procrunner import requests from rich.console import Console from rich.prompt import Confirm @@ -76,6 +76,6 @@ def run(): cmd.extend(list(Path(args.source or ".").glob("*"))) cmd.append(f"{murfey_url.hostname}::{args.destination}") - result = procrunner.run(cmd) + result = subprocess.run(cmd) if result.returncode: console.print(f"[red]rsync failed returning code {result.returncode}") diff --git a/src/murfey/instrument_server/api.py b/src/murfey/instrument_server/api.py index c2bee0516..2081e0a50 100644 --- a/src/murfey/instrument_server/api.py +++ b/src/murfey/instrument_server/api.py @@ -1,6 +1,7 @@ from __future__ import annotations import secrets +import subprocess import time from datetime import datetime from functools import partial @@ -9,7 +10,6 @@ from typing import Annotated, Dict, List, Optional, Union from urllib.parse import urlparse -import procrunner import requests from fastapi import APIRouter, Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer @@ -283,7 +283,7 @@ def upload_gain_reference(session_id: MurfeySessionID, gain_reference: GainRefer str(gain_reference.gain_path), f"{urlparse(_get_murfey_url(), allow_fragments=False).hostname}::{gain_reference.visit_path}/{gain_reference.gain_destination_dir}/{secure_filename(gain_reference.gain_path.name)}", ] - gain_rsync = procrunner.run(cmd) + gain_rsync = subprocess.run(cmd) if gain_rsync.returncode: safe_gain_path = ( str(gain_reference.gain_path).replace("\r\n", "").replace("\n", "") diff --git a/src/murfey/util/rsync.py b/src/murfey/util/rsync.py index 51836f692..1a9b14bb4 100644 --- a/src/murfey/util/rsync.py +++ b/src/murfey/util/rsync.py @@ -1,11 +1,10 @@ from __future__ import annotations import logging +import subprocess from pathlib import Path from typing import Callable, Dict, List, Optional, Tuple, Union -import procrunner - from murfey.util import Processor from murfey.util.file_monitor import Monitor @@ -32,7 +31,7 @@ def __init__( self.received_bytes = 0 self.byte_rate: float = 0 self.total_size = 0 - self.runner_return: List[procrunner.ReturnObject] = [] + self.runner_return: List[subprocess.CompletedProcess] = [] self._root = root self._sub_structure: Optional[Path] = None self._notify = notify or (lambda f: None) @@ -53,7 +52,7 @@ def _run_rsync( retry: bool = True, ): """ - Run rsync -v on a list of files using procrunner. + Run rsync -v on a list of files using subprocess. :param root: root path of files for transferring; structure below the root is preserved :type root: pathlib.Path object @@ -109,17 +108,20 @@ def _single_rsync( else: cmd.append(str(self._finaldir / sub_struct) + "/") self._transferring = True - runner = procrunner.run( + runner = subprocess.run( cmd, - callback_stdout=self._parse_rsync_stdout, - callback_stderr=self._parse_rsync_stderr, + capture_output=True, ) + for line in runner.stdout.decode("utf-8", "replace").split("\n"): + self._parse_rsync_stdout(line) + for line in runner.stderr.decode("utf-8", "replace").split("\n"): + self._parse_rsync_stderr(line) self.runner_return.append(runner) self.failed.extend(root / sub_struct / f for f in self._failed_tmp) if retry: self._in.put(root / sub_struct / f for f in self._failed_tmp) - def _parse_rsync_stdout(self, stdout: bytes): + def _parse_rsync_stdout(self, stdout: bytes | str): """ Parse rsync stdout to collect information such as the paths of transferred files and the amount of data transferred. @@ -127,7 +129,7 @@ def _parse_rsync_stdout(self, stdout: bytes): :param stdout: stdout of rsync process :type stdout: bytes """ - stringy_stdout = str(stdout) + stringy_stdout = str(stdout) if isinstance(stdout, bytes) else stdout if stringy_stdout: if self._transferring: if stringy_stdout.startswith("sent"): @@ -158,14 +160,14 @@ def _parse_rsync_stdout(self, stdout: bytes): stringy_stdout.replace("total size", "").split()[1] ) - def _parse_rsync_stderr(self, stderr: bytes): + def _parse_rsync_stderr(self, stderr: bytes | str): """ Parse rsync stderr to collect information on any files that failed to transfer. :param stderr: stderr of rsync process :type stderr: bytes """ - stringy_stderr = str(stderr) + stringy_stderr = str(stderr) if isinstance(stderr, bytes) else stderr if stringy_stderr: if ( stringy_stderr.startswith("rsync: link_stat") From 2bb132a8c40dcb0e6dd5c648e4044daf4aa41123 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Tue, 14 Jan 2025 15:59:29 +0000 Subject: [PATCH 05/10] Updated descriptive comments on 'pyproject.toml' --- pyproject.toml | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 620f8b37b..4bb53f9e8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,14 +32,14 @@ classifiers = [ dependencies = [ "backports.entry_points_selectable", "defusedxml", # For safely parsing XML files - "pydantic<2", # Locked to <2 by zocalo + "pydantic<2", # Locked to <2 by cygwin terminal "requests", "rich", "werkzeug", ] [project.optional-dependencies] cicd = [ - "pytest-cov", # Used by Azure Pipelines for PyTest coverage reports + "pytest-cov", # Used for generating PyTest coverage reports ] client = [ "textual==0.42.0", @@ -57,7 +57,6 @@ instrument-server = [ "uvicorn[standard]", ] server = [ - # "matplotlib", # For visual statistical analysis of images "aiohttp", "cryptography", "fastapi[standard]", From bb0ec71601ec35e50904e770d0ddf566d91df199 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Tue, 14 Jan 2025 16:07:17 +0000 Subject: [PATCH 06/10] Implemented sanitisation to satisfy CodeQL warning on uncontrolled command line input --- src/murfey/instrument_server/api.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/src/murfey/instrument_server/api.py b/src/murfey/instrument_server/api.py index 2081e0a50..229acfa9b 100644 --- a/src/murfey/instrument_server/api.py +++ b/src/murfey/instrument_server/api.py @@ -21,7 +21,7 @@ from murfey.client.multigrid_control import MultigridController from murfey.client.rsync import RSyncer from murfey.client.watchdir_multigrid import MultigridDirWatcher -from murfey.util import sanitise_nonpath, secure_path +from murfey.util import sanitise, sanitise_nonpath, secure_path from murfey.util.instrument_models import MultigridWatcherSpec from murfey.util.models import File, Token @@ -278,19 +278,16 @@ class GainReference(BaseModel): @router.post("/sessions/{session_id}/upload_gain_reference") def upload_gain_reference(session_id: MurfeySessionID, gain_reference: GainReference): + safe_gain_path = sanitise(str(gain_reference.gain_path)) + safe_visit_path = sanitise(gain_reference.visit_path) + safe_destination_dir = sanitise(gain_reference.gain_destination_dir) cmd = [ "rsync", - str(gain_reference.gain_path), - f"{urlparse(_get_murfey_url(), allow_fragments=False).hostname}::{gain_reference.visit_path}/{gain_reference.gain_destination_dir}/{secure_filename(gain_reference.gain_path.name)}", + safe_gain_path, + f"{urlparse(_get_murfey_url(), allow_fragments=False).hostname}::{safe_visit_path}/{safe_destination_dir}/{secure_filename(gain_reference.gain_path.name)}", ] gain_rsync = subprocess.run(cmd) if gain_rsync.returncode: - safe_gain_path = ( - str(gain_reference.gain_path).replace("\r\n", "").replace("\n", "") - ) - safe_visit_path = gain_reference.visit_path.replace("\r\n", "").replace( - "\n", "" - ) logger.warning( f"Gain reference file {safe_gain_path} was not successfully transferred to {safe_visit_path}/processing" ) From b3c8e3dd98594561fcc203b2ded435a54d93df57 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 15 Jan 2025 14:05:26 +0000 Subject: [PATCH 07/10] Added 'jose' as an 'instrument-server' dependency --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index 4bb53f9e8..450042362 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -54,6 +54,7 @@ developer = [ ] instrument-server = [ "fastapi[standard]", + "jose", "uvicorn[standard]", ] server = [ From b55ae5fe5aa24fd55fc596a07366e3d01555f736 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 15 Jan 2025 14:06:02 +0000 Subject: [PATCH 08/10] Added 'jose' as a 'server' dependency --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index 450042362..2ea86545a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -63,6 +63,7 @@ server = [ "fastapi[standard]", "ispyb", # Responsible for setting requirements for SQLAlchemy and mysql-connector-python; v10.0.0: sqlalchemy <2, mysql-connector-python >=8.0.32 "jinja2", + "jose", "mrcfile", "numpy", "packaging", From 933cfe6090d9d9bf1d8a49bb87c98bf52e3338ff Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 15 Jan 2025 14:21:36 +0000 Subject: [PATCH 09/10] Package is 'python-jose[cryptography], not 'jose' --- pyproject.toml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 2ea86545a..1ee9240d2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -54,7 +54,7 @@ developer = [ ] instrument-server = [ "fastapi[standard]", - "jose", + "python-jose[cryptography]", "uvicorn[standard]", ] server = [ @@ -63,7 +63,6 @@ server = [ "fastapi[standard]", "ispyb", # Responsible for setting requirements for SQLAlchemy and mysql-connector-python; v10.0.0: sqlalchemy <2, mysql-connector-python >=8.0.32 "jinja2", - "jose", "mrcfile", "numpy", "packaging", From 72548e2bc1372c3a982a0bef16ce5767b6c1b1a0 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 15 Jan 2025 16:20:42 +0000 Subject: [PATCH 10/10] Updated '_parse_rsync_stdout/stderr()' functions to read stringified lines --- src/murfey/util/rsync.py | 74 ++++++++++++++++++---------------------- 1 file changed, 33 insertions(+), 41 deletions(-) diff --git a/src/murfey/util/rsync.py b/src/murfey/util/rsync.py index 1a9b14bb4..84953b39a 100644 --- a/src/murfey/util/rsync.py +++ b/src/murfey/util/rsync.py @@ -121,7 +121,7 @@ def _single_rsync( if retry: self._in.put(root / sub_struct / f for f in self._failed_tmp) - def _parse_rsync_stdout(self, stdout: bytes | str): + def _parse_rsync_stdout(self, line: str): """ Parse rsync stdout to collect information such as the paths of transferred files and the amount of data transferred. @@ -129,51 +129,43 @@ def _parse_rsync_stdout(self, stdout: bytes | str): :param stdout: stdout of rsync process :type stdout: bytes """ - stringy_stdout = str(stdout) if isinstance(stdout, bytes) else stdout - if stringy_stdout: - if self._transferring: - if stringy_stdout.startswith("sent"): - self._transferring = False - byte_info = stringy_stdout.split() - self.sent_bytes = int( - byte_info[byte_info.index("sent") + 1].replace(",", "") - ) - self.received_bytes = int( - byte_info[byte_info.index("received") + 1].replace(",", "") - ) - self.byte_rate = float( - byte_info[byte_info.index("bytes/sec") - 1].replace(",", "") - ) - elif len(stringy_stdout.split()) == 1: - if self._root and self._sub_structure: - self._notify( - self._finaldir / self._sub_structure / stringy_stdout - ) - self._out.put(self._root / self._sub_structure / stringy_stdout) - else: - logger.warning( - f"root or substructure not set for transfer of {stringy_stdout}" - ) - else: - if "total size" in stringy_stdout: - self.total_size = int( - stringy_stdout.replace("total size", "").split()[1] + if self._transferring: + if line.startswith("sent"): + self._transferring = False + byte_info = line.split() + self.sent_bytes = int( + byte_info[byte_info.index("sent") + 1].replace(",", "") + ) + self.received_bytes = int( + byte_info[byte_info.index("received") + 1].replace(",", "") + ) + self.byte_rate = float( + byte_info[byte_info.index("bytes/sec") - 1].replace(",", "") + ) + elif len(line.split()) == 1: + if self._root and self._sub_structure: + self._notify(self._finaldir / self._sub_structure / line) + self._out.put(self._root / self._sub_structure / line) + else: + logger.warning( + f"root or substructure not set for transfer of {line}" ) + else: + if "total size" in line: + self.total_size = int(line.replace("total size", "").split()[1]) - def _parse_rsync_stderr(self, stderr: bytes | str): + def _parse_rsync_stderr(self, line: str): """ Parse rsync stderr to collect information on any files that failed to transfer. :param stderr: stderr of rsync process :type stderr: bytes """ - stringy_stderr = str(stderr) if isinstance(stderr, bytes) else stderr - if stringy_stderr: - if ( - stringy_stderr.startswith("rsync: link_stat") - or stringy_stderr.startswith("rsync: [sender] link_stat") - ) and "failed" in stringy_stderr: - failed_msg = stringy_stderr.split() - self._failed_tmp.append( - failed_msg[failed_msg.index("failed:") - 1].replace('"', "") - ) + if ( + line.startswith("rsync: link_stat") + or line.startswith("rsync: [sender] link_stat") + ) and "failed" in line: + failed_msg = line.split() + self._failed_tmp.append( + failed_msg[failed_msg.index("failed:") - 1].replace('"', "") + )