diff --git a/roborock/cli.py b/roborock/cli.py index 9def26af..2b261afe 100644 --- a/roborock/cli.py +++ b/roborock/cli.py @@ -607,14 +607,19 @@ async def _await_q10_map_push( timeout: float = _Q10_MAP_PUSH_TIMEOUT, allow_cached_on_timeout: bool = False, ) -> bool: - """Nudge a Q10 to push its map/trace and wait for a fresh update. + """Nudge a Q10 to push its map/trace and wait until ``predicate`` holds. The Q10 map API is entirely push-driven: there is no synchronous get-map request. A ``dpRequestDps`` causes the device to publish a ``MAP_RESPONSE``, which the device's subscribe loop feeds into the map trait. Here we register - an update listener, send the request, and wait for a newly pushed update to - satisfy ``predicate``. Returns whether it did within ``timeout``. + an update listener, send the request, and wait for the pushed data to satisfy + ``predicate``. Returns whether it did within ``timeout``. When the predicate + already holds against cached content we return immediately without nudging. + If ``allow_cached_on_timeout`` is set, a timeout still returns ``True`` when + the predicate holds against the previously cached content. """ + if predicate(): + return True loop = asyncio.get_running_loop() updated: asyncio.Future[None] = loop.create_future() @@ -662,6 +667,59 @@ async def map_image(ctx, device_id: str, output_file: str): click.echo("No map image content available.") +@session.command() +@click.option("--device_id", required=True) +@click.option("--output-dir", default=None, help="If set, write one transparent PNG per layer here.") +@click.pass_context +@async_command +async def q10_map_layers(ctx, device_id: str, output_dir: str | None): + """List the Q10 map's separable layers (background/wall/floor/per-room). + + With --output-dir, also exports each layer as a transparent PNG that can be + stacked in a frontend (background, then floor, then walls, then each room). + """ + import os + + context: RoborockContext = ctx.obj + device_manager = await context.get_device_manager() + device = await device_manager.get_device(device_id) + if device.b01_q10_properties is None: + click.echo("Feature not supported by device") + return + properties = device.b01_q10_properties + await _await_q10_map_push(properties, lambda: properties.map.layers is not None) + layers = properties.map.layers + if layers is None: + click.echo("No map layers available.") + return + + summary = { + "size": {"width": layers.width, "height": layers.height}, + "class_counts": layers.class_counts, + "rooms": [ + {"id": r.id, "name": r.name, "pixel_count": r.pixel_count, "bbox": list(r.bbox)} for r in layers.rooms + ], + } + click.echo(dump_json(summary)) + + if output_dir: + os.makedirs(output_dir, exist_ok=True) + exports = { + "background": layers.render_class("background", (210, 210, 215, 255), scale=2), + "floor": layers.render_class("floor", (70, 170, 95, 200), scale=2), + "wall": layers.render_class("wall", (20, 20, 25, 255), scale=2), + } + for name, png in exports.items(): + with open(os.path.join(output_dir, f"layer_{name}.png"), "wb") as f: + f.write(png) + for room in layers.rooms: + png = layers.render_room(room.id, (90, 140, 220, 200), scale=2) + safe = "".join(c if c.isalnum() else "_" for c in room.name) or f"room{room.id}" + with open(os.path.join(output_dir, f"room_{room.id}_{safe}.png"), "wb") as f: + f.write(png) + click.echo(f"Wrote {3 + len(layers.rooms)} layer PNGs to {output_dir}") + + @session.command() @click.option("--device_id", required=True) @click.option("--include_path", is_flag=True, default=False, help="Include path data in the output.") @@ -721,6 +779,42 @@ async def q10_position(ctx, device_id: str, include_path: bool): click.echo(dump_json(summary)) +@session.command() +@click.option("--device_id", required=True) +@click.option("--output-file", required=True, help="Path to save the map image with the path drawn.") +@click.pass_context +@async_command +async def q10_map_with_path(ctx, device_id: str, output_file: str): + """Render the Q10 map with the current cleaning path + robot position drawn. + + Needs the robot to be actively cleaning (the path/calibration come from the + live trace). Fetches the map and the path, solves the world<->pixel + calibration, and writes the annotated PNG. + """ + context: RoborockContext = ctx.obj + device_manager = await context.get_device_manager() + device = await device_manager.get_device(device_id) + if device.b01_q10_properties is None: + click.echo("Feature not supported by device") + return + properties = device.b01_q10_properties + map_trait = properties.map + await _await_q10_map_push(properties, lambda: map_trait.image_content is not None) + got_path = await _await_q10_map_push(properties, lambda: bool(map_trait.path)) + if not got_path: + click.echo("No live path available (the robot only reports its path while cleaning).") + return + try: + image = map_trait.render_path_on_map() + except RoborockException as err: + click.echo(f"Could not render path on map: {err}") + return + with open(output_file, "wb") as f: + f.write(image) + cal = map_trait.calibration + click.echo(f"Saved map with {len(map_trait.path)}-point path to {output_file} (calibration: {cal})") + + @session.command() @click.option("--device_id", required=True) @click.pass_context diff --git a/roborock/data/b01_q10/b01_q10_containers.py b/roborock/data/b01_q10/b01_q10_containers.py index 7b0351a6..42eec1d0 100644 --- a/roborock/data/b01_q10/b01_q10_containers.py +++ b/roborock/data/b01_q10/b01_q10_containers.py @@ -167,6 +167,11 @@ class Q10Status(RoborockBase): cleaning_progress: int | None = field(default=None, metadata={"dps": B01_Q10_DP.CLEAN_PROGRESS}) fault: int | None = field(default=None, metadata={"dps": B01_Q10_DP.FAULT}) + # Raw base64 map-overlay blobs (decoded by roborock.map.b01_q10_overlays). + restricted_zone_up: str | None = field(default=None, metadata={"dps": B01_Q10_DP.RESTRICTED_ZONE_UP}) + virtual_wall_up: str | None = field(default=None, metadata={"dps": B01_Q10_DP.VIRTUAL_WALL_UP}) + zoned_up: str | None = field(default=None, metadata={"dps": B01_Q10_DP.ZONED_UP}) + # Additional state reported in the device's full status dump. clean_line: YXCleanLine | None = field(default=None, metadata={"dps": B01_Q10_DP.CLEAN_LINE}) carpet_clean_type: YXCarpetCleanType | None = field(default=None, metadata={"dps": B01_Q10_DP.CARPET_CLEAN_TYPE}) diff --git a/roborock/devices/traits/b01/q10/__init__.py b/roborock/devices/traits/b01/q10/__init__.py index d026052c..a9097967 100644 --- a/roborock/devices/traits/b01/q10/__init__.py +++ b/roborock/devices/traits/b01/q10/__init__.py @@ -14,6 +14,7 @@ from .child_lock import ChildLockTrait from .clean_history import CleanHistoryTrait from .command import CommandTrait +from .common import DpsUpdatable from .consumable import ConsumableTrait from .do_not_disturb import DoNotDisturbTrait from .dust_collection import DustCollectionTrait @@ -100,7 +101,7 @@ def __init__(self, channel: MqttChannel) -> None: self.map = MapContentTrait() self.clean_history = CleanHistoryTrait(self.command) # Read-model traits updated from the device's DPS push stream. - self._updatable_traits = [ + self._updatable_traits: list[DpsUpdatable] = [ self.status, self.volume, self.child_lock, @@ -109,6 +110,10 @@ def __init__(self, channel: MqttChannel) -> None: self.network_info, self.consumable, self.clean_history, + # The map trait owns the vector-overlay data points (no-go zones / + # virtual walls), which arrive as status DPs rather than in the map + # packet, so it updates from the DPS stream like any other read-model. + self.map, ] self._subscribe_task: asyncio.Task[None] | None = None @@ -152,7 +157,8 @@ def _handle_message(self, message: Q10Message) -> None: elif isinstance(message, Q10DpsUpdate): _LOGGER.debug("Received Q10 status update: %s", message.dps) # Notify all read-model traits about the new message; each trait - # only updates the fields that it is responsible for. + # only updates the fields that it is responsible for (the map trait + # picks out the vector-overlay data points it owns). for trait in self._updatable_traits: trait.update_from_dps(message.dps) diff --git a/roborock/devices/traits/b01/q10/common.py b/roborock/devices/traits/b01/q10/common.py index 6e9fff03..ccd2426f 100644 --- a/roborock/devices/traits/b01/q10/common.py +++ b/roborock/devices/traits/b01/q10/common.py @@ -11,7 +11,7 @@ """ import logging -from typing import Any, ClassVar, cast +from typing import Any, ClassVar, Protocol, cast from roborock.data.b01_q10.b01_q10_code_mappings import B01_Q10_DP from roborock.data.containers import RoborockBase @@ -20,7 +20,23 @@ from .command import CommandTrait -class UpdatableTrait(TraitUpdateListener): +class DpsUpdatable(Protocol): + """A trait that updates itself from the Q10 DPS push stream. + + The ``Q10PropertiesApi`` subscribe loop fans each push out to every trait in + its ``_updatable_traits`` list; each picks out the data points it is + responsible for and ignores the rest. That list is heterogeneous -- most + members are converter-backed :class:`UpdatableTrait` read-models, but the map + trait implements ``update_from_dps`` directly to decode the vector overlays it + owns without a read-model. This Protocol is the shared shape the loop needs; + :class:`UpdatableTrait` declares it explicitly and the map trait satisfies it + structurally. + """ + + def update_from_dps(self, decoded_dps: dict[B01_Q10_DP, Any]) -> None: ... + + +class UpdatableTrait(TraitUpdateListener, DpsUpdatable): """Base for Q10 traits backed by a read-model updated from the DPS stream. Concrete traits subclass both their ``RoborockBase`` read-model and this diff --git a/roborock/devices/traits/b01/q10/map.py b/roborock/devices/traits/b01/q10/map.py index c132def4..fbce4623 100644 --- a/roborock/devices/traits/b01/q10/map.py +++ b/roborock/devices/traits/b01/q10/map.py @@ -5,106 +5,294 @@ - The device pushes its current map/path as protocol-301 ``MAP_RESPONSE`` messages (a ``dpRequestDps`` nudges it to do so). The protocol layer decodes - those into :class:`Q10MapPacket` / :class:`Q10TracePacket` objects and the - ``Q10PropertiesApi`` subscribe loop routes them to + each push into a typed :class:`~roborock.map.b01_q10_map_parser.Q10MapPacket` + or :class:`~roborock.map.b01_q10_map_parser.Q10TracePacket`, and the + ``Q10PropertiesApi`` subscribe loop routes those to :meth:`MapContentTrait.update_from_map_packet` / :meth:`MapContentTrait.update_from_trace_packet`. -- Those methods render/cache the content and notify update listeners (register - via :meth:`add_update_listener`). -- ``image_content``, ``map_data``, ``rooms``, ``path`` and ``robot_position`` - are readable and reflect the most recently pushed map. +- The no-go / no-mop zones and virtual walls arrive separately as status data + points, fanned in via :meth:`update_from_dps`. + +The trait is deliberately just state management: it accumulates those pushed +inputs (the map packet, the path, the overlays, a solved calibration) and, on +every change, asks :func:`~roborock.map.b01_q10_render.render_q10_map` to compose +them into one :class:`~roborock.map.b01_q10_render.Q10MapRender`. The +low-level pixel work (layer decomposition, erase blanking, world->pixel overlay +placement, path drawing) lives in that map module, not here. Consumers read the +cached fields (``image_content``, ``map_data``, ``rooms``, ``layers``, ``path``, +``robot_position``, ``zones``, ``virtual_walls``) or register a callback with +:meth:`add_update_listener` to learn when new content arrives. Unlike the Q7, the Q10 map payload is unencrypted, so no map key is required. """ import logging -from dataclasses import dataclass, field +from typing import Any from vacuum_map_parser_base.map_data import MapData -from roborock.data import RoborockBase +from roborock.data.b01_q10.b01_q10_code_mappings import B01_Q10_DP from roborock.devices.traits.common import TraitUpdateListener +from roborock.exceptions import RoborockException +from roborock.map.b01_grid_layers import GridCalibration, GridLayers from roborock.map.b01_q10_map_parser import ( - B01Q10MapParser, B01Q10MapParserConfig, Q10MapPacket, Q10Point, Q10Room, Q10TracePacket, ) +from roborock.map.b01_q10_overlays import ( + Q10Zone, + parse_virtual_wall_blob, + parse_zone_blob, +) +from roborock.map.b01_q10_render import Q10MapRender, draw_path_on_map, render_q10_map, solve_q10_calibration _LOGGER = logging.getLogger(__name__) -_TRUNCATE_LENGTH = 20 +class MapContentTrait(TraitUpdateListener): + """Trait holding the most recently pushed parsed map content for Q10 devices. -@dataclass -class MapContent(RoborockBase): - """Dataclass representing Q10 map content.""" + Holds the pushed inputs plus the single composed + :class:`~roborock.map.b01_q10_render.Q10MapRender` derived from them (see the + module docstring). Every mutator updates one input, rebuilds the render in one + shot, and notifies listeners, so the exposed state is always consistent with + one set of inputs rather than a pile of independently-mutated fields. + """ - image_content: bytes | None = None - """The rendered image of the map in PNG format.""" + def __init__( + self, + *, + map_parser_config: B01Q10MapParserConfig | None = None, + ) -> None: + TraitUpdateListener.__init__(self, logger=_LOGGER) + self._config = map_parser_config or B01Q10MapParserConfig() + # Pushed inputs, accumulated from the device's map / trace / DPS streams. + self._packet: Q10MapPacket | None = None + self._path: list[Q10Point] = [] + self._robot_position: Q10Point | None = None + self._robot_heading: int | None = None + self._zones: list[Q10Zone] = [] + self._virtual_walls: list[Q10Zone] = [] + self._calibration: GridCalibration | None = None + # The single composed result, rebuilt from the inputs on every change. + self._render: Q10MapRender | None = None - map_data: MapData | None = None - """Parsed map data (image metadata + room names).""" + # -- Read-only view of the composed map -------------------------------------- - rooms: list[Q10Room] = field(default_factory=list) - """Rooms (segments) reported by the device, with ids and names.""" + @property + def image_content(self) -> bytes | None: + """The rendered base map (PNG), or ``None`` if no map has been pushed.""" + return self._render.image_content if self._render else None - path: list[Q10Point] = field(default_factory=list) - """Full path of the current cleaning session (oldest point first). + @property + def map_data(self) -> MapData | None: + """Parsed map data (image metadata, room names, placed overlays).""" + return self._render.map_data if self._render else None - The robot accumulates this server-side and serves the whole trajectory so - far in one packet, so it is complete even if we connect mid-session. Only - populated while a cleaning session is active.""" + @property + def rooms(self) -> list[Q10Room]: + """Rooms (segments) reported by the device, with ids and names.""" + return self._render.rooms if self._render else [] - robot_position: Q10Point | None = None - """Current robot position (the most recent path point), if known.""" + @property + def layers(self) -> GridLayers | None: + """Separable map layers (background / wall / floor / per-room) in + grid-pixel space, each renderable to a transparent PNG for compositing.""" + return self._render.layers if self._render else None - def __repr__(self) -> str: - img = self.image_content - if img and len(img) > _TRUNCATE_LENGTH: - img = img[: _TRUNCATE_LENGTH - 3] + b"..." - return f"MapContent(image_content={img!r}, rooms={self.rooms!r})" + @property + def calibration(self) -> GridCalibration | None: + """World<->pixel transform, solved from a cleaning path via + :meth:`solve_calibration` (``None`` until one has been fitted).""" + return self._calibration + @property + def path(self) -> list[Q10Point]: + """Full path of the current cleaning session (oldest point first). -class MapContentTrait(MapContent, TraitUpdateListener): - """Trait holding the most recently pushed parsed map content for Q10 devices. + The robot accumulates this server-side and serves the whole trajectory so + far in one packet, so it is complete even if we connect mid-session. Only + populated while a cleaning session is active.""" + return self._path - The Q10 has no synchronous get-map request; the device pushes map and trace - packets, which the protocol layer decodes and the ``Q10PropertiesApi`` - subscribe loop feeds into :meth:`update_from_map_packet` / - :meth:`update_from_trace_packet`. Consumers read the cached fields and/or - register a callback with :meth:`add_update_listener` to be notified when new - map content arrives. - """ + @property + def robot_position(self) -> Q10Point | None: + """Current robot position (the most recent path point), if known.""" + return self._robot_position - def __init__( - self, - *, - map_parser_config: B01Q10MapParserConfig | None = None, - ) -> None: - super().__init__() - TraitUpdateListener.__init__(self, logger=_LOGGER) - self._map_parser = B01Q10MapParser(map_parser_config) + @property + def robot_heading(self) -> int | None: + """Current robot heading in degrees from the trace packet (``0`` = +x, + ``+90`` = +y, ``±180`` = −x, ``−90`` = −y), if a trace has been pushed.""" + return self._robot_heading + + @property + def zones(self) -> list[Q10Zone]: + """Restricted zones (no-go / no-mop) in world coordinates, from the + device's ``dpRestrictedZoneUp``. See :meth:`load_overlays`.""" + return self._zones + + @property + def virtual_walls(self) -> list[Q10Zone]: + """Virtual walls (line segments) in world coordinates.""" + return self._virtual_walls + + # -- Push handlers ----------------------------------------------------------- def update_from_map_packet(self, packet: Q10MapPacket) -> None: - """Render a pushed full-map packet into the cached image/rooms. + """Render a pushed full-map packet into the cached map view. - Rendering failures are logged and skipped (listeners are not notified) so - a single bad push cannot tear down the subscribe loop. + Rendering failures are logged and skipped (the previous map and listeners + are left untouched) so a single bad push cannot tear down the subscribe + loop. """ - parsed = self._map_parser.parse_packet(packet) - if parsed.image_content is None: - _LOGGER.debug("Failed to render Q10 map image") + render = self._compose(packet) + if render is None: return - self.image_content = parsed.image_content - self.map_data = parsed.map_data - self.rooms = packet.rooms + self._packet = packet + self._render = render self._notify_update() def update_from_trace_packet(self, packet: Q10TracePacket) -> None: - """Cache the path/robot position from a pushed trace packet.""" - self.path = packet.points - self.robot_position = packet.robot_position + """Cache the path / robot position / heading from a pushed trace packet.""" + self._path = packet.points + self._robot_position = packet.robot_position + self._robot_heading = packet.heading + # The path/position only reach the rendered map_data once a calibration + # places them in pixel space, so there is nothing to recompose until then + # (skipping the rebuild keeps the frequent pre-calibration trace pushes + # cheap -- the raster is unchanged by the path). + if self._calibration is not None: + self._rebuild() + self._notify_update() + + def update_from_dps(self, decoded_dps: dict[B01_Q10_DP, Any]) -> None: + """Decode any vector-overlay data points present in a DPS push. + + The Q10 pushes no-go / no-mop zones (``dpRestrictedZoneUp``) and virtual + walls (``dpVirtualWallUp``) as status data points rather than inside the + map packet, so the map trait joins the ``Q10PropertiesApi`` DPS fan-out + like the other read-model traits instead of being special-cased by the + orchestrator. Data points absent from this push leave the existing + overlays untouched (a partial status push must not wipe them); a push + carrying neither is a no-op. + """ + if B01_Q10_DP.RESTRICTED_ZONE_UP not in decoded_dps and B01_Q10_DP.VIRTUAL_WALL_UP not in decoded_dps: + return + self.load_overlays( + restricted_zone_up=decoded_dps.get(B01_Q10_DP.RESTRICTED_ZONE_UP), + virtual_wall_up=decoded_dps.get(B01_Q10_DP.VIRTUAL_WALL_UP), + ) self._notify_update() + + def load_overlays( + self, + *, + restricted_zone_up: bytes | str | None = None, + virtual_wall_up: bytes | str | None = None, + ) -> None: + """Decode the device's vector-overlay blobs (from the status DPs). + + Pass the raw ``dpRestrictedZoneUp`` / ``dpVirtualWallUp`` values + (``Q10Status.restricted_zone_up`` / ``virtual_wall_up``). Stores them as + world-coordinate :attr:`zones` / :attr:`virtual_walls`, and -- once a + calibration is available -- the rebuild places them onto ``map_data`` as + ``no_go_areas`` / ``no_mopping_areas`` / ``walls`` in pixel space. + + ``None`` means "data point absent from this update" and leaves the + existing value untouched (a partial status push must not wipe overlays). + An explicit empty blob does clear them. Unlike the push handlers this does + not notify listeners; :meth:`update_from_dps` does after calling it. + """ + if restricted_zone_up is not None: + self._zones = parse_zone_blob(restricted_zone_up) + if virtual_wall_up is not None: + self._virtual_walls = parse_virtual_wall_blob(virtual_wall_up) + # As with the path, the zones/walls are only placed onto the map once a + # calibration exists, so there is nothing to recompose until then. + if self._calibration is not None: + self._rebuild() + + # -- Calibration + rendering ------------------------------------------------- + + def solve_calibration(self) -> GridCalibration | None: + """Fit and cache the world<->pixel calibration from the current path. + + When the map packet's grid-frame header carries a calibration origin + (ss07), only the resolution is fit -- around that fixed origin -- so a + short path suffices; otherwise the full origin + resolution fit is used, + which needs a reasonably dense cleaning path. Both inputs arrive as device + pushes (the path is only populated during an active clean). Caches and + returns the calibration (also stored on :attr:`calibration`), rebuilding + the map so the erase zones and overlays are applied, or ``None`` if there + is no map or the path is too short/featureless to fit. + """ + if self._render is None or self._packet is None: + return None + calibration = solve_q10_calibration(self._render.layers, self._packet.header_calibration, self._path) + if calibration is not None: + self._calibration = calibration + self._rebuild() + return calibration + + def render_path_on_map( + self, + *, + line_color: tuple[int, int, int, int] = (235, 64, 52, 255), + position_color: tuple[int, int, int, int] = (255, 211, 0, 255), + ) -> bytes: + """Return the map image (PNG) with the session path + robot position drawn. + + Solves the calibration on demand if not already cached. Raises + :class:`RoborockException` if there is no map, or no calibration can be + fitted (e.g. no cleaning path captured yet). + """ + if self._render is None: + raise RoborockException("No map available; no map has been pushed yet") + if self._calibration is None: + self.solve_calibration() + if self._render.calibration is None: + raise RoborockException( + "No calibration available; a cleaning path must be captured (pushed) during a clean" + ) + return draw_path_on_map( + self._render, + config=self._config, + path=self._path, + robot_position=self._robot_position, + robot_heading=self._robot_heading, + zones=self._zones, + virtual_walls=self._virtual_walls, + line_color=line_color, + position_color=position_color, + ) + + # -- Internals --------------------------------------------------------------- + + def _compose(self, packet: Q10MapPacket) -> Q10MapRender | None: + """Compose ``packet`` with the current inputs, or ``None`` on failure.""" + try: + return render_q10_map( + packet, + calibration=self._calibration, + path=self._path, + robot_position=self._robot_position, + zones=self._zones, + virtual_walls=self._virtual_walls, + config=self._config, + ) + except RoborockException as ex: + _LOGGER.debug("Failed to render Q10 map packet: %s", ex) + return None + + def _rebuild(self) -> None: + """Recompose the cached map packet with the current inputs (no-op if no + map has been pushed yet, e.g. overlays/trace arriving before the map).""" + if self._packet is None: + return + render = self._compose(self._packet) + if render is not None: + self._render = render diff --git a/roborock/devices/traits/b01/q7/map_content.py b/roborock/devices/traits/b01/q7/map_content.py index afc36d02..a3efeb27 100644 --- a/roborock/devices/traits/b01/q7/map_content.py +++ b/roborock/devices/traits/b01/q7/map_content.py @@ -17,7 +17,8 @@ from roborock.devices.rpc.b01_q7_channel import MapRpcChannel from roborock.devices.traits import Trait from roborock.exceptions import RoborockException -from roborock.map.b01_map_parser import B01MapParser, B01MapParserConfig +from roborock.map.b01_grid_layers import GridCalibration, GridLayers +from roborock.map.b01_map_parser import B01MapParser, B01MapParserConfig, decompose_q7_layers, q7_calibration from roborock.protocols.b01_q7_protocol import B01_Q7_DPS, Q7RequestMessage from roborock.roborock_typing import RoborockB01Q7Methods @@ -36,6 +37,16 @@ class MapContent(RoborockBase): map_data: MapData | None = None """Parsed map data (metadata for points on the map).""" + layers: GridLayers | None = None + """Separable map layers (background / wall / floor) in grid-pixel space. + + Q7's raster has no per-room segmentation, so ``layers.rooms`` is empty (room + ids/names are in the map metadata).""" + + calibration: GridCalibration | None = None + """World<->pixel transform, read directly from the SCMap ``mapHead`` + (``minX``/``minY``/``resolution``); world coordinates are in metres.""" + raw_api_response: bytes | None = None """Raw bytes of the map payload from the device. @@ -98,3 +109,9 @@ async def refresh(self) -> None: self.image_content = parsed_data.image_content self.map_data = parsed_data.map_data self.raw_api_response = raw_payload + try: + self.layers = decompose_q7_layers(raw_payload) + self.calibration = q7_calibration(raw_payload) + except RoborockException: + self.layers = None + self.calibration = None diff --git a/roborock/map/b01_grid_layers.py b/roborock/map/b01_grid_layers.py index 410d60fd..46026f17 100644 --- a/roborock/map/b01_grid_layers.py +++ b/roborock/map/b01_grid_layers.py @@ -200,6 +200,62 @@ def solve_calibration( return best[1] +def solve_calibration_with_origin( + layers: GridLayers, + points: list[tuple[float, float]], + origin: tuple[float, float], + *, + resolutions: Iterable[float], + y_signs: Iterable[int] = (1, -1), + min_on_floor: float = 0.5, +) -> GridCalibration | None: + """Fit resolution + Y orientation around a *known* grid-pixel origin. + + Unlike :func:`solve_calibration`, the pixel origin ``(ox, oy)`` is fixed -- + e.g. read straight from the Q10 grid-frame header -- so this only sweeps the + candidate ``resolutions`` and ``y_signs`` and keeps the placement landing the + most ``points`` on floor. With the expensive 2-D offset slide gone, far fewer + points are needed to confirm the fit, so it works from a short path rather + than a dense clean. Returns ``None`` if no candidate lands a ``min_on_floor`` + fraction of points on floor (e.g. the origin or points are bogus). + """ + if not points: + return None + w, h = layers.width, layers.height + ox, oy = origin + classify = layers.classifier + # 1 = floor, 2 = wall/background (blocked), 0 = other. Index by cell. + klass = bytes( + 1 if (c := classify(v)) == LAYER_FLOOR else 2 if c in (LAYER_WALL, LAYER_BACKGROUND) else 0 for v in layers.grid + ) + + best: tuple[float, GridCalibration] | None = None + for resolution in resolutions: + if resolution <= 0: + continue + for y_sign in y_signs: + on_floor = 0 + blocked = 0 + for x, y in points: + px = int(x / resolution + ox) + py = int(oy - y_sign * y / resolution) + if not (0 <= px < w and 0 <= py < h): + blocked += 1 + continue + k = klass[py * w + px] + if k == 1: + on_floor += 1 + elif k == 2: + blocked += 1 + score = on_floor - 1.5 * blocked + if best is None or score > best[0]: + best = (score, GridCalibration(float(resolution), float(ox), float(oy), y_sign)) + + if best is None or best[0] < len(points) * min_on_floor: + return None + return best[1] + + def decompose_grid( width: int, height: int, diff --git a/roborock/map/b01_map_parser.py b/roborock/map/b01_map_parser.py index b57912e4..30b5887e 100644 --- a/roborock/map/b01_map_parser.py +++ b/roborock/map/b01_map_parser.py @@ -15,11 +15,67 @@ from roborock.exceptions import RoborockException from roborock.map.proto.b01_scmap_pb2 import RobotMap # type: ignore[attr-defined] +from .b01_grid_layers import ( + LAYER_BACKGROUND, + LAYER_FLOOR, + LAYER_WALL, + GridCalibration, + GridLayers, + decompose_grid, +) from .map_parser import ParsedMapData _MAP_FILE_FORMAT = "PNG" +# The Q7 occupancy grid encodes only these classes (no per-room segmentation in +# the raster -- room ids/names live in the protobuf metadata, not the pixels). +_Q7_WALL_VALUE = 127 +_Q7_FLOOR_VALUE = 128 + + +def classify_q7_cell(value: int) -> str: + """Map a Q7 SCMap grid cell value to a canonical layer class.""" + if value == _Q7_WALL_VALUE: + return LAYER_WALL + if value == _Q7_FLOOR_VALUE: + return LAYER_FLOOR + return LAYER_BACKGROUND # 0 = outside / unknown + + +def decompose_q7_layers(payload: bytes) -> GridLayers: + """Split an inflated Q7 SCMap into background / wall / floor layers. + + Q7 has no per-room raster, so ``GridLayers.rooms`` is empty; room ids/names + are available separately via the map metadata. Reuses the same device-agnostic + decomposition as the Q10. + """ + parsed = _parse_scmap_payload(payload) + size_x, size_y, grid = _extract_grid(parsed) + return decompose_grid(size_x, size_y, grid, [], classify_q7_cell) + + +def q7_calibration(payload: bytes) -> GridCalibration | None: + """Build a world<->pixel calibration straight from the Q7 ``mapHead``. + + Unlike the Q10 (whose packet carries no calibration), the Q7 SCMap header + provides ``minX``/``minY``/``resolution`` directly, so no path fitting is + needed. World coordinates are in metres; resolution is metres-per-pixel. + """ + head = _parse_scmap_payload(payload).mapHead + if not head.HasField("resolution") or head.resolution <= 0 or not head.HasField("sizeY"): + return None + resolution = head.resolution + min_x = head.minX if head.HasField("minX") else 0.0 + min_y = head.minY if head.HasField("minY") else 0.0 + return GridCalibration( + resolution=resolution, + origin_x=-min_x / resolution, + origin_y=(head.sizeY - 1) + min_y / resolution, + y_sign=1, + ) + + @dataclass class B01MapParserConfig: """Configuration for the B01/Q7 map parser.""" diff --git a/roborock/map/b01_q10_map_parser.py b/roborock/map/b01_q10_map_parser.py index 89dec083..52dfc76d 100644 --- a/roborock/map/b01_q10_map_parser.py +++ b/roborock/map/b01_q10_map_parser.py @@ -23,7 +23,7 @@ import io import math import statistics -from dataclasses import dataclass, field +from dataclasses import dataclass, field, replace from PIL import Image from vacuum_map_parser_base.config.image_config import ImageConfig @@ -31,10 +31,46 @@ from roborock.exceptions import RoborockException +from .b01_grid_layers import ( + LAYER_BACKGROUND, + LAYER_FLOOR, + LAYER_UNKNOWN, + LAYER_WALL, + GridLayers, + decompose_grid, +) from .map_parser import ParsedMapData _MAP_FILE_FORMAT = "PNG" +# Semantic raster classes, confirmed against real ss07 captures (rendered and +# eyeballed): 243 is the background outside the home (~half the grid), 240 is +# scanned floor not yet assigned to a room, other values >= 240 are walls, and +# 0 < value < 240 are per-room floor cells (value == room_id * 4). +_BACKGROUND_VALUE = 243 +_UNSEGMENTED_FLOOR_VALUE = 240 + + +def classify_q10_cell(value: int) -> str: + """Map a Q10 grid cell value to a canonical layer class.""" + if value == 0: + return LAYER_UNKNOWN + if value == _BACKGROUND_VALUE: + return LAYER_BACKGROUND + if value == _UNSEGMENTED_FLOOR_VALUE: + return LAYER_FLOOR + if value >= _WALL_THRESHOLD: + return LAYER_WALL + return LAYER_FLOOR + + +def decompose_layers(packet: "Q10MapPacket") -> GridLayers: + """Split a parsed Q10 map packet into separable grid-pixel layers.""" + rooms = [(room.id, room.name, room.pixel_value, room.pixel_count) for room in packet.rooms] + # The ss07 grid is stored top-down (row 0 = top), so no display flip is applied. + return decompose_grid(packet.width, packet.height, packet.grid, rooms, classify_q10_cell, flip=False) + + MAP_PACKET_MARKER = b"\x01\x01" TRACE_PACKET_MARKER = b"\x02\x01" @@ -52,6 +88,28 @@ _ROOM_RECORD_LENGTH = 47 _ROOM_NAME_LENGTH_OFFSET = 26 _MAX_ROOMS = 32 +# Sanity bound for the erase-zone vector section's vertices-per-polygon field. +_MAX_ERASE_ZONE_VERTICES = 16 + +# The 01 01 grid-frame header also carries the map's calibration, so a +# GridCalibration can be derived without fitting a cleaning path (i.e. docked / +# pre-clean). Absolute byte offsets in the frame, reported and verified by +# @andrewlyeats across independent ss07 (fw 03.11.24) captures and cross-checked +# with the ioBroker roborock adapter: +# - 11-12 x_min, 13-14 y_min (s16be): map origin in 5 mm units. The grid is +# 50 mm/px, so dividing by 10 yields the origin in grid pixels -- the (ox, oy) +# that solve_calibration otherwise recovers by sliding the path. +# - 15-16 resolution (u16be): reads 5 (= 0.05 m/px = 50 mm/px) universally. +# - 17-18 charger x, 19-20 charger y (s16be, 5 mm units), 21-22 charger phi. +_ORIGIN_X_OFFSET = 11 +_ORIGIN_Y_OFFSET = 13 +_HEADER_RESOLUTION_OFFSET = 15 +_CHARGER_X_OFFSET = 17 +_CHARGER_Y_OFFSET = 19 +_CHARGER_PHI_OFFSET = 21 +# The header origin/charger are in 5 mm units and the grid is 50 mm/px, so a +# header coordinate maps to grid pixels by dividing by this. +_HEADER_UNITS_PER_PIXEL = 10 # Grid cell values >= this are walls / borders rather than room segments. _WALL_THRESHOLD = 240 @@ -72,6 +130,60 @@ def name(self) -> str: return self.raw_name.removeprefix("rr_").replace("_", " ").strip().title() +@dataclass +class Q10EraseZone: + """A user-drawn "erase" area (polygon) carried in the map packet. + + These are the app's *Erase* tool rectangles -- regions the user marked to be + removed from the map (e.g. phantom floor the lidar mapped through windows). + Coordinates are world units (millimetres), same frame as the path/zones. + + Confirmed by a controlled diff: removing the two erase zones on a live device + dropped this section's count from 2 to 0 while the grid and the trailing + raster were byte-identical. (Earlier revisions misidentified this section as + "carpets"; it is the erase-zone list.) + """ + + vertices: list[tuple[int, int]] = field(default_factory=list) + + +@dataclass +class Q10HeaderCalibration: + """Calibration carried in the ``01 01`` grid-frame header (ss07). + + Lets a :class:`~roborock.map.b01_grid_layers.GridCalibration` origin be read + straight from the map packet -- no cleaning path / fit required, so it works + docked or pre-clean. See :meth:`origin_pixels`. + + ``origin_x`` / ``origin_y`` and the charger coordinates are in 5 mm units; + ``resolution`` is the raw header field (5 == 50 mm/px). ``charger_phi`` is the + raw dock heading field. Reported and verified by @andrewlyeats (ss07). + """ + + origin_x: int + origin_y: int + resolution: int + charger_x: int + charger_y: int + charger_phi: int + + @property + def is_keepalive(self) -> bool: + """True for null/keepalive frames (``x_min == y_min == 0``), which carry + no usable origin -- callers should fall back to a path-fit calibration.""" + return self.origin_x == 0 and self.origin_y == 0 + + def origin_pixels(self) -> tuple[float, float] | None: + """The grid-pixel origin ``(ox, oy)`` for a ``GridCalibration``. + + The header origin is in 5 mm units and the grid is 50 mm/px, so the + pixel origin is the header value divided by 10. Returns ``None`` for a + keepalive frame (no origin to use).""" + if self.is_keepalive: + return None + return (self.origin_x / _HEADER_UNITS_PER_PIXEL, self.origin_y / _HEADER_UNITS_PER_PIXEL) + + @dataclass class Q10MapPacket: """Decoded contents of a Q10 ``01 01`` map packet.""" @@ -81,6 +193,14 @@ class Q10MapPacket: height: int grid: bytes rooms: list[Q10Room] = field(default_factory=list) + erase_zones: list[Q10EraseZone] = field(default_factory=list) + """Erase areas decoded from the packet tail (world coordinates).""" + header_calibration: Q10HeaderCalibration | None = None + """Calibration read straight from the grid-frame header (ss07), or ``None``.""" + carpet_mask: bytes | None = None + """Carpet mask decoded from the packet tail: a full ``width*height`` grid in + the same (top-down) pixel space as :attr:`grid`, where a non-zero cell is + carpet (the value is the carpet kind). ``None`` if the packet carried none.""" @dataclass @@ -98,13 +218,14 @@ class Q10TracePacket: The robot accumulates the **full path of the current cleaning session** and serves it in a single packet: ``points`` holds the whole trajectory so far (oldest first), growing as the robot cleans. This was confirmed live -- a - corridor run produced packets of 1, then 3, then 15 points, each a strict - superset describing the path travelled. Because the robot keeps the path - server-side, a client that connects mid-session still receives the complete - path (this is how the app shows the trail even after a cold launch). - - The robot only emits these while a session is active, so an idle/docked robot - will not produce them. The most recent point is the current robot position. + corridor run produced packets of 0 (just a heading, docked), then 3, then 14 + points, each a strict superset describing the path travelled. Because the + robot keeps the path server-side, a client that connects mid-session still + receives the complete path (this is how the app shows the trail even after a + cold launch). + + A docked/idle robot can still emit a packet carrying only the ``heading`` + (zero points). The most recent point is the current robot position. """ points: list[Q10Point] = field(default_factory=list) @@ -112,30 +233,52 @@ class Q10TracePacket: """Session counter (byte 3); increments per cleaning session, tracking the device clean count. Not a per-packet sequence.""" + heading: int = 0 + """Robot heading from the 0201 SLAM field (bytes 10-11), in degrees: + ``0`` = +x, ``+90`` = +y, ``±180`` = −x, ``−90`` = −y. This is the current + orientation; pair it with :attr:`robot_position` to draw a facing robot. + + Convention (incl. the y-sign) ground-truthed on a live R1 clean: across + straight segments the reported heading equalled the direction of travel + ``atan2(dy, dx)`` -- +x read 0, −x read ±180, a slight −y drift read + negative.""" + @property def robot_position(self) -> Q10Point | None: """The current robot position (the most recent point).""" return self.points[-1] if self.points else None -# Trace packet (``02 01``): a 10-byte header followed by big-endian int16 (x, y) +# Trace packet (``02 01``): a 14-byte header followed by big-endian int16 (x, y) # point pairs forming the accumulated session path. Header layout confirmed -# against live ss07 captures: byte 3 is a session counter (tracks the device -# clean count); bytes 8-9 are a u16be point count minus one (verified: a 15-point -# packet carried 0x000e == 14). The parser reads all 4-byte pairs in the body -# rather than trusting the count field, so a truncated tail can't desync it. +# against live ss07 captures and cross-checked by @andrewlyeats: +# - byte 3: a session counter (tracks the device clean count). +# - bytes 8-9: a u16be point count -- the exact number of (x, y) pairs from +# byte 14 (verified: captures of 1417 / 2462 points carried 0x0589 / 0x099e). +# - bytes 10-11: the 0201 SLAM heading (s16be degrees; 0 = +x, +90 = +y, +# +-180 = -x, -90 = -y) -- the robot's current orientation. +# - bytes 12-13: a constant (0x0000). +# - byte 14 onward: the path points. +# An earlier revision used a 10-byte header, which folded the heading word into +# a phantom leading point ``(heading, 0)`` -- that is the "stray point" the +# heuristic below was papering over, and why the count read "one high". The +# parser reads all 4-byte pairs in the body rather than trusting the count +# field, so a truncated tail can't desync it. # NOTE: the format documented by roborock-qseries-map-bridge (18-byte header) -# did not match this firmware -- this 10-byte layout is what the device sent. -_TRACE_HEADER_LENGTH = 10 +# did not match this firmware -- this 14-byte layout is what the device sent. +_TRACE_HEADER_LENGTH = 14 _TRACE_SEQUENCE_OFFSET = 3 - -# Some cleans prepend a single stray point to the path, far outside the map -# (e.g. ~(0, -1907) when the real path starts near (-3760, -1920)); it skews the -# rendered start/bounding box and any path-based calibration. We drop points[0] -# only when its step to points[1] is a gross outlier (this multiple of the -# median step of the rest of the path), so a genuine first point is never lost. -# The current position (last point) is unaffected. Trigger and threshold -# reported and verified by @andrewlyeats across independent B01/Q10 captures. +_TRACE_HEADING_OFFSET = 10 + +# Some cleans still prepend a single near-origin sentinel as the first real +# point (e.g. ~(5, 76) / (-3, 0) when the path proper starts near (-1700, -800)); +# it skews the rendered start/bounding box and any path-based calibration. (This +# is distinct from the heading word the old 10-byte header used to surface as a +# phantom point -- that is now consumed by the header.) We drop points[0] only +# when its step to points[1] is a gross outlier (this multiple of the median step +# of the rest of the path), so a genuine first point is never lost. The current +# position (last point) is unaffected. Trigger and threshold reported and +# verified by @andrewlyeats across independent B01/Q10 captures. _STRAY_POINT_STEP_RATIO = 20 @@ -159,6 +302,7 @@ def parse_trace_packet(payload: bytes) -> Q10TracePacket: if len(body) % 4: raise RoborockException("Q10 trace points are not 4-byte (x, y) pairs") + heading = int.from_bytes(payload[_TRACE_HEADING_OFFSET : _TRACE_HEADING_OFFSET + 2], "big", signed=True) points = [ Q10Point( x=int.from_bytes(body[offset : offset + 2], "big", signed=True), @@ -167,7 +311,7 @@ def parse_trace_packet(payload: bytes) -> Q10TracePacket: for offset in range(0, len(body), 4) ] points = _drop_stray_leading_point(points) - return Q10TracePacket(points=points, sequence=payload[_TRACE_SEQUENCE_OFFSET]) + return Q10TracePacket(points=points, sequence=payload[_TRACE_SEQUENCE_OFFSET], heading=heading) def _drop_stray_leading_point(points: list[Q10Point]) -> list[Q10Point]: @@ -323,7 +467,132 @@ def parse_map_packet(payload: bytes) -> Q10MapPacket: else: height, grid, room_data = _infer_layout(decoded, width) rooms = _parse_rooms(room_data, grid) - return Q10MapPacket(map_id=map_id, width=width, height=height, grid=grid, rooms=rooms) + tail = payload[layout_end:] + erase_zones = _parse_erase_zones(tail) + carpet_mask = _parse_carpet_mask(tail, width, height) + header_calibration = _parse_header_calibration(payload) + return Q10MapPacket( + map_id=map_id, + width=width, + height=height, + grid=grid, + rooms=rooms, + erase_zones=erase_zones, + header_calibration=header_calibration, + carpet_mask=carpet_mask, + ) + + +def _parse_header_calibration(payload: bytes) -> Q10HeaderCalibration | None: + """Read the calibration fields from the ``01 01`` grid-frame header. + + All fields sit inside the fixed 29-byte header (already length-checked by + the caller). See the offset constants for the byte layout. Returns the + decoded :class:`Q10HeaderCalibration` (callers skip keepalive frames via + :attr:`Q10HeaderCalibration.is_keepalive`).""" + + def s16(offset: int) -> int: + return int.from_bytes(payload[offset : offset + 2], "big", signed=True) + + return Q10HeaderCalibration( + origin_x=s16(_ORIGIN_X_OFFSET), + origin_y=s16(_ORIGIN_Y_OFFSET), + resolution=int.from_bytes(payload[_HEADER_RESOLUTION_OFFSET : _HEADER_RESOLUTION_OFFSET + 2], "big"), + charger_x=s16(_CHARGER_X_OFFSET), + charger_y=s16(_CHARGER_Y_OFFSET), + charger_phi=s16(_CHARGER_PHI_OFFSET), + ) + + +def _parse_erase_zones(tail: bytes) -> list[Q10EraseZone]: + """Decode erase areas from the bytes after the compressed grid block. + + The tail begins with a vector section ``[count: u8][vertices_per: u8]`` then + ``count`` polygons of ``vertices_per`` int16-BE (x, y) pairs (axis-aligned + rectangles in practice). Identified by a controlled diff on a live ss07 + device: deleting the two app *Erase* zones dropped ``count`` 2->0 with the + rest of the packet byte-identical. The remaining tail (a run-length raster + + signature) is unrelated to erase and is not decoded here. + """ + if len(tail) < 2: + return [] + count = tail[0] + vertices_per = tail[1] + if count == 0 or not 1 <= vertices_per <= _MAX_ERASE_ZONE_VERTICES: + return [] + + erase_zones: list[Q10EraseZone] = [] + offset = 2 + for _ in range(count): + end = offset + vertices_per * 4 + if end > len(tail): + break + vertices = [ + ( + int.from_bytes(tail[offset + j * 4 : offset + j * 4 + 2], "big", signed=True), + int.from_bytes(tail[offset + j * 4 + 2 : offset + j * 4 + 4], "big", signed=True), + ) + for j in range(vertices_per) + ] + erase_zones.append(Q10EraseZone(vertices=vertices)) + offset = end + return erase_zones + + +def _carpet_offset(tail: bytes) -> int: + """Byte offset within the post-grid tail where the carpet mask begins. + + The erase section is ``[u8 count][u8 vertices_per]`` then ``count`` polygons + of ``vertices_per`` int16 (x, y) pairs, so it always occupies + ``2 + count*vertices_per*4`` bytes -- just the 2-byte header when count is 0. + """ + if len(tail) < 2: + return len(tail) + count, vertices_per = tail[0], tail[1] + return 2 + count * vertices_per * 4 + + +def _parse_carpet_mask(tail: bytes, width: int, height: int) -> bytes | None: + """Decode the carpet mask that follows the erase section in the packet tail. + + Framing matches the main grid block: ``[u32 uncompressed_len]`` + ``[u16 compressed_len][LZ4 block]``. The decompressed mask is a full + ``width*height`` grid in the same (top-down) pixel space as the main grid; a + non-zero cell is carpet (the value is the carpet kind). Confirmed byte-exact + on live ss07 captures (R1 / RDC), where ``uncompressed_len == width*height``. + + Returns the decompressed mask, or ``None`` if the section is absent or does + not line up (the ``uncompressed_len == width*height`` invariant is used as the + guard so a mis-located section yields no carpet rather than garbage). + """ + offset = _carpet_offset(tail) + if offset + 6 > len(tail): + return None + uncompressed_len = int.from_bytes(tail[offset : offset + 4], "big") + compressed_len = int.from_bytes(tail[offset + 4 : offset + 6], "big") + block_end = offset + 6 + compressed_len + if uncompressed_len != width * height or compressed_len <= 0 or block_end > len(tail): + return None + try: + mask = lz4_block_decompress(tail[offset + 6 : block_end]) + except RoborockException: + return None + return mask if len(mask) == width * height else None + + +def erased_packet(packet: "Q10MapPacket", cells: set[int]) -> "Q10MapPacket": + """Return a copy of ``packet`` with ``cells`` (grid indices) set to background. + + Used to apply the app's erase zones: cells inside an erase rectangle are blanked + to the background class so they drop out of the rendered map and every layer. + """ + if not cells: + return packet + grid = bytearray(packet.grid) + for cell in cells: + if 0 <= cell < len(grid): + grid[cell] = _BACKGROUND_VALUE + return replace(packet, grid=bytes(grid)) @dataclass @@ -340,16 +609,17 @@ class B01Q10MapParser: def __init__(self, config: B01Q10MapParserConfig | None = None) -> None: self._config = config or B01Q10MapParserConfig() + @property + def config(self) -> B01Q10MapParserConfig: + """The parser configuration (image scale, ...).""" + return self._config + def parse(self, payload: bytes) -> ParsedMapData: """Parse a raw Q10 map packet into a rendered PNG + ``MapData``.""" - return self.parse_packet(parse_map_packet(payload)) - - def parse_packet(self, packet: Q10MapPacket) -> ParsedMapData: - """Render an already-parsed Q10 map packet into a PNG + ``MapData``. + return self.parsed_from_packet(parse_map_packet(payload)) - The protocol layer parses the wire bytes into a :class:`Q10MapPacket`; - this renders that packet without re-parsing it. - """ + def parsed_from_packet(self, packet: Q10MapPacket) -> ParsedMapData: + """Render a (possibly erase-modified) packet into a PNG + ``MapData``.""" image = self._render(packet) map_data = MapData() @@ -367,6 +637,11 @@ def parse_packet(self, packet: Q10MapPacket) -> ParsedMapData: if room_names: map_data.additional_parameters["room_names"] = room_names + # Carpet cells are flat grid indices (y*width + x) in the same top-down + # pixel space as the rendered raster, so they line up with the image. + if packet.carpet_mask is not None: + map_data.carpet_map = {i for i, value in enumerate(packet.carpet_mask) if value} + image_bytes = io.BytesIO() image.save(image_bytes, format=_MAP_FILE_FORMAT) return ParsedMapData(image_content=image_bytes.getvalue(), map_data=map_data) @@ -378,7 +653,8 @@ def _render(self, packet: Q10MapPacket) -> Image.Image: for value in packet.grid: rgb.extend(palette[value]) img = Image.frombytes("RGB", (packet.width, packet.height), bytes(rgb)) - img = img.transpose(Image.Transpose.FLIP_TOP_BOTTOM) + # The ss07 grid is stored top-down (row 0 = top of the home), so it is + # rendered as-is -- unlike the V1/Q7 convention, no vertical flip. scale = self._config.map_scale if scale > 1: img = img.resize((packet.width * scale, packet.height * scale), resample=Image.Resampling.NEAREST) diff --git a/roborock/map/b01_q10_overlays.py b/roborock/map/b01_q10_overlays.py index 13e67491..34fcf71c 100644 --- a/roborock/map/b01_q10_overlays.py +++ b/roborock/map/b01_q10_overlays.py @@ -4,13 +4,21 @@ the map raster; the device reports them as base64-encoded blobs in separate data points (``dpRestrictedZoneUp`` 55, ``dpVirtualWallUp`` 57, ``dpZonedUp`` 59). -The blob format was reverse-engineered from a live ss07 (confirmed against 7 -real no-go zones): +The restricted-zone / zoned blob format (DP 55 and DP 59) was reverse-engineered +from a live ss07 (confirmed against 7 real no-go zones): [version: u8][count: u8] then ``count`` fixed-size records, each: [type: u8][vertex_count: u8] then vertex_count (x, y) int16-BE pairs, zero-padded to the record size. +Use :func:`parse_zone_blob` for those. Virtual walls (DP 57) use a *different +framing* -- a bare ``[count]`` and 8-byte ``(x, y)`` records, no version/type/pad +-- so they have their own :func:`parse_virtual_wall_blob`; feeding DP 57 to +:func:`parse_zone_blob` mis-frames it (the leading byte is read as a version and +the next, a coordinate, as a record count). The coordinate order matches the +zones (first wire word = x), confirmed against the app. Provenance and the +byte-level breakdown are in PR #850's review thread. + Coordinates are in the device's world units (the same space as the cleaning path), so a :class:`~roborock.map.b01_grid_layers.GridCalibration` maps them to map pixels. ``type`` distinguishes the restriction kind (2 = no-mop, 3 = door @@ -83,12 +91,67 @@ def parse_zone_blob(data: bytes | str | None) -> list[Q10Zone]: return zones +_WALL_RECORD_SIZE = 8 # two (x, y) int16-BE endpoints + + +def parse_virtual_wall_blob(data: bytes | str | None) -> list[Q10Zone]: + """Decode a Q10 virtual-wall overlay blob (``dpVirtualWallUp`` 57). + + Virtual walls use a *different framing* from the restricted-zone DPs handled + by :func:`parse_zone_blob`: a single ``[count: u8]`` byte (no version, no + per-record type/pad) followed by ``count`` 8-byte records, each two + ``(x, y)`` int16-BE endpoints. The *coordinate order is the same* as the + restricted zones (first wire word = x), so a wall and a no-go zone drawn on + the same line decode parallel rather than transposed. + + Each wall is returned as a :class:`Q10Zone` of type + :data:`ZONE_TYPE_VIRTUAL_WALL` with its two ``(x, y)`` endpoints, so callers + can place them onto the map through the same + :class:`~roborock.map.b01_grid_layers.GridCalibration` as the zones. + + Accepts raw bytes or the base64 string straight from the data point. Returns + ``[]`` for empty/absent/unparsable blobs (the device sends a single ``0x00`` + byte -- base64 ``AA==`` -- when there are none). + + The axis order was confirmed against the app: a horizontal wall drawn below + a room reads back with x varying and y constant (and the wide RDC no-go zone + reads back wide), so DP 57 shares DP 55's order. An earlier revision swapped + the wall axes to ``(y, x)`` -- following a misreading of PR #850's notes -- + which placed every wall transposed 90 degrees from where it was drawn. + """ + raw = _as_bytes(data) + if len(raw) < 1: + return [] + count = raw[0] + if count <= 0: + return [] + + body = raw[1:] + walls: list[Q10Zone] = [] + for index in range(count): + record = body[index * _WALL_RECORD_SIZE : (index + 1) * _WALL_RECORD_SIZE] + if len(record) < _WALL_RECORD_SIZE: + break # truncated trailing record; stop rather than misread + vertices = [ + ( + int.from_bytes(record[p * 4 : p * 4 + 2], "big", signed=True), + int.from_bytes(record[p * 4 + 2 : p * 4 + 4], "big", signed=True), + ) + for p in range(2) + ] + walls.append(Q10Zone(type=ZONE_TYPE_VIRTUAL_WALL, vertices=vertices)) + return walls + + # Observed ``type`` values, confirmed against an ss07 Q10 (firmware 03.11.24) and # cross-checked with the ioBroker roborock adapter: 2 = no-mop, 3 = door -# threshold, 1 = virtual wall. Any other value (including 0) is a no-go zone. -# In practice virtual walls arrive on a separate DP (VIRTUAL_WALL_UP 57), so this -# restricted-zone DP normally only carries 0 / 2 / 3. The raw value is also kept -# on ``Q10Zone.type`` for callers that recognise it. +# threshold. Any other value (including 0) is a no-go zone. The raw value is also +# kept on ``Q10Zone.type`` for callers that recognise it. +# +# Virtual walls arrive on a separate DP (VIRTUAL_WALL_UP 57) with their own frame +# (see :func:`parse_virtual_wall_blob`), so this restricted-zone DP only carries +# 0 / 2 / 3 -- never a 1. ``ZONE_TYPE_VIRTUAL_WALL`` is kept here only to tag the +# walls that :func:`parse_virtual_wall_blob` produces. # # Corrected from an earlier reading that treated type 3 as no-mop -- 3 is the # door-threshold rectangle; the no-mop area reads back as type 2. Reported and diff --git a/roborock/map/b01_q10_render.py b/roborock/map/b01_q10_render.py new file mode 100644 index 00000000..eecc5f33 --- /dev/null +++ b/roborock/map/b01_q10_render.py @@ -0,0 +1,338 @@ +"""Compose a Q10 (B01/ss07) map into a single rendered result. + +The :class:`~roborock.map.b01_q10_map_parser.B01Q10MapParser` turns wire bytes +into a :class:`~roborock.map.b01_q10_map_parser.Q10MapPacket`; this module takes +that packet plus the *other* inputs the device streams separately -- the cleaning +path, the vector overlays (no-go / no-mop zones, virtual walls) and a solved +world<->pixel calibration -- and composes them into one :class:`Q10MapRender` result object +(image + ``MapData`` + layers). + +It exists so the map trait stays about state management: the trait accumulates +the pushed inputs and calls :func:`render_q10_map` once per change, holding the +returned object rather than mutating a pile of derived fields itself. All the +low-level pixel work (erase-zone blanking, world->pixel overlay placement, path +drawing) and the calibration policy live here, next to the rest of the map code. +""" + +import io +import math +from collections.abc import Sequence +from dataclasses import dataclass + +from PIL import Image, ImageDraw +from vacuum_map_parser_base.map_data import Area, MapData, Path, Point, Wall + +from roborock.exceptions import RoborockException + +from .b01_grid_layers import ( + GridCalibration, + GridLayers, + solve_calibration, + solve_calibration_with_origin, +) +from .b01_q10_map_parser import ( + B01Q10MapParser, + B01Q10MapParserConfig, + Q10HeaderCalibration, + Q10MapPacket, + Q10Point, + Q10Room, + decompose_layers, + erased_packet, +) +from .b01_q10_overlays import ZONE_TYPE_NO_GO, ZONE_TYPE_NO_MOP, Q10Zone + +# Path-units-per-pixel candidates for calibration. A dense ss07 path lands a +# best fit of 20.0 around the header origin -- ground-truthed June 2026 on the +# R1: a corridor drive registered at 20 (matching the format author's +# independent "20 path-units/px"), and the dock->corridor span lined up with the +# ruler-measured 8.81 m corridor. With the header resolution=5 (50 mm/px grid) +# that makes one path-unit exactly 50/20 = 2.5 mm -- so a path-unit is NOT a +# millimetre (the open scale question). An earlier [10.0..18.0] range couldn't +# reach 20 (it railed at the bound), biasing the fit. A dense cleaning path +# selects the best fit within this bracket. +_Q10_RESOLUTIONS = [step * 0.5 for step in range(24, 53)] # 12.0 .. 26.0 +# A path needs enough shape to constrain a full (origin + resolution) fit; a few +# points cannot. +_MIN_CALIBRATION_POINTS = 20 +# When the grid-frame header supplies the origin, only the resolution is fit, so +# a much shorter path suffices to confirm it (early in a clean, not just a dense +# one). See :func:`solve_calibration_with_origin`. +_MIN_HEADER_CALIBRATION_POINTS = 4 + + +@dataclass +class Q10MapRender: + """The fully composed result of rendering a Q10 map packet. + + Built by :func:`render_q10_map` from the packet plus the current path, + overlays and calibration, so every derived field is consistent with one set + of inputs. Analogous to :class:`~roborock.map.map_parser.ParsedMapData`, but + also carrying the separable :attr:`layers` and the :attr:`calibration` used + to place the vector overlays. + """ + + image_content: bytes + """The rendered base map (PNG) with erase zones blanked, path not drawn.""" + + map_data: MapData + """Parsed map data: image metadata, room names, and -- once a calibration is + known -- the path / robot position / zones / walls placed in pixel space.""" + + layers: GridLayers + """Separable map layers (background / wall / floor / per-room) in grid-pixel + space, each renderable to a transparent PNG for frontend compositing.""" + + rooms: list[Q10Room] + """Rooms (segments) reported by the device, with ids and names.""" + + calibration: GridCalibration | None + """World<->pixel transform used to place the overlays, or ``None`` if no + calibration was available (the overlays are then absent from ``map_data``).""" + + +def render_q10_map( + packet: Q10MapPacket, + *, + calibration: GridCalibration | None, + path: Sequence[Q10Point], + robot_position: Q10Point | None, + zones: Sequence[Q10Zone], + virtual_walls: Sequence[Q10Zone], + config: B01Q10MapParserConfig, +) -> Q10MapRender: + """Compose a Q10 map packet and its overlays into a :class:`Q10MapRender`. + + With a ``calibration`` the erase zones are blanked out of the raster and the + path / robot position / restricted zones / virtual walls are placed onto + ``map_data`` in pixel space; without one only the base raster is rendered + (the overlays are world-coordinate only and can't be placed yet). Raises + :class:`RoborockException` if the packet fails to render. + """ + parser = B01Q10MapParser(config) + layers = decompose_layers(packet) + + render_packet = packet + if calibration is not None: + cells = _erased_cells(layers, packet.erase_zones, calibration) + if cells: + # Blank the erase-zone cells and re-derive the raster/layers from the + # modified packet so the phantom areas disappear (as the app shows). + render_packet = erased_packet(packet, cells) + layers = decompose_layers(render_packet) + + parsed = parser.parsed_from_packet(render_packet) + if parsed.image_content is None or parsed.map_data is None: + raise RoborockException("Failed to render Q10 map image") + map_data = parsed.map_data + + if calibration is not None: + _place_path(map_data, calibration, path, robot_position) + _place_zones(map_data, calibration, path, zones, virtual_walls) + + return Q10MapRender( + image_content=parsed.image_content, + map_data=map_data, + layers=layers, + rooms=packet.rooms, + calibration=calibration, + ) + + +def solve_q10_calibration( + layers: GridLayers, + header_calibration: Q10HeaderCalibration | None, + path: Sequence[Q10Point], +) -> GridCalibration | None: + """Fit the world<->pixel calibration from the current cleaning path. + + When the map packet's grid-frame header carries a calibration origin (ss07), + only the resolution is fit -- around that fixed origin -- so a short path + suffices and the origin is exact rather than recovered by a slide. Otherwise + the full origin + resolution fit is used, which needs a reasonably dense + cleaning path. Returns ``None`` if the path is too short/featureless to fit. + """ + points: list[tuple[float, float]] = [(point.x, point.y) for point in path] + return _calibration_from_header(layers, header_calibration, points) or _calibration_from_fit(layers, points) + + +def _calibration_from_header( + layers: GridLayers, + header_calibration: Q10HeaderCalibration | None, + points: list[tuple[float, float]], +) -> GridCalibration | None: + """Calibrate around the header-supplied origin (resolution fit to a path).""" + if header_calibration is None or len(points) < _MIN_HEADER_CALIBRATION_POINTS: + return None + origin = header_calibration.origin_pixels() + if origin is None: # keepalive frame -- no usable origin + return None + return solve_calibration_with_origin(layers, points, origin, resolutions=_Q10_RESOLUTIONS) + + +def _calibration_from_fit(layers: GridLayers, points: list[tuple[float, float]]) -> GridCalibration | None: + """Full origin + resolution fit; needs a reasonably dense path.""" + if len(points) < _MIN_CALIBRATION_POINTS: + return None + return solve_calibration(layers, points, resolutions=_Q10_RESOLUTIONS) + + +def _erased_cells(layers: GridLayers, erase_zones: Sequence, calibration: GridCalibration) -> set[int]: + """Grid-cell indices covered by the erase zones (axis-aligned bbox fill).""" + if not erase_zones: + return set() + width, height = layers.width, layers.height + cells: set[int] = set() + for zone in erase_zones: + pixels = [calibration.world_to_pixel(x, y) for x, y in zone.vertices] + xs = [p[0] for p in pixels] + ys = [p[1] for p in pixels] + x0, x1 = int(min(xs)), int(max(xs)) + y0, y1 = int(min(ys)), int(max(ys)) + for py in range(max(0, y0), min(height, y1 + 1)): + for px in range(max(0, x0), min(width, x1 + 1)): + cells.add(py * width + px) + return cells + + +def _place_path( + map_data: MapData, + calibration: GridCalibration, + path: Sequence[Q10Point], + robot_position: Q10Point | None, +) -> None: + """Fill ``MapData.path`` / ``vacuum_position`` in grid-pixel coords. + + Points are stored in grid-pixel space (origin top-left), matching the Q10's + top-down, un-flipped raster so they line up with the rendered image. + """ + pixels = [Point(*calibration.world_to_pixel(point.x, point.y)) for point in path] + map_data.path = Path(len(pixels), 1, 0, [pixels]) + if robot_position is not None: + px, py = calibration.world_to_pixel(robot_position.x, robot_position.y) + map_data.vacuum_position = Point(px, py) + + +def _place_zones( + map_data: MapData, + calibration: GridCalibration, + path: Sequence[Q10Point], + zones: Sequence[Q10Zone], + virtual_walls: Sequence[Q10Zone], +) -> None: + """Convert world-coordinate zones/walls into pixel-space ``MapData`` layers.""" + + def to_area(zone: Q10Zone) -> Area | None: + if len(zone.vertices) != 4: + return None # MapData.Area is a quad + pts = [calibration.world_to_pixel(x, y) for x, y in zone.vertices] + return Area(pts[0][0], pts[0][1], pts[1][0], pts[1][1], pts[2][0], pts[2][1], pts[3][0], pts[3][1]) + + no_go = [area for zone in zones if zone.type == ZONE_TYPE_NO_GO and (area := to_area(zone))] + no_mop = [area for zone in zones if zone.type == ZONE_TYPE_NO_MOP and (area := to_area(zone))] + map_data.no_go_areas = no_go or None + map_data.no_mopping_areas = no_mop or None + + walls: list[Wall] = [] + for zone in virtual_walls: + if len(zone.vertices) >= 2: + (x0, y0), (x1, y1) = zone.vertices[0], zone.vertices[1] + p0 = calibration.world_to_pixel(x0, y0) + p1 = calibration.world_to_pixel(x1, y1) + walls.append(Wall(p0[0], p0[1], p1[0], p1[1])) + map_data.walls = walls or None + + # The robot starts a session at its dock, so the path origin is the charger. + if path: + cx, cy = calibration.world_to_pixel(path[0].x, path[0].y) + map_data.charger = Point(cx, cy) + + +def draw_path_on_map( + render: Q10MapRender, + *, + config: B01Q10MapParserConfig, + path: Sequence[Q10Point], + robot_position: Q10Point | None, + robot_heading: int | None, + zones: Sequence[Q10Zone], + virtual_walls: Sequence[Q10Zone], + line_color: tuple[int, int, int, int] = (235, 64, 52, 255), + position_color: tuple[int, int, int, int] = (255, 211, 0, 255), +) -> bytes: + """Draw the session path + robot position + overlays onto the base map (PNG). + + ``render`` must carry a calibration (its :attr:`Q10MapRender.calibration`) -- + the caller is responsible for solving one first. Returns a fresh PNG; the + ``render.image_content`` base raster is left untouched. + """ + calibration = render.calibration + if calibration is None: + raise RoborockException("No calibration available; a cleaning path must be captured during a clean") + + scale = config.map_scale + base = Image.open(io.BytesIO(render.image_content)).convert("RGBA") + + def world_to_image(x: float, y: float) -> tuple[float, float]: + px, py = calibration.world_to_pixel(x, y) + # The ss07 grid renders top-down (no flip), so grid-pixel (px, py) maps + # straight to image space, only upscaled by ``scale``. + return (px * scale, py * scale) + + def to_image(point: Q10Point) -> tuple[float, float]: + return world_to_image(point.x, point.y) + + draw = ImageDraw.Draw(base, "RGBA") + + # Erase zones are applied to the raster itself (cells blanked), so they are + # not drawn here -- the base image already reflects them. + + # No-go (blue) and no-mop (magenta) zones beneath the path. + for zone in zones: + if len(zone.vertices) < 3: + continue + polygon = [world_to_image(x, y) for x, y in zone.vertices] + fill = (0, 120, 255, 70) if zone.type == ZONE_TYPE_NO_GO else (255, 0, 200, 70) + outline = (0, 80, 200, 255) if zone.type == ZONE_TYPE_NO_GO else (200, 0, 160, 255) + draw.polygon(polygon, fill=fill, outline=outline) + + # Virtual walls (line segments, not polygons) drawn over the zones. + for wall in virtual_walls: + if len(wall.vertices) < 2: + continue + draw.line( + [world_to_image(x, y) for x, y in wall.vertices[:2]], + fill=(255, 64, 64, 255), + width=max(2, scale), + ) + + if len(path) >= 2: + draw.line([to_image(point) for point in path], fill=line_color, width=max(1, scale // 2)) + if path: # path origin == dock / charger + dx, dy = to_image(path[0]) + draw.ellipse([dx - scale, dy - scale, dx + scale, dy + scale], outline=(40, 200, 40, 255), width=2) + if robot_position is not None: + cx, cy = to_image(robot_position) + radius = scale + draw.ellipse([cx - radius, cy - radius, cx + radius, cy + radius], fill=position_color) + if robot_heading is not None: + # Heading is world-space degrees (0 = +x, +90 = +y). Map a unit + # world-space facing vector through the same transform (so the + # Y-flip/scale match the marker), then normalize to a fixed + # pixel-length tick so it reads at any calibration resolution. + angle = math.radians(robot_heading) + hx, hy = world_to_image( + robot_position.x + math.cos(angle), + robot_position.y + math.sin(angle), + ) + norm = math.hypot(hx - cx, hy - cy) + if norm > 0: + tick = 4 * radius + draw.line( + [cx, cy, cx + (hx - cx) / norm * tick, cy + (hy - cy) / norm * tick], + fill=position_color, + width=max(1, scale // 2), + ) + buffer = io.BytesIO() + base.save(buffer, format="PNG") + return buffer.getvalue() diff --git a/tests/devices/traits/b01/q10/test_map.py b/tests/devices/traits/b01/q10/test_map.py index 470f3f99..b9180065 100644 --- a/tests/devices/traits/b01/q10/test_map.py +++ b/tests/devices/traits/b01/q10/test_map.py @@ -1,26 +1,41 @@ """Tests for the Q10 B01 map content trait. The Q10 map API is push-driven: the device publishes ``MAP_RESPONSE`` messages -and the trait updates its cached state from them via ``update_from_map_response`` -(there is no synchronous get-map request). +which the protocol layer decodes into typed map/trace packets; the trait updates +its cached state from them via ``update_from_map_packet`` / +``update_from_trace_packet`` (there is no synchronous get-map request). These +tests cover that state management; the pixel/geometry work it drives is tested in +``tests/map/test_b01_q10_render.py``. """ import asyncio from collections.abc import AsyncGenerator +from dataclasses import replace from pathlib import Path -from typing import cast from unittest.mock import AsyncMock, Mock import pytest -from roborock.cli import _await_q10_map_push, cli +from roborock.data.b01_q10.b01_q10_code_mappings import B01_Q10_DP from roborock.devices.traits.b01.q10 import Q10PropertiesApi, create from roborock.devices.traits.b01.q10.map import MapContentTrait -from roborock.map.b01_q10_map_parser import Q10Point, parse_map_packet, parse_trace_packet +from roborock.exceptions import RoborockException +from roborock.map.b01_grid_layers import GridCalibration +from roborock.map.b01_q10_map_parser import ( + Q10HeaderCalibration, + Q10Point, + Q10TracePacket, + parse_map_packet, + parse_trace_packet, +) from roborock.roborock_message import RoborockMessage, RoborockMessageProtocol FIXTURE = Path("tests/map/testdata/b01_q10_map.bin") -TRACE_FIXTURE = Path("tests/map/testdata/b01_q10_trace.bin") +TRACE_SESSION_FIXTURE = Path("tests/map/testdata/b01_q10_trace_session.bin") + +# A header calibration whose pixel origin (0, 5) is usable (not a keepalive +# frame), so a short path can calibrate the fixture map. +_USABLE_HEADER = Q10HeaderCalibration(origin_x=0, origin_y=50, resolution=5, charger_x=0, charger_y=0, charger_phi=0) def _map_message( @@ -29,8 +44,28 @@ def _map_message( return RoborockMessage(protocol=protocol, payload=payload, version=b"B01") +def _trait_with_map() -> MapContentTrait: + """A trait with the fixture map already pushed into it.""" + trait = MapContentTrait() + trait.update_from_map_packet(parse_map_packet(FIXTURE.read_bytes())) + return trait + + +def _floor_world_points(trait: MapContentTrait, cal: GridCalibration, count: int) -> list[Q10Point]: + """``count`` world points lying on the map's floor under ``cal``.""" + layers = trait.layers + assert layers is not None + floor = [ + (px, py) + for py in range(layers.height) + for px in range(layers.width) + if layers.cell_class(layers.grid[py * layers.width + px]) == "floor" + ] + return [Q10Point(*(int(v) for v in cal.pixel_to_world(px, py))) for px, py in floor[:count]] + + def test_update_from_map_packet_populates_image_and_rooms() -> None: - """A parsed 01 01 map packet populates the image, rooms and map data.""" + """A pushed 01 01 map packet populates the image, rooms and map data.""" packet = parse_map_packet(FIXTURE.read_bytes()) trait = MapContentTrait() updates: list[None] = [] @@ -42,83 +77,33 @@ def test_update_from_map_packet_populates_image_and_rooms() -> None: assert trait.image_content[:8] == b"\x89PNG\r\n\x1a\n" assert {room.id: room.name for room in trait.rooms} == {2: "Living Room", 3: "Bedroom"} assert trait.map_data is not None + assert trait.layers is not None assert len(updates) == 1 def test_update_from_trace_packet_populates_path_and_position() -> None: - """A parsed 02 01 trace packet populates the path and robot position.""" - packet = parse_trace_packet(TRACE_FIXTURE.read_bytes()) + """A pushed 02 01 trace packet populates the path, position and heading.""" + trace = parse_trace_packet(TRACE_SESSION_FIXTURE.read_bytes()) trait = MapContentTrait() updates: list[None] = [] trait.add_update_listener(lambda: updates.append(None)) - trait.update_from_trace_packet(packet) + trait.update_from_trace_packet(trace) - assert [(p.x, p.y) for p in trait.path] == [(169, 0)] + assert len(trait.path) == 14 + assert (trait.path[0].x, trait.path[0].y) == (41, 64) assert trait.robot_position is not None - assert (trait.robot_position.x, trait.robot_position.y) == (169, 0) + assert (trait.robot_position.x, trait.robot_position.y) == (276, -1) + assert trait.robot_heading == -34 assert len(updates) == 1 -def test_q10_position_is_available_as_top_level_cli_command() -> None: - assert "q10-position" in cli.commands - - -# --- CLI push waiting -------------------------------------------------------- - - -class _FakeQ10Properties: - def __init__(self) -> None: - self.map = MapContentTrait() - self.refresh_count = 0 - - async def refresh(self) -> None: - self.refresh_count += 1 - - -class _FakeQ10PropertiesWithTrace(_FakeQ10Properties): - async def refresh(self) -> None: - await super().refresh() - self.map.update_from_trace_packet(parse_trace_packet(TRACE_FIXTURE.read_bytes())) - - -async def test_await_q10_map_push_waits_for_fresh_update() -> None: - """A cached trace alone is not treated as a successful new map push.""" - properties = _FakeQ10Properties() - properties.map.path = [Q10Point(1, 2)] - - got_trace = await _await_q10_map_push( - cast(Q10PropertiesApi, properties), lambda: bool(properties.map.path), timeout=0.01 - ) - - assert got_trace is False - assert properties.refresh_count == 1 - - -async def test_await_q10_map_push_returns_true_after_update() -> None: - properties = _FakeQ10PropertiesWithTrace() - - got_trace = await _await_q10_map_push( - cast(Q10PropertiesApi, properties), lambda: bool(properties.map.path), timeout=0.01 - ) - - assert got_trace is True - assert [(p.x, p.y) for p in properties.map.path] == [(169, 0)] - - -async def test_await_q10_map_push_can_fall_back_to_cached_map_on_timeout() -> None: - properties = _FakeQ10Properties() - properties.map.image_content = b"cached-png" - - got_map = await _await_q10_map_push( - cast(Q10PropertiesApi, properties), - lambda: properties.map.image_content is not None, - timeout=0.01, - allow_cached_on_timeout=True, - ) - - assert got_map is True - assert properties.refresh_count == 1 +def test_map_push_populates_layers() -> None: + """A pushed map is also decomposed into separable layers.""" + trait = _trait_with_map() + assert trait.layers is not None + assert trait.layers.class_counts.get("floor") == 26 + assert {room.id for room in trait.layers.rooms} == {2, 3} # --- Integration through the Q10PropertiesApi subscribe loop ----------------- @@ -174,7 +159,141 @@ async def test_subscribe_loop_routes_trace_push( """A trace pushed onto the stream is routed to the map trait by the loop.""" assert not q10_api.map.path - message_queue.put_nowait(_map_message(TRACE_FIXTURE.read_bytes())) + message_queue.put_nowait(_map_message(TRACE_SESSION_FIXTURE.read_bytes())) await _wait_for(lambda: bool(q10_api.map.path)) assert q10_api.map.robot_position is not None + + +# --- Calibration + rendering ------------------------------------------------- + + +def test_solve_calibration_needs_map_and_dense_path() -> None: + """No map -> no calibration, even with a path pushed.""" + trait = MapContentTrait() + trait.update_from_trace_packet(Q10TracePacket(points=[Q10Point(i, 0) for i in range(30)])) + assert trait.solve_calibration() is None # no map pushed yet + + +def test_solve_calibration_uses_header_origin_with_short_path() -> None: + """A grid-frame header origin lets a short path calibrate and caches it.""" + trait = MapContentTrait() + trait.update_from_map_packet(replace(parse_map_packet(FIXTURE.read_bytes()), header_calibration=_USABLE_HEADER)) + true = GridCalibration(resolution=20.0, origin_x=0.0, origin_y=5.0, y_sign=1) + trait.update_from_trace_packet(Q10TracePacket(points=_floor_world_points(trait, true, 6))) + assert len(trait.path) < 20 # far too short for the full origin+resolution fit + + cal = trait.solve_calibration() + + assert cal is not None + assert (cal.origin_x, cal.origin_y) == (0.0, 5.0) # straight from the header + assert trait.calibration is cal + # The solved calibration is applied to the map so overlays are placed. + assert trait.map_data is not None + assert trait.map_data.path is not None + + +def test_solve_calibration_short_path_without_header_returns_none() -> None: + """Without a header origin a short path is still too sparse for the full fit.""" + trait = _trait_with_map() # the fixture header is a keepalive frame + true = GridCalibration(resolution=10.0, origin_x=0.0, origin_y=5.0, y_sign=1) + trait.update_from_trace_packet(Q10TracePacket(points=_floor_world_points(trait, true, 6))) + assert trait.solve_calibration() is None + assert trait.calibration is None + + +def test_render_path_on_map_requires_map() -> None: + trait = MapContentTrait() + with pytest.raises(RoborockException, match="No map available"): + trait.render_path_on_map() + + +def test_render_path_on_map_solves_and_renders() -> None: + """render_path_on_map solves the calibration on demand and returns a PNG.""" + trait = MapContentTrait() + trait.update_from_map_packet(replace(parse_map_packet(FIXTURE.read_bytes()), header_calibration=_USABLE_HEADER)) + true = GridCalibration(resolution=20.0, origin_x=0.0, origin_y=5.0, y_sign=1) + trait.update_from_trace_packet(Q10TracePacket(points=_floor_world_points(trait, true, 6))) + + png = trait.render_path_on_map() + + assert png[:8] == b"\x89PNG\r\n\x1a\n" + assert trait.calibration is not None # solved and cached on demand + + +def test_render_path_on_map_without_path_cannot_calibrate() -> None: + """A map but no cleaning path -> no calibration -> a clear error.""" + trait = _trait_with_map() + with pytest.raises(RoborockException, match="No calibration available"): + trait.render_path_on_map() + + +# --- Overlays ---------------------------------------------------------------- + + +def test_load_overlays_places_zones_after_calibration() -> None: + """Decoded no-go / no-mop zones are placed on MapData once calibrated.""" + trait = MapContentTrait() + trait.update_from_map_packet(replace(parse_map_packet(FIXTURE.read_bytes()), header_calibration=_USABLE_HEADER)) + true = GridCalibration(resolution=20.0, origin_x=0.0, origin_y=5.0, y_sign=1) + trait.update_from_trace_packet(Q10TracePacket(points=_floor_world_points(trait, true, 6))) + assert trait.solve_calibration() is not None + + def rect(zone_type: int, corners: list[tuple[int, int]]) -> bytes: + out = bytes([zone_type, len(corners)]) + for x, y in corners: + out += int.to_bytes(x & 0xFFFF, 2, "big") + int.to_bytes(y & 0xFFFF, 2, "big") + return out.ljust(18, b"\x00") + + blob = bytes([1, 1]) + rect(0, [(0, 0), (40, 0), (40, 40), (0, 40)]) + trait.load_overlays(restricted_zone_up=blob) + + assert len(trait.zones) == 1 + assert trait.map_data is not None + assert len(trait.map_data.no_go_areas or []) == 1 + + +def test_load_overlays_partial_update_keeps_existing_zones() -> None: + """A status push without the zone DP (None) must not wipe loaded zones.""" + trait = MapContentTrait() + blob = ( + bytes([1, 1]) + + bytes([0, 4]) + + b"".join(int.to_bytes(v & 0xFFFF, 2, "big") for xy in [(0, 0), (4, 0), (4, 4), (0, 4)] for v in xy) + ) + trait.load_overlays(restricted_zone_up=blob) + assert len(trait.zones) == 1 + # A later partial update carrying only the (empty) virtual-wall DP. + trait.load_overlays(restricted_zone_up=None, virtual_wall_up=b"\x00") + assert len(trait.zones) == 1 # zones preserved + assert trait.virtual_walls == [] + + +def test_update_from_dps_decodes_overlay_data_points() -> None: + """The map trait picks the overlay DPs out of a DPS push and decodes them.""" + trait = MapContentTrait() + blob = ( + bytes([1, 1]) + + bytes([0, 4]) + + b"".join(int.to_bytes(v & 0xFFFF, 2, "big") for xy in [(0, 0), (4, 0), (4, 4), (0, 4)] for v in xy) + ) + notified = [] + trait.add_update_listener(lambda: notified.append(True)) + + trait.update_from_dps({B01_Q10_DP.RESTRICTED_ZONE_UP: blob}) + + assert len(trait.zones) == 1 + assert notified # listeners learn the overlays changed + + +def test_update_from_dps_without_overlay_data_points_is_noop() -> None: + """A DPS push carrying neither overlay DP leaves the trait untouched.""" + trait = MapContentTrait() + notified = [] + trait.add_update_listener(lambda: notified.append(True)) + + trait.update_from_dps({B01_Q10_DP.BATTERY: 50}) + + assert trait.zones == [] + assert trait.virtual_walls == [] + assert not notified diff --git a/tests/map/test_b01_grid_layers.py b/tests/map/test_b01_grid_layers.py index 2299ac3f..52954203 100644 --- a/tests/map/test_b01_grid_layers.py +++ b/tests/map/test_b01_grid_layers.py @@ -1,7 +1,9 @@ -"""Tests for the device-agnostic grid->layers decomposition + calibration.""" +"""Tests for the device-agnostic grid->layers decomposition + Q10 classifier.""" import io +from pathlib import Path +import pytest from PIL import Image from roborock.map.b01_grid_layers import ( @@ -11,7 +13,30 @@ GridCalibration, decompose_grid, solve_calibration, + solve_calibration_with_origin, ) +from roborock.map.b01_q10_map_parser import ( + classify_q10_cell, + decompose_layers, + parse_map_packet, +) + +FIXTURE = Path(__file__).resolve().parent / "testdata" / "b01_q10_map.bin" + + +@pytest.mark.parametrize( + ("value", "expected"), + [ + (0, "unknown"), + (8, LAYER_FLOOR), + (12, LAYER_FLOOR), + (240, LAYER_FLOOR), + (243, LAYER_BACKGROUND), + (249, LAYER_WALL), + ], +) +def test_classify_q10_cell(value: int, expected: str) -> None: + assert classify_q10_cell(value) == expected def test_decompose_grid_generic_classifier_and_bbox() -> None: @@ -72,6 +97,25 @@ def test_render_scale_upsamples() -> None: assert Image.open(io.BytesIO(png)).size == (6, 3) +def test_decompose_layers_on_q10_fixture() -> None: + """The Q10 synthetic fixture splits into floor + per-room layers.""" + layers = decompose_layers(parse_map_packet(FIXTURE.read_bytes())) + assert layers.class_counts.get(LAYER_FLOOR) == 26 + names = {room.id: room.name for room in layers.rooms} + assert names == {2: "Living Room", 3: "Bedroom"} + # Each room renders to a valid PNG and only its own pixels are opaque. + living = layers.render_room(2, (255, 0, 0, 255)) + img = Image.open(io.BytesIO(living)) + opaque = sum(1 for *_rgb, a in img.getdata() if a > 0) + assert opaque == next(r.pixel_count for r in layers.rooms if r.id == 2) + + +def test_render_room_unknown_id_raises() -> None: + layers = decompose_layers(parse_map_packet(FIXTURE.read_bytes())) + with pytest.raises(KeyError): + layers.render_room(999, (0, 0, 0, 255)) + + def test_calibration_roundtrip() -> None: cal = GridCalibration(resolution=2.0, origin_x=3.0, origin_y=8.0, y_sign=1) assert cal.world_to_pixel(0, 0) == (3.0, 8.0) @@ -117,3 +161,27 @@ def test_solve_calibration_returns_none_when_unfittable() -> None: # Points so far apart no resolution keeps them on the 6x6 floor block. points = [(0.0, 0.0), (1000.0, 0.0), (0.0, 1000.0)] assert solve_calibration(layers, points, resolutions=[2.0]) is None + + +def test_solve_calibration_with_origin_fits_resolution_from_short_path() -> None: + """With a known origin only resolution is fit, so a tiny path suffices.""" + layers = _floor_block_layers() + true = GridCalibration(2.0, 3.0, 8.0, 1) + points = [true.pixel_to_world(px, py) for px, py in [(4, 7), (6, 5)]] # two points + cal = solve_calibration_with_origin(layers, points, (true.origin_x, true.origin_y), resolutions=[1.0, 2.0, 3.0]) + assert cal is not None + assert (cal.resolution, cal.origin_x, cal.origin_y, cal.y_sign) == (2.0, 3.0, 8.0, 1) + + +def test_solve_calibration_with_origin_returns_none_off_floor() -> None: + """A wrong origin that lands the path off floor is rejected, not forced.""" + layers = _floor_block_layers() + true = GridCalibration(2.0, 3.0, 8.0, 1) + points = [true.pixel_to_world(px, py) for px, py in [(4, 7), (6, 5)]] + # Origin shoved into the background corner: every point lands off floor. + assert solve_calibration_with_origin(layers, points, (0.0, 0.0), resolutions=[2.0]) is None + + +def test_solve_calibration_with_origin_returns_none_without_points() -> None: + layers = _floor_block_layers() + assert solve_calibration_with_origin(layers, [], (3.0, 8.0), resolutions=[2.0]) is None diff --git a/tests/map/test_b01_map_parser.py b/tests/map/test_b01_map_parser.py index 0829182e..2e66cd4f 100644 --- a/tests/map/test_b01_map_parser.py +++ b/tests/map/test_b01_map_parser.py @@ -11,7 +11,14 @@ from PIL import Image from roborock.exceptions import RoborockException -from roborock.map.b01_map_parser import B01MapParser, _parse_scmap_payload +from roborock.map.b01_grid_layers import LAYER_BACKGROUND, LAYER_FLOOR, LAYER_WALL +from roborock.map.b01_map_parser import ( + B01MapParser, + _parse_scmap_payload, + classify_q7_cell, + decompose_q7_layers, + q7_calibration, +) from roborock.map.proto.b01_scmap_pb2 import RobotMap # type: ignore[attr-defined] from roborock.protocols.b01_q7_protocol import create_map_key, decode_map_payload @@ -126,6 +133,31 @@ def test_b01_scmap_parser_maps_observed_schema_fields() -> None: assert not parsed.roomDataInfo[1].HasField("roomName") +def test_classify_q7_cell() -> None: + assert classify_q7_cell(0) == LAYER_BACKGROUND + assert classify_q7_cell(127) == LAYER_WALL + assert classify_q7_cell(128) == LAYER_FLOOR + + +def test_q7_layers_and_calibration_from_fixture() -> None: + """Q7 reuses the shared grid decomposition + reads calibration from mapHead.""" + inflated = gzip.decompress(FIXTURE.read_bytes()) + + layers = decompose_q7_layers(inflated) + assert set(layers.class_counts) == {LAYER_BACKGROUND, LAYER_WALL, LAYER_FLOOR} + assert layers.class_counts[LAYER_FLOOR] > 0 + assert layers.rooms == [] # Q7 raster has no per-room segmentation + + cal = q7_calibration(inflated) + assert cal is not None + # mapHead gives minX=-5, minY=-7, resolution=0.05 -> origin from those. + assert cal.resolution == pytest.approx(0.05, abs=1e-4) + assert cal.origin_x == pytest.approx(5.0 / cal.resolution, abs=1.0) + # World origin (0,0) maps inside the grid. + px, py = cal.world_to_pixel(0.0, 0.0) + assert 0 <= px < layers.width and 0 <= py < layers.height + + def test_b01_map_parser_rejects_invalid_payload() -> None: parser = B01MapParser() with pytest.raises(RoborockException, match="Failed to parse B01 SCMap"): diff --git a/tests/map/test_b01_q10_map_parser.py b/tests/map/test_b01_q10_map_parser.py index 53c3c63c..2ceb5e72 100644 --- a/tests/map/test_b01_q10_map_parser.py +++ b/tests/map/test_b01_q10_map_parser.py @@ -18,7 +18,7 @@ FIXTURE = Path(__file__).resolve().parent / "testdata" / "b01_q10_map.bin" TRACE_FIXTURE = Path(__file__).resolve().parent / "testdata" / "b01_q10_trace.bin" TRACE_MULTI_FIXTURE = Path(__file__).resolve().parent / "testdata" / "b01_q10_trace_multi.bin" -# Real 15-point packet captured from an R1 corridor run (full session path). +# Real 14-point packet captured from an R1 corridor run (full session path). TRACE_SESSION_FIXTURE = Path(__file__).resolve().parent / "testdata" / "b01_q10_trace_session.bin" @@ -75,10 +75,12 @@ def _full_header_map_payload(width: int, height: int, decoded_layout: bytes) -> return bytes(payload) -def _trace_payload(points: list[tuple[int, int]], sequence: int = 1) -> bytes: - header = bytearray(10) +def _trace_payload(points: list[tuple[int, int]], sequence: int = 1, heading: int = 0) -> bytes: + header = bytearray(14) # _TRACE_HEADER_LENGTH header[0:2] = b"\x02\x01" header[3] = sequence + header[8:10] = len(points).to_bytes(2, "big") # point count == number of (x, y) pairs + header[10:12] = heading.to_bytes(2, "big", signed=True) body = b"".join(x.to_bytes(2, "big", signed=True) + y.to_bytes(2, "big", signed=True) for x, y in points) return bytes(header) + body @@ -190,30 +192,37 @@ def test_packet_markers_are_distinct() -> None: def test_parse_trace_packet_real_single_point() -> None: - """A real ss07 packet captured early in a session has a single path point.""" + """A real ss07 packet captured while docked: a heading, but no path points. + + The 14-byte header carries point count 0 (bytes 8-9) and heading 169 (bytes + 10-11); the rest of the frame is empty. An earlier 10-byte-header revision + mis-decoded the heading word as a phantom path point ``(169, 0)``. + """ trace = parse_trace_packet(TRACE_FIXTURE.read_bytes()) assert trace.sequence == 9 - assert [(p.x, p.y) for p in trace.points] == [(169, 0)] - assert trace.robot_position is not None - assert (trace.robot_position.x, trace.robot_position.y) == (169, 0) + assert trace.heading == 169 + assert trace.points == [] + assert trace.robot_position is None def test_parse_trace_packet_real_session_path() -> None: - """A real 15-point packet (corridor run) decodes the full accumulated path. + """A real 14-point packet (corridor run) decodes the full accumulated path. Captured live from an R1: the same session emitted packets of 1, then 3, - then 15 points, proving the path accumulates rather than reporting only the - current position. The most recent point is the current robot position. + then 14 points, proving the path accumulates rather than reporting only the + current position. The most recent point is the current robot position, and + the header carries the robot heading (bytes 10-11). """ trace = parse_trace_packet(TRACE_SESSION_FIXTURE.read_bytes()) points = [(p.x, p.y) for p in trace.points] - assert len(points) == 15 - assert points[0] == (-34, 0) # oldest + assert len(points) == 14 + assert trace.heading == -34 # robot orientation, not a path point + assert points[0] == (41, 64) # oldest real point (from byte 14) assert points[-1] == (276, -1) # most recent == current position - # After the initial repositioning, x marches steadily down the corridor. - tail_x = [p[0] for p in points[2:]] + # After the initial reposition, x marches steadily down the corridor. + tail_x = [p[0] for p in points[1:]] assert tail_x == sorted(tail_x) - assert points[-1][0] - points[0][0] > 300 # spans the corridor + assert points[-1][0] - points[0][0] > 200 # spans the corridor assert trace.robot_position is not None assert (trace.robot_position.x, trace.robot_position.y) == (276, -1) @@ -222,6 +231,7 @@ def test_parse_trace_packet_multi_point() -> None: """A multi-point packet decodes all points; position is the most recent.""" trace = parse_trace_packet(TRACE_MULTI_FIXTURE.read_bytes()) assert [(p.x, p.y) for p in trace.points] == [(100, 200), (150, 250), (-50, 300)] + assert trace.heading == 45 # Signed coordinates are supported (negative x). assert trace.robot_position is not None assert (trace.robot_position.x, trace.robot_position.y) == (-50, 300) @@ -243,18 +253,25 @@ def test_parse_trace_keeps_genuine_first_point() -> None: assert [(p.x, p.y) for p in trace.points] == points -def test_parse_trace_session_keeps_initial_reposition() -> None: - """The real corridor capture has a 4.8x first step -- well under the 20x cut.""" +def test_parse_trace_reads_heading_not_a_leading_point() -> None: + """The real corridor capture's heading word is decoded as orientation. + + Bytes 10-11 (``ff de`` == -34) are the SLAM heading, not a path point, so the + first genuine point ``(41, 64)`` is kept and no spurious near-origin point is + introduced (the old 10-byte header surfaced ``(-34, 0)`` here). + """ trace = parse_trace_packet(TRACE_SESSION_FIXTURE.read_bytes()) - assert len(trace.points) == 15 - assert (trace.points[0].x, trace.points[0].y) == (-34, 0) + assert trace.heading == -34 + assert len(trace.points) == 14 + assert (trace.points[0].x, trace.points[0].y) == (41, 64) def test_parse_trace_empty_path_has_no_position() -> None: - header_only = b"\x02\x01" + b"\x00" * 8 # 10-byte header, no points + header_only = b"\x02\x01" + b"\x00" * 12 # 14-byte header, no points trace = parse_trace_packet(header_only) assert trace.points == [] assert trace.robot_position is None + assert trace.heading == 0 def test_parse_trace_rejects_non_trace_packet() -> None: @@ -264,7 +281,7 @@ def test_parse_trace_rejects_non_trace_packet() -> None: def test_parse_trace_rejects_misaligned_points() -> None: with pytest.raises(RoborockException, match="not 4-byte"): - parse_trace_packet(b"\x02\x01" + b"\x00" * 8 + b"\x01\x02\x03") + parse_trace_packet(b"\x02\x01" + b"\x00" * 12 + b"\x01\x02\x03") def test_parse_rejects_bad_layout_length() -> None: @@ -272,3 +289,114 @@ def test_parse_rejects_bad_layout_length() -> None: payload[27:29] = (0xFFFF).to_bytes(2, "big") # compressed length past the buffer with pytest.raises(RoborockException, match="invalid layout block length"): parse_map_packet(bytes(payload)) + + +def test_parse_erase_zones_from_map_packet_tail() -> None: + """Erase rectangles appended after the grid decode to world polygons.""" + rects = [ + [(100, 200), (300, 200), (300, 50), (100, 50)], + [(-40, -10), (10, -10), (10, -60), (-40, -60)], + ] + tail = bytes([len(rects), 4]) + for rect in rects: + for x, y in rect: + tail += int.to_bytes(x & 0xFFFF, 2, "big") + int.to_bytes(y & 0xFFFF, 2, "big") + packet = parse_map_packet(FIXTURE.read_bytes() + tail) + assert [z.vertices for z in packet.erase_zones] == rects # incl. signed coords + + +def test_parse_map_packet_without_erase_tail() -> None: + assert parse_map_packet(FIXTURE.read_bytes()).erase_zones == [] + + +def _carpet_tail(width: int, height: int, carpet: bytes, erase: bytes = bytes([0, 0])) -> bytes: + """Build a packet tail: an erase section followed by a carpet-mask section.""" + block = _literal_lz4_block(carpet) + return erase + (width * height).to_bytes(4, "big") + len(block).to_bytes(2, "big") + block + + +def test_parse_carpet_mask_from_map_packet_tail() -> None: + """A carpet mask after the erase section decodes to a same-dims grid. + + Framing confirmed byte-exact on live ss07 captures (R1 / RDC): the carpet + mask is a second LZ4 grid (``[u32 uncompressed][u16 compressed][block]``) + the same size as the main grid, with non-zero cells marking carpet. + """ + width, height = 8, 6 # the fixture's dimensions + carpet = bytearray(width * height) + carpet[10] = 4 # one carpet cell (kind 4) + carpet[20] = 3 # another carpet kind + # A non-empty erase section in front, so the carpet offset must skip it. + erase = bytes([1, 4]) + b"".join( + int.to_bytes(v & 0xFFFF, 2, "big") for x, y in [(0, 0), (4, 0), (4, 4), (0, 4)] for v in (x, y) + ) + packet = parse_map_packet(FIXTURE.read_bytes() + _carpet_tail(width, height, bytes(carpet), erase)) + + assert len(packet.erase_zones) == 1 # erase still parses + assert packet.carpet_mask == bytes(carpet) + map_data = B01Q10MapParser().parsed_from_packet(packet).map_data + assert map_data is not None + assert map_data.carpet_map == {10, 20} # flat grid indices of the carpet cells + + +def test_parse_map_packet_without_carpet() -> None: + assert parse_map_packet(FIXTURE.read_bytes()).carpet_mask is None + + +def test_carpet_mask_ignored_when_uncompressed_len_mismatches() -> None: + """If the section doesn't line up (uncompressed_len != w*h) carpet is dropped.""" + carpet = bytes([4] * 48) + block = _literal_lz4_block(carpet) + tail = bytes([0, 0]) + (999).to_bytes(4, "big") + len(block).to_bytes(2, "big") + block + assert parse_map_packet(FIXTURE.read_bytes() + tail).carpet_mask is None + + +def _calibrated_map_payload( + width: int, + height: int, + decoded_layout: bytes, + *, + origin: tuple[int, int], + resolution: int = 5, + charger: tuple[int, int, int] = (0, 0, 0), +) -> bytes: + """A map packet with the grid-frame header calibration fields populated.""" + payload = bytearray(_full_header_map_payload(width, height, decoded_layout)) + payload[11:13] = origin[0].to_bytes(2, "big", signed=True) + payload[13:15] = origin[1].to_bytes(2, "big", signed=True) + payload[15:17] = resolution.to_bytes(2, "big") + payload[17:19] = charger[0].to_bytes(2, "big", signed=True) + payload[19:21] = charger[1].to_bytes(2, "big", signed=True) + payload[21:23] = charger[2].to_bytes(2, "big", signed=True) + return bytes(payload) + + +def test_parse_header_calibration_fields() -> None: + """The 01 01 header's calibration fields decode to a usable origin (ss07).""" + grid = bytes([8]) * 6 + bytes([12]) * 6 # 4x3 grid, two rooms + layout = grid + b"\x01\x02" + _room_record(2, "rr_kitchen") + _room_record(3, "den") + payload = _calibrated_map_payload(4, 3, layout, origin=(-3760, 1920), resolution=5, charger=(-50, 30, 180)) + cal = parse_map_packet(payload).header_calibration + assert cal is not None + assert (cal.origin_x, cal.origin_y) == (-3760, 1920) + assert cal.resolution == 5 + assert (cal.charger_x, cal.charger_y, cal.charger_phi) == (-50, 30, 180) + assert not cal.is_keepalive + # 5 mm units / (50 mm/px) -> divide by 10 for grid pixels. + assert cal.origin_pixels() == (-376.0, 192.0) + + +def test_parse_header_calibration_keepalive_has_no_origin() -> None: + """A null/keepalive frame (x_min == y_min == 0) yields no usable origin.""" + grid = bytes([8]) * 6 + bytes([12]) * 6 + layout = grid + b"\x01\x02" + _room_record(2, "rr_kitchen") + _room_record(3, "den") + cal = parse_map_packet(_calibrated_map_payload(4, 3, layout, origin=(0, 0))).header_calibration + assert cal is not None + assert cal.is_keepalive + assert cal.origin_pixels() is None + + +def test_real_fixture_header_calibration_is_keepalive() -> None: + """The synthetic fixture carries no header origin, so callers fall back to a fit.""" + cal = parse_map_packet(FIXTURE.read_bytes()).header_calibration + assert cal is not None and cal.is_keepalive diff --git a/tests/map/test_b01_q10_overlays.py b/tests/map/test_b01_q10_overlays.py index 5af2acc8..1d13eb62 100644 --- a/tests/map/test_b01_q10_overlays.py +++ b/tests/map/test_b01_q10_overlays.py @@ -7,6 +7,7 @@ ZONE_TYPE_NO_MOP, ZONE_TYPE_THRESHOLD, ZONE_TYPE_VIRTUAL_WALL, + parse_virtual_wall_blob, parse_zone_blob, ) @@ -71,3 +72,106 @@ def test_parse_zone_blob_real_record_size_inferred() -> None: rect = _rect(ZONE_TYPE_NO_GO, [(100, 200), (300, 200), (300, 50), (100, 50)]) zones = parse_zone_blob(_blob(1, [rect], record_size=38)) assert len(zones) == 1 and zones[0].vertices[0] == (100, 200) + + +def test_parse_zone_blob_real_rdc_three_no_go() -> None: + """Real DP-55 read-back from our RDC ss07 with three No-Go Zones drawn. + + Captured live; exercises the 38-byte slot walk at count=3 against actual + device bytes (all type 0 = no-go). + """ + blob = ( + "AQMABP9A9ToAEPU6ABDzzv9A884AAAAAAAAAAAAAAAAAAAAAAAAAAAAE/Rb/mv5z/5r+c/3L/Rb9ywAAAA" + "AAAAAAAAAAAAAAAAAAAAAAAAQDpADEB3UAxAd1/GMDpPxjAAAAAAAAAAAAAAAAAAAAAAAAAAA=" + ) + zones = parse_zone_blob(blob) + assert [z.type for z in zones] == [ZONE_TYPE_NO_GO, ZONE_TYPE_NO_GO, ZONE_TYPE_NO_GO] + assert zones[2].vertices == [(932, 196), (1909, 196), (1909, -925), (932, -925)] + + +def test_parse_virtual_wall_blob_real_capture() -> None: + """Real DP-57 read-back from an ss07 (one wall drawn in the app). + + Frame is ``[count]`` + 8-byte ``(x, y)`` records -- no version byte, and the + same coordinate order as the restricted zones (first wire word = x). + Provenance: PR #850 review thread. + """ + walls = parse_virtual_wall_blob("Aflu+cX87PoO") + assert len(walls) == 1 + assert walls[0].type == ZONE_TYPE_VIRTUAL_WALL + assert walls[0].vertices == [(-1682, -1595), (-788, -1522)] + + +def test_parse_virtual_wall_blob_real_r1_horizontal() -> None: + """Real DP-57 read-back from the R1, ground-truthed against the app. + + The wall was drawn horizontally just below the Kids bedroom; it reads back + with x varying (377 -> 951) and y constant (-83), i.e. horizontal. This is + the capture that settled DP 57's axis order: an earlier revision swapped the + axes and would have placed this wall vertical (transposed 90 degrees). + """ + walls = parse_virtual_wall_blob("AQF5/60Dt/+t") + assert [(w.type, w.vertices) for w in walls] == [ + (ZONE_TYPE_VIRTUAL_WALL, [(377, -83), (951, -83)]), + ] + + +def test_parse_virtual_wall_blob_real_r1_mixed_orientation() -> None: + """Real DP-57 read-back from the R1 with one horizontal + one vertical wall. + + Ground-truthed against the app: a wall drawn horizontally and another drawn + (near-)vertically. They decode as such -- the first with y constant, the + second with x near-constant (the 4-unit x drift matches the wall not being + drawn perfectly vertical). This pins DP 57's axis order for *both* + orientations at once, so a mixed pair can't come back transposed. + """ + walls = parse_virtual_wall_blob("AgNyBGMFsARjAf8FAAH7CnE=") + assert [(w.type, w.vertices) for w in walls] == [ + (ZONE_TYPE_VIRTUAL_WALL, [(882, 1123), (1456, 1123)]), # horizontal: y constant + (ZONE_TYPE_VIRTUAL_WALL, [(511, 1280), (507, 2673)]), # vertical: x near-constant + ] + + +def test_parse_virtual_wall_blob_real_rdc_two_walls() -> None: + """Real DP-57 read-back from our RDC ss07 with two Invisible Walls drawn. + + Captured live after drawing the walls in the official app; the old + parse_zone_blob silently returned [] for this exact blob (the regression + this fix addresses). + """ + walls = parse_virtual_wall_blob("AgAz/QMCDv0D/83+Dv/O/OY=") + assert [(w.type, w.vertices) for w in walls] == [ + (ZONE_TYPE_VIRTUAL_WALL, [(51, -765), (526, -765)]), + (ZONE_TYPE_VIRTUAL_WALL, [(-51, -498), (-50, -794)]), + ] + + +def test_parse_virtual_wall_blob_empty_variants() -> None: + assert parse_virtual_wall_blob(None) == [] + assert parse_virtual_wall_blob(b"\x00") == [] # device's "no walls" sentinel + assert parse_virtual_wall_blob("AA==") == [] # base64 of 0x00 + + +def test_parse_virtual_wall_blob_multiple_walls() -> None: + """Two walls back-to-back; each is a separate 8-byte (x, y) record.""" + wall_a = bytes.fromhex("000a0014001e0028") # (x,y)=(10,20)->(30,40) + wall_b = bytes.fromhex("fffb0005fff6000a") # (x,y)=(-5,5)->(-10,10) + walls = parse_virtual_wall_blob(bytes([2]) + wall_a + wall_b) + assert [w.vertices for w in walls] == [[(10, 20), (30, 40)], [(-5, 5), (-10, 10)]] + + +def test_parse_virtual_wall_blob_truncated_record_dropped() -> None: + """A trailing record shorter than 8 bytes is dropped, not misread.""" + blob = bytes([2]) + bytes([0x00, 0x0A, 0x00, 0x14, 0x00, 0x1E, 0x00, 0x28]) + b"\x00\x00" + walls = parse_virtual_wall_blob(blob) + assert [w.vertices for w in walls] == [[(10, 20), (30, 40)]] + + +def test_zone_parser_misframes_virtual_wall_blob() -> None: + """The restricted-zone parser must NOT be used on DP 57 -- it mis-frames it. + + Regression guard for the original bug: a DP-57 blob fed to parse_zone_blob + reads the leading 0x01 as a version and the next coordinate byte as a record + count, yielding garbage (here: nothing), so DP 57 needs its own decoder. + """ + assert parse_zone_blob("Aflu+cX87PoO") != parse_virtual_wall_blob("Aflu+cX87PoO") diff --git a/tests/map/test_b01_q10_render.py b/tests/map/test_b01_q10_render.py new file mode 100644 index 00000000..927aa2bc --- /dev/null +++ b/tests/map/test_b01_q10_render.py @@ -0,0 +1,206 @@ +"""Tests for composing a Q10 map packet + overlays into a rendered result. + +The pixel-level machinery (erase blanking, world->pixel overlay placement, path +drawing, calibration fitting) lives in ``b01_q10_render``; these exercise it with +explicit calibrations so the geometry is deterministic. The map trait's own tests +cover the state management that drives this module. +""" + +import io +from dataclasses import replace +from pathlib import Path + +from PIL import Image + +from roborock.map.b01_grid_layers import GridCalibration +from roborock.map.b01_q10_map_parser import ( + B01Q10MapParserConfig, + Q10EraseZone, + Q10HeaderCalibration, + Q10MapPacket, + Q10Point, + parse_map_packet, +) +from roborock.map.b01_q10_overlays import ZONE_TYPE_NO_GO, ZONE_TYPE_NO_MOP, Q10Zone +from roborock.map.b01_q10_render import ( + _Q10_RESOLUTIONS, + Q10MapRender, + draw_path_on_map, + render_q10_map, + solve_q10_calibration, +) + +FIXTURE = Path("tests/map/testdata/b01_q10_map.bin") +CONFIG = B01Q10MapParserConfig() + +# identity-ish calibration used across the geometry tests: world (x, y) -> grid +# pixel (x, 5 - y) over the fixture's 8x6 grid (top-down, no flip). +IDENTITY = GridCalibration(resolution=1.0, origin_x=0.0, origin_y=5.0, y_sign=1) + + +def _packet() -> Q10MapPacket: + return parse_map_packet(FIXTURE.read_bytes()) + + +def _render( + packet: Q10MapPacket | None = None, + *, + calibration: GridCalibration | None = None, + path: list[Q10Point] | None = None, + robot_position: Q10Point | None = None, + zones: list[Q10Zone] | None = None, + virtual_walls: list[Q10Zone] | None = None, +) -> Q10MapRender: + return render_q10_map( + packet if packet is not None else _packet(), + calibration=calibration, + path=path or [], + robot_position=robot_position, + zones=zones or [], + virtual_walls=virtual_walls or [], + config=CONFIG, + ) + + +def _floor_world_points(layers, cal: GridCalibration, count: int) -> list[Q10Point]: + """``count`` world points lying on the map's floor under ``cal``.""" + floor = [ + (px, py) + for py in range(layers.height) + for px in range(layers.width) + if layers.cell_class(layers.grid[py * layers.width + px]) == "floor" + ] + return [Q10Point(*(int(v) for v in cal.pixel_to_world(px, py))) for px, py in floor[:count]] + + +def test_render_base_map_without_calibration() -> None: + """Without a calibration only the base raster/layers/rooms are produced.""" + render = _render() + assert render.image_content[:8] == b"\x89PNG\r\n\x1a\n" + assert render.map_data is not None + assert render.calibration is None + assert {room.id: room.name for room in render.rooms} == {2: "Living Room", 3: "Bedroom"} + assert render.layers.class_counts.get("floor") == 26 + # Overlays are world-coordinate only, so nothing is placed yet. + assert render.map_data.path is None + + +def test_render_places_path_and_position() -> None: + """A calibration places the path + robot position onto MapData in pixels.""" + path = [Q10Point(1, 2), Q10Point(3, 2)] + render = _render(calibration=IDENTITY, path=path, robot_position=Q10Point(3, 2)) + assert render.map_data.path is not None + assert render.map_data.vacuum_position is not None + # world (3, 2) -> grid pixel (3, 5 - 2) = (3, 3) + assert (render.map_data.vacuum_position.x, render.map_data.vacuum_position.y) == (3.0, 3.0) + + +def test_render_places_zones_and_charger() -> None: + """Decoded no-go / no-mop zones become pixel-space MapData areas + charger.""" + zones = [ + Q10Zone(type=ZONE_TYPE_NO_GO, vertices=[(0, 0), (4, 0), (4, 4), (0, 4)]), + Q10Zone(type=ZONE_TYPE_NO_MOP, vertices=[(1, 1), (2, 1), (2, 2), (1, 2)]), + ] + render = _render(calibration=IDENTITY, path=[Q10Point(1, 1)], zones=zones) + assert len(render.map_data.no_go_areas or []) == 1 + assert len(render.map_data.no_mopping_areas or []) == 1 + # charger = path origin in pixels: (1, 5 - 1) = (1, 4) + assert render.map_data.charger is not None + assert (render.map_data.charger.x, render.map_data.charger.y) == (1.0, 4.0) + + +def test_render_applies_erase_zones() -> None: + """With a calibration, erase-zone cells are blanked from layers + image.""" + base = _render() + before_floor = base.layers.class_counts.get("floor") + assert before_floor and before_floor > 0 + + # A rectangle covering the whole grid in world coords erases every cell. + packet = replace(_packet(), erase_zones=[Q10EraseZone(vertices=[(0, 0), (7, 0), (7, 5), (0, 5)])]) + render = _render(packet, calibration=IDENTITY) + + assert render.layers.class_counts.get("floor", 0) == 0 # all floor erased + assert render.image_content != base.image_content # re-rendered + + +def test_render_partial_erase() -> None: + """An erase rectangle only blanks the cells it covers, leaving the rest.""" + before_floor = _render().layers.class_counts.get("floor", 0) + + # Cover only the top two grid rows (pixel y 0..1 -> world y 4..5). + packet = replace(_packet(), erase_zones=[Q10EraseZone(vertices=[(0, 4), (7, 4), (7, 5), (0, 5)])]) + render = _render(packet, calibration=IDENTITY) + + after_floor = render.layers.class_counts.get("floor", 0) + assert 0 < after_floor < before_floor # some, not all, floor removed + + +def test_draw_path_on_map_draws_position() -> None: + """The robot position is drawn at the mapped pixel.""" + path = [Q10Point(1, 2), Q10Point(3, 2)] + render = _render(calibration=IDENTITY, path=path, robot_position=Q10Point(3, 2)) + png = draw_path_on_map( + render, + config=CONFIG, + path=path, + robot_position=Q10Point(3, 2), + robot_heading=None, + zones=[], + virtual_walls=[], + position_color=(255, 211, 0, 255), + ) + img = Image.open(io.BytesIO(png)).convert("RGBA") + # world (3, 2) -> grid pixel (3, 3) -> image (12, 12) at scale 4 (no flip). + assert img.size == (8 * 4, 6 * 4) + assert img.getpixel((12, 12)) == (255, 211, 0, 255) + + +def test_draw_path_on_map_draws_heading_indicator() -> None: + """A known heading draws a facing tick from the robot marker. + + With heading 0 (= +x world) and the identity-ish calibration, the tick + extends to the right of the robot pixel; with the marker at image (12, 12) + the tick covers pixels at x > 12 along y == 12. + """ + path = [Q10Point(1, 2), Q10Point(3, 2)] + render = _render(calibration=IDENTITY, path=path, robot_position=Q10Point(3, 2)) + png = draw_path_on_map( + render, + config=CONFIG, + path=path, + robot_position=Q10Point(3, 2), + robot_heading=0, # facing +x + zones=[], + virtual_walls=[], + position_color=(255, 211, 0, 255), + ) + img = Image.open(io.BytesIO(png)).convert("RGBA") + # tick runs +x from the marker (4 * radius = 16 px at scale 4) + assert img.getpixel((20, 12)) == (255, 211, 0, 255) + # ...and not behind it (the marker is a small disc; sample well to the left) + assert img.getpixel((4, 12)) != (255, 211, 0, 255) + + +def test_solve_q10_calibration_uses_header_origin_with_short_path() -> None: + """A grid-frame header origin lets a short path calibrate (origin is exact).""" + layers = _render().layers + # Header origin in 5 mm units -> pixel origin (0, 5); not a keepalive frame. + header = Q10HeaderCalibration(origin_x=0, origin_y=50, resolution=5, charger_x=0, charger_y=0, charger_phi=0) + true = GridCalibration(resolution=20.0, origin_x=0.0, origin_y=5.0, y_sign=1) + path = _floor_world_points(layers, true, 6) + assert len(path) < 20 # far too short for the full origin+resolution fit + + cal = solve_q10_calibration(layers, header, path) + assert cal is not None + # Origin comes straight from the header (exact); only the resolution is fit, + # so it lands on one of the candidates (the exact pick is grid-quantized). + assert (cal.origin_x, cal.origin_y) == (0.0, 5.0) + assert cal.resolution in _Q10_RESOLUTIONS + + +def test_solve_q10_calibration_short_path_without_header_returns_none() -> None: + """Without a header origin a short path is too sparse for the full fit.""" + layers = _render().layers + true = GridCalibration(resolution=10.0, origin_x=0.0, origin_y=5.0, y_sign=1) + path = _floor_world_points(layers, true, 6) + assert solve_q10_calibration(layers, None, path) is None diff --git a/tests/map/testdata/b01_q10_trace_multi.bin b/tests/map/testdata/b01_q10_trace_multi.bin index 8377e6c0..c388ab0c 100644 Binary files a/tests/map/testdata/b01_q10_trace_multi.bin and b/tests/map/testdata/b01_q10_trace_multi.bin differ diff --git a/tests/protocols/test_b01_q10_protocol.py b/tests/protocols/test_b01_q10_protocol.py index e3617709..f5d916d1 100644 --- a/tests/protocols/test_b01_q10_protocol.py +++ b/tests/protocols/test_b01_q10_protocol.py @@ -54,7 +54,10 @@ def test_decode_message_trace_packet() -> None: message = _message(TRACE_FIXTURE.read_bytes(), RoborockMessageProtocol.MAP_RESPONSE) decoded = decode_message(message) assert isinstance(decoded, Q10TracePacket) - assert [(p.x, p.y) for p in decoded.points] == [(169, 0)] + # This docked capture carries only a heading (no path points); see the + # parser tests for the full byte-level decode. + assert decoded.points == [] + assert decoded.heading == 169 def test_decode_message_unknown_map_marker_returns_none() -> None: