diff --git a/src/murfey/client/contexts/spa_metadata.py b/src/murfey/client/contexts/spa_metadata.py index 87ca99946..a89555c0a 100644 --- a/src/murfey/client/contexts/spa_metadata.py +++ b/src/murfey/client/contexts/spa_metadata.py @@ -1,6 +1,6 @@ import logging from pathlib import Path -from typing import Optional +from typing import Dict, Optional import requests import xmltodict @@ -9,13 +9,66 @@ from murfey.client.contexts.spa import _get_source from murfey.client.instance_environment import MurfeyInstanceEnvironment, SampleInfo from murfey.util import authorised_requests, capture_post, get_machine_config_client -from murfey.util.spa_metadata import get_grid_square_atlas_positions +from murfey.util.spa_metadata import FoilHoleInfo, get_grid_square_atlas_positions logger = logging.getLogger("murfey.client.contexts.spa_metadata") requests.get, requests.post, requests.put, requests.delete = authorised_requests() +def _foil_hole_positions(xml_path: Path, grid_square: int) -> Dict[str, FoilHoleInfo]: + with open(xml_path, "r") as xml: + for_parsing = xml.read() + data = xmltodict.parse(for_parsing) + data = data["GridSquareXml"] + readout_area = data["MicroscopeImage"]["microscopeData"]["acquisition"]["camera"][ + "ReadoutArea" + ] + pixel_size = data["MicroscopeImage"]["SpatialScale"]["pixelSize"]["x"][ + "numericValue" + ] + full_size = (int(readout_area["a:width"]), int(readout_area["a:height"])) + serialization_array = data["TargetLocations"]["TargetLocationsEfficient"][ + "a:m_serializationArray" + ] + required_key = "" + for key in serialization_array.keys(): + if key.startswith("b:KeyValuePairOfintTargetLocation"): + required_key = key + break + if not required_key: + return {} + foil_holes = {} + for fh_block in serialization_array[required_key]: + if fh_block["b:value"]["IsNearGridBar"] == "false": + image_paths = list( + (xml_path.parent.parent).glob( + f"Images-Disc*/GridSquare_{grid_square}/FoilHoles/FoilHole_{fh_block['b:key']}_*.jpg" + ) + ) + image_paths.sort(key=lambda x: x.stat().st_ctime) + image_path: str = str(image_paths[-1]) if image_paths else "" + pix_loc = fh_block["b:value"]["PixelCenter"] + stage = fh_block["b:value"]["StagePosition"] + diameter = fh_block["b:value"]["PixelWidthHeight"]["c:width"] + foil_holes[fh_block["b:key"]] = FoilHoleInfo( + id=int(fh_block["b:key"]), + grid_square_id=grid_square, + x_location=int(float(pix_loc["c:x"])), + y_location=int(float(pix_loc["c:y"])), + x_stage_position=float(stage["c:X"]), + y_stage_position=float(stage["c:Y"]), + readout_area_x=full_size[0] if image_path else None, + readout_area_y=full_size[1] if image_path else None, + thumbnail_size_x=None, + thumbnail_size_y=None, + pixel_size=float(pixel_size) if image_path else None, + image=str(image_path), + diameter=int(float(diameter)), + ) + return foil_holes + + def _atlas_destination( environment: MurfeyInstanceEnvironment, source: Path, file_path: Path ) -> Path: @@ -109,36 +162,47 @@ def post_transfer( "atlas_pixel_size": atlas_pixel_size, } capture_post(url, json=dcg_data) - registered_grid_squares = ( - requests.get( - f"{str(environment.url.geturl())}/sessions/{environment.murfey_session}/grid_squares" - ) - .json() - .get(str(source), []) + gs_pix_positions = get_grid_square_atlas_positions( + _atlas_destination(environment, source, transferred_file) + / environment.samples[source].atlas + ) + for gs, pos_data in gs_pix_positions.items(): + if pos_data: + capture_post( + f"{str(environment.url.geturl())}/sessions/{environment.murfey_session}/grid_square/{gs}", + json={ + "tag": str(source), + "x_location": pos_data[0], + "y_location": pos_data[1], + "x_stage_position": pos_data[2], + "y_stage_position": pos_data[3], + "width": pos_data[4], + "height": pos_data[5], + "angle": pos_data[6], + }, + ) + + elif transferred_file.suffix == ".dm" and environment: + gs_name = transferred_file.name.split("_")[1] + fh_positions = _foil_hole_positions(transferred_file, int(gs_name)) + source = _get_source(transferred_file, environment=environment) + visitless_source = str(source).replace(f"/{environment.visit}", "") + for fh, fh_data in fh_positions.items(): + capture_post( + f"{str(environment.url.geturl())}/sessions/{environment.murfey_session}/grid_square/{gs_name}/foil_hole", + json={ + "name": fh, + "x_location": fh_data.x_location, + "y_location": fh_data.y_location, + "x_stage_position": fh_data.x_stage_position, + "y_stage_position": fh_data.y_stage_position, + "readout_area_x": fh_data.readout_area_x, + "readout_area_y": fh_data.readout_area_y, + "thumbnail_size_x": fh_data.thumbnail_size_x, + "thumbnail_size_y": fh_data.thumbnail_size_y, + "pixel_size": fh_data.pixel_size, + "diameter": fh_data.diameter, + "tag": visitless_source, + "image": fh_data.image, + }, ) - if registered_grid_squares: - gs_pix_positions = get_grid_square_atlas_positions( - source_visit_dir / partial_path - ) - for gs in registered_grid_squares: - pos_data = gs_pix_positions.get(str(gs["name"])) - if pos_data: - capture_post( - f"{str(environment.url.geturl())}/sessions/{environment.murfey_session}/grid_square/{gs['name']}", - json={ - "tag": gs["tag"], - "readout_area_x": gs["readout_area_x"], - "readout_area_y": gs["readout_area_y"], - "thumbnail_size_x": gs["thumbnail_size_x"], - "thumbnail_size_y": gs["thumbnail_size_y"], - "pixel_size": gs["pixel_size"], - "image": gs["image"], - "x_location": pos_data[0], - "y_location": pos_data[1], - "x_stage_position": pos_data[2], - "y_stage_position": pos_data[3], - "width": pos_data[4], - "height": pos_data[5], - "angle": pos_data[6], - }, - ) diff --git a/src/murfey/util/spa_metadata.py b/src/murfey/util/spa_metadata.py index 3db0a0bc6..fe82d1e0e 100644 --- a/src/murfey/util/spa_metadata.py +++ b/src/murfey/util/spa_metadata.py @@ -1,6 +1,6 @@ import logging from pathlib import Path -from typing import Dict, NamedTuple, Optional, Tuple +from typing import Dict, NamedTuple, Optional, Tuple, Union import xmltodict @@ -167,7 +167,7 @@ def foil_hole_data(xml_path: Path, foil_hole: int, grid_square: int) -> FoilHole ) ) image_paths.sort(key=lambda x: x.stat().st_ctime) - image_path: Path | str = image_paths[-1] if image_paths else "" + image_path: Union[Path, str] = image_paths[-1] if image_paths else "" if image_path: with open(Path(image_path).with_suffix(".xml")) as fh_xml: fh_xml_data = xmltodict.parse(fh_xml.read())