Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
3841ed9
added osx-arm64 workspace. this branch should parallel main, but with…
dihenriksen Apr 28, 2026
3aa49bb
finished most of the logic for the attenuator bank
dihenriksen May 8, 2026
c44bd46
small cleanups
dihenriksen May 8, 2026
010e3ec
fixed formatting issues
dihenriksen May 8, 2026
f8e3473
moved xraylib dependency
dihenriksen May 8, 2026
f35622a
added open, close, and get_status methods with corresponding signals
dihenriksen May 8, 2026
bc3547d
added test, added test env dependency
dihenriksen May 9, 2026
542b679
fixed dependency structure, formatted
dihenriksen May 9, 2026
1881d81
simplified finding closest attenuator logic
dihenriksen May 11, 2026
3ef4648
added set_attenuation method
dihenriksen May 11, 2026
734a026
minor fixes
dihenriksen May 12, 2026
d2b1b0b
fixed style things
dihenriksen May 12, 2026
0dc3e90
add movable protocol to both Attenuator classes,
dihenriksen May 12, 2026
e7066c9
fixed read, write pvs for position
dihenriksen May 12, 2026
706af8d
more comments, use asyncmovable protocol, changed cmd to position
dihenriksen May 12, 2026
56803ef
realized that DeviceVector can have arbitrary integer keys, so does n…
dihenriksen May 13, 2026
d27711e
made thicknesses a constant
dihenriksen May 13, 2026
0745eb5
corrected terms transmission and attenuation, added some tests
dihenriksen May 14, 2026
dee2024
corrected terms transmission and attenuation, added some tests
dihenriksen May 14, 2026
aee0a9d
made photon_energy and units parameters to pass into calculations, us…
dihenriksen May 14, 2026
8a21bda
fully removed xraylib; minor refactoring
dihenriksen May 14, 2026
8f3d086
added tests
dihenriksen May 14, 2026
be8f3b4
pass energy object into AttenuatorBank on creation, so it can poll fo…
dihenriksen May 14, 2026
0c4b801
more comprehensive status for attenuator bank
dihenriksen May 15, 2026
cd07820
fixed terminology so it is clear that the value set on the attenuator…
dihenriksen May 15, 2026
1d6a948
made prefix an argument for bank creation
dihenriksen May 15, 2026
09138c1
made material for attenuators a parameter
dihenriksen May 15, 2026
22fa679
MNT: minor cleanups
dihenriksen May 18, 2026
fec4828
PRF: minor speed up
dihenriksen May 18, 2026
80917ca
made photon_energy and egu get methods instead of properties
dihenriksen May 18, 2026
6be3182
Update src/cditools/attenuator.py
dihenriksen Jun 18, 2026
525f87f
remove active_attenuators from status
dihenriksen Jun 18, 2026
4385f2c
added describe method to match read
dihenriksen Jun 18, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,6 @@ Thumbs.db
# IDEs
.vscode/
.cursor/

# Other
notes.*
21 changes: 13 additions & 8 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ dependencies = [
"ophyd",
"ophyd-async[ca] >=0.19",
"h5py",
"xrayutilities>=1.7.12,<2"
]

[project.optional-dependencies]
Expand All @@ -45,6 +46,7 @@ test = [
"tiled[minimal-client]",
"tiled[minimal-server]",
"ophyd >=v1.10.6",
"pytest-watcher",
]
dev = [
"caproto[standard] >=0.4.2rc1,!=1.2.0",
Expand Down Expand Up @@ -92,15 +94,12 @@ dev-dependencies = [

[tool.pytest.ini_options]
minversion = "6.0"
addopts = ["-ra", "--showlocals", "--strict-markers", "--strict-config"]
# TODO - fix the eiger_async module and tests
addopts = ["-ra", "--showlocals", "--strict-markers", "--strict-config", "--ignore=tests/test_eiger_async.py"]
xfail_strict = true
filterwarnings = [
"ignore",
]
filterwarnings = "ignore"
log_cli_level = "INFO"
testpaths = [
"tests",
]
testpaths = "tests"

[tool.coverage]
run.source = ["cditools"]
Expand Down Expand Up @@ -164,11 +163,17 @@ isort.required-imports = ["from __future__ import annotations"]

[tool.pixi.project]
channels = ["conda-forge"]
platforms = ["linux-64"]
platforms = ["linux-64", "osx-arm64"]

[tool.pixi.pypi-dependencies]
cditools = { path = ".", editable = true }

[tool.pixi.dependencies]
xrayutilities = ">=1.7.12,<2"

[tool.pixi.feature.dev.tasks]
format = "ruff format ."

[tool.pixi.environments]
default = { solve-group = "default" }
dev = { features = ["dev"], solve-group = "default" }
Expand Down
280 changes: 280 additions & 0 deletions src/cditools/attenuator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
from __future__ import annotations

import asyncio
import math
from collections import OrderedDict
from dataclasses import dataclass

import numpy as np
import xrayutilities as xu
from event_model import DataKey # type: ignore[import-untyped]
from ophyd_async.core import (
AsyncMovable,
AsyncStatus,
DeviceVector,
StandardReadable,
StrictEnum,
)
from ophyd_async.epics.core import EpicsDevice, epics_signal_r, epics_signal_rw

from cditools.motors import Energy


@dataclass
class AttenuatorCombination:
transmission: float
attenuators: list[int]

@property
def attenuation(self):
return 1 - self.transmission


class AttenuatorStatusEnum(StrictEnum):
LOW = "Low" # off / not obstructing
HIGH = "High" # on / obstructing


class Attenuator(EpicsDevice, AsyncMovable[AttenuatorStatusEnum]):
def __init__(self, prefix: str, num: int, material: str, thickness: float):
"""
Parameters
----------
prefix : str
The common prefix for the attenuator bank
num : int
An integer denoting which attenuator within the bank this is
thickness : float
The thickness of the attenuator in microns

Attributes
----------
position : SignalRW[AttenuatorStatusEnum]
The read / write PV to open and close the attenuator and get
the current state of the attenuator
mode : SignalRW[bool]
in_status : SignalR[AttenuatorStatusEnum]
"""
self.prefix = prefix
self.num = num
self.filter_material = getattr(xu.materials, material)
self.thickness = thickness # microns

self.position = epics_signal_rw(
AttenuatorStatusEnum,
f"{self.prefix}:DO{self.num}-Sts",
write_pv=f"{self.prefix}:DO{self.num}-Cmd",
)
self.mode = epics_signal_rw(bool, f"{self.prefix}:DIO{self.num}-Mode")
self.in_status = epics_signal_r(
AttenuatorStatusEnum, f"{self.prefix}:DI{self.num}-Sts"
)

super().__init__(prefix=self.prefix)

def __repr__(self):
return f"{self.thickness!s} microns, {self.filter_material}"

@property
def name(self):
return f"attenuator_{self.num}"

@AsyncStatus.wrap
async def set(self, value: AttenuatorStatusEnum):
await self.position.set(value)

async def open(self):
"""Open means open to allowing the beam to pass unobstructed"""
await self.position.set(AttenuatorStatusEnum.LOW)

async def close(self):
"""Closed means obstructing the beam"""
await self.position.set(AttenuatorStatusEnum.HIGH)

def attenuation(self, photon_energy: float, egu: str = "KeV"):
"""Attenuation is the fraction of the beam removed"""
return 1 - self.transmission(photon_energy, egu=egu)

def transmission(self, photon_energy: float, egu: str = "KeV"):
"""Transmission is the fraction of beam remaining"""
abs_len = self._absorption_length(photon_energy, egu=egu)
return np.exp(-self.thickness / abs_len)

def _absorption_length(self, photon_energy: float, egu: str = "KeV") -> float:
"""
Calculates L, the characteristic absorption length of this material,
at this beam energy.

Parameters
----------
photon energy : float
The beam energy
egu : {'KeV', 'eV'}
The engineering units of the beam energy

Returns
-------
float
The characteristic absorption length of the filter material (microns)
"""
if egu == "KeV":
photon_energy = photon_energy * 1e3
elif egu != "eV":
msg = f"Photon energy units must be eV or KeV (not {egu=})"
raise RuntimeError(msg)
return self.filter_material.absorption_length(photon_energy) # type: ignore[reportArgumentType]


class AttenuatorBank(StandardReadable, EpicsDevice, AsyncMovable[float]):
"""
The ioc for the iologik1 lives on xf09id1-inst-ioc1.nsls2.bnl.gov
"""

def __init__(
self, prefix: str, atten_configs: list[tuple[str, float]], energy: Energy
):
self.prefix = prefix
self.energy = energy

with self.add_children_as_readables():
self.attenuators = DeviceVector(
{
i: Attenuator(self.prefix, i, material, thickness)
for i, (material, thickness) in enumerate(atten_configs, start=1)
}
)
super().__init__(prefix=self.prefix)

def get_photon_energy(self):
return self.energy.energy.readback.get()

def get_egu(self):
return self.energy.egu

async def read(self): # type: ignore[reportUnknownParameterType]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs a matched describe (unless ophyd async is providing more magic than I think it is)

"""
Polls the bluesky energy object for the current beam energy, and
returns that energy, each filter position, each transmission, and
the total transmission.
"""
status = OrderedDict()
active_attens = []
energy = self.get_photon_energy()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use async flavor here for access to the hardware.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

energy is a regular ophyd object, not an ophyd-async object.

egu = self.get_egu()
positions = await asyncio.gather(
*(a.position.get_value() for _, a in self.attenuators.items())
)
for i, pos in zip(self.attenuators, positions):
atten = self.attenuators[i]
is_active = pos == AttenuatorStatusEnum.HIGH
if is_active:
active_attens.append(atten)
transmission = atten.transmission(energy, egu) if is_active else 0
status[atten.name] = {"active": is_active, "transmission": transmission}
status["photon_energy"] = energy
status["egu"] = egu
status["total_transmission"] = self._calculate_total_transmission(
energy, *active_attens
)
return status

async def describe(self) -> OrderedDict[str, DataKey]:
"""Describe the structure of values returned by read()."""

description = OrderedDict()

for atten in self.attenuators.values():
description[atten.name] = DataKey(
source=atten.position.source,
dtype="string",
shape=[],
)
energy_source = getattr(
self.energy.energy.readback,
"source",
f"ca://{self.prefix}:photon_energy",
)
description["photon_energy"] = DataKey(
source=energy_source,
dtype="number",
shape=[],
)
description["egu"] = DataKey(
source=f"ca://{self.prefix}:egu",
dtype="string",
shape=[],
)
description["total_transmission"] = DataKey(
source=f"ca://{self.prefix}:total_transmission",
dtype="number",
shape=[],
)

return description

@AsyncStatus.wrap
async def set(self, value: float):
"""Set the transmission for the attenuator bank"""
attenuation_combination = self.find_closest_transmission(
self.get_photon_energy(), value
)
coros = []
for (
num,
atten,
) in self.attenuators.items():
if num in attenuation_combination.attenuators:
coros.append(atten.close())
else:
coros.append(atten.open())
await asyncio.gather(*coros)

def find_closest_transmission(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check with Garth if we want "closest" or "closest with at least" or "closest with at most" semantics on this. My knee-jerk guess would be "closest with at least" on the logic of if you are cutting the flux down it is because you want to protect something (sample or detector) so it is better to over attenuate than under attenuate.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Garth said this is not necessary. That you can't be too picky with the attenuators, and always just pick the closest one. It would be easy to add though if we do ever need it.

self, photon_energy: float, target_transmission: float
) -> AttenuatorCombination:
available_attenuations = self._calculate_available_transmissions(photon_energy)
best_idx = np.argmin(
[abs(target_transmission - _.transmission) for _ in available_attenuations]
)
return available_attenuations[best_idx]

def _calculate_available_transmissions(
self, photon_energy: float
) -> list[AttenuatorCombination]:
"""
Calculates all possible transmissions for the attenuator bank, using
the powerset of the available attenuators. The result is not sorted,
as it is more efficient to scan linearly each time for the closest
match.
"""
available_transmissions = []
for combination in self._powerset():
attens = [self.attenuators[a] for a in self.attenuators if a in combination]
total_transmission = self._calculate_total_transmission(
photon_energy, *attens
)
available_transmissions.append(
AttenuatorCombination(total_transmission, combination)
)
return available_transmissions

def _calculate_total_transmission(
self, photon_energy: float, *attenuators: Attenuator
) -> float:
transmissions = [
a.transmission(photon_energy, self.get_egu()) for a in attenuators
]
return round(float(math.prod(transmissions)), 3)

def _powerset(self) -> list[list[int]]:
"""
This is a famously O(n*2^n) problem.
"""
powerset = []
for i in range(1 << len(self.attenuators)):
combination = []
for j in range(len(self.attenuators)):
if i & (1 << j):
combination.append(j + 1) # +1 because attenuators are 1-indexed
powerset.append(combination)
return powerset
Loading
Loading