diff --git a/uc2rest/UC2Client.py b/uc2rest/UC2Client.py index 6041118..6856926 100644 --- a/uc2rest/UC2Client.py +++ b/uc2rest/UC2Client.py @@ -47,9 +47,9 @@ class UC2Client(object): is_serial = False BAUDRATE = 115200 - def __init__(self, host=None, port=31950, serialport=None, identity="UC2_Feather", baudrate=BAUDRATE, + def __init__(self, host=None, port=31950, serialport=None, identity="UC2_Feather", baudrate=BAUDRATE, NLeds=64, SerialManager=None, DEBUG=False, logger=None, skipFirmwareCheck=False, - isPyScript=False): + isPyScript=False, device_id=None, requireMaster=False): ''' This client connects to the UC2-REST microcontroller that can be found here https://github.com/openUC2/UC2-REST @@ -80,7 +80,7 @@ def __init__(self, host=None, port=31950, serialport=None, identity="UC2_Feather # initialize communication channel (serial only) if serialport is not None: # use USB connection - self.serial = Serial(serialport, baudrate, parent=self, identity=identity, DEBUG=DEBUG, skipFirmwareCheck=skipFirmwareCheck) + self.serial = Serial(serialport, baudrate, parent=self, identity=identity, DEBUG=DEBUG, skipFirmwareCheck=skipFirmwareCheck, device_id=device_id, requireMaster=requireMaster) self.is_serial = True self.is_connected = self.serial.is_connected self.serial.DEBUG = DEBUG diff --git a/uc2rest/motor.py b/uc2rest/motor.py index bbc2398..a02c88d 100644 --- a/uc2rest/motor.py +++ b/uc2rest/motor.py @@ -613,10 +613,15 @@ def move_stepper(self, steps=(0,0,0,0), speed=(1000,1000,1000,1000), is_absolute if isAbsoluteArray[iMotor]: # Compare current position (physical) with target (physical, already includes offset) self.currentDirection[iMotor] = 1 if (self.currentPosition[iMotor] > targetPositionPhysical[iMotor]) else -1 - # Calculate distance to travel in HARDWARE STEPS: - # Current position (physical) -> convert to steps, then subtract target (already in steps) - currentPosition_steps = self.currentPosition[iMotor] / stepSizes[iMotor] - absoluteDistances_steps[iMotor] = abs(currentPosition_steps - steps[iMotor]) + # Travel distance for the time estimate = |current - target| in + # PHYSICAL units, converted to hardware steps. Computing it in + # physical units keeps it direction-agnostic: the hardware `steps` + # target carries the per-axis direction sign, so subtracting it + # from an unsigned current-in-steps produced a *sum* (not a + # difference) on inverted axes, hugely inflating the estimate. + absoluteDistances_steps[iMotor] = abs( + self.currentPosition[iMotor] - targetPositionPhysical[iMotor] + ) / stepSizes[iMotor] else: self.currentDirection[iMotor] = np.sign(steps[iMotor]) # For relative motion, steps[iMotor] is already the distance in hardware steps @@ -629,19 +634,20 @@ def move_stepper(self, steps=(0,0,0,0), speed=(1000,1000,1000,1000), is_absolute if not isAbsoluteArray[iMotor]: absoluteDistances_steps[iMotor] = abs(steps[iMotor]) - # Convert speed and acceleration from physical units to steps/second + # Speed and acceleration are already in firmware step units (the same raw + # values sent to the device), and absoluteDistances_steps is in hardware + # steps too, so the time estimate is unit-consistent WITHOUT any stepSize + # division — dividing here would desync it from the distance and break the + # estimate. Just take magnitudes. speed_steps = np.zeros(4) acceleration_steps = np.zeros(4) for iMotor in range(4): if speed[iMotor] != 0: - # Speed: µm/s -> steps/s => divide by stepSize (µm/step) - speed_steps[iMotor] = abs(speed[iMotor]) # TODO: This is actually given in steps/s / stepSizes[iMotor] + speed_steps[iMotor] = abs(speed[iMotor]) if acceleration[iMotor] is not None and acceleration[iMotor] != 0: - # Acceleration: µm/s² -> steps/s² => divide by stepSize - acceleration_steps[iMotor] = abs(acceleration[iMotor]) # TODO: This is actually given in steps/s / stepSizes[iMotor] + acceleration_steps[iMotor] = abs(acceleration[iMotor]) else: - # Default acceleration in steps/s² - acceleration_steps[iMotor] = 20000 # This should also be converted, but we use a safe default + acceleration_steps[iMotor] = 20000 # safe default (firmware steps/s^2) # Calculate travel time using HARDWARE STEPS and converted speed/acceleration # Find the axis that will take the longest (limits overall movement time) @@ -696,8 +702,10 @@ def move_stepper(self, steps=(0,0,0,0), speed=(1000,1000,1000,1000), is_absolute "redu": int(is_reduced)} if acceleration[iMotor] is not None: motorProp["accel"] = int(acceleration[iMotor]) + motorProp["acceleration"] = int(acceleration[iMotor]) else: motorProp["accel"] = self.DEFAULT_ACCELERATION + motorProp["acceleleration"] = self.DEFAULT_ACCELERATION motorPropList.append(motorProp) if len(motorPropList)==0: return "{'return':-1}" @@ -1243,6 +1251,44 @@ def set_tmc_parameters(self, axis=0, msteps=None, rms_current=None, stall_value= r = self._parent.post_json(path, payload, timeout=timeout) return r + def get_tmc_parameters(self, axis=0, timeout=1): + ''' Read the TMC parameters for a specific axis back from the device. + + Sends {"task":"/tmc_get", "axis":} and parses the response. Returns a + dict with msteps/rms_current/sgthrs/semin/semax/blank_time/toff, or None + if the firmware does not implement TMC readback (older firmwares only + accept /tmc_act). Callers should fall back to their last-applied values + in that case. + ''' + if type(axis) == str: + axis = self.xyztTo1230(axis) + path = "/tmc_get" + payload = {"task": path} + if axis is not None: + payload["axis"] = axis + try: + r = self._parent.post_json(path, payload, timeout=timeout) + if isinstance(r, list): + r = r[0] if r else {} + if not isinstance(r, dict): + return None + # firmware may nest the values under "tmc" or return them flat + tmc = r.get("tmc", r) + if not isinstance(tmc, dict): + return None + keys = ("msteps", "rms_current", "sgthrs", "semin", "semax", "blank_time", "toff") + if not any(k in tmc for k in keys): + # nothing TMC-shaped came back -> readback unsupported + return None + return {k: tmc[k] for k in keys if k in tmc} + except Exception as e: + self._parent.logger.debug(f"get_tmc_parameters failed: {e}") + return None + + # camelCase alias used by the ImSwitch ESP32StageManager + def getTMCSettings(self, axis=0, timeout=1): + return self.get_tmc_parameters(axis=axis, timeout=timeout) + def set_hard_limits(self, axis=1, enabled=True, polarity=0, timeout=1): ''' Configure hard limits (emergency stop) for a motor axis. diff --git a/uc2rest/mserial.py b/uc2rest/mserial.py index 7eef755..5c9d2c0 100644 --- a/uc2rest/mserial.py +++ b/uc2rest/mserial.py @@ -16,8 +16,8 @@ class SerialException(Exception): T_SERIAL_WARMUP = 2.5 class Serial: def __init__(self, port, baudrate=115200, timeout=5, - identity="UC2_Feather", parent=None, DEBUG=False, - skipFirmwareCheck=False): + identity="UC2_Feather", parent=None, DEBUG=False, + skipFirmwareCheck=False, device_id=None, requireMaster=False): self.serialdevice = None self.serialport = port @@ -26,6 +26,15 @@ def __init__(self, port, baudrate=115200, timeout=5, self._parent = parent self.manufacturer = "" self.skipFirmwareCheck = skipFirmwareCheck + # Connect only to a specific board / only to the CANopen master. + # device_id pins a physical board by its USB serial number (or a + # substring of the port path / hwid). requireMaster makes auto-discovery + # skip any board whose firmware does NOT report a "*_master" pindef, so a + # motor/slave board (ESP32-S3, native USB) is never picked accidentally. + self.device_id = device_id + self.requireMaster = requireMaster + # identity of the board we actually connected to (filled by checkFirmware) + self.firmware_info = {} if self._parent is None: import logging self._logger = logging.getLogger(__name__) @@ -160,6 +169,64 @@ def openDevice(self, port=None, baud_rate=115200): return ser + def _portMatchesDeviceId(self, port): + '''True if `port` matches the configured device_id, or if no device_id + is set (then everything matches). The device_id is compared as a + case-insensitive substring against the USB serial number, the port path + and the hwid, so the user can pin a board by whichever is stable on + their OS.''' + if not self.device_id: + return True + did = str(self.device_id).lower() + candidates = [getattr(port, "serial_number", None), + getattr(port, "device", None), + getattr(port, "hwid", None)] + return any(c and did in str(c).lower() for c in candidates) + + def _probeDeviceIdentity(self, ser, timeout=2): + '''Send /state_get and parse the firmware identity block into + self.firmware_info: {name, version, date, author, pindef, isMaster}. + Used by requireMaster to reject motor/slave boards. Best-effort: returns + an empty dict (and leaves firmware_info empty) if nothing parses.''' + info = {} + try: + self._write(ser, {"task": "/state_get"}) + ser.write(b'\n') + buffer = "" + reading_json = False + t0 = time.time() + while time.time() - t0 < timeout: + raw = self._read(ser) + try: + line = raw.decode('utf-8').strip() + except Exception: + continue + if line == "": + continue + if line.find("++") >= 0: + reading_json = True + continue + if reading_json and line.find("--") >= 0: + break + if reading_json: + buffer += line + if buffer: + data = json.loads(buffer) + state = data.get("state", data) if isinstance(data, dict) else {} + pindef = state.get("pindef", "") + info = { + "name": state.get("identifier_name", ""), + "version": state.get("identifier_id", ""), + "date": state.get("identifier_date", ""), + "author": state.get("identifier_author", ""), + "pindef": pindef, + "isMaster": "master" in str(pindef).lower(), + } + except Exception as e: + self._logger.debug(f"_probeDeviceIdentity failed: {e}") + self.firmware_info = info + return info + def findCorrectSerialDevice(self): ''' This function tries to find the correct serial device from the list of available ports @@ -183,6 +250,9 @@ def findCorrectSerialDevice(self): descriptions_to_check = ["CH340", "CP2102", "USB2.0-Serial", "USB-Serial"] for port in _available_ports: + # If a specific board was requested, ignore everything else. + if not self._portMatchesDeviceId(port): + continue if any(port.device.startswith(p) for p in ports_to_check) or \ any(port.description.startswith(d) for d in descriptions_to_check): if current_os.startswith("darwin") and port.device.startswith("/dev/cu.usbserial-"): @@ -217,6 +287,22 @@ def tryToConnect(self, port, baudrate=None): #time.sleep(T_SERIAL_WARMUP) self._freeSerialBuffer(self.serialdevice, timeout=2, timeMinimum=1) if self.skipFirmwareCheck or self.checkFirmware(self.serialdevice): + # When only the master may be used, read the firmware identity and + # reject boards that are not a CANopen master (e.g. ESP32-S3 motor + # boards on native USB that also speak the UC2 protocol). + if self.requireMaster: + self._probeDeviceIdentity(self.serialdevice) + if not self.firmware_info.get("isMaster", False): + self._logger.debug( + f"Skipping non-master board on {getattr(port, 'device', '?')} " + f"(pindef={self.firmware_info.get('pindef', '?')})" + ) + try: + self.serialdevice.close() + except Exception: + pass + self.is_connected = False + return False self.is_connected = True self.NumberRetryReconnect = 0 return True diff --git a/uc2rest/state.py b/uc2rest/state.py index 5d973d1..6ed5cb8 100644 --- a/uc2rest/state.py +++ b/uc2rest/state.py @@ -137,6 +137,41 @@ def get_state(self, timeout=3): r = self._parent.get_json(path, timeout=timeout) return r + def get_firmware_info(self, timeout=3): + ''' + Return the firmware identity of the USB-connected ESP32 as a flat dict: + {"name", "version", "date", "author", "pindef", "isMaster"}. + + Parsed from /state_get, e.g. + {"state":{"identifier_name":"UC2_Feather","identifier_id":"V2.0", + "identifier_date":"Jun 17 2026 07:17:22","identifier_author":"BD", + "pindef":"UC2_canopen_master", ...},"qid":0} + The build date and pindef are the fields that matter for telling boards + apart. Returns an empty dict if nothing could be parsed. + ''' + r = self.get_state(timeout=timeout) + try: + if isinstance(r, list): + r = r[0] if r else {} + state = r.get("state", r) if isinstance(r, dict) else {} + pindef = state.get("pindef", "") + return { + "name": state.get("identifier_name", ""), + "version": state.get("identifier_id", ""), + "date": state.get("identifier_date", ""), + "author": state.get("identifier_author", ""), + "pindef": pindef, + "isMaster": "master" in str(pindef).lower(), + } + except Exception as e: + self._parent.logger.debug(f"get_firmware_info failed: {e}") + return {} + + def is_master(self, timeout=1): + '''True if the connected board reports a CANopen-master pindef.''' + info = self.get_firmware_info(timeout=timeout) + return bool(info.get("isMaster", False)) + def delay(self, delay=1, getReturn=True): path = "/state_act" payload = {