diff --git a/pyproject.toml b/pyproject.toml index d956932fa..1ee9240d2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,17 +32,16 @@ classifiers = [ dependencies = [ "backports.entry_points_selectable", "defusedxml", # For safely parsing XML files - "pydantic<2", # Locked to <2 by zocalo + "pydantic<2", # Locked to <2 by cygwin terminal "requests", "rich", "werkzeug", ] [project.optional-dependencies] cicd = [ - "pytest-cov", # Used by Azure Pipelines for PyTest coverage reports + "pytest-cov", # Used for generating PyTest coverage reports ] client = [ - "procrunner", "textual==0.42.0", "websocket-client", "xmltodict", @@ -53,8 +52,12 @@ developer = [ "pre-commit", # Formatting, linting, type checking, etc. "pytest", # Test code functionality ] +instrument-server = [ + "fastapi[standard]", + "python-jose[cryptography]", + "uvicorn[standard]", +] server = [ - # "matplotlib", # For visual statistical analysis of images "aiohttp", "cryptography", "fastapi[standard]", diff --git a/src/murfey/cli/transfer.py b/src/murfey/cli/transfer.py index 3c03c8e05..4b23793c4 100644 --- a/src/murfey/cli/transfer.py +++ b/src/murfey/cli/transfer.py @@ -1,10 +1,10 @@ from __future__ import annotations import argparse +import subprocess from pathlib import Path from urllib.parse import urlparse -import procrunner import requests from rich.console import Console from rich.prompt import Confirm @@ -76,6 +76,6 @@ def run(): cmd.extend(list(Path(args.source or ".").glob("*"))) cmd.append(f"{murfey_url.hostname}::{args.destination}") - result = procrunner.run(cmd) + result = subprocess.run(cmd) if result.returncode: console.print(f"[red]rsync failed returning code {result.returncode}") diff --git a/src/murfey/client/contexts/tomo.py b/src/murfey/client/contexts/tomo.py index e9bcd2aaf..f09dad951 100644 --- a/src/murfey/client/contexts/tomo.py +++ b/src/murfey/client/contexts/tomo.py @@ -21,6 +21,7 @@ ) from murfey.util import authorised_requests, capture_post, get_machine_config_client from murfey.util.mdoc import get_block, get_global_data, get_num_blocks +from murfey.util.tomo import midpoint logger = logging.getLogger("murfey.client.contexts.tomo") @@ -64,20 +65,6 @@ def _construct_tilt_series_name(file_path: Path) -> str: return "_".join(split_name[:-5]) -def _midpoint(angles: List[float]) -> int: - if not angles: - return 0 - if len(angles) <= 2: - return round(angles[0]) - sorted_angles = sorted(angles) - return round( - sorted_angles[len(sorted_angles) // 2] - if sorted_angles[len(sorted_angles) // 2] - and sorted_angles[len(sorted_angles) // 2 + 1] - else 0 - ) - - class ProcessFileIncomplete(BaseModel): dest: Path source: Path @@ -738,7 +725,7 @@ def gather_metadata( if environment else None ) - mdoc_metadata["manual_tilt_offset"] = -_midpoint( + mdoc_metadata["manual_tilt_offset"] = -midpoint( [float(b["TiltAngle"]) for b in blocks] ) mdoc_metadata["source"] = str(self._basepath) diff --git a/src/murfey/instrument_server/api.py b/src/murfey/instrument_server/api.py index c2bee0516..229acfa9b 100644 --- a/src/murfey/instrument_server/api.py +++ b/src/murfey/instrument_server/api.py @@ -1,6 +1,7 @@ from __future__ import annotations import secrets +import subprocess import time from datetime import datetime from functools import partial @@ -9,7 +10,6 @@ from typing import Annotated, Dict, List, Optional, Union from urllib.parse import urlparse -import procrunner import requests from fastapi import APIRouter, Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer @@ -21,7 +21,7 @@ from murfey.client.multigrid_control import MultigridController from murfey.client.rsync import RSyncer from murfey.client.watchdir_multigrid import MultigridDirWatcher -from murfey.util import sanitise_nonpath, secure_path +from murfey.util import sanitise, sanitise_nonpath, secure_path from murfey.util.instrument_models import MultigridWatcherSpec from murfey.util.models import File, Token @@ -278,19 +278,16 @@ class GainReference(BaseModel): @router.post("/sessions/{session_id}/upload_gain_reference") def upload_gain_reference(session_id: MurfeySessionID, gain_reference: GainReference): + safe_gain_path = sanitise(str(gain_reference.gain_path)) + safe_visit_path = sanitise(gain_reference.visit_path) + safe_destination_dir = sanitise(gain_reference.gain_destination_dir) cmd = [ "rsync", - str(gain_reference.gain_path), - f"{urlparse(_get_murfey_url(), allow_fragments=False).hostname}::{gain_reference.visit_path}/{gain_reference.gain_destination_dir}/{secure_filename(gain_reference.gain_path.name)}", + safe_gain_path, + f"{urlparse(_get_murfey_url(), allow_fragments=False).hostname}::{safe_visit_path}/{safe_destination_dir}/{secure_filename(gain_reference.gain_path.name)}", ] - gain_rsync = procrunner.run(cmd) + gain_rsync = subprocess.run(cmd) if gain_rsync.returncode: - safe_gain_path = ( - str(gain_reference.gain_path).replace("\r\n", "").replace("\n", "") - ) - safe_visit_path = gain_reference.visit_path.replace("\r\n", "").replace( - "\n", "" - ) logger.warning( f"Gain reference file {safe_gain_path} was not successfully transferred to {safe_visit_path}/processing" ) diff --git a/src/murfey/server/__init__.py b/src/murfey/server/__init__.py index 823df3063..57b10dcc7 100644 --- a/src/murfey/server/__init__.py +++ b/src/murfey/server/__init__.py @@ -48,7 +48,6 @@ import murfey.server.prometheus as prom import murfey.server.websocket import murfey.util.db as db -from murfey.client.contexts.tomo import _midpoint from murfey.server.murfey_db import url # murfey_db from murfey.util import LogFilter from murfey.util.config import ( @@ -60,6 +59,7 @@ ) from murfey.util.processing_params import default_spa_parameters from murfey.util.state import global_state +from murfey.util.tomo import midpoint try: from murfey.server.ispyb import TransportManager # Session @@ -2576,7 +2576,7 @@ def feedback_callback(header: dict, message: dict) -> None: ) if not stack_file.parent.exists(): stack_file.parent.mkdir(parents=True) - tilt_offset = _midpoint([float(get_angle(t)) for t in tilts]) + tilt_offset = midpoint([float(get_angle(t)) for t in tilts]) zocalo_message = { "recipes": ["em-tomo-align"], "parameters": { diff --git a/src/murfey/server/api/__init__.py b/src/murfey/server/api/__init__.py index 6f5d411e9..d6232f9ac 100644 --- a/src/murfey/server/api/__init__.py +++ b/src/murfey/server/api/__init__.py @@ -33,7 +33,6 @@ import murfey.server.websocket as ws import murfey.util.eer from murfey.server import ( - _midpoint, _murfey_id, _transport_object, check_tilt_series_mc, @@ -108,6 +107,7 @@ ) from murfey.util.processing_params import default_spa_parameters from murfey.util.state import global_state +from murfey.util.tomo import midpoint log = logging.getLogger("murfey.server.api") @@ -840,7 +840,7 @@ def register_completed_tilt_series( ) if not stack_file.parent.exists(): stack_file.parent.mkdir(parents=True) - tilt_offset = _midpoint([float(get_angle(t)) for t in tilts]) + tilt_offset = midpoint([float(get_angle(t)) for t in tilts]) zocalo_message = { "recipes": ["em-tomo-align"], "parameters": { diff --git a/src/murfey/util/rsync.py b/src/murfey/util/rsync.py index 51836f692..84953b39a 100644 --- a/src/murfey/util/rsync.py +++ b/src/murfey/util/rsync.py @@ -1,11 +1,10 @@ from __future__ import annotations import logging +import subprocess from pathlib import Path from typing import Callable, Dict, List, Optional, Tuple, Union -import procrunner - from murfey.util import Processor from murfey.util.file_monitor import Monitor @@ -32,7 +31,7 @@ def __init__( self.received_bytes = 0 self.byte_rate: float = 0 self.total_size = 0 - self.runner_return: List[procrunner.ReturnObject] = [] + self.runner_return: List[subprocess.CompletedProcess] = [] self._root = root self._sub_structure: Optional[Path] = None self._notify = notify or (lambda f: None) @@ -53,7 +52,7 @@ def _run_rsync( retry: bool = True, ): """ - Run rsync -v on a list of files using procrunner. + Run rsync -v on a list of files using subprocess. :param root: root path of files for transferring; structure below the root is preserved :type root: pathlib.Path object @@ -109,17 +108,20 @@ def _single_rsync( else: cmd.append(str(self._finaldir / sub_struct) + "/") self._transferring = True - runner = procrunner.run( + runner = subprocess.run( cmd, - callback_stdout=self._parse_rsync_stdout, - callback_stderr=self._parse_rsync_stderr, + capture_output=True, ) + for line in runner.stdout.decode("utf-8", "replace").split("\n"): + self._parse_rsync_stdout(line) + for line in runner.stderr.decode("utf-8", "replace").split("\n"): + self._parse_rsync_stderr(line) self.runner_return.append(runner) self.failed.extend(root / sub_struct / f for f in self._failed_tmp) if retry: self._in.put(root / sub_struct / f for f in self._failed_tmp) - def _parse_rsync_stdout(self, stdout: bytes): + def _parse_rsync_stdout(self, line: str): """ Parse rsync stdout to collect information such as the paths of transferred files and the amount of data transferred. @@ -127,51 +129,43 @@ def _parse_rsync_stdout(self, stdout: bytes): :param stdout: stdout of rsync process :type stdout: bytes """ - stringy_stdout = str(stdout) - if stringy_stdout: - if self._transferring: - if stringy_stdout.startswith("sent"): - self._transferring = False - byte_info = stringy_stdout.split() - self.sent_bytes = int( - byte_info[byte_info.index("sent") + 1].replace(",", "") - ) - self.received_bytes = int( - byte_info[byte_info.index("received") + 1].replace(",", "") - ) - self.byte_rate = float( - byte_info[byte_info.index("bytes/sec") - 1].replace(",", "") - ) - elif len(stringy_stdout.split()) == 1: - if self._root and self._sub_structure: - self._notify( - self._finaldir / self._sub_structure / stringy_stdout - ) - self._out.put(self._root / self._sub_structure / stringy_stdout) - else: - logger.warning( - f"root or substructure not set for transfer of {stringy_stdout}" - ) - else: - if "total size" in stringy_stdout: - self.total_size = int( - stringy_stdout.replace("total size", "").split()[1] + if self._transferring: + if line.startswith("sent"): + self._transferring = False + byte_info = line.split() + self.sent_bytes = int( + byte_info[byte_info.index("sent") + 1].replace(",", "") + ) + self.received_bytes = int( + byte_info[byte_info.index("received") + 1].replace(",", "") + ) + self.byte_rate = float( + byte_info[byte_info.index("bytes/sec") - 1].replace(",", "") + ) + elif len(line.split()) == 1: + if self._root and self._sub_structure: + self._notify(self._finaldir / self._sub_structure / line) + self._out.put(self._root / self._sub_structure / line) + else: + logger.warning( + f"root or substructure not set for transfer of {line}" ) + else: + if "total size" in line: + self.total_size = int(line.replace("total size", "").split()[1]) - def _parse_rsync_stderr(self, stderr: bytes): + def _parse_rsync_stderr(self, line: str): """ Parse rsync stderr to collect information on any files that failed to transfer. :param stderr: stderr of rsync process :type stderr: bytes """ - stringy_stderr = str(stderr) - if stringy_stderr: - if ( - stringy_stderr.startswith("rsync: link_stat") - or stringy_stderr.startswith("rsync: [sender] link_stat") - ) and "failed" in stringy_stderr: - failed_msg = stringy_stderr.split() - self._failed_tmp.append( - failed_msg[failed_msg.index("failed:") - 1].replace('"', "") - ) + if ( + line.startswith("rsync: link_stat") + or line.startswith("rsync: [sender] link_stat") + ) and "failed" in line: + failed_msg = line.split() + self._failed_tmp.append( + failed_msg[failed_msg.index("failed:") - 1].replace('"', "") + ) diff --git a/src/murfey/util/tomo.py b/src/murfey/util/tomo.py new file mode 100644 index 000000000..dc7314ec1 --- /dev/null +++ b/src/murfey/util/tomo.py @@ -0,0 +1,16 @@ +def midpoint(angles: list[float]) -> int: + """ + Utility function to calculate the midpoint of the angles used in a tilt series. + Used primarily in the tomography workflow. + """ + if not angles: + return 0 + if len(angles) <= 2: + return round(angles[0]) + sorted_angles = sorted(angles) + return round( + sorted_angles[len(sorted_angles) // 2] + if sorted_angles[len(sorted_angles) // 2] + and sorted_angles[len(sorted_angles) // 2 + 1] + else 0 + )