diff --git a/roborock/cli.py b/roborock/cli.py index e655ac6e..7482e6e5 100644 --- a/roborock/cli.py +++ b/roborock/cli.py @@ -541,14 +541,19 @@ async def _await_q10_map_push( timeout: float = _Q10_MAP_PUSH_TIMEOUT, allow_cached_on_timeout: bool = False, ) -> bool: - """Nudge a Q10 to push its map/trace and wait for a fresh update. + """Nudge a Q10 to push its map/trace and wait until ``predicate`` holds. The Q10 map API is entirely push-driven: there is no synchronous get-map request. A ``dpRequestDps`` causes the device to publish a ``MAP_RESPONSE``, which the device's subscribe loop feeds into the map trait. Here we register - an update listener, send the request, and wait for a newly pushed update to - satisfy ``predicate``. Returns whether it did within ``timeout``. + an update listener, send the request, and wait for the pushed data to satisfy + ``predicate``. Returns whether it did within ``timeout``. When the predicate + already holds against cached content we return immediately without nudging. + If ``allow_cached_on_timeout`` is set, a timeout still returns ``True`` when + the predicate holds against the previously cached content. """ + if predicate(): + return True loop = asyncio.get_running_loop() updated: asyncio.Future[None] = loop.create_future() @@ -596,6 +601,59 @@ async def map_image(ctx, device_id: str, output_file: str): click.echo("No map image content available.") +@session.command() +@click.option("--device_id", required=True) +@click.option("--output-dir", default=None, help="If set, write one transparent PNG per layer here.") +@click.pass_context +@async_command +async def q10_map_layers(ctx, device_id: str, output_dir: str | None): + """List the Q10 map's separable layers (background/wall/floor/per-room). + + With --output-dir, also exports each layer as a transparent PNG that can be + stacked in a frontend (background, then floor, then walls, then each room). + """ + import os + + context: RoborockContext = ctx.obj + device_manager = await context.get_device_manager() + device = await device_manager.get_device(device_id) + if device.b01_q10_properties is None: + click.echo("Feature not supported by device") + return + properties = device.b01_q10_properties + await _await_q10_map_push(properties, lambda: properties.map.layers is not None) + layers = properties.map.layers + if layers is None: + click.echo("No map layers available.") + return + + summary = { + "size": {"width": layers.width, "height": layers.height}, + "class_counts": layers.class_counts, + "rooms": [ + {"id": r.id, "name": r.name, "pixel_count": r.pixel_count, "bbox": list(r.bbox)} for r in layers.rooms + ], + } + click.echo(dump_json(summary)) + + if output_dir: + os.makedirs(output_dir, exist_ok=True) + exports = { + "background": layers.render_class("background", (210, 210, 215, 255), scale=2), + "floor": layers.render_class("floor", (70, 170, 95, 200), scale=2), + "wall": layers.render_class("wall", (20, 20, 25, 255), scale=2), + } + for name, png in exports.items(): + with open(os.path.join(output_dir, f"layer_{name}.png"), "wb") as f: + f.write(png) + for room in layers.rooms: + png = layers.render_room(room.id, (90, 140, 220, 200), scale=2) + safe = "".join(c if c.isalnum() else "_" for c in room.name) or f"room{room.id}" + with open(os.path.join(output_dir, f"room_{room.id}_{safe}.png"), "wb") as f: + f.write(png) + click.echo(f"Wrote {3 + len(layers.rooms)} layer PNGs to {output_dir}") + + @session.command() @click.option("--device_id", required=True) @click.option("--include_path", is_flag=True, default=False, help="Include path data in the output.") @@ -655,6 +713,42 @@ async def q10_position(ctx, device_id: str, include_path: bool): click.echo(dump_json(summary)) +@session.command() +@click.option("--device_id", required=True) +@click.option("--output-file", required=True, help="Path to save the map image with the path drawn.") +@click.pass_context +@async_command +async def q10_map_with_path(ctx, device_id: str, output_file: str): + """Render the Q10 map with the current cleaning path + robot position drawn. + + Needs the robot to be actively cleaning (the path/calibration come from the + live trace). Fetches the map and the path, solves the world<->pixel + calibration, and writes the annotated PNG. + """ + context: RoborockContext = ctx.obj + device_manager = await context.get_device_manager() + device = await device_manager.get_device(device_id) + if device.b01_q10_properties is None: + click.echo("Feature not supported by device") + return + properties = device.b01_q10_properties + map_trait = properties.map + await _await_q10_map_push(properties, lambda: map_trait.image_content is not None) + got_path = await _await_q10_map_push(properties, lambda: bool(map_trait.path)) + if not got_path: + click.echo("No live path available (the robot only reports its path while cleaning).") + return + try: + image = map_trait.render_path_on_map() + except RoborockException as err: + click.echo(f"Could not render path on map: {err}") + return + with open(output_file, "wb") as f: + f.write(image) + cal = map_trait.calibration + click.echo(f"Saved map with {len(map_trait.path)}-point path to {output_file} (calibration: {cal})") + + @session.command() @click.option("--device_id", required=True) @click.pass_context diff --git a/roborock/data/b01_q10/b01_q10_containers.py b/roborock/data/b01_q10/b01_q10_containers.py index 393eb231..86154b6b 100644 --- a/roborock/data/b01_q10/b01_q10_containers.py +++ b/roborock/data/b01_q10/b01_q10_containers.py @@ -108,3 +108,7 @@ class Q10Status(RoborockBase): back_type: YXBackType | None = field(default=None, metadata={"dps": B01_Q10_DP.BACK_TYPE}) cleaning_progress: int | None = field(default=None, metadata={"dps": B01_Q10_DP.CLEAN_PROGRESS}) fault: int | None = field(default=None, metadata={"dps": B01_Q10_DP.FAULT}) + # Raw base64 map-overlay blobs (decoded by roborock.map.b01_q10_overlays). + restricted_zone_up: str | None = field(default=None, metadata={"dps": B01_Q10_DP.RESTRICTED_ZONE_UP}) + virtual_wall_up: str | None = field(default=None, metadata={"dps": B01_Q10_DP.VIRTUAL_WALL_UP}) + zoned_up: str | None = field(default=None, metadata={"dps": B01_Q10_DP.ZONED_UP}) diff --git a/roborock/devices/rpc/b01_q10_channel.py b/roborock/devices/rpc/b01_q10_channel.py index 49758326..50ec8e5a 100644 --- a/roborock/devices/rpc/b01_q10_channel.py +++ b/roborock/devices/rpc/b01_q10_channel.py @@ -2,42 +2,41 @@ import logging from collections.abc import AsyncGenerator +from typing import Any from roborock.data.b01_q10.b01_q10_code_mappings import B01_Q10_DP from roborock.devices.transport.mqtt_channel import MqttChannel from roborock.exceptions import RoborockException from roborock.protocols.b01_q10_protocol import ( ParamsType, - Q10Message, - decode_message, + decode_rpc_response, encode_mqtt_payload, ) _LOGGER = logging.getLogger(__name__) -async def stream_decoded_messages( +async def stream_decoded_responses( mqtt_channel: MqttChannel, -) -> AsyncGenerator[Q10Message, None]: - """Stream decoded Q10 messages received via MQTT. +) -> AsyncGenerator[dict[B01_Q10_DP, Any], None]: + """Stream decoded DPS messages received via MQTT. - Each pushed ``RoborockMessage`` is decoded into a typed :data:`Q10Message` - (a DPS status update, a map packet, or a trace packet). Messages that fail - to decode or carry an unrecognized payload are skipped. + Messages that are not decodable DPS responses (e.g. protocol-301 + ``MAP_RESPONSE`` map pushes) are skipped; callers that need the raw + messages should subscribe to :meth:`MqttChannel.subscribe_stream` directly. """ - async for message in mqtt_channel.subscribe_stream(): + async for response_message in mqtt_channel.subscribe_stream(): try: - decoded = decode_message(message) + decoded_dps = decode_rpc_response(response_message) except RoborockException as ex: _LOGGER.debug( - "Failed to decode B01 Q10 message: %s: %s", - message, + "Failed to decode B01 Q10 RPC response: %s: %s", + response_message, ex, ) continue - if decoded is not None: - yield decoded + yield decoded_dps async def send_command( diff --git a/roborock/devices/traits/b01/q10/__init__.py b/roborock/devices/traits/b01/q10/__init__.py index 799151a6..9aac42f7 100644 --- a/roborock/devices/traits/b01/q10/__init__.py +++ b/roborock/devices/traits/b01/q10/__init__.py @@ -4,11 +4,11 @@ import logging from roborock.data.b01_q10.b01_q10_code_mappings import B01_Q10_DP -from roborock.devices.rpc.b01_q10_channel import stream_decoded_messages from roborock.devices.traits import Trait from roborock.devices.transport.mqtt_channel import MqttChannel -from roborock.map.b01_q10_map_parser import Q10MapPacket, Q10TracePacket -from roborock.protocols.b01_q10_protocol import Q10DpsUpdate, Q10Message +from roborock.exceptions import RoborockException +from roborock.protocols.b01_q10_protocol import decode_rpc_response +from roborock.roborock_message import RoborockMessage from .command import CommandTrait from .map import MapContentTrait @@ -73,25 +73,40 @@ async def refresh(self) -> None: await self.command.send(B01_Q10_DP.REQUEST_DPS, params={}) async def _subscribe_loop(self) -> None: - """Persistent loop dispatching decoded messages to the read-model traits.""" - async for message in stream_decoded_messages(self._channel): + """Persistent loop dispatching pushed messages to the read-model traits.""" + async for message in self._channel.subscribe_stream(): self._handle_message(message) - def _handle_message(self, message: Q10Message) -> None: - """Route a single decoded message to the trait responsible for it. + def _handle_message(self, message: RoborockMessage) -> None: + """Route a single pushed message to the trait responsible for it. - Map and trace packets arrive as protocol-301 ``MAP_RESPONSE`` pushes (the - Q10 is entirely push-driven: there is no synchronous get-map request, a - ``dpRequestDps`` just nudges the device to publish its current map). DPS - updates feed the status trait. More traits can be dispatched here below. + Map/trace pushes arrive as protocol-301 ``MAP_RESPONSE`` messages (not + DPS), so they are handled separately from the status DPS stream. The Q10 + is entirely push-driven: there is no synchronous get-map request, the + device just publishes its current map (a ``dpRequestDps`` nudges it to). """ - if isinstance(message, Q10MapPacket): - self.map.update_from_map_packet(message) - elif isinstance(message, Q10TracePacket): - self.map.update_from_trace_packet(message) - elif isinstance(message, Q10DpsUpdate): - _LOGGER.debug("Received Q10 status update: %s", message.dps) - self.status.update_from_dps(message.dps) + if self.map.update_from_map_response(message): + return + + try: + decoded_dps = decode_rpc_response(message) + except RoborockException as ex: + _LOGGER.debug("Failed to decode Q10 RPC response: %s: %s", message, ex) + return + + _LOGGER.debug("Received Q10 status update: %s", decoded_dps) + # Notify all traits about a new message and each trait will + # only update what fields that it is responsible for. + # More traits can be added here below. + self.status.update_from_dps(decoded_dps) + + # Feed the map's vector-overlay data points (no-go zones / virtual + # walls) to the map trait so they are decoded as they arrive. + if B01_Q10_DP.RESTRICTED_ZONE_UP in decoded_dps or B01_Q10_DP.VIRTUAL_WALL_UP in decoded_dps: + self.map.load_overlays( + restricted_zone_up=decoded_dps.get(B01_Q10_DP.RESTRICTED_ZONE_UP), + virtual_wall_up=decoded_dps.get(B01_Q10_DP.VIRTUAL_WALL_UP), + ) def create(channel: MqttChannel) -> Q10PropertiesApi: diff --git a/roborock/devices/traits/b01/q10/map.py b/roborock/devices/traits/b01/q10/map.py index c132def4..259c94e9 100644 --- a/roborock/devices/traits/b01/q10/map.py +++ b/roborock/devices/traits/b01/q10/map.py @@ -4,39 +4,82 @@ trait is purely push-driven and mirrors the Q10 ``StatusTrait`` contract: - The device pushes its current map/path as protocol-301 ``MAP_RESPONSE`` - messages (a ``dpRequestDps`` nudges it to do so). The protocol layer decodes - those into :class:`Q10MapPacket` / :class:`Q10TracePacket` objects and the - ``Q10PropertiesApi`` subscribe loop routes them to - :meth:`MapContentTrait.update_from_map_packet` / - :meth:`MapContentTrait.update_from_trace_packet`. -- Those methods render/cache the content and notify update listeners (register - via :meth:`add_update_listener`). -- ``image_content``, ``map_data``, ``rooms``, ``path`` and ``robot_position`` - are readable and reflect the most recently pushed map. + messages (a ``dpRequestDps`` nudges it to do so). The ``Q10PropertiesApi`` + subscribe loop routes those messages to :meth:`MapContentTrait.update_from_map_response`. +- ``update_from_map_response`` parses the payload, updates the cached fields and + notifies update listeners (register via :meth:`add_update_listener`). +- ``parse_map_content()`` reparses the cached raw bytes without I/O. +- ``image_content``, ``map_data``, ``rooms``, ``path``, ``robot_position`` and + ``raw_api_response`` are readable and reflect the most recently pushed map. Unlike the Q7, the Q10 map payload is unencrypted, so no map key is required. """ +import io import logging +import math from dataclasses import dataclass, field -from vacuum_map_parser_base.map_data import MapData +from PIL import Image, ImageDraw +from vacuum_map_parser_base.map_data import Area, MapData, Path, Point, Wall from roborock.data import RoborockBase from roborock.devices.traits.common import TraitUpdateListener +from roborock.exceptions import RoborockException +from roborock.map.b01_grid_layers import ( + GridCalibration, + GridLayers, + solve_calibration, + solve_calibration_with_origin, +) from roborock.map.b01_q10_map_parser import ( B01Q10MapParser, B01Q10MapParserConfig, - Q10MapPacket, + Q10EraseZone, + Q10HeaderCalibration, Q10Point, Q10Room, - Q10TracePacket, + decompose_layers, + erased_packet, + parse_map_packet, + parse_trace_packet, +) +from roborock.map.b01_q10_overlays import ( + ZONE_TYPE_NO_GO, + ZONE_TYPE_NO_MOP, + Q10Zone, + parse_virtual_wall_blob, + parse_zone_blob, ) +from roborock.roborock_message import RoborockMessage, RoborockMessageProtocol _LOGGER = logging.getLogger(__name__) _TRUNCATE_LENGTH = 20 +# MAP_RESPONSE (protocol 301) payloads start with a 2-byte marker identifying the +# packet kind: a full map (``01 01``) or a live trace/path (``02 01``). +_MAP_PACKET_MARKER = b"\x01\x01" +_TRACE_PACKET_MARKER = b"\x02\x01" + +# Path-units-per-pixel candidates for calibration. A dense ss07 path lands a +# best fit of 20.0 around the header origin -- ground-truthed June 2026 on the +# R1: a corridor drive registered at 20 (matching the format author's +# independent "20 path-units/px"), and the dock->corridor span lined up with the +# ruler-measured 8.81 m corridor. With the header resolution=5 (50 mm/px grid) +# that makes one path-unit exactly 50/20 = 2.5 mm -- so a path-unit is NOT a +# millimetre (the open scale question). An earlier [10.0..18.0] range couldn't +# reach 20 (it railed at the bound), biasing the fit. A dense cleaning path +# selects the best fit within this bracket. +_Q10_RESOLUTIONS = [step * 0.5 for step in range(24, 53)] # 12.0 .. 26.0 +# A path needs enough shape to constrain a full (origin + resolution) fit; a few +# points cannot. +_MIN_CALIBRATION_POINTS = 20 +# When the grid-frame header supplies the origin, only the resolution is fit, so +# a much shorter path suffices to confirm it (early in a clean, not just a dense +# one). See :func:`solve_calibration_with_origin`. +_MIN_HEADER_CALIBRATION_POINTS = 4 + @dataclass class MapContent(RoborockBase): @@ -51,6 +94,10 @@ class MapContent(RoborockBase): rooms: list[Q10Room] = field(default_factory=list) """Rooms (segments) reported by the device, with ids and names.""" + layers: GridLayers | None = None + """Separable map layers (background / wall / floor / per-room) in grid-pixel + space, each renderable to a transparent PNG for frontend compositing.""" + path: list[Q10Point] = field(default_factory=list) """Full path of the current cleaning session (oldest point first). @@ -61,6 +108,36 @@ class MapContent(RoborockBase): robot_position: Q10Point | None = None """Current robot position (the most recent path point), if known.""" + robot_heading: int | None = None + """Current robot heading in degrees from the trace packet (``0`` = +x, + ``+90`` = +y, ``±180`` = −x, ``−90`` = −y), if a trace has been pushed.""" + + calibration: GridCalibration | None = None + """World<->pixel transform, solved from a cleaning path (see + :meth:`MapContentTrait.solve_calibration`). Required to place the path, + robot position and vector overlays onto the map raster.""" + + zones: list[Q10Zone] = field(default_factory=list) + """Restricted zones (no-go / no-mop) in world coordinates, from the device's + ``dpRestrictedZoneUp``. See :meth:`MapContentTrait.load_overlays`.""" + + virtual_walls: list[Q10Zone] = field(default_factory=list) + """Virtual walls (line segments) in world coordinates.""" + + erase_zones: list[Q10EraseZone] = field(default_factory=list) + """Erase areas (the app's *Erase* tool) in world coordinates, decoded from the + map packet tail. Once a calibration is available the cells inside them are + blanked from the rendered map and every layer (see :meth:`MapContentTrait`).""" + + raw_api_response: bytes | None = None + """Raw bytes of the map payload from the device (opaque blob for re-parsing).""" + + header_calibration: Q10HeaderCalibration | None = None + """Calibration read straight from the map packet's grid-frame header (ss07). + Supplies the world<->pixel origin without a fit, so :meth:`solve_calibration` + can calibrate from a short path instead of a dense clean. ``None`` if the + packet carried no header calibration or it was a keepalive frame.""" + def __repr__(self) -> str: img = self.image_content if img and len(img) > _TRUNCATE_LENGTH: @@ -72,9 +149,8 @@ class MapContentTrait(MapContent, TraitUpdateListener): """Trait holding the most recently pushed parsed map content for Q10 devices. The Q10 has no synchronous get-map request; the device pushes map and trace - packets, which the protocol layer decodes and the ``Q10PropertiesApi`` - subscribe loop feeds into :meth:`update_from_map_packet` / - :meth:`update_from_trace_packet`. Consumers read the cached fields and/or + packets, which the ``Q10PropertiesApi`` subscribe loop feeds into + :meth:`update_from_map_response`. Consumers read the cached fields and/or register a callback with :meth:`add_update_listener` to be notified when new map content arrives. """ @@ -88,23 +164,292 @@ def __init__( TraitUpdateListener.__init__(self, logger=_LOGGER) self._map_parser = B01Q10MapParser(map_parser_config) - def update_from_map_packet(self, packet: Q10MapPacket) -> None: - """Render a pushed full-map packet into the cached image/rooms. + def update_from_map_response(self, message: RoborockMessage) -> bool: + """Update cached map/trace state from a pushed ``MAP_RESPONSE`` message. - Rendering failures are logged and skipped (listeners are not notified) so - a single bad push cannot tear down the subscribe loop. + Returns ``True`` if the message was a recognized Q10 map (``01 01``) or + trace (``02 01``) packet (so the caller can stop processing it), and + ``False`` otherwise. Update listeners are notified only when a packet is + parsed successfully. """ - parsed = self._map_parser.parse_packet(packet) + if message.protocol != RoborockMessageProtocol.MAP_RESPONSE or not message.payload: + return False + marker = message.payload[:2] + if marker == _MAP_PACKET_MARKER: + self.raw_api_response = message.payload + try: + self.parse_map_content() + except RoborockException as ex: + _LOGGER.debug("Failed to parse Q10 map packet: %s", ex) + return True + self._notify_update() + return True + if marker == _TRACE_PACKET_MARKER: + try: + trace = parse_trace_packet(message.payload) + except RoborockException as ex: + _LOGGER.debug("Failed to parse Q10 trace packet: %s", ex) + return True + self.path = trace.points + self.robot_position = trace.robot_position + self.robot_heading = trace.heading + self._notify_update() + return True + return False + + def parse_map_content(self) -> None: + """Reparse the cached raw map payload without performing any I/O.""" + if self.raw_api_response is None: + raise RoborockException("No map payload available; no map has been pushed yet") + + try: + parsed = self._map_parser.parse(self.raw_api_response) + packet = parse_map_packet(self.raw_api_response) + except RoborockException: + raise + except Exception as ex: + raise RoborockException("Failed to parse Q10 map data") from ex + if parsed.image_content is None: - _LOGGER.debug("Failed to render Q10 map image") - return + raise RoborockException("Failed to render Q10 map image") + self.image_content = parsed.image_content self.map_data = parsed.map_data self.rooms = packet.rooms - self._notify_update() + self.erase_zones = packet.erase_zones + self.header_calibration = packet.header_calibration + self.layers = decompose_layers(packet) + if self.calibration is not None: + self._apply_erase(self.calibration) + self._populate_map_data_overlays(self.calibration) + self._place_zones_on_map_data(self.calibration) + + def solve_calibration(self) -> GridCalibration | None: + """Fit and cache the world<->pixel calibration from the current path. + + When the map packet's grid-frame header carries a calibration origin + (ss07), only the resolution is fit -- around that fixed origin -- so a + short path suffices and the origin is exact rather than recovered by a + slide. Otherwise the full origin + resolution fit is used, which needs a + reasonably dense cleaning path. Both inputs arrive as device pushes (the + path is only populated during an active clean). Returns the calibration + (also stored on :attr:`calibration`), or ``None`` if there is no map or + the path is too short/featureless to fit. + """ + if self.layers is None: + return None + points: list[tuple[float, float]] = [(point.x, point.y) for point in self.path] + calibration = self._calibration_from_header(points) or self._calibration_from_fit(points) + if calibration is not None: + self.calibration = calibration + self._apply_erase(calibration) + self._populate_map_data_overlays(calibration) + self._place_zones_on_map_data(calibration) + return calibration + + def _calibration_from_header(self, points: list[tuple[float, float]]) -> GridCalibration | None: + """Calibrate around the header-supplied origin (resolution fit to a path).""" + if self.layers is None or self.header_calibration is None or len(points) < _MIN_HEADER_CALIBRATION_POINTS: + return None + origin = self.header_calibration.origin_pixels() + if origin is None: # keepalive frame -- no usable origin + return None + return solve_calibration_with_origin(self.layers, points, origin, resolutions=_Q10_RESOLUTIONS) + + def _calibration_from_fit(self, points: list[tuple[float, float]]) -> GridCalibration | None: + """Full origin + resolution fit; needs a reasonably dense path.""" + if self.layers is None or len(points) < _MIN_CALIBRATION_POINTS: + return None + return solve_calibration(self.layers, points, resolutions=_Q10_RESOLUTIONS) + + def load_overlays( + self, + *, + restricted_zone_up: bytes | str | None = None, + virtual_wall_up: bytes | str | None = None, + ) -> None: + """Decode the device's vector-overlay blobs (from the status DPs). + + Pass the raw ``dpRestrictedZoneUp`` / ``dpVirtualWallUp`` values + (``Q10Status.restricted_zone_up`` / ``virtual_wall_up``). Stores them as + world-coordinate :attr:`zones` / :attr:`virtual_walls`, and -- if a + calibration is available -- places them onto ``map_data`` as + ``no_go_areas`` / ``no_mopping_areas`` / ``walls`` in pixel space. + + ``None`` means "data point absent from this update" and leaves the + existing value untouched (a partial status push must not wipe overlays). + An explicit empty blob does clear them. + """ + if restricted_zone_up is not None: + self.zones = parse_zone_blob(restricted_zone_up) + if virtual_wall_up is not None: + self.virtual_walls = parse_virtual_wall_blob(virtual_wall_up) + if self.calibration is not None: + self._place_zones_on_map_data(self.calibration) + + def _place_zones_on_map_data(self, calibration: GridCalibration) -> None: + """Convert world-coordinate zones/walls into pixel-space MapData layers.""" + if self.map_data is None: + return + + def to_area(zone: Q10Zone) -> Area | None: + if len(zone.vertices) != 4: + return None # MapData.Area is a quad + pts = [calibration.world_to_pixel(x, y) for x, y in zone.vertices] + return Area(pts[0][0], pts[0][1], pts[1][0], pts[1][1], pts[2][0], pts[2][1], pts[3][0], pts[3][1]) + + no_go = [area for zone in self.zones if zone.type == ZONE_TYPE_NO_GO and (area := to_area(zone))] + no_mop = [area for zone in self.zones if zone.type == ZONE_TYPE_NO_MOP and (area := to_area(zone))] + self.map_data.no_go_areas = no_go or None + self.map_data.no_mopping_areas = no_mop or None + + walls: list[Wall] = [] + for zone in self.virtual_walls: + if len(zone.vertices) >= 2: + (x0, y0), (x1, y1) = zone.vertices[0], zone.vertices[1] + p0 = calibration.world_to_pixel(x0, y0) + p1 = calibration.world_to_pixel(x1, y1) + walls.append(Wall(p0[0], p0[1], p1[0], p1[1])) + self.map_data.walls = walls or None + + # The robot starts a session at its dock, so the path origin is the charger. + if self.path: + cx, cy = calibration.world_to_pixel(self.path[0].x, self.path[0].y) + self.map_data.charger = Point(cx, cy) + + def _erased_cells(self, calibration: GridCalibration) -> set[int]: + """Grid-cell indices covered by the erase zones (axis-aligned bbox fill).""" + if not self.erase_zones or self.layers is None: + return set() + width, height = self.layers.width, self.layers.height + cells: set[int] = set() + for zone in self.erase_zones: + pixels = [calibration.world_to_pixel(x, y) for x, y in zone.vertices] + xs = [p[0] for p in pixels] + ys = [p[1] for p in pixels] + x0, x1 = int(min(xs)), int(max(xs)) + y0, y1 = int(min(ys)), int(max(ys)) + for py in range(max(0, y0), min(height, y1 + 1)): + for px in range(max(0, x0), min(width, x1 + 1)): + cells.add(py * width + px) + return cells + + def _apply_erase(self, calibration: GridCalibration) -> None: + """Blank erase-zone cells out of the rendered map, layers, and ``map_data``. + + The erase rectangles are world-coordinate areas the user marked for removal + (e.g. phantom floor seen through windows). With a calibration we can place + them in pixel space, blank those cells to background, and re-render so the + phantom areas disappear -- matching what the app shows. + """ + if self.layers is None or self.raw_api_response is None: + return + cells = self._erased_cells(calibration) + if not cells: + return + packet = erased_packet(parse_map_packet(self.raw_api_response), cells) + parsed = self._map_parser.parsed_from_packet(packet) + self.image_content = parsed.image_content + self.map_data = parsed.map_data + self.layers = decompose_layers(packet) + + def _populate_map_data_overlays(self, calibration: GridCalibration) -> None: + """Fill MapData.path / vacuum_position in grid-pixel coords. + + Points are stored in grid-pixel space (origin top-left), matching the + Q10's top-down, un-flipped raster so they line up with the rendered image. + """ + if self.map_data is None: + return + pixels = [Point(*calibration.world_to_pixel(point.x, point.y)) for point in self.path] + self.map_data.path = Path(len(pixels), 1, 0, [pixels]) + if self.robot_position is not None: + px, py = calibration.world_to_pixel(self.robot_position.x, self.robot_position.y) + self.map_data.vacuum_position = Point(px, py) + + def render_path_on_map( + self, + *, + line_color: tuple[int, int, int, int] = (235, 64, 52, 255), + position_color: tuple[int, int, int, int] = (255, 211, 0, 255), + ) -> bytes: + """Return the map image (PNG) with the session path + robot position drawn. + + Solves the calibration on demand if not already cached. Raises + :class:`RoborockException` if there is no map, or no calibration can be + fitted (e.g. no cleaning path captured yet). + """ + if self.image_content is None or self.layers is None: + raise RoborockException("No map available; no map has been pushed yet") + calibration = self.calibration or self.solve_calibration() + if calibration is None: + raise RoborockException( + "No calibration available; a cleaning path must be captured (pushed) during a clean" + ) + + scale = self._map_parser.config.map_scale + base = Image.open(io.BytesIO(self.image_content)).convert("RGBA") + + def world_to_image(x: float, y: float) -> tuple[float, float]: + px, py = calibration.world_to_pixel(x, y) + # The ss07 grid renders top-down (no flip), so grid-pixel (px, py) maps + # straight to image space, only upscaled by ``scale``. + return (px * scale, py * scale) + + def to_image(point: Q10Point) -> tuple[float, float]: + return world_to_image(point.x, point.y) + + draw = ImageDraw.Draw(base, "RGBA") + + # Erase zones are applied to the raster itself (cells blanked), so they are + # not drawn here -- the base image already reflects them. + + # No-go (blue) and no-mop (magenta) zones beneath the path. + for zone in self.zones: + if len(zone.vertices) < 3: + continue + polygon = [world_to_image(x, y) for x, y in zone.vertices] + fill = (0, 120, 255, 70) if zone.type == ZONE_TYPE_NO_GO else (255, 0, 200, 70) + outline = (0, 80, 200, 255) if zone.type == ZONE_TYPE_NO_GO else (200, 0, 160, 255) + draw.polygon(polygon, fill=fill, outline=outline) + + # Virtual walls (line segments, not polygons) drawn over the zones. + for wall in self.virtual_walls: + if len(wall.vertices) < 2: + continue + draw.line( + [world_to_image(x, y) for x, y in wall.vertices[:2]], + fill=(255, 64, 64, 255), + width=max(2, scale), + ) - def update_from_trace_packet(self, packet: Q10TracePacket) -> None: - """Cache the path/robot position from a pushed trace packet.""" - self.path = packet.points - self.robot_position = packet.robot_position - self._notify_update() + if len(self.path) >= 2: + draw.line([to_image(point) for point in self.path], fill=line_color, width=max(1, scale // 2)) + if self.path: # path origin == dock / charger + dx, dy = to_image(self.path[0]) + draw.ellipse([dx - scale, dy - scale, dx + scale, dy + scale], outline=(40, 200, 40, 255), width=2) + if self.robot_position is not None: + cx, cy = to_image(self.robot_position) + radius = scale + draw.ellipse([cx - radius, cy - radius, cx + radius, cy + radius], fill=position_color) + if self.robot_heading is not None: + # Heading is world-space degrees (0 = +x, +90 = +y). Map a unit + # world-space facing vector through the same transform (so the + # Y-flip/scale match the marker), then normalize to a fixed + # pixel-length tick so it reads at any calibration resolution. + angle = math.radians(self.robot_heading) + hx, hy = world_to_image( + self.robot_position.x + math.cos(angle), + self.robot_position.y + math.sin(angle), + ) + norm = math.hypot(hx - cx, hy - cy) + if norm > 0: + tick = 4 * radius + draw.line( + [cx, cy, cx + (hx - cx) / norm * tick, cy + (hy - cy) / norm * tick], + fill=position_color, + width=max(1, scale // 2), + ) + buffer = io.BytesIO() + base.save(buffer, format="PNG") + return buffer.getvalue() diff --git a/roborock/devices/traits/b01/q7/map_content.py b/roborock/devices/traits/b01/q7/map_content.py index afc36d02..a3efeb27 100644 --- a/roborock/devices/traits/b01/q7/map_content.py +++ b/roborock/devices/traits/b01/q7/map_content.py @@ -17,7 +17,8 @@ from roborock.devices.rpc.b01_q7_channel import MapRpcChannel from roborock.devices.traits import Trait from roborock.exceptions import RoborockException -from roborock.map.b01_map_parser import B01MapParser, B01MapParserConfig +from roborock.map.b01_grid_layers import GridCalibration, GridLayers +from roborock.map.b01_map_parser import B01MapParser, B01MapParserConfig, decompose_q7_layers, q7_calibration from roborock.protocols.b01_q7_protocol import B01_Q7_DPS, Q7RequestMessage from roborock.roborock_typing import RoborockB01Q7Methods @@ -36,6 +37,16 @@ class MapContent(RoborockBase): map_data: MapData | None = None """Parsed map data (metadata for points on the map).""" + layers: GridLayers | None = None + """Separable map layers (background / wall / floor) in grid-pixel space. + + Q7's raster has no per-room segmentation, so ``layers.rooms`` is empty (room + ids/names are in the map metadata).""" + + calibration: GridCalibration | None = None + """World<->pixel transform, read directly from the SCMap ``mapHead`` + (``minX``/``minY``/``resolution``); world coordinates are in metres.""" + raw_api_response: bytes | None = None """Raw bytes of the map payload from the device. @@ -98,3 +109,9 @@ async def refresh(self) -> None: self.image_content = parsed_data.image_content self.map_data = parsed_data.map_data self.raw_api_response = raw_payload + try: + self.layers = decompose_q7_layers(raw_payload) + self.calibration = q7_calibration(raw_payload) + except RoborockException: + self.layers = None + self.calibration = None diff --git a/roborock/map/b01_grid_layers.py b/roborock/map/b01_grid_layers.py index 410d60fd..46026f17 100644 --- a/roborock/map/b01_grid_layers.py +++ b/roborock/map/b01_grid_layers.py @@ -200,6 +200,62 @@ def solve_calibration( return best[1] +def solve_calibration_with_origin( + layers: GridLayers, + points: list[tuple[float, float]], + origin: tuple[float, float], + *, + resolutions: Iterable[float], + y_signs: Iterable[int] = (1, -1), + min_on_floor: float = 0.5, +) -> GridCalibration | None: + """Fit resolution + Y orientation around a *known* grid-pixel origin. + + Unlike :func:`solve_calibration`, the pixel origin ``(ox, oy)`` is fixed -- + e.g. read straight from the Q10 grid-frame header -- so this only sweeps the + candidate ``resolutions`` and ``y_signs`` and keeps the placement landing the + most ``points`` on floor. With the expensive 2-D offset slide gone, far fewer + points are needed to confirm the fit, so it works from a short path rather + than a dense clean. Returns ``None`` if no candidate lands a ``min_on_floor`` + fraction of points on floor (e.g. the origin or points are bogus). + """ + if not points: + return None + w, h = layers.width, layers.height + ox, oy = origin + classify = layers.classifier + # 1 = floor, 2 = wall/background (blocked), 0 = other. Index by cell. + klass = bytes( + 1 if (c := classify(v)) == LAYER_FLOOR else 2 if c in (LAYER_WALL, LAYER_BACKGROUND) else 0 for v in layers.grid + ) + + best: tuple[float, GridCalibration] | None = None + for resolution in resolutions: + if resolution <= 0: + continue + for y_sign in y_signs: + on_floor = 0 + blocked = 0 + for x, y in points: + px = int(x / resolution + ox) + py = int(oy - y_sign * y / resolution) + if not (0 <= px < w and 0 <= py < h): + blocked += 1 + continue + k = klass[py * w + px] + if k == 1: + on_floor += 1 + elif k == 2: + blocked += 1 + score = on_floor - 1.5 * blocked + if best is None or score > best[0]: + best = (score, GridCalibration(float(resolution), float(ox), float(oy), y_sign)) + + if best is None or best[0] < len(points) * min_on_floor: + return None + return best[1] + + def decompose_grid( width: int, height: int, diff --git a/roborock/map/b01_map_parser.py b/roborock/map/b01_map_parser.py index b57912e4..30b5887e 100644 --- a/roborock/map/b01_map_parser.py +++ b/roborock/map/b01_map_parser.py @@ -15,11 +15,67 @@ from roborock.exceptions import RoborockException from roborock.map.proto.b01_scmap_pb2 import RobotMap # type: ignore[attr-defined] +from .b01_grid_layers import ( + LAYER_BACKGROUND, + LAYER_FLOOR, + LAYER_WALL, + GridCalibration, + GridLayers, + decompose_grid, +) from .map_parser import ParsedMapData _MAP_FILE_FORMAT = "PNG" +# The Q7 occupancy grid encodes only these classes (no per-room segmentation in +# the raster -- room ids/names live in the protobuf metadata, not the pixels). +_Q7_WALL_VALUE = 127 +_Q7_FLOOR_VALUE = 128 + + +def classify_q7_cell(value: int) -> str: + """Map a Q7 SCMap grid cell value to a canonical layer class.""" + if value == _Q7_WALL_VALUE: + return LAYER_WALL + if value == _Q7_FLOOR_VALUE: + return LAYER_FLOOR + return LAYER_BACKGROUND # 0 = outside / unknown + + +def decompose_q7_layers(payload: bytes) -> GridLayers: + """Split an inflated Q7 SCMap into background / wall / floor layers. + + Q7 has no per-room raster, so ``GridLayers.rooms`` is empty; room ids/names + are available separately via the map metadata. Reuses the same device-agnostic + decomposition as the Q10. + """ + parsed = _parse_scmap_payload(payload) + size_x, size_y, grid = _extract_grid(parsed) + return decompose_grid(size_x, size_y, grid, [], classify_q7_cell) + + +def q7_calibration(payload: bytes) -> GridCalibration | None: + """Build a world<->pixel calibration straight from the Q7 ``mapHead``. + + Unlike the Q10 (whose packet carries no calibration), the Q7 SCMap header + provides ``minX``/``minY``/``resolution`` directly, so no path fitting is + needed. World coordinates are in metres; resolution is metres-per-pixel. + """ + head = _parse_scmap_payload(payload).mapHead + if not head.HasField("resolution") or head.resolution <= 0 or not head.HasField("sizeY"): + return None + resolution = head.resolution + min_x = head.minX if head.HasField("minX") else 0.0 + min_y = head.minY if head.HasField("minY") else 0.0 + return GridCalibration( + resolution=resolution, + origin_x=-min_x / resolution, + origin_y=(head.sizeY - 1) + min_y / resolution, + y_sign=1, + ) + + @dataclass class B01MapParserConfig: """Configuration for the B01/Q7 map parser.""" diff --git a/roborock/map/b01_q10_map_parser.py b/roborock/map/b01_q10_map_parser.py index 89dec083..f04e7a6d 100644 --- a/roborock/map/b01_q10_map_parser.py +++ b/roborock/map/b01_q10_map_parser.py @@ -23,7 +23,7 @@ import io import math import statistics -from dataclasses import dataclass, field +from dataclasses import dataclass, field, replace from PIL import Image from vacuum_map_parser_base.config.image_config import ImageConfig @@ -31,10 +31,46 @@ from roborock.exceptions import RoborockException +from .b01_grid_layers import ( + LAYER_BACKGROUND, + LAYER_FLOOR, + LAYER_UNKNOWN, + LAYER_WALL, + GridLayers, + decompose_grid, +) from .map_parser import ParsedMapData _MAP_FILE_FORMAT = "PNG" +# Semantic raster classes, confirmed against real ss07 captures (rendered and +# eyeballed): 243 is the background outside the home (~half the grid), 240 is +# scanned floor not yet assigned to a room, other values >= 240 are walls, and +# 0 < value < 240 are per-room floor cells (value == room_id * 4). +_BACKGROUND_VALUE = 243 +_UNSEGMENTED_FLOOR_VALUE = 240 + + +def classify_q10_cell(value: int) -> str: + """Map a Q10 grid cell value to a canonical layer class.""" + if value == 0: + return LAYER_UNKNOWN + if value == _BACKGROUND_VALUE: + return LAYER_BACKGROUND + if value == _UNSEGMENTED_FLOOR_VALUE: + return LAYER_FLOOR + if value >= _WALL_THRESHOLD: + return LAYER_WALL + return LAYER_FLOOR + + +def decompose_layers(packet: "Q10MapPacket") -> GridLayers: + """Split a parsed Q10 map packet into separable grid-pixel layers.""" + rooms = [(room.id, room.name, room.pixel_value, room.pixel_count) for room in packet.rooms] + # The ss07 grid is stored top-down (row 0 = top), so no display flip is applied. + return decompose_grid(packet.width, packet.height, packet.grid, rooms, classify_q10_cell, flip=False) + + MAP_PACKET_MARKER = b"\x01\x01" TRACE_PACKET_MARKER = b"\x02\x01" @@ -52,6 +88,28 @@ _ROOM_RECORD_LENGTH = 47 _ROOM_NAME_LENGTH_OFFSET = 26 _MAX_ROOMS = 32 +# Sanity bound for the erase-zone vector section's vertices-per-polygon field. +_MAX_ERASE_ZONE_VERTICES = 16 + +# The 01 01 grid-frame header also carries the map's calibration, so a +# GridCalibration can be derived without fitting a cleaning path (i.e. docked / +# pre-clean). Absolute byte offsets in the frame, reported and verified by +# @andrewlyeats across independent ss07 (fw 03.11.24) captures and cross-checked +# with the ioBroker roborock adapter: +# - 11-12 x_min, 13-14 y_min (s16be): map origin in 5 mm units. The grid is +# 50 mm/px, so dividing by 10 yields the origin in grid pixels -- the (ox, oy) +# that solve_calibration otherwise recovers by sliding the path. +# - 15-16 resolution (u16be): reads 5 (= 0.05 m/px = 50 mm/px) universally. +# - 17-18 charger x, 19-20 charger y (s16be, 5 mm units), 21-22 charger phi. +_ORIGIN_X_OFFSET = 11 +_ORIGIN_Y_OFFSET = 13 +_HEADER_RESOLUTION_OFFSET = 15 +_CHARGER_X_OFFSET = 17 +_CHARGER_Y_OFFSET = 19 +_CHARGER_PHI_OFFSET = 21 +# The header origin/charger are in 5 mm units and the grid is 50 mm/px, so a +# header coordinate maps to grid pixels by dividing by this. +_HEADER_UNITS_PER_PIXEL = 10 # Grid cell values >= this are walls / borders rather than room segments. _WALL_THRESHOLD = 240 @@ -72,6 +130,60 @@ def name(self) -> str: return self.raw_name.removeprefix("rr_").replace("_", " ").strip().title() +@dataclass +class Q10EraseZone: + """A user-drawn "erase" area (polygon) carried in the map packet. + + These are the app's *Erase* tool rectangles -- regions the user marked to be + removed from the map (e.g. phantom floor the lidar mapped through windows). + Coordinates are world units (millimetres), same frame as the path/zones. + + Confirmed by a controlled diff: removing the two erase zones on a live device + dropped this section's count from 2 to 0 while the grid and the trailing + raster were byte-identical. (Earlier revisions misidentified this section as + "carpets"; it is the erase-zone list.) + """ + + vertices: list[tuple[int, int]] = field(default_factory=list) + + +@dataclass +class Q10HeaderCalibration: + """Calibration carried in the ``01 01`` grid-frame header (ss07). + + Lets a :class:`~roborock.map.b01_grid_layers.GridCalibration` origin be read + straight from the map packet -- no cleaning path / fit required, so it works + docked or pre-clean. See :meth:`origin_pixels`. + + ``origin_x`` / ``origin_y`` and the charger coordinates are in 5 mm units; + ``resolution`` is the raw header field (5 == 50 mm/px). ``charger_phi`` is the + raw dock heading field. Reported and verified by @andrewlyeats (ss07). + """ + + origin_x: int + origin_y: int + resolution: int + charger_x: int + charger_y: int + charger_phi: int + + @property + def is_keepalive(self) -> bool: + """True for null/keepalive frames (``x_min == y_min == 0``), which carry + no usable origin -- callers should fall back to a path-fit calibration.""" + return self.origin_x == 0 and self.origin_y == 0 + + def origin_pixels(self) -> tuple[float, float] | None: + """The grid-pixel origin ``(ox, oy)`` for a ``GridCalibration``. + + The header origin is in 5 mm units and the grid is 50 mm/px, so the + pixel origin is the header value divided by 10. Returns ``None`` for a + keepalive frame (no origin to use).""" + if self.is_keepalive: + return None + return (self.origin_x / _HEADER_UNITS_PER_PIXEL, self.origin_y / _HEADER_UNITS_PER_PIXEL) + + @dataclass class Q10MapPacket: """Decoded contents of a Q10 ``01 01`` map packet.""" @@ -81,6 +193,10 @@ class Q10MapPacket: height: int grid: bytes rooms: list[Q10Room] = field(default_factory=list) + erase_zones: list[Q10EraseZone] = field(default_factory=list) + """Erase areas decoded from the packet tail (world coordinates).""" + header_calibration: Q10HeaderCalibration | None = None + """Calibration read straight from the grid-frame header (ss07), or ``None``.""" @dataclass @@ -98,13 +214,14 @@ class Q10TracePacket: The robot accumulates the **full path of the current cleaning session** and serves it in a single packet: ``points`` holds the whole trajectory so far (oldest first), growing as the robot cleans. This was confirmed live -- a - corridor run produced packets of 1, then 3, then 15 points, each a strict - superset describing the path travelled. Because the robot keeps the path - server-side, a client that connects mid-session still receives the complete - path (this is how the app shows the trail even after a cold launch). - - The robot only emits these while a session is active, so an idle/docked robot - will not produce them. The most recent point is the current robot position. + corridor run produced packets of 0 (just a heading, docked), then 3, then 14 + points, each a strict superset describing the path travelled. Because the + robot keeps the path server-side, a client that connects mid-session still + receives the complete path (this is how the app shows the trail even after a + cold launch). + + A docked/idle robot can still emit a packet carrying only the ``heading`` + (zero points). The most recent point is the current robot position. """ points: list[Q10Point] = field(default_factory=list) @@ -112,30 +229,52 @@ class Q10TracePacket: """Session counter (byte 3); increments per cleaning session, tracking the device clean count. Not a per-packet sequence.""" + heading: int = 0 + """Robot heading from the 0201 SLAM field (bytes 10-11), in degrees: + ``0`` = +x, ``+90`` = +y, ``±180`` = −x, ``−90`` = −y. This is the current + orientation; pair it with :attr:`robot_position` to draw a facing robot. + + Convention (incl. the y-sign) ground-truthed on a live R1 clean: across + straight segments the reported heading equalled the direction of travel + ``atan2(dy, dx)`` -- +x read 0, −x read ±180, a slight −y drift read + negative.""" + @property def robot_position(self) -> Q10Point | None: """The current robot position (the most recent point).""" return self.points[-1] if self.points else None -# Trace packet (``02 01``): a 10-byte header followed by big-endian int16 (x, y) +# Trace packet (``02 01``): a 14-byte header followed by big-endian int16 (x, y) # point pairs forming the accumulated session path. Header layout confirmed -# against live ss07 captures: byte 3 is a session counter (tracks the device -# clean count); bytes 8-9 are a u16be point count minus one (verified: a 15-point -# packet carried 0x000e == 14). The parser reads all 4-byte pairs in the body -# rather than trusting the count field, so a truncated tail can't desync it. +# against live ss07 captures and cross-checked by @andrewlyeats: +# - byte 3: a session counter (tracks the device clean count). +# - bytes 8-9: a u16be point count -- the exact number of (x, y) pairs from +# byte 14 (verified: captures of 1417 / 2462 points carried 0x0589 / 0x099e). +# - bytes 10-11: the 0201 SLAM heading (s16be degrees; 0 = +x, +90 = +y, +# +-180 = -x, -90 = -y) -- the robot's current orientation. +# - bytes 12-13: a constant (0x0000). +# - byte 14 onward: the path points. +# An earlier revision used a 10-byte header, which folded the heading word into +# a phantom leading point ``(heading, 0)`` -- that is the "stray point" the +# heuristic below was papering over, and why the count read "one high". The +# parser reads all 4-byte pairs in the body rather than trusting the count +# field, so a truncated tail can't desync it. # NOTE: the format documented by roborock-qseries-map-bridge (18-byte header) -# did not match this firmware -- this 10-byte layout is what the device sent. -_TRACE_HEADER_LENGTH = 10 +# did not match this firmware -- this 14-byte layout is what the device sent. +_TRACE_HEADER_LENGTH = 14 _TRACE_SEQUENCE_OFFSET = 3 - -# Some cleans prepend a single stray point to the path, far outside the map -# (e.g. ~(0, -1907) when the real path starts near (-3760, -1920)); it skews the -# rendered start/bounding box and any path-based calibration. We drop points[0] -# only when its step to points[1] is a gross outlier (this multiple of the -# median step of the rest of the path), so a genuine first point is never lost. -# The current position (last point) is unaffected. Trigger and threshold -# reported and verified by @andrewlyeats across independent B01/Q10 captures. +_TRACE_HEADING_OFFSET = 10 + +# Some cleans still prepend a single near-origin sentinel as the first real +# point (e.g. ~(5, 76) / (-3, 0) when the path proper starts near (-1700, -800)); +# it skews the rendered start/bounding box and any path-based calibration. (This +# is distinct from the heading word the old 10-byte header used to surface as a +# phantom point -- that is now consumed by the header.) We drop points[0] only +# when its step to points[1] is a gross outlier (this multiple of the median step +# of the rest of the path), so a genuine first point is never lost. The current +# position (last point) is unaffected. Trigger and threshold reported and +# verified by @andrewlyeats across independent B01/Q10 captures. _STRAY_POINT_STEP_RATIO = 20 @@ -159,6 +298,7 @@ def parse_trace_packet(payload: bytes) -> Q10TracePacket: if len(body) % 4: raise RoborockException("Q10 trace points are not 4-byte (x, y) pairs") + heading = int.from_bytes(payload[_TRACE_HEADING_OFFSET : _TRACE_HEADING_OFFSET + 2], "big", signed=True) points = [ Q10Point( x=int.from_bytes(body[offset : offset + 2], "big", signed=True), @@ -167,7 +307,7 @@ def parse_trace_packet(payload: bytes) -> Q10TracePacket: for offset in range(0, len(body), 4) ] points = _drop_stray_leading_point(points) - return Q10TracePacket(points=points, sequence=payload[_TRACE_SEQUENCE_OFFSET]) + return Q10TracePacket(points=points, sequence=payload[_TRACE_SEQUENCE_OFFSET], heading=heading) def _drop_stray_leading_point(points: list[Q10Point]) -> list[Q10Point]: @@ -323,7 +463,88 @@ def parse_map_packet(payload: bytes) -> Q10MapPacket: else: height, grid, room_data = _infer_layout(decoded, width) rooms = _parse_rooms(room_data, grid) - return Q10MapPacket(map_id=map_id, width=width, height=height, grid=grid, rooms=rooms) + erase_zones = _parse_erase_zones(payload[layout_end:]) + header_calibration = _parse_header_calibration(payload) + return Q10MapPacket( + map_id=map_id, + width=width, + height=height, + grid=grid, + rooms=rooms, + erase_zones=erase_zones, + header_calibration=header_calibration, + ) + + +def _parse_header_calibration(payload: bytes) -> Q10HeaderCalibration | None: + """Read the calibration fields from the ``01 01`` grid-frame header. + + All fields sit inside the fixed 29-byte header (already length-checked by + the caller). See the offset constants for the byte layout. Returns the + decoded :class:`Q10HeaderCalibration` (callers skip keepalive frames via + :attr:`Q10HeaderCalibration.is_keepalive`).""" + + def s16(offset: int) -> int: + return int.from_bytes(payload[offset : offset + 2], "big", signed=True) + + return Q10HeaderCalibration( + origin_x=s16(_ORIGIN_X_OFFSET), + origin_y=s16(_ORIGIN_Y_OFFSET), + resolution=int.from_bytes(payload[_HEADER_RESOLUTION_OFFSET : _HEADER_RESOLUTION_OFFSET + 2], "big"), + charger_x=s16(_CHARGER_X_OFFSET), + charger_y=s16(_CHARGER_Y_OFFSET), + charger_phi=s16(_CHARGER_PHI_OFFSET), + ) + + +def _parse_erase_zones(tail: bytes) -> list[Q10EraseZone]: + """Decode erase areas from the bytes after the compressed grid block. + + The tail begins with a vector section ``[count: u8][vertices_per: u8]`` then + ``count`` polygons of ``vertices_per`` int16-BE (x, y) pairs (axis-aligned + rectangles in practice). Identified by a controlled diff on a live ss07 + device: deleting the two app *Erase* zones dropped ``count`` 2->0 with the + rest of the packet byte-identical. The remaining tail (a run-length raster + + signature) is unrelated to erase and is not decoded here. + """ + if len(tail) < 2: + return [] + count = tail[0] + vertices_per = tail[1] + if count == 0 or not 1 <= vertices_per <= _MAX_ERASE_ZONE_VERTICES: + return [] + + erase_zones: list[Q10EraseZone] = [] + offset = 2 + for _ in range(count): + end = offset + vertices_per * 4 + if end > len(tail): + break + vertices = [ + ( + int.from_bytes(tail[offset + j * 4 : offset + j * 4 + 2], "big", signed=True), + int.from_bytes(tail[offset + j * 4 + 2 : offset + j * 4 + 4], "big", signed=True), + ) + for j in range(vertices_per) + ] + erase_zones.append(Q10EraseZone(vertices=vertices)) + offset = end + return erase_zones + + +def erased_packet(packet: "Q10MapPacket", cells: set[int]) -> "Q10MapPacket": + """Return a copy of ``packet`` with ``cells`` (grid indices) set to background. + + Used to apply the app's erase zones: cells inside an erase rectangle are blanked + to the background class so they drop out of the rendered map and every layer. + """ + if not cells: + return packet + grid = bytearray(packet.grid) + for cell in cells: + if 0 <= cell < len(grid): + grid[cell] = _BACKGROUND_VALUE + return replace(packet, grid=bytes(grid)) @dataclass @@ -340,16 +561,17 @@ class B01Q10MapParser: def __init__(self, config: B01Q10MapParserConfig | None = None) -> None: self._config = config or B01Q10MapParserConfig() + @property + def config(self) -> B01Q10MapParserConfig: + """The parser configuration (image scale, ...).""" + return self._config + def parse(self, payload: bytes) -> ParsedMapData: """Parse a raw Q10 map packet into a rendered PNG + ``MapData``.""" - return self.parse_packet(parse_map_packet(payload)) - - def parse_packet(self, packet: Q10MapPacket) -> ParsedMapData: - """Render an already-parsed Q10 map packet into a PNG + ``MapData``. + return self.parsed_from_packet(parse_map_packet(payload)) - The protocol layer parses the wire bytes into a :class:`Q10MapPacket`; - this renders that packet without re-parsing it. - """ + def parsed_from_packet(self, packet: Q10MapPacket) -> ParsedMapData: + """Render a (possibly erase-modified) packet into a PNG + ``MapData``.""" image = self._render(packet) map_data = MapData() @@ -378,7 +600,8 @@ def _render(self, packet: Q10MapPacket) -> Image.Image: for value in packet.grid: rgb.extend(palette[value]) img = Image.frombytes("RGB", (packet.width, packet.height), bytes(rgb)) - img = img.transpose(Image.Transpose.FLIP_TOP_BOTTOM) + # The ss07 grid is stored top-down (row 0 = top of the home), so it is + # rendered as-is -- unlike the V1/Q7 convention, no vertical flip. scale = self._config.map_scale if scale > 1: img = img.resize((packet.width * scale, packet.height * scale), resample=Image.Resampling.NEAREST) diff --git a/roborock/map/b01_q10_overlays.py b/roborock/map/b01_q10_overlays.py index 13e67491..34fcf71c 100644 --- a/roborock/map/b01_q10_overlays.py +++ b/roborock/map/b01_q10_overlays.py @@ -4,13 +4,21 @@ the map raster; the device reports them as base64-encoded blobs in separate data points (``dpRestrictedZoneUp`` 55, ``dpVirtualWallUp`` 57, ``dpZonedUp`` 59). -The blob format was reverse-engineered from a live ss07 (confirmed against 7 -real no-go zones): +The restricted-zone / zoned blob format (DP 55 and DP 59) was reverse-engineered +from a live ss07 (confirmed against 7 real no-go zones): [version: u8][count: u8] then ``count`` fixed-size records, each: [type: u8][vertex_count: u8] then vertex_count (x, y) int16-BE pairs, zero-padded to the record size. +Use :func:`parse_zone_blob` for those. Virtual walls (DP 57) use a *different +framing* -- a bare ``[count]`` and 8-byte ``(x, y)`` records, no version/type/pad +-- so they have their own :func:`parse_virtual_wall_blob`; feeding DP 57 to +:func:`parse_zone_blob` mis-frames it (the leading byte is read as a version and +the next, a coordinate, as a record count). The coordinate order matches the +zones (first wire word = x), confirmed against the app. Provenance and the +byte-level breakdown are in PR #850's review thread. + Coordinates are in the device's world units (the same space as the cleaning path), so a :class:`~roborock.map.b01_grid_layers.GridCalibration` maps them to map pixels. ``type`` distinguishes the restriction kind (2 = no-mop, 3 = door @@ -83,12 +91,67 @@ def parse_zone_blob(data: bytes | str | None) -> list[Q10Zone]: return zones +_WALL_RECORD_SIZE = 8 # two (x, y) int16-BE endpoints + + +def parse_virtual_wall_blob(data: bytes | str | None) -> list[Q10Zone]: + """Decode a Q10 virtual-wall overlay blob (``dpVirtualWallUp`` 57). + + Virtual walls use a *different framing* from the restricted-zone DPs handled + by :func:`parse_zone_blob`: a single ``[count: u8]`` byte (no version, no + per-record type/pad) followed by ``count`` 8-byte records, each two + ``(x, y)`` int16-BE endpoints. The *coordinate order is the same* as the + restricted zones (first wire word = x), so a wall and a no-go zone drawn on + the same line decode parallel rather than transposed. + + Each wall is returned as a :class:`Q10Zone` of type + :data:`ZONE_TYPE_VIRTUAL_WALL` with its two ``(x, y)`` endpoints, so callers + can place them onto the map through the same + :class:`~roborock.map.b01_grid_layers.GridCalibration` as the zones. + + Accepts raw bytes or the base64 string straight from the data point. Returns + ``[]`` for empty/absent/unparsable blobs (the device sends a single ``0x00`` + byte -- base64 ``AA==`` -- when there are none). + + The axis order was confirmed against the app: a horizontal wall drawn below + a room reads back with x varying and y constant (and the wide RDC no-go zone + reads back wide), so DP 57 shares DP 55's order. An earlier revision swapped + the wall axes to ``(y, x)`` -- following a misreading of PR #850's notes -- + which placed every wall transposed 90 degrees from where it was drawn. + """ + raw = _as_bytes(data) + if len(raw) < 1: + return [] + count = raw[0] + if count <= 0: + return [] + + body = raw[1:] + walls: list[Q10Zone] = [] + for index in range(count): + record = body[index * _WALL_RECORD_SIZE : (index + 1) * _WALL_RECORD_SIZE] + if len(record) < _WALL_RECORD_SIZE: + break # truncated trailing record; stop rather than misread + vertices = [ + ( + int.from_bytes(record[p * 4 : p * 4 + 2], "big", signed=True), + int.from_bytes(record[p * 4 + 2 : p * 4 + 4], "big", signed=True), + ) + for p in range(2) + ] + walls.append(Q10Zone(type=ZONE_TYPE_VIRTUAL_WALL, vertices=vertices)) + return walls + + # Observed ``type`` values, confirmed against an ss07 Q10 (firmware 03.11.24) and # cross-checked with the ioBroker roborock adapter: 2 = no-mop, 3 = door -# threshold, 1 = virtual wall. Any other value (including 0) is a no-go zone. -# In practice virtual walls arrive on a separate DP (VIRTUAL_WALL_UP 57), so this -# restricted-zone DP normally only carries 0 / 2 / 3. The raw value is also kept -# on ``Q10Zone.type`` for callers that recognise it. +# threshold. Any other value (including 0) is a no-go zone. The raw value is also +# kept on ``Q10Zone.type`` for callers that recognise it. +# +# Virtual walls arrive on a separate DP (VIRTUAL_WALL_UP 57) with their own frame +# (see :func:`parse_virtual_wall_blob`), so this restricted-zone DP only carries +# 0 / 2 / 3 -- never a 1. ``ZONE_TYPE_VIRTUAL_WALL`` is kept here only to tag the +# walls that :func:`parse_virtual_wall_blob` produces. # # Corrected from an earlier reading that treated type 3 as no-mop -- 3 is the # door-threshold rectangle; the no-mop area reads back as type 2. Reported and diff --git a/tests/devices/traits/b01/q10/test_map.py b/tests/devices/traits/b01/q10/test_map.py index 470f3f99..74f497ff 100644 --- a/tests/devices/traits/b01/q10/test_map.py +++ b/tests/devices/traits/b01/q10/test_map.py @@ -6,21 +6,25 @@ """ import asyncio +import io from collections.abc import AsyncGenerator from pathlib import Path -from typing import cast from unittest.mock import AsyncMock, Mock import pytest +from PIL import Image -from roborock.cli import _await_q10_map_push, cli from roborock.devices.traits.b01.q10 import Q10PropertiesApi, create -from roborock.devices.traits.b01.q10.map import MapContentTrait -from roborock.map.b01_q10_map_parser import Q10Point, parse_map_packet, parse_trace_packet +from roborock.devices.traits.b01.q10.map import _Q10_RESOLUTIONS, MapContentTrait +from roborock.exceptions import RoborockException +from roborock.map.b01_grid_layers import GridCalibration +from roborock.map.b01_q10_map_parser import Q10EraseZone, Q10HeaderCalibration, Q10Point +from roborock.map.b01_q10_overlays import ZONE_TYPE_NO_GO, ZONE_TYPE_NO_MOP from roborock.roborock_message import RoborockMessage, RoborockMessageProtocol FIXTURE = Path("tests/map/testdata/b01_q10_map.bin") TRACE_FIXTURE = Path("tests/map/testdata/b01_q10_trace.bin") +TRACE_SESSION_FIXTURE = Path("tests/map/testdata/b01_q10_trace_session.bin") def _map_message( @@ -29,15 +33,16 @@ def _map_message( return RoborockMessage(protocol=protocol, payload=payload, version=b"B01") -def test_update_from_map_packet_populates_image_and_rooms() -> None: - """A parsed 01 01 map packet populates the image, rooms and map data.""" - packet = parse_map_packet(FIXTURE.read_bytes()) +def test_update_from_map_response_populates_image_and_rooms() -> None: + """A pushed 01 01 map packet populates the image, rooms and map data.""" + payload = FIXTURE.read_bytes() trait = MapContentTrait() updates: list[None] = [] trait.add_update_listener(lambda: updates.append(None)) - trait.update_from_map_packet(packet) + assert trait.update_from_map_response(_map_message(payload)) is True + assert trait.raw_api_response == payload assert trait.image_content is not None assert trait.image_content[:8] == b"\x89PNG\r\n\x1a\n" assert {room.id: room.name for room in trait.rooms} == {2: "Living Room", 3: "Bedroom"} @@ -45,80 +50,43 @@ def test_update_from_map_packet_populates_image_and_rooms() -> None: assert len(updates) == 1 -def test_update_from_trace_packet_populates_path_and_position() -> None: - """A parsed 02 01 trace packet populates the path and robot position.""" - packet = parse_trace_packet(TRACE_FIXTURE.read_bytes()) +def test_update_from_map_response_populates_path_and_position() -> None: + """A pushed 02 01 trace packet populates the path, position and heading.""" trait = MapContentTrait() updates: list[None] = [] trait.add_update_listener(lambda: updates.append(None)) - trait.update_from_trace_packet(packet) + assert trait.update_from_map_response(_map_message(TRACE_SESSION_FIXTURE.read_bytes())) is True - assert [(p.x, p.y) for p in trait.path] == [(169, 0)] + assert len(trait.path) == 14 + assert (trait.path[0].x, trait.path[0].y) == (41, 64) assert trait.robot_position is not None - assert (trait.robot_position.x, trait.robot_position.y) == (169, 0) + assert (trait.robot_position.x, trait.robot_position.y) == (276, -1) + assert trait.robot_heading == -34 assert len(updates) == 1 -def test_q10_position_is_available_as_top_level_cli_command() -> None: - assert "q10-position" in cli.commands - - -# --- CLI push waiting -------------------------------------------------------- - - -class _FakeQ10Properties: - def __init__(self) -> None: - self.map = MapContentTrait() - self.refresh_count = 0 - - async def refresh(self) -> None: - self.refresh_count += 1 - - -class _FakeQ10PropertiesWithTrace(_FakeQ10Properties): - async def refresh(self) -> None: - await super().refresh() - self.map.update_from_trace_packet(parse_trace_packet(TRACE_FIXTURE.read_bytes())) - - -async def test_await_q10_map_push_waits_for_fresh_update() -> None: - """A cached trace alone is not treated as a successful new map push.""" - properties = _FakeQ10Properties() - properties.map.path = [Q10Point(1, 2)] - - got_trace = await _await_q10_map_push( - cast(Q10PropertiesApi, properties), lambda: bool(properties.map.path), timeout=0.01 - ) - - assert got_trace is False - assert properties.refresh_count == 1 - - -async def test_await_q10_map_push_returns_true_after_update() -> None: - properties = _FakeQ10PropertiesWithTrace() - - got_trace = await _await_q10_map_push( - cast(Q10PropertiesApi, properties), lambda: bool(properties.map.path), timeout=0.01 - ) +def test_update_from_map_response_ignores_non_map_messages() -> None: + """Non-MAP_RESPONSE messages are left for the status path to handle.""" + trait = MapContentTrait() + updates: list[None] = [] + trait.add_update_listener(lambda: updates.append(None)) - assert got_trace is True - assert [(p.x, p.y) for p in properties.map.path] == [(169, 0)] + rpc = _map_message(b"\x01\x01whatever", protocol=RoborockMessageProtocol.RPC_RESPONSE) + assert trait.update_from_map_response(rpc) is False + # An unrecognized MAP_RESPONSE marker is also not consumed. + assert trait.update_from_map_response(_map_message(b"\x09\x09junk")) is False -async def test_await_q10_map_push_can_fall_back_to_cached_map_on_timeout() -> None: - properties = _FakeQ10Properties() - properties.map.image_content = b"cached-png" + assert trait.image_content is None + assert not trait.path + assert not updates - got_map = await _await_q10_map_push( - cast(Q10PropertiesApi, properties), - lambda: properties.map.image_content is not None, - timeout=0.01, - allow_cached_on_timeout=True, - ) - assert got_map is True - assert properties.refresh_count == 1 +def test_parse_without_data_raises() -> None: + trait = MapContentTrait() + with pytest.raises(RoborockException, match="No map payload available"): + trait.parse_map_content() # --- Integration through the Q10PropertiesApi subscribe loop ----------------- @@ -174,7 +142,206 @@ async def test_subscribe_loop_routes_trace_push( """A trace pushed onto the stream is routed to the map trait by the loop.""" assert not q10_api.map.path - message_queue.put_nowait(_map_message(TRACE_FIXTURE.read_bytes())) + message_queue.put_nowait(_map_message(TRACE_SESSION_FIXTURE.read_bytes())) await _wait_for(lambda: bool(q10_api.map.path)) assert q10_api.map.robot_position is not None + + +# --- Layers / calibration / rendering ---------------------------------------- + + +def _trait_with_map() -> MapContentTrait: + """A trait with a map already pushed into it.""" + trait = MapContentTrait() + trait.update_from_map_response(_map_message(FIXTURE.read_bytes())) + return trait + + +def test_map_push_populates_layers() -> None: + """A pushed map is also decomposed into separable layers.""" + trait = _trait_with_map() + assert trait.layers is not None + assert trait.layers.class_counts.get("floor") == 26 + assert {room.id for room in trait.layers.rooms} == {2, 3} + + +def test_solve_calibration_needs_map_and_dense_path() -> None: + """No map or too-short a path -> no calibration.""" + trait = MapContentTrait() + trait.path = [Q10Point(i, 0) for i in range(30)] + assert trait.solve_calibration() is None # no layers yet + + +def _floor_world_points(trait: MapContentTrait, cal: GridCalibration, count: int) -> list[Q10Point]: + """``count`` world points lying on the map's floor under ``cal``.""" + assert trait.layers is not None + layers = trait.layers + floor = [ + (px, py) + for py in range(layers.height) + for px in range(layers.width) + if layers.cell_class(layers.grid[py * layers.width + px]) == "floor" + ] + return [Q10Point(*(int(v) for v in cal.pixel_to_world(px, py))) for px, py in floor[:count]] + + +def test_solve_calibration_uses_header_origin_with_short_path() -> None: + """A grid-frame header origin lets a short path calibrate (origin is exact).""" + trait = _trait_with_map() + # Header origin in 5 mm units -> pixel origin (0, 5); not a keepalive frame. + trait.header_calibration = Q10HeaderCalibration( + origin_x=0, origin_y=50, resolution=5, charger_x=0, charger_y=0, charger_phi=0 + ) + true = GridCalibration(resolution=20.0, origin_x=0.0, origin_y=5.0, y_sign=1) + trait.path = _floor_world_points(trait, true, 6) + assert len(trait.path) < 20 # far too short for the full origin+resolution fit + cal = trait.solve_calibration() + assert cal is not None + # Origin comes straight from the header (exact); only the resolution is fit, + # so it lands on one of the candidates (the exact pick is grid-quantized). + assert (cal.origin_x, cal.origin_y) == (0.0, 5.0) + assert cal.resolution in _Q10_RESOLUTIONS + assert trait.calibration is cal + + +def test_solve_calibration_short_path_without_header_returns_none() -> None: + """Without a header origin a short path is still too sparse for the full fit.""" + trait = _trait_with_map() # the fixture header is a keepalive frame + assert trait.header_calibration is not None and trait.header_calibration.is_keepalive + true = GridCalibration(resolution=10.0, origin_x=0.0, origin_y=5.0, y_sign=1) + trait.path = _floor_world_points(trait, true, 6) + assert trait.solve_calibration() is None + + +def test_render_path_on_map_requires_map() -> None: + trait = MapContentTrait() + with pytest.raises(RoborockException, match="No map available"): + trait.render_path_on_map() + + +def test_render_path_on_map_draws_position() -> None: + """With a calibration set, the robot position is drawn at the mapped pixel.""" + trait = _trait_with_map() + # identity-ish calibration: world (x,y) -> pixel (x, 5 - y) in the 8x6 grid. + trait.calibration = GridCalibration(resolution=1.0, origin_x=0.0, origin_y=5.0, y_sign=1) + trait.path = [Q10Point(1, 2), Q10Point(3, 2)] + # world (3, 2) -> grid pixel (3, 3); the ss07 grid renders top-down (no flip), + # so that maps straight to image (12, 12) at scale 4. + trait.robot_position = Q10Point(3, 2) + png = trait.render_path_on_map(position_color=(255, 211, 0, 255)) + img = Image.open(io.BytesIO(png)).convert("RGBA") + assert img.size == (8 * 4, 6 * 4) + assert img.getpixel((12, 12)) == (255, 211, 0, 255) + + +def test_render_path_on_map_draws_heading_indicator() -> None: + """A known heading draws a facing tick from the robot marker. + + With heading 0 (= +x world) and the identity-ish calibration, the tick + extends to the right of the robot pixel; with the marker at image (12, 12) + the tick covers pixels at x > 12 along y == 12. + """ + trait = _trait_with_map() + trait.calibration = GridCalibration(resolution=1.0, origin_x=0.0, origin_y=5.0, y_sign=1) + trait.path = [Q10Point(1, 2), Q10Point(3, 2)] + trait.robot_position = Q10Point(3, 2) + trait.robot_heading = 0 # facing +x + png = trait.render_path_on_map(position_color=(255, 211, 0, 255)) + img = Image.open(io.BytesIO(png)).convert("RGBA") + # tick runs +x from the marker (4 * radius = 16 px at scale 4) + assert img.getpixel((20, 12)) == (255, 211, 0, 255) + # ...and not behind it (the marker is a small disc; sample well to the left) + assert img.getpixel((4, 12)) != (255, 211, 0, 255) + + +def test_parse_map_content_preserves_path_overlays_after_calibration() -> None: + """Reparsing a calibrated map keeps path and vacuum position on MapData.""" + trait = _trait_with_map() + trait.calibration = GridCalibration(resolution=1.0, origin_x=0.0, origin_y=5.0, y_sign=1) + trait.path = [Q10Point(1, 2), Q10Point(3, 2)] + trait.robot_position = Q10Point(3, 2) + + trait.parse_map_content() + + assert trait.map_data is not None + assert trait.map_data.path is not None + assert trait.map_data.vacuum_position is not None + assert (trait.map_data.vacuum_position.x, trait.map_data.vacuum_position.y) == (3.0, 3.0) + + +def test_load_overlays_places_zones_with_calibration() -> None: + """Decoded no-go / no-mop zones become pixel-space MapData areas.""" + trait = _trait_with_map() + trait.calibration = GridCalibration(resolution=1.0, origin_x=0.0, origin_y=5.0, y_sign=1) + trait.path = [Q10Point(1, 1)] # path origin -> charger + + def rect(zone_type: int, corners: list[tuple[int, int]]) -> bytes: + out = bytes([zone_type, len(corners)]) + for x, y in corners: + out += int.to_bytes(x & 0xFFFF, 2, "big") + int.to_bytes(y & 0xFFFF, 2, "big") + return out.ljust(18, b"\x00") + + blob = ( + bytes([1, 2]) + + rect(ZONE_TYPE_NO_GO, [(0, 0), (4, 0), (4, 4), (0, 4)]) + + rect(ZONE_TYPE_NO_MOP, [(1, 1), (2, 1), (2, 2), (1, 2)]) + ) + trait.load_overlays(restricted_zone_up=blob) + + assert len(trait.zones) == 2 + assert trait.map_data is not None + assert len(trait.map_data.no_go_areas or []) == 1 + assert len(trait.map_data.no_mopping_areas or []) == 1 + # charger = path origin in pixels: (1, 5-1) = (1, 4) + assert trait.map_data.charger is not None + assert (trait.map_data.charger.x, trait.map_data.charger.y) == (1.0, 4.0) + + +def test_apply_erase_blanks_cells_with_calibration() -> None: + """With a calibration, erase-zone cells are blanked from the layers + image.""" + trait = _trait_with_map() + assert trait.layers is not None + before_floor = trait.layers.class_counts.get("floor") + before_image = trait.image_content + assert before_floor and before_floor > 0 + + # identity-ish calibration: world (x, y) -> pixel (x, 5 - y) over the 8x6 grid. + trait.calibration = GridCalibration(resolution=1.0, origin_x=0.0, origin_y=5.0, y_sign=1) + # A rectangle covering the whole grid in world coords erases every cell. + trait.erase_zones = [Q10EraseZone(vertices=[(0, 0), (7, 0), (7, 5), (0, 5)])] + trait._apply_erase(trait.calibration) + + assert trait.layers.class_counts.get("floor", 0) == 0 # all floor erased + assert trait.image_content != before_image # re-rendered + + +def test_apply_erase_partial_rectangle() -> None: + """An erase rectangle only blanks the cells it covers, leaving the rest.""" + trait = _trait_with_map() + assert trait.layers is not None + before_floor = trait.layers.class_counts.get("floor", 0) + + trait.calibration = GridCalibration(resolution=1.0, origin_x=0.0, origin_y=5.0, y_sign=1) + # Cover only the top two grid rows (pixel y 0..1 -> world y 4..5). + trait.erase_zones = [Q10EraseZone(vertices=[(0, 4), (7, 4), (7, 5), (0, 5)])] + trait._apply_erase(trait.calibration) + + after_floor = trait.layers.class_counts.get("floor", 0) + assert 0 < after_floor < before_floor # some, not all, floor removed + + +def test_load_overlays_partial_update_keeps_existing_zones() -> None: + """A status push without the zone DP (None) must not wipe loaded zones.""" + trait = MapContentTrait() + blob = ( + bytes([1, 1]) + + bytes([0, 4]) + + b"".join(int.to_bytes(v & 0xFFFF, 2, "big") for xy in [(0, 0), (4, 0), (4, 4), (0, 4)] for v in xy) + ) + trait.load_overlays(restricted_zone_up=blob) + assert len(trait.zones) == 1 + # A later partial update carrying only the (empty) virtual-wall DP. + trait.load_overlays(restricted_zone_up=None, virtual_wall_up=b"\x00") + assert len(trait.zones) == 1 # zones preserved + assert trait.virtual_walls == [] diff --git a/tests/map/test_b01_grid_layers.py b/tests/map/test_b01_grid_layers.py index 2299ac3f..52954203 100644 --- a/tests/map/test_b01_grid_layers.py +++ b/tests/map/test_b01_grid_layers.py @@ -1,7 +1,9 @@ -"""Tests for the device-agnostic grid->layers decomposition + calibration.""" +"""Tests for the device-agnostic grid->layers decomposition + Q10 classifier.""" import io +from pathlib import Path +import pytest from PIL import Image from roborock.map.b01_grid_layers import ( @@ -11,7 +13,30 @@ GridCalibration, decompose_grid, solve_calibration, + solve_calibration_with_origin, ) +from roborock.map.b01_q10_map_parser import ( + classify_q10_cell, + decompose_layers, + parse_map_packet, +) + +FIXTURE = Path(__file__).resolve().parent / "testdata" / "b01_q10_map.bin" + + +@pytest.mark.parametrize( + ("value", "expected"), + [ + (0, "unknown"), + (8, LAYER_FLOOR), + (12, LAYER_FLOOR), + (240, LAYER_FLOOR), + (243, LAYER_BACKGROUND), + (249, LAYER_WALL), + ], +) +def test_classify_q10_cell(value: int, expected: str) -> None: + assert classify_q10_cell(value) == expected def test_decompose_grid_generic_classifier_and_bbox() -> None: @@ -72,6 +97,25 @@ def test_render_scale_upsamples() -> None: assert Image.open(io.BytesIO(png)).size == (6, 3) +def test_decompose_layers_on_q10_fixture() -> None: + """The Q10 synthetic fixture splits into floor + per-room layers.""" + layers = decompose_layers(parse_map_packet(FIXTURE.read_bytes())) + assert layers.class_counts.get(LAYER_FLOOR) == 26 + names = {room.id: room.name for room in layers.rooms} + assert names == {2: "Living Room", 3: "Bedroom"} + # Each room renders to a valid PNG and only its own pixels are opaque. + living = layers.render_room(2, (255, 0, 0, 255)) + img = Image.open(io.BytesIO(living)) + opaque = sum(1 for *_rgb, a in img.getdata() if a > 0) + assert opaque == next(r.pixel_count for r in layers.rooms if r.id == 2) + + +def test_render_room_unknown_id_raises() -> None: + layers = decompose_layers(parse_map_packet(FIXTURE.read_bytes())) + with pytest.raises(KeyError): + layers.render_room(999, (0, 0, 0, 255)) + + def test_calibration_roundtrip() -> None: cal = GridCalibration(resolution=2.0, origin_x=3.0, origin_y=8.0, y_sign=1) assert cal.world_to_pixel(0, 0) == (3.0, 8.0) @@ -117,3 +161,27 @@ def test_solve_calibration_returns_none_when_unfittable() -> None: # Points so far apart no resolution keeps them on the 6x6 floor block. points = [(0.0, 0.0), (1000.0, 0.0), (0.0, 1000.0)] assert solve_calibration(layers, points, resolutions=[2.0]) is None + + +def test_solve_calibration_with_origin_fits_resolution_from_short_path() -> None: + """With a known origin only resolution is fit, so a tiny path suffices.""" + layers = _floor_block_layers() + true = GridCalibration(2.0, 3.0, 8.0, 1) + points = [true.pixel_to_world(px, py) for px, py in [(4, 7), (6, 5)]] # two points + cal = solve_calibration_with_origin(layers, points, (true.origin_x, true.origin_y), resolutions=[1.0, 2.0, 3.0]) + assert cal is not None + assert (cal.resolution, cal.origin_x, cal.origin_y, cal.y_sign) == (2.0, 3.0, 8.0, 1) + + +def test_solve_calibration_with_origin_returns_none_off_floor() -> None: + """A wrong origin that lands the path off floor is rejected, not forced.""" + layers = _floor_block_layers() + true = GridCalibration(2.0, 3.0, 8.0, 1) + points = [true.pixel_to_world(px, py) for px, py in [(4, 7), (6, 5)]] + # Origin shoved into the background corner: every point lands off floor. + assert solve_calibration_with_origin(layers, points, (0.0, 0.0), resolutions=[2.0]) is None + + +def test_solve_calibration_with_origin_returns_none_without_points() -> None: + layers = _floor_block_layers() + assert solve_calibration_with_origin(layers, [], (3.0, 8.0), resolutions=[2.0]) is None diff --git a/tests/map/test_b01_map_parser.py b/tests/map/test_b01_map_parser.py index 0829182e..2e66cd4f 100644 --- a/tests/map/test_b01_map_parser.py +++ b/tests/map/test_b01_map_parser.py @@ -11,7 +11,14 @@ from PIL import Image from roborock.exceptions import RoborockException -from roborock.map.b01_map_parser import B01MapParser, _parse_scmap_payload +from roborock.map.b01_grid_layers import LAYER_BACKGROUND, LAYER_FLOOR, LAYER_WALL +from roborock.map.b01_map_parser import ( + B01MapParser, + _parse_scmap_payload, + classify_q7_cell, + decompose_q7_layers, + q7_calibration, +) from roborock.map.proto.b01_scmap_pb2 import RobotMap # type: ignore[attr-defined] from roborock.protocols.b01_q7_protocol import create_map_key, decode_map_payload @@ -126,6 +133,31 @@ def test_b01_scmap_parser_maps_observed_schema_fields() -> None: assert not parsed.roomDataInfo[1].HasField("roomName") +def test_classify_q7_cell() -> None: + assert classify_q7_cell(0) == LAYER_BACKGROUND + assert classify_q7_cell(127) == LAYER_WALL + assert classify_q7_cell(128) == LAYER_FLOOR + + +def test_q7_layers_and_calibration_from_fixture() -> None: + """Q7 reuses the shared grid decomposition + reads calibration from mapHead.""" + inflated = gzip.decompress(FIXTURE.read_bytes()) + + layers = decompose_q7_layers(inflated) + assert set(layers.class_counts) == {LAYER_BACKGROUND, LAYER_WALL, LAYER_FLOOR} + assert layers.class_counts[LAYER_FLOOR] > 0 + assert layers.rooms == [] # Q7 raster has no per-room segmentation + + cal = q7_calibration(inflated) + assert cal is not None + # mapHead gives minX=-5, minY=-7, resolution=0.05 -> origin from those. + assert cal.resolution == pytest.approx(0.05, abs=1e-4) + assert cal.origin_x == pytest.approx(5.0 / cal.resolution, abs=1.0) + # World origin (0,0) maps inside the grid. + px, py = cal.world_to_pixel(0.0, 0.0) + assert 0 <= px < layers.width and 0 <= py < layers.height + + def test_b01_map_parser_rejects_invalid_payload() -> None: parser = B01MapParser() with pytest.raises(RoborockException, match="Failed to parse B01 SCMap"): diff --git a/tests/map/test_b01_q10_map_parser.py b/tests/map/test_b01_q10_map_parser.py index 53c3c63c..658879aa 100644 --- a/tests/map/test_b01_q10_map_parser.py +++ b/tests/map/test_b01_q10_map_parser.py @@ -18,7 +18,7 @@ FIXTURE = Path(__file__).resolve().parent / "testdata" / "b01_q10_map.bin" TRACE_FIXTURE = Path(__file__).resolve().parent / "testdata" / "b01_q10_trace.bin" TRACE_MULTI_FIXTURE = Path(__file__).resolve().parent / "testdata" / "b01_q10_trace_multi.bin" -# Real 15-point packet captured from an R1 corridor run (full session path). +# Real 14-point packet captured from an R1 corridor run (full session path). TRACE_SESSION_FIXTURE = Path(__file__).resolve().parent / "testdata" / "b01_q10_trace_session.bin" @@ -75,10 +75,12 @@ def _full_header_map_payload(width: int, height: int, decoded_layout: bytes) -> return bytes(payload) -def _trace_payload(points: list[tuple[int, int]], sequence: int = 1) -> bytes: - header = bytearray(10) +def _trace_payload(points: list[tuple[int, int]], sequence: int = 1, heading: int = 0) -> bytes: + header = bytearray(14) # _TRACE_HEADER_LENGTH header[0:2] = b"\x02\x01" header[3] = sequence + header[8:10] = len(points).to_bytes(2, "big") # point count == number of (x, y) pairs + header[10:12] = heading.to_bytes(2, "big", signed=True) body = b"".join(x.to_bytes(2, "big", signed=True) + y.to_bytes(2, "big", signed=True) for x, y in points) return bytes(header) + body @@ -190,30 +192,37 @@ def test_packet_markers_are_distinct() -> None: def test_parse_trace_packet_real_single_point() -> None: - """A real ss07 packet captured early in a session has a single path point.""" + """A real ss07 packet captured while docked: a heading, but no path points. + + The 14-byte header carries point count 0 (bytes 8-9) and heading 169 (bytes + 10-11); the rest of the frame is empty. An earlier 10-byte-header revision + mis-decoded the heading word as a phantom path point ``(169, 0)``. + """ trace = parse_trace_packet(TRACE_FIXTURE.read_bytes()) assert trace.sequence == 9 - assert [(p.x, p.y) for p in trace.points] == [(169, 0)] - assert trace.robot_position is not None - assert (trace.robot_position.x, trace.robot_position.y) == (169, 0) + assert trace.heading == 169 + assert trace.points == [] + assert trace.robot_position is None def test_parse_trace_packet_real_session_path() -> None: - """A real 15-point packet (corridor run) decodes the full accumulated path. + """A real 14-point packet (corridor run) decodes the full accumulated path. Captured live from an R1: the same session emitted packets of 1, then 3, - then 15 points, proving the path accumulates rather than reporting only the - current position. The most recent point is the current robot position. + then 14 points, proving the path accumulates rather than reporting only the + current position. The most recent point is the current robot position, and + the header carries the robot heading (bytes 10-11). """ trace = parse_trace_packet(TRACE_SESSION_FIXTURE.read_bytes()) points = [(p.x, p.y) for p in trace.points] - assert len(points) == 15 - assert points[0] == (-34, 0) # oldest + assert len(points) == 14 + assert trace.heading == -34 # robot orientation, not a path point + assert points[0] == (41, 64) # oldest real point (from byte 14) assert points[-1] == (276, -1) # most recent == current position - # After the initial repositioning, x marches steadily down the corridor. - tail_x = [p[0] for p in points[2:]] + # After the initial reposition, x marches steadily down the corridor. + tail_x = [p[0] for p in points[1:]] assert tail_x == sorted(tail_x) - assert points[-1][0] - points[0][0] > 300 # spans the corridor + assert points[-1][0] - points[0][0] > 200 # spans the corridor assert trace.robot_position is not None assert (trace.robot_position.x, trace.robot_position.y) == (276, -1) @@ -222,6 +231,7 @@ def test_parse_trace_packet_multi_point() -> None: """A multi-point packet decodes all points; position is the most recent.""" trace = parse_trace_packet(TRACE_MULTI_FIXTURE.read_bytes()) assert [(p.x, p.y) for p in trace.points] == [(100, 200), (150, 250), (-50, 300)] + assert trace.heading == 45 # Signed coordinates are supported (negative x). assert trace.robot_position is not None assert (trace.robot_position.x, trace.robot_position.y) == (-50, 300) @@ -243,18 +253,25 @@ def test_parse_trace_keeps_genuine_first_point() -> None: assert [(p.x, p.y) for p in trace.points] == points -def test_parse_trace_session_keeps_initial_reposition() -> None: - """The real corridor capture has a 4.8x first step -- well under the 20x cut.""" +def test_parse_trace_reads_heading_not_a_leading_point() -> None: + """The real corridor capture's heading word is decoded as orientation. + + Bytes 10-11 (``ff de`` == -34) are the SLAM heading, not a path point, so the + first genuine point ``(41, 64)`` is kept and no spurious near-origin point is + introduced (the old 10-byte header surfaced ``(-34, 0)`` here). + """ trace = parse_trace_packet(TRACE_SESSION_FIXTURE.read_bytes()) - assert len(trace.points) == 15 - assert (trace.points[0].x, trace.points[0].y) == (-34, 0) + assert trace.heading == -34 + assert len(trace.points) == 14 + assert (trace.points[0].x, trace.points[0].y) == (41, 64) def test_parse_trace_empty_path_has_no_position() -> None: - header_only = b"\x02\x01" + b"\x00" * 8 # 10-byte header, no points + header_only = b"\x02\x01" + b"\x00" * 12 # 14-byte header, no points trace = parse_trace_packet(header_only) assert trace.points == [] assert trace.robot_position is None + assert trace.heading == 0 def test_parse_trace_rejects_non_trace_packet() -> None: @@ -264,7 +281,7 @@ def test_parse_trace_rejects_non_trace_packet() -> None: def test_parse_trace_rejects_misaligned_points() -> None: with pytest.raises(RoborockException, match="not 4-byte"): - parse_trace_packet(b"\x02\x01" + b"\x00" * 8 + b"\x01\x02\x03") + parse_trace_packet(b"\x02\x01" + b"\x00" * 12 + b"\x01\x02\x03") def test_parse_rejects_bad_layout_length() -> None: @@ -272,3 +289,72 @@ def test_parse_rejects_bad_layout_length() -> None: payload[27:29] = (0xFFFF).to_bytes(2, "big") # compressed length past the buffer with pytest.raises(RoborockException, match="invalid layout block length"): parse_map_packet(bytes(payload)) + + +def test_parse_erase_zones_from_map_packet_tail() -> None: + """Erase rectangles appended after the grid decode to world polygons.""" + rects = [ + [(100, 200), (300, 200), (300, 50), (100, 50)], + [(-40, -10), (10, -10), (10, -60), (-40, -60)], + ] + tail = bytes([len(rects), 4]) + for rect in rects: + for x, y in rect: + tail += int.to_bytes(x & 0xFFFF, 2, "big") + int.to_bytes(y & 0xFFFF, 2, "big") + packet = parse_map_packet(FIXTURE.read_bytes() + tail) + assert [z.vertices for z in packet.erase_zones] == rects # incl. signed coords + + +def test_parse_map_packet_without_erase_tail() -> None: + assert parse_map_packet(FIXTURE.read_bytes()).erase_zones == [] + + +def _calibrated_map_payload( + width: int, + height: int, + decoded_layout: bytes, + *, + origin: tuple[int, int], + resolution: int = 5, + charger: tuple[int, int, int] = (0, 0, 0), +) -> bytes: + """A map packet with the grid-frame header calibration fields populated.""" + payload = bytearray(_full_header_map_payload(width, height, decoded_layout)) + payload[11:13] = origin[0].to_bytes(2, "big", signed=True) + payload[13:15] = origin[1].to_bytes(2, "big", signed=True) + payload[15:17] = resolution.to_bytes(2, "big") + payload[17:19] = charger[0].to_bytes(2, "big", signed=True) + payload[19:21] = charger[1].to_bytes(2, "big", signed=True) + payload[21:23] = charger[2].to_bytes(2, "big", signed=True) + return bytes(payload) + + +def test_parse_header_calibration_fields() -> None: + """The 01 01 header's calibration fields decode to a usable origin (ss07).""" + grid = bytes([8]) * 6 + bytes([12]) * 6 # 4x3 grid, two rooms + layout = grid + b"\x01\x02" + _room_record(2, "rr_kitchen") + _room_record(3, "den") + payload = _calibrated_map_payload(4, 3, layout, origin=(-3760, 1920), resolution=5, charger=(-50, 30, 180)) + cal = parse_map_packet(payload).header_calibration + assert cal is not None + assert (cal.origin_x, cal.origin_y) == (-3760, 1920) + assert cal.resolution == 5 + assert (cal.charger_x, cal.charger_y, cal.charger_phi) == (-50, 30, 180) + assert not cal.is_keepalive + # 5 mm units / (50 mm/px) -> divide by 10 for grid pixels. + assert cal.origin_pixels() == (-376.0, 192.0) + + +def test_parse_header_calibration_keepalive_has_no_origin() -> None: + """A null/keepalive frame (x_min == y_min == 0) yields no usable origin.""" + grid = bytes([8]) * 6 + bytes([12]) * 6 + layout = grid + b"\x01\x02" + _room_record(2, "rr_kitchen") + _room_record(3, "den") + cal = parse_map_packet(_calibrated_map_payload(4, 3, layout, origin=(0, 0))).header_calibration + assert cal is not None + assert cal.is_keepalive + assert cal.origin_pixels() is None + + +def test_real_fixture_header_calibration_is_keepalive() -> None: + """The synthetic fixture carries no header origin, so callers fall back to a fit.""" + cal = parse_map_packet(FIXTURE.read_bytes()).header_calibration + assert cal is not None and cal.is_keepalive diff --git a/tests/map/test_b01_q10_overlays.py b/tests/map/test_b01_q10_overlays.py index 5af2acc8..1d13eb62 100644 --- a/tests/map/test_b01_q10_overlays.py +++ b/tests/map/test_b01_q10_overlays.py @@ -7,6 +7,7 @@ ZONE_TYPE_NO_MOP, ZONE_TYPE_THRESHOLD, ZONE_TYPE_VIRTUAL_WALL, + parse_virtual_wall_blob, parse_zone_blob, ) @@ -71,3 +72,106 @@ def test_parse_zone_blob_real_record_size_inferred() -> None: rect = _rect(ZONE_TYPE_NO_GO, [(100, 200), (300, 200), (300, 50), (100, 50)]) zones = parse_zone_blob(_blob(1, [rect], record_size=38)) assert len(zones) == 1 and zones[0].vertices[0] == (100, 200) + + +def test_parse_zone_blob_real_rdc_three_no_go() -> None: + """Real DP-55 read-back from our RDC ss07 with three No-Go Zones drawn. + + Captured live; exercises the 38-byte slot walk at count=3 against actual + device bytes (all type 0 = no-go). + """ + blob = ( + "AQMABP9A9ToAEPU6ABDzzv9A884AAAAAAAAAAAAAAAAAAAAAAAAAAAAE/Rb/mv5z/5r+c/3L/Rb9ywAAAA" + "AAAAAAAAAAAAAAAAAAAAAAAAQDpADEB3UAxAd1/GMDpPxjAAAAAAAAAAAAAAAAAAAAAAAAAAA=" + ) + zones = parse_zone_blob(blob) + assert [z.type for z in zones] == [ZONE_TYPE_NO_GO, ZONE_TYPE_NO_GO, ZONE_TYPE_NO_GO] + assert zones[2].vertices == [(932, 196), (1909, 196), (1909, -925), (932, -925)] + + +def test_parse_virtual_wall_blob_real_capture() -> None: + """Real DP-57 read-back from an ss07 (one wall drawn in the app). + + Frame is ``[count]`` + 8-byte ``(x, y)`` records -- no version byte, and the + same coordinate order as the restricted zones (first wire word = x). + Provenance: PR #850 review thread. + """ + walls = parse_virtual_wall_blob("Aflu+cX87PoO") + assert len(walls) == 1 + assert walls[0].type == ZONE_TYPE_VIRTUAL_WALL + assert walls[0].vertices == [(-1682, -1595), (-788, -1522)] + + +def test_parse_virtual_wall_blob_real_r1_horizontal() -> None: + """Real DP-57 read-back from the R1, ground-truthed against the app. + + The wall was drawn horizontally just below the Kids bedroom; it reads back + with x varying (377 -> 951) and y constant (-83), i.e. horizontal. This is + the capture that settled DP 57's axis order: an earlier revision swapped the + axes and would have placed this wall vertical (transposed 90 degrees). + """ + walls = parse_virtual_wall_blob("AQF5/60Dt/+t") + assert [(w.type, w.vertices) for w in walls] == [ + (ZONE_TYPE_VIRTUAL_WALL, [(377, -83), (951, -83)]), + ] + + +def test_parse_virtual_wall_blob_real_r1_mixed_orientation() -> None: + """Real DP-57 read-back from the R1 with one horizontal + one vertical wall. + + Ground-truthed against the app: a wall drawn horizontally and another drawn + (near-)vertically. They decode as such -- the first with y constant, the + second with x near-constant (the 4-unit x drift matches the wall not being + drawn perfectly vertical). This pins DP 57's axis order for *both* + orientations at once, so a mixed pair can't come back transposed. + """ + walls = parse_virtual_wall_blob("AgNyBGMFsARjAf8FAAH7CnE=") + assert [(w.type, w.vertices) for w in walls] == [ + (ZONE_TYPE_VIRTUAL_WALL, [(882, 1123), (1456, 1123)]), # horizontal: y constant + (ZONE_TYPE_VIRTUAL_WALL, [(511, 1280), (507, 2673)]), # vertical: x near-constant + ] + + +def test_parse_virtual_wall_blob_real_rdc_two_walls() -> None: + """Real DP-57 read-back from our RDC ss07 with two Invisible Walls drawn. + + Captured live after drawing the walls in the official app; the old + parse_zone_blob silently returned [] for this exact blob (the regression + this fix addresses). + """ + walls = parse_virtual_wall_blob("AgAz/QMCDv0D/83+Dv/O/OY=") + assert [(w.type, w.vertices) for w in walls] == [ + (ZONE_TYPE_VIRTUAL_WALL, [(51, -765), (526, -765)]), + (ZONE_TYPE_VIRTUAL_WALL, [(-51, -498), (-50, -794)]), + ] + + +def test_parse_virtual_wall_blob_empty_variants() -> None: + assert parse_virtual_wall_blob(None) == [] + assert parse_virtual_wall_blob(b"\x00") == [] # device's "no walls" sentinel + assert parse_virtual_wall_blob("AA==") == [] # base64 of 0x00 + + +def test_parse_virtual_wall_blob_multiple_walls() -> None: + """Two walls back-to-back; each is a separate 8-byte (x, y) record.""" + wall_a = bytes.fromhex("000a0014001e0028") # (x,y)=(10,20)->(30,40) + wall_b = bytes.fromhex("fffb0005fff6000a") # (x,y)=(-5,5)->(-10,10) + walls = parse_virtual_wall_blob(bytes([2]) + wall_a + wall_b) + assert [w.vertices for w in walls] == [[(10, 20), (30, 40)], [(-5, 5), (-10, 10)]] + + +def test_parse_virtual_wall_blob_truncated_record_dropped() -> None: + """A trailing record shorter than 8 bytes is dropped, not misread.""" + blob = bytes([2]) + bytes([0x00, 0x0A, 0x00, 0x14, 0x00, 0x1E, 0x00, 0x28]) + b"\x00\x00" + walls = parse_virtual_wall_blob(blob) + assert [w.vertices for w in walls] == [[(10, 20), (30, 40)]] + + +def test_zone_parser_misframes_virtual_wall_blob() -> None: + """The restricted-zone parser must NOT be used on DP 57 -- it mis-frames it. + + Regression guard for the original bug: a DP-57 blob fed to parse_zone_blob + reads the leading 0x01 as a version and the next coordinate byte as a record + count, yielding garbage (here: nothing), so DP 57 needs its own decoder. + """ + assert parse_zone_blob("Aflu+cX87PoO") != parse_virtual_wall_blob("Aflu+cX87PoO") diff --git a/tests/map/testdata/b01_q10_trace_multi.bin b/tests/map/testdata/b01_q10_trace_multi.bin index 8377e6c0..c388ab0c 100644 Binary files a/tests/map/testdata/b01_q10_trace_multi.bin and b/tests/map/testdata/b01_q10_trace_multi.bin differ diff --git a/tests/protocols/test_b01_q10_protocol.py b/tests/protocols/test_b01_q10_protocol.py index 8e42f07e..bf5ca29d 100644 --- a/tests/protocols/test_b01_q10_protocol.py +++ b/tests/protocols/test_b01_q10_protocol.py @@ -52,7 +52,10 @@ def test_decode_message_trace_packet() -> None: message = _message(TRACE_FIXTURE.read_bytes(), RoborockMessageProtocol.MAP_RESPONSE) decoded = decode_message(message) assert isinstance(decoded, Q10TracePacket) - assert [(p.x, p.y) for p in decoded.points] == [(169, 0)] + # This docked capture carries only a heading (no path points); see the + # parser tests for the full byte-level decode. + assert decoded.points == [] + assert decoded.heading == 169 def test_decode_message_unknown_map_marker_returns_none() -> None: