From bbade786dc6fa7229448ec98c358814e56126386 Mon Sep 17 00:00:00 2001 From: beniroquai Date: Tue, 2 Jun 2026 12:30:14 +0200 Subject: [PATCH 1/2] adding fan interface --- ...UC2Client-PinConfigurator-checkpoint.ipynb | 647 ------------------ uc2rest/UC2Client.py | 4 + uc2rest/fan.py | 86 +++ 3 files changed, 90 insertions(+), 647 deletions(-) delete mode 100644 DOCUMENTATION/.ipynb_checkpoints/DOC_UC2Client-PinConfigurator-checkpoint.ipynb create mode 100644 uc2rest/fan.py diff --git a/DOCUMENTATION/.ipynb_checkpoints/DOC_UC2Client-PinConfigurator-checkpoint.ipynb b/DOCUMENTATION/.ipynb_checkpoints/DOC_UC2Client-PinConfigurator-checkpoint.ipynb deleted file mode 100644 index 409a1a7..0000000 --- a/DOCUMENTATION/.ipynb_checkpoints/DOC_UC2Client-PinConfigurator-checkpoint.ipynb +++ /dev/null @@ -1,647 +0,0 @@ -{ - "cells": [ - { - "cell_type": "markdown", - "id": "7b18ec0a", - "metadata": {}, - "source": [ - "\"Open" - ] - }, - { - "cell_type": "markdown", - "id": "0456a946", - "metadata": {}, - "source": [ - "# UC2 REST Tutorial\n", - "\n", - "Here we are going to teach you how to interact with the UC2 microcontroller and how you can add additional functionalities. \n", - "\n", - "In order to use the client in your python environment you need the following packages to be installed:\n", - "\n", - "(use the `!` to install it from within this jupyter notebook)\n", - "```py\n", - "!pip install UC2-REST\n", - "```\n", - "\n", - "This code has been tested with the ESP32 WEMOS D1 R32 + CNC shield v3, where 3 stepper are connected to the board and an LED Matrix (WS2812, adafruit) is connected to the FEED pin. \n", - "\n", - "If you find an error, please report it [here](https://github.com/openUC2/UC2-REST/issues/new) " - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "37e5e2f1", - "metadata": {}, - "outputs": [], - "source": [ - "!pip install UC2-REST" - ] - }, - { - "cell_type": "markdown", - "id": "623bc1e4", - "metadata": {}, - "source": [ - "## Organize all imports\n", - "\n", - "First of all we need to import the `ESP32Client`. Since it is not yet a standalone pip package, we have to do that via a relaitve import, meaning that the file is in the same folder as this Jupyter Notebook" - ] - }, - { - "cell_type": "code", - "execution_count": 10, - "id": "c5e2f6da", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "The autoreload extension is already loaded. To reload it, use:\n", - " %reload_ext autoreload\n" - ] - } - ], - "source": [ - "%load_ext autoreload \n", - "%autoreload 2\n", - "import uc2rest\n", - "import time\n", - "import numpy as np" - ] - }, - { - "cell_type": "markdown", - "id": "8db1ee0c", - "metadata": {}, - "source": [ - "# Connecting to the ESP32 via USB\n", - "\n", - "Now we want to initiliaze the USB-serial connection. Therefore, connect the ESP32 to your computer using a data (!) USB cable and establish the connection. You can leave the port as \"unknown\" as the portfinder may identify the ESP.\n", - "\n", - "**Important:** Close all applications that may be connected to the ESP (e.g. Arduino Serial Plotter)\n", - "\n", - "**IMPORTANT:** Install the USB serial driver for the ESP32: https://learn.sparkfun.com/tutorials/how-to-install-ch340-drivers/all" - ] - }, - { - "cell_type": "code", - "execution_count": 2, - "id": "caadeb4f", - "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "2022-10-22 23:39:46 DEBUG [UC2Client] /dev/cu.MALS\n", - "2022-10-22 23:39:46 DEBUG [UC2Client] /dev/cu.SOC\n", - "2022-10-22 23:39:46 DEBUG [UC2Client] /dev/cu.Bluetooth-Incoming-Port\n", - "2022-10-22 23:39:46 DEBUG [UC2Client] /dev/cu.MiTrueWirelessEBsBasic2\n", - "2022-10-22 23:39:46 DEBUG [UC2Client] /dev/cu.SLAB_USBtoUART\n", - "2022-10-22 23:39:49 DEBUG [UC2Client] We are connected: True on port: /dev/cu.SLAB_USBtoUART\n" - ] - } - ], - "source": [ - "ESP32 = uc2rest.UC2Client(serialport=\"unknown\")" - ] - }, - { - "cell_type": "markdown", - "id": "e492c7df", - "metadata": {}, - "source": [ - "# Moving the motor \n", - "\n", - "The following code snippets will help you moving the motors (XYZ) continously or at a known number of `steps` at a certain `speed` level (all measured in steps/s). \n", - "\n", - "The additional attributs \n", - "- `is_blocking` states if the action is performed in the background or not; if `False` no return message will be provided\n", - "- `is_absolute` says if we go relative or absolute steps \n", - "- `is_enabled` says if we want to \"unpower\" the motors once we are done (prevent overheating)" - ] - }, - { - "cell_type": "code", - "execution_count": 4, - "id": "d6f65b85", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Current position: 0\n", - "Current position: 1000\n", - "Current position: 0\n", - "Don't worry that it's not going to zero... it actually does, but we are asking for it too fast..\n" - ] - } - ], - "source": [ - "# move and measure\n", - "print(\"Current position: \"+ str(ESP32.motor.get_position(axis=1)))\n", - "ESP32.motor.move_x(steps=1000, speed=1000, is_blocking=True, is_absolute=True, is_enabled=True)\n", - "print(\"Current position: \"+ str(ESP32.motor.get_position(axis=1)))\n", - "ESP32.motor.move_x(steps=0, speed=1000, is_blocking=True, is_absolute=True, is_enabled=False)\n", - "print(\"Current position: \"+ str(ESP32.motor.get_position(axis=1)))\n", - "\n", - "print(\"Don't worry that it's not going to zero... it actually does, but we are asking for it too fast..\")" - ] - }, - { - "cell_type": "code", - "execution_count": 5, - "id": "2c95f5f1", - "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "''" - ] - }, - "execution_count": 5, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "# for moving other motors\n", - "ESP32.motor.move_y(steps=1000, speed=1000, is_blocking=True, is_absolute=True, is_enabled=True)\n", - "ESP32.motor.move_z(steps=1000, speed=1000, is_blocking=True, is_absolute=True, is_enabled=True)\n", - "ESP32.motor.move_t(steps=1000, speed=1000, is_blocking=True, is_absolute=True, is_enabled=True)" - ] - }, - { - "cell_type": "code", - "execution_count": 6, - "id": "e2f97425", - "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "''" - ] - }, - "execution_count": 6, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "# moving multiple motors at different speed\n", - "ESP32.motor.move_xyz(steps=(1000,160,330), speed=(1000,100,10000), is_blocking=False, is_absolute=False, is_enabled=False) " - ] - }, - { - "cell_type": "code", - "execution_count": 9, - "id": "b03b910e", - "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "2022-10-22 23:42:34 DEBUG [UC2Client] Casting json string from serial to Python dict failed\n", - "2022-10-22 23:42:35 DEBUG [UC2Client] Casting json string from serial to Python dict failed\n" - ] - }, - { - "data": { - "text/plain": [ - "''" - ] - }, - "execution_count": 9, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "# moving a motor forever and then stop it\n", - "ESP32.motor.move_forever(speed=(100,0,0), is_stop=False)\n", - "time.sleep(0.5)\n", - "ESP32.motor.move_forever(speed=(100,0,0), is_stop=True)" - ] - }, - { - "cell_type": "code", - "execution_count": 13, - "id": "0ec5e529", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "3111\n" - ] - }, - { - "data": { - "text/plain": [ - "{'return': 1}" - ] - }, - "execution_count": 13, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "ESP32.motor.set_motor_maxSpeed(axis=0, maxSpeed=10000)\n", - "ESP32.motor.set_motor_currentPosition(axis=0, currentPosition=10000)\n", - "ESP32.motor.set_motor_acceleration(axis=0, acceleration=10000)\n", - "ESP32.motor.set_motor_enable(is_enable=1)\n", - "ESP32.motor.set_direction(axis=1, sign=1, timeout=1)\n", - "position = ESP32.motor.get_position(axis=1, timeout=1)\n", - "print(position)\n", - "ESP32.motor.set_position(axis=1, position=0, timeout=1)" - ] - }, - { - "cell_type": "markdown", - "id": "ced099d5", - "metadata": {}, - "source": [ - "# ESP32 State" - ] - }, - { - "cell_type": "code", - "execution_count": 16, - "id": "e6db2308", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "{'identifier_name': 'UC2_Feather', 'identifier_id': 'V1.2', 'identifier_date': 'Sep 3 202205:33:18', 'identifier_author': 'BD', 'IDENTIFIER_NAME': ''}\n", - "0\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "2022-10-22 23:47:05 DEBUG [UC2Client] Casting json string from serial to Python dict failed\n" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "False\n" - ] - } - ], - "source": [ - "# test state\n", - "_state = ESP32.state.get_state()\n", - "print(_state)\n", - "ESP32.state.set_state(debug=False)\n", - "_mode = ESP32.state.isControllerMode()\n", - "print(_mode)\n", - "ESP32.state.espRestart()\n", - "time.sleep(5)\n", - "ESP32.state.setControllerMode(isController=True)\n", - "_busy = ESP32.state.isBusy()\n", - "print(_busy)\n" - ] - }, - { - "cell_type": "markdown", - "id": "0d72ffa0", - "metadata": {}, - "source": [ - "# LED Matrix\n", - "\n", - "The LED matrix is connected to the \"HOLD\" pin and can be controlled through the PYTHON interface too" - ] - }, - { - "cell_type": "code", - "execution_count": 17, - "id": "7ecd5211", - "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "2022-10-22 23:47:47 DEBUG [UC2Client] Setting LED Pattern (full): (255, 0, 0)\n" - ] - }, - { - "data": { - "text/plain": [ - "{'return': 1, 'LEDArrMode': 'full'}" - ] - }, - "execution_count": 17, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "# first define the number of LEDs\n", - "ESP32.led.setLEDArrayConfig(ledArrPin=4, ledArrNum=25)\n", - "\n", - "# set all LEDs to a certain RGB value\n", - "ESP32.led.send_LEDMatrix_full(intensity=(255, 0, 0), timeout=1)" - ] - }, - { - "cell_type": "code", - "execution_count": 18, - "id": "6970eaea", - "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "2022-10-22 23:48:10 DEBUG [UC2Client] Setting LED PAttern: 0 - (0, 255, 0)\n" - ] - }, - { - "data": { - "text/plain": [ - "{'return': 1, 'LEDArrMode': 'single'}" - ] - }, - "execution_count": 18, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "# set a single LED to a certain RGB value\n", - "ESP32.led.send_LEDMatrix_single(indexled=0, intensity=(0, 255, 0), timeout=1)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "c40d9ba8", - "metadata": {}, - "outputs": [], - "source": [ - "# set a special LED pattern to a certain RGB value (e.g. \"top\", \"bottom\", \"left\", \"right\")\n", - "ESP32.send_LEDMatrix_special(pattern=\"left\", intensity = (255,255,255),timeout=1)" - ] - }, - { - "cell_type": "code", - "execution_count": 19, - "id": "407942ef", - "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "2022-10-22 23:48:53 DEBUG [UC2Client] Setting LED Pattern (full): (255, 255, 255)\n" - ] - }, - { - "data": { - "text/plain": [ - "{'return': 1, 'LEDArrMode': 'left'}" - ] - }, - "execution_count": 19, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "# set a special LED pattern to a certain RGB value (e.g. \"top\", \"bottom\", \"left\", \"right\")\n", - "ESP32.led.send_LEDMatrix_special(pattern=\"left\", intensity = (255,255,255),timeout=1)\n" - ] - }, - { - "cell_type": "code", - "execution_count": 20, - "id": "24769ec2", - "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "2022-10-22 23:49:38 DEBUG [UC2Client] Setting LED Pattern (array) \n" - ] - }, - { - "data": { - "text/plain": [ - "{'return': 1, 'LEDArrMode': 'array'}" - ] - }, - "execution_count": 20, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "# set a funny pattern\n", - "import numpy as np\n", - "\n", - "Nx=8\n", - "Ny=8\n", - "led_pattern = np.abs(np.int8(np.random.randn(3,Nx*Ny)*255))\n", - "ESP32.led.send_LEDMatrix_array(led_pattern=led_pattern, timeout=1)" - ] - }, - { - "cell_type": "markdown", - "id": "a0ad184a", - "metadata": {}, - "source": [ - "# SLM" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "d79b1629", - "metadata": {}, - "outputs": [], - "source": [ - "ESP32.send_SLM_circle(posX=10, posY=20, radius=30, color=10000)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "ffc9477f", - "metadata": {}, - "outputs": [], - "source": [ - "ESP32.send_SLM_clear()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "a59c228a", - "metadata": {}, - "outputs": [], - "source": [ - "import matplotlib.pyplot as plt\n", - "import numpy as np\n", - "\n", - "\n", - "ESP32.send_SLM_clear()\n", - "\n", - "import time \n", - "\n", - "time.sleep(1)\n", - "\n", - "Nx=30\n", - "Ny=30\n", - "image = np.random.randint(0,255,(Nx,Ny))\n", - "startX = 0\n", - "startY = 0\n", - "\n", - "\n", - "\n", - "ESP32.send_SLM_image(image, startX, startY, timeout=3)\n", - "\n", - "'''\n", - "endX = startX+image.shape[0]\n", - "endY = startY+image.shape[1]\n", - "path = '/slm_act'\n", - "payload = {\n", - " \"task\": \"/act_slm\",\n", - " \"color\": image[:].flatten().tolist(),\n", - " \"startX\":startX,\n", - " \"startY\":startY,\n", - " \"endX\":endX,\n", - " \"endY\":endY, \n", - " \"slmMode\": \"image\"\n", - "}\n", - "\n", - "payload\n", - "'''\n", - "plt.imshow(image), plt.show()\n", - "\n", - "\n", - "\n", - "\n" - ] - }, - { - "cell_type": "markdown", - "id": "2f645a41", - "metadata": {}, - "source": [ - "# Galvos\n", - "\n", - "This is coming soon" - ] - }, - { - "cell_type": "markdown", - "id": "c9af0617", - "metadata": {}, - "source": [ - "# Lasers\n", - "\n", - "This is coming soon" - ] - }, - { - "cell_type": "code", - "execution_count": 14, - "id": "5727098a", - "metadata": {}, - "outputs": [ - { - "data": { - "application/vnd.jupyter.widget-view+json": { - "model_id": "aafe99e68bd0447aa10670341984c053", - "version_major": 2, - "version_minor": 0 - }, - "text/plain": [ - "interactive(children=(IntSlider(value=557, description='val', max=1671, min=-557), Output()), _dom_classes=('w…" - ] - }, - "metadata": {}, - "output_type": "display_data" - } - ], - "source": [ - "\n", - "\n", - "%matplotlib notebook\n", - "from ipywidgets import *\n", - "import numpy as np\n", - "import matplotlib.pyplot as plt\n", - "\n", - "@widgets.interact(controlLasers=(0, 1000)) \n", - "def controlLasers(val=557):\n", - " print(val)\n", - " ESP32.laser.set_laser(channel=1, value=val, despeckleAmplitude=0.5, despecklePeriod=10, timeout=20, is_blocking = True)\n", - " ESP32.laser.set_laser(channel=2, value=val, despeckleAmplitude=0.5, despecklePeriod=10, timeout=20, is_blocking = True)\n", - " ESP32.laser.set_laser(channel=3, value=val, despeckleAmplitude=0.5, despecklePeriod=10, timeout=20, is_blocking = True)\n", - "\n" - ] - }, - { - "cell_type": "markdown", - "id": "d938a1be", - "metadata": {}, - "source": [ - "# LEDs\n", - "\n", - "This is coming soon" - ] - }, - { - "cell_type": "markdown", - "id": "7c1794b0", - "metadata": {}, - "source": [ - "# PID controller \n", - "\n", - "Create a feedback loop for constant pressure" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3 (ipykernel)", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.8.5" - }, - "vscode": { - "interpreter": { - "hash": "7e0b4c7bcc57fa321d9ed93045b19436fc14ee561979f5319cd55a29d7ed0d86" - } - } - }, - "nbformat": 4, - "nbformat_minor": 5 -} diff --git a/uc2rest/UC2Client.py b/uc2rest/UC2Client.py index dcc48a3..9b1ba0d 100644 --- a/uc2rest/UC2Client.py +++ b/uc2rest/UC2Client.py @@ -26,6 +26,7 @@ from .logger import Logger from .cmdrecorder import cmdRecorder from .temperature import Temperature +from .fan import Fan from .message import Message from .can import CAN from .canota import CANOTA @@ -131,6 +132,9 @@ def __init__(self, host=None, port=31950, serialport=None, identity="UC2_Feather # initialize temperature self.temperature = Temperature(self) + # initialize fan controller + self.fan = Fan(self) + # initialize laser self.state = State(self) diff --git a/uc2rest/fan.py b/uc2rest/fan.py new file mode 100644 index 0000000..ed56b9c --- /dev/null +++ b/uc2rest/fan.py @@ -0,0 +1,86 @@ +import time + +class Fan(object): + + def __init__(self, parent): + self._parent = parent + self._fan_state = {} + self._temp_state = {} + + if hasattr(self._parent, "serial"): + self._parent.serial.register_callback(self._callback_fan, pattern="fan") + + def _callback_fan(self, data): + try: + self._fan_state = data["fan"] + except Exception as e: + pass + + # ── getters ────────────────────────────────────────────────────────────── + + def get_fan(self, timeout=1, blocking=True): + """{"task":"/fan_get"} → {fan:{mode,wiper,manual,rpm,stalled,kick,tempC,curve}}""" + if blocking: + r = self._parent.get_json("/fan_get", timeout=timeout) + try: + self._fan_state = r[0]["fan"] + except Exception: + pass + return self._fan_state + return self._fan_state + + def get_temp(self, timeout=1): + """{"task":"/temp_get"} → {temp:{pcb,air,esp,pcb_ok,air_ok}}""" + r = self._parent.get_json("/temp_get", timeout=timeout) + try: + self._temp_state = r[0]["temp"] + except Exception: + pass + return self._temp_state + + # ── setters ─────────────────────────────────────────────────────────────── + + def set_mode(self, mode="auto", wiper=None): + """mode: 'auto' | 'manual' | 'off'. wiper 0-127 required for manual.""" + fan = {"mode": mode} + if wiper is not None: + fan["wiper"] = int(max(0, min(127, wiper))) + return self._act(fan) + + def set_wiper(self, wiper): + """Set fan wiper directly (implies manual mode), wiper 0-127.""" + return self._act({"mode": "manual", "wiper": int(max(0, min(127, wiper)))}) + + def set_kick(self, enabled): + """Enable (True/1) or disable (False/0) stall-kick-start.""" + return self._act({"kick": 1 if enabled else 0}) + + def set_curve(self, curve): + """curve: list of [thresholdC, wiper] pairs, e.g. [[35,127],[45,80]]""" + return self._act({"curve": curve}) + + def off(self): + return self._act({"mode": "off"}) + + # ── internal ────────────────────────────────────────────────────────────── + + def _act(self, fan_payload): + payload = {"task": "/fan_act", "fan": fan_payload} + return self._parent.post_json("/fan_act", payload, getReturn=False) + + +if __name__ == "__main__": + import uc2rest + port = "/dev/cu.SLAB_USBtoUART" + ESP32 = uc2rest.UC2Client(serialport=port, DEBUG=True) + + print("temp:", ESP32.fan.get_temp()) + print("fan:", ESP32.fan.get_fan()) + + ESP32.fan.set_mode("auto") + time.sleep(1) + ESP32.fan.set_wiper(48) + time.sleep(1) + ESP32.fan.set_mode("off") + + ESP32.close() From 8a02b283d2ad227bf3fbf94ea459c68afbe5ab00 Mon Sep 17 00:00:00 2001 From: beniroquai Date: Tue, 2 Jun 2026 14:50:56 +0200 Subject: [PATCH 2/2] Update mserial.py --- uc2rest/mserial.py | 83 +++++++++++++++++++++++++++++++++++++--------- 1 file changed, 68 insertions(+), 15 deletions(-) diff --git a/uc2rest/mserial.py b/uc2rest/mserial.py index 5e4ff8a..4b55b03 100644 --- a/uc2rest/mserial.py +++ b/uc2rest/mserial.py @@ -111,22 +111,26 @@ def openDevice(self, port=None, baud_rate=115200): self.serialdevice.close() except: pass - + # Honour the baudrate passed in — previously tryToConnect used + # self.baudrate, so any override from openDevice/reconnect was lost. + if baud_rate: + self.baudrate = baud_rate + try: - # check if port is string and if so within available ports + # check if port is string and if so within available ports if type(port)==str: port = self.get_port_info(port) if port is None: raise ValueError("Port not found") - + for i in range(2): # not good, but sometimes it needs a second attempt - isUC2 = self.tryToConnect(port) + isUC2 = self.tryToConnect(port, baudrate=self.baudrate) if isUC2: break if not isUC2: raise ValueError('Wrong Firmware.') else: - self._logger.debug(f"Connected to {port.device} at {baud_rate} baud.") + self._logger.debug(f"Connected to {port.device} at {self.baudrate} baud.") ser = self.serialdevice self.is_connected = True self.manufacturer = "UC2" @@ -135,7 +139,7 @@ def openDevice(self, port=None, baud_rate=115200): self._logger.error("[OpenDevice]: "+str(e)) ser = self.findCorrectSerialDevice() if ser is None: - ser = MockSerial(port, baud_rate, timeout=.1) + ser = MockSerial(port, self.baudrate, timeout=.1) self.is_connected = False ser.write_timeout = self.write_timeout if not ser.isOpen(): @@ -197,7 +201,9 @@ def findCorrectSerialDevice(self): self.manufacturer = "UC2Mock" return None - def tryToConnect(self, port): + def tryToConnect(self, port, baudrate=None): + if baudrate is not None: + self.baudrate = baudrate try: self.serialdevice = serial.Serial(port.device, baudrate=self.baudrate, timeout=self.read_timeout, write_timeout=self.write_timeout) # close the device - similar to hard reset @@ -301,8 +307,10 @@ def _process_commands(self): if self.serialdevice is None: self.is_connected = False # TODO: We have to indicate if the device is not connected or if it is just not ready yet - otherwise we will have a lot of "Failed to Send" messages in the beginning continue - else: - self.is_connected = True + # Note: do NOT flip is_connected to True merely because the + # serialdevice handle exists. The handle survives unplugs on + # Linux until the next read, so we only consider ourselves + # connected once a successful read happens below. # if we just want to send but not even wait for a response with self.serialReadLock: @@ -313,8 +321,20 @@ def _process_commands(self): if nFailedCommands > nFailedCommandsMax: raise Exception("Failed to read the line in serial: "+str(mReadline)) line = mReadline.decode('utf-8').strip() - if self.DEBUG and line!="": + if line != "": + # Real bytes from the device — mark the link live + # and reset the failure counter. + self.is_connected = True + nFailedCommands = 0 + if self.DEBUG and line!="": self._logger.debug("[ProcessLines]:"+str(line)) + except SerialException as e: + # Hardware-level error (port disappeared, IO error). + # Flip the flag immediately so callers see the truth. + self.is_connected = False + self._logger.error("SerialException in read loop: "+str(e)) + nFailedCommands += 1 + line = "" except Exception as e: self._logger.error("Failed to read the line in serial: "+str(e)) nFailedCommands += 1 @@ -554,21 +574,54 @@ def closeSerial(self): def close(self): self.closeSerial() - def reconnect(self, baudrate=None): - self._logger.debug("Reconnecting to the serial device") + def reconnect(self, port=None, baudrate=None): + """Reconnect the serial link. + + Either parameter may be omitted to keep the currently-configured value. + Returns True on success, False otherwise. + """ + self._logger.debug( + f"Reconnecting to the serial device (port={port or self.serialport}, " + f"baud={baudrate or self.baudrate})" + ) self.running = False if baudrate is not None: self.baudrate = baudrate + if port is not None: + self.serialport = port try: self.serialdevice.close() except: pass - self.serialdevice = self.openDevice(port = self.serialport, baud_rate = self.baudrate) - if self.serialdevice: - self.serialport = self.serialdevice.port + # Give the previous thread a moment to exit before we replace it. + if self.thread is not None and self.thread.is_alive(): + self.thread.join(timeout=1.0) + self.serialdevice = self.openDevice(port=self.serialport, baud_rate=self.baudrate) + if self.serialdevice: + try: + self.serialport = self.serialdevice.port + except Exception: + pass return True return False + def ping(self, timeout=0.5): + """Quick health check that asks the device for /state_get. + + Returns True if the device responds within ``timeout`` seconds. + Does NOT disturb the running command loop — uses sendMessage with + nResponses=1 and a tight timeout. Intended for the strict variant of + uc2_board_is_connected on the ImSwitch side. + """ + if not self.is_connected or self.manufacturer == "UC2Mock": + return False + try: + resp = self.sendMessage({"task": "/state_get"}, nResponses=1, timeout=timeout) + return isinstance(resp, list) and len(resp) > 0 + except Exception as e: + self._logger.debug(f"ping failed: {e}") + return False + if __name__ == "__main__": # Usage example