Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions uc2rest/UC2Client.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ class UC2Client(object):
is_serial = False
BAUDRATE = 115200

def __init__(self, host=None, port=31950, serialport=None, identity="UC2_Feather", baudrate=BAUDRATE,
def __init__(self, host=None, port=31950, serialport=None, identity="UC2_Feather", baudrate=BAUDRATE,
NLeds=64, SerialManager=None, DEBUG=False, logger=None, skipFirmwareCheck=False,
isPyScript=False):
isPyScript=False, device_id=None, requireMaster=False):
'''
This client connects to the UC2-REST microcontroller that can be found here
https://github.com/openUC2/UC2-REST
Expand Down Expand Up @@ -80,7 +80,7 @@ def __init__(self, host=None, port=31950, serialport=None, identity="UC2_Feather
# initialize communication channel (serial only)
if serialport is not None:
# use USB connection
self.serial = Serial(serialport, baudrate, parent=self, identity=identity, DEBUG=DEBUG, skipFirmwareCheck=skipFirmwareCheck)
self.serial = Serial(serialport, baudrate, parent=self, identity=identity, DEBUG=DEBUG, skipFirmwareCheck=skipFirmwareCheck, device_id=device_id, requireMaster=requireMaster)
self.is_serial = True
self.is_connected = self.serial.is_connected
self.serial.DEBUG = DEBUG
Expand Down
68 changes: 57 additions & 11 deletions uc2rest/motor.py
Original file line number Diff line number Diff line change
Expand Up @@ -613,10 +613,15 @@ def move_stepper(self, steps=(0,0,0,0), speed=(1000,1000,1000,1000), is_absolute
if isAbsoluteArray[iMotor]:
# Compare current position (physical) with target (physical, already includes offset)
self.currentDirection[iMotor] = 1 if (self.currentPosition[iMotor] > targetPositionPhysical[iMotor]) else -1
# Calculate distance to travel in HARDWARE STEPS:
# Current position (physical) -> convert to steps, then subtract target (already in steps)
currentPosition_steps = self.currentPosition[iMotor] / stepSizes[iMotor]
absoluteDistances_steps[iMotor] = abs(currentPosition_steps - steps[iMotor])
# Travel distance for the time estimate = |current - target| in
# PHYSICAL units, converted to hardware steps. Computing it in
# physical units keeps it direction-agnostic: the hardware `steps`
# target carries the per-axis direction sign, so subtracting it
# from an unsigned current-in-steps produced a *sum* (not a
# difference) on inverted axes, hugely inflating the estimate.
absoluteDistances_steps[iMotor] = abs(
self.currentPosition[iMotor] - targetPositionPhysical[iMotor]
) / stepSizes[iMotor]
else:
self.currentDirection[iMotor] = np.sign(steps[iMotor])
# For relative motion, steps[iMotor] is already the distance in hardware steps
Expand All @@ -629,19 +634,20 @@ def move_stepper(self, steps=(0,0,0,0), speed=(1000,1000,1000,1000), is_absolute
if not isAbsoluteArray[iMotor]:
absoluteDistances_steps[iMotor] = abs(steps[iMotor])

# Convert speed and acceleration from physical units to steps/second
# Speed and acceleration are already in firmware step units (the same raw
# values sent to the device), and absoluteDistances_steps is in hardware
# steps too, so the time estimate is unit-consistent WITHOUT any stepSize
# division — dividing here would desync it from the distance and break the
# estimate. Just take magnitudes.
speed_steps = np.zeros(4)
acceleration_steps = np.zeros(4)
for iMotor in range(4):
if speed[iMotor] != 0:
# Speed: µm/s -> steps/s => divide by stepSize (µm/step)
speed_steps[iMotor] = abs(speed[iMotor]) # TODO: This is actually given in steps/s / stepSizes[iMotor]
speed_steps[iMotor] = abs(speed[iMotor])
if acceleration[iMotor] is not None and acceleration[iMotor] != 0:
# Acceleration: µm/s² -> steps/s² => divide by stepSize
acceleration_steps[iMotor] = abs(acceleration[iMotor]) # TODO: This is actually given in steps/s / stepSizes[iMotor]
acceleration_steps[iMotor] = abs(acceleration[iMotor])
else:
# Default acceleration in steps/s²
acceleration_steps[iMotor] = 20000 # This should also be converted, but we use a safe default
acceleration_steps[iMotor] = 20000 # safe default (firmware steps/s^2)

# Calculate travel time using HARDWARE STEPS and converted speed/acceleration
# Find the axis that will take the longest (limits overall movement time)
Expand Down Expand Up @@ -696,8 +702,10 @@ def move_stepper(self, steps=(0,0,0,0), speed=(1000,1000,1000,1000), is_absolute
"redu": int(is_reduced)}
if acceleration[iMotor] is not None:
motorProp["accel"] = int(acceleration[iMotor])
motorProp["acceleration"] = int(acceleration[iMotor])
else:
motorProp["accel"] = self.DEFAULT_ACCELERATION
motorProp["acceleleration"] = self.DEFAULT_ACCELERATION
motorPropList.append(motorProp)
if len(motorPropList)==0:
return "{'return':-1}"
Expand Down Expand Up @@ -1243,6 +1251,44 @@ def set_tmc_parameters(self, axis=0, msteps=None, rms_current=None, stall_value=
r = self._parent.post_json(path, payload, timeout=timeout)
return r

def get_tmc_parameters(self, axis=0, timeout=1):
''' Read the TMC parameters for a specific axis back from the device.

Sends {"task":"/tmc_get", "axis":<n>} and parses the response. Returns a
dict with msteps/rms_current/sgthrs/semin/semax/blank_time/toff, or None
if the firmware does not implement TMC readback (older firmwares only
accept /tmc_act). Callers should fall back to their last-applied values
in that case.
'''
if type(axis) == str:
axis = self.xyztTo1230(axis)
path = "/tmc_get"
payload = {"task": path}
if axis is not None:
payload["axis"] = axis
try:
r = self._parent.post_json(path, payload, timeout=timeout)
if isinstance(r, list):
r = r[0] if r else {}
if not isinstance(r, dict):
return None
# firmware may nest the values under "tmc" or return them flat
tmc = r.get("tmc", r)
if not isinstance(tmc, dict):
return None
keys = ("msteps", "rms_current", "sgthrs", "semin", "semax", "blank_time", "toff")
if not any(k in tmc for k in keys):
# nothing TMC-shaped came back -> readback unsupported
return None
return {k: tmc[k] for k in keys if k in tmc}
except Exception as e:
self._parent.logger.debug(f"get_tmc_parameters failed: {e}")
return None

# camelCase alias used by the ImSwitch ESP32StageManager
def getTMCSettings(self, axis=0, timeout=1):
return self.get_tmc_parameters(axis=axis, timeout=timeout)

def set_hard_limits(self, axis=1, enabled=True, polarity=0, timeout=1):
'''
Configure hard limits (emergency stop) for a motor axis.
Expand Down
90 changes: 88 additions & 2 deletions uc2rest/mserial.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ class SerialException(Exception):
T_SERIAL_WARMUP = 2.5
class Serial:
def __init__(self, port, baudrate=115200, timeout=5,
identity="UC2_Feather", parent=None, DEBUG=False,
skipFirmwareCheck=False):
identity="UC2_Feather", parent=None, DEBUG=False,
skipFirmwareCheck=False, device_id=None, requireMaster=False):

self.serialdevice = None
self.serialport = port
Expand All @@ -26,6 +26,15 @@ def __init__(self, port, baudrate=115200, timeout=5,
self._parent = parent
self.manufacturer = ""
self.skipFirmwareCheck = skipFirmwareCheck
# Connect only to a specific board / only to the CANopen master.
# device_id pins a physical board by its USB serial number (or a
# substring of the port path / hwid). requireMaster makes auto-discovery
# skip any board whose firmware does NOT report a "*_master" pindef, so a
# motor/slave board (ESP32-S3, native USB) is never picked accidentally.
self.device_id = device_id
self.requireMaster = requireMaster
# identity of the board we actually connected to (filled by checkFirmware)
self.firmware_info = {}
if self._parent is None:
import logging
self._logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -160,6 +169,64 @@ def openDevice(self, port=None, baud_rate=115200):

return ser

def _portMatchesDeviceId(self, port):
'''True if `port` matches the configured device_id, or if no device_id
is set (then everything matches). The device_id is compared as a
case-insensitive substring against the USB serial number, the port path
and the hwid, so the user can pin a board by whichever is stable on
their OS.'''
if not self.device_id:
return True
did = str(self.device_id).lower()
candidates = [getattr(port, "serial_number", None),
getattr(port, "device", None),
getattr(port, "hwid", None)]
return any(c and did in str(c).lower() for c in candidates)

def _probeDeviceIdentity(self, ser, timeout=2):
'''Send /state_get and parse the firmware identity block into
self.firmware_info: {name, version, date, author, pindef, isMaster}.
Used by requireMaster to reject motor/slave boards. Best-effort: returns
an empty dict (and leaves firmware_info empty) if nothing parses.'''
info = {}
try:
self._write(ser, {"task": "/state_get"})
ser.write(b'\n')
buffer = ""
reading_json = False
t0 = time.time()
while time.time() - t0 < timeout:
raw = self._read(ser)
try:
line = raw.decode('utf-8').strip()
except Exception:
continue
if line == "":
continue
if line.find("++") >= 0:
reading_json = True
continue
if reading_json and line.find("--") >= 0:
break
if reading_json:
buffer += line
if buffer:
data = json.loads(buffer)
state = data.get("state", data) if isinstance(data, dict) else {}
pindef = state.get("pindef", "")
info = {
"name": state.get("identifier_name", ""),
"version": state.get("identifier_id", ""),
"date": state.get("identifier_date", ""),
"author": state.get("identifier_author", ""),
"pindef": pindef,
"isMaster": "master" in str(pindef).lower(),
}
except Exception as e:
self._logger.debug(f"_probeDeviceIdentity failed: {e}")
self.firmware_info = info
return info

def findCorrectSerialDevice(self):
'''
This function tries to find the correct serial device from the list of available ports
Expand All @@ -183,6 +250,9 @@ def findCorrectSerialDevice(self):
descriptions_to_check = ["CH340", "CP2102", "USB2.0-Serial", "USB-Serial"]

for port in _available_ports:
# If a specific board was requested, ignore everything else.
if not self._portMatchesDeviceId(port):
continue
if any(port.device.startswith(p) for p in ports_to_check) or \
any(port.description.startswith(d) for d in descriptions_to_check):
if current_os.startswith("darwin") and port.device.startswith("/dev/cu.usbserial-"):
Expand Down Expand Up @@ -217,6 +287,22 @@ def tryToConnect(self, port, baudrate=None):
#time.sleep(T_SERIAL_WARMUP)
self._freeSerialBuffer(self.serialdevice, timeout=2, timeMinimum=1)
if self.skipFirmwareCheck or self.checkFirmware(self.serialdevice):
# When only the master may be used, read the firmware identity and
# reject boards that are not a CANopen master (e.g. ESP32-S3 motor
# boards on native USB that also speak the UC2 protocol).
if self.requireMaster:
self._probeDeviceIdentity(self.serialdevice)
if not self.firmware_info.get("isMaster", False):
self._logger.debug(
f"Skipping non-master board on {getattr(port, 'device', '?')} "
f"(pindef={self.firmware_info.get('pindef', '?')})"
)
try:
self.serialdevice.close()
except Exception:
pass
self.is_connected = False
return False
self.is_connected = True
self.NumberRetryReconnect = 0
return True
Expand Down
35 changes: 35 additions & 0 deletions uc2rest/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,41 @@ def get_state(self, timeout=3):
r = self._parent.get_json(path, timeout=timeout)
return r

def get_firmware_info(self, timeout=3):
'''
Return the firmware identity of the USB-connected ESP32 as a flat dict:
{"name", "version", "date", "author", "pindef", "isMaster"}.

Parsed from /state_get, e.g.
{"state":{"identifier_name":"UC2_Feather","identifier_id":"V2.0",
"identifier_date":"Jun 17 2026 07:17:22","identifier_author":"BD",
"pindef":"UC2_canopen_master", ...},"qid":0}
The build date and pindef are the fields that matter for telling boards
apart. Returns an empty dict if nothing could be parsed.
'''
Comment on lines +149 to +151
r = self.get_state(timeout=timeout)
try:
if isinstance(r, list):
r = r[0] if r else {}
state = r.get("state", r) if isinstance(r, dict) else {}
pindef = state.get("pindef", "")
return {
"name": state.get("identifier_name", ""),
"version": state.get("identifier_id", ""),
"date": state.get("identifier_date", ""),
"author": state.get("identifier_author", ""),
"pindef": pindef,
"isMaster": "master" in str(pindef).lower(),
}
except Exception as e:
self._parent.logger.debug(f"get_firmware_info failed: {e}")
return {}

def is_master(self, timeout=1):
'''True if the connected board reports a CANopen-master pindef.'''
info = self.get_firmware_info(timeout=timeout)
return bool(info.get("isMaster", False))

def delay(self, delay=1, getReturn=True):
path = "/state_act"
payload = {
Expand Down