diff --git a/mypy.ini b/mypy.ini index f6f59369..ebf1c85a 100644 --- a/mypy.ini +++ b/mypy.ini @@ -3,3 +3,6 @@ check_untyped_defs = True [mypy-construct] ignore_missing_imports = True + +[mypy-roborock.cli] +ignore_errors = True diff --git a/roborock/cli.py b/roborock/cli.py index 6ae92b3a..e655ac6e 100644 --- a/roborock/cli.py +++ b/roborock/cli.py @@ -56,6 +56,7 @@ from roborock.devices.device import RoborockDevice from roborock.devices.device_manager import DeviceManager, UserParams, create_device_manager from roborock.devices.traits import Trait +from roborock.devices.traits.b01.q10 import Q10PropertiesApi from roborock.devices.traits.b01.q10.vacuum import VacuumTrait from roborock.devices.traits.v1 import V1TraitMixin from roborock.devices.traits.v1.consumeable import ConsumableAttribute @@ -527,6 +528,45 @@ async def maps(ctx, device_id: str): await _display_v1_trait(context, device_id, lambda v1: v1.maps) +# The Q10 pushes its map ~9s after a dpRequestDps; firmware throttles pushes to +# ~once per 60-70s, so a single request is answered quickly but rapid re-requests +# may not be. This bounds how long a one-shot CLI command waits for that push. +_Q10_MAP_PUSH_TIMEOUT = 30.0 + + +async def _await_q10_map_push( + properties: Q10PropertiesApi, + predicate: Callable[[], bool], + *, + timeout: float = _Q10_MAP_PUSH_TIMEOUT, + allow_cached_on_timeout: bool = False, +) -> bool: + """Nudge a Q10 to push its map/trace and wait for a fresh update. + + The Q10 map API is entirely push-driven: there is no synchronous get-map + request. A ``dpRequestDps`` causes the device to publish a ``MAP_RESPONSE``, + which the device's subscribe loop feeds into the map trait. Here we register + an update listener, send the request, and wait for a newly pushed update to + satisfy ``predicate``. Returns whether it did within ``timeout``. + """ + loop = asyncio.get_running_loop() + updated: asyncio.Future[None] = loop.create_future() + + def on_update() -> None: + if predicate() and not updated.done(): + updated.set_result(None) + + unsub = properties.map.add_update_listener(on_update) + try: + await properties.refresh() + await asyncio.wait_for(updated, timeout=timeout) + return True + except TimeoutError: + return allow_cached_on_timeout and predicate() + finally: + unsub() + + @session.command() @click.option("--device_id", required=True) @click.option("--output-file", required=True, help="Path to save the map image.") @@ -535,10 +575,22 @@ async def maps(ctx, device_id: str): async def map_image(ctx, device_id: str, output_file: str): """Get device map image and save it to a file.""" context: RoborockContext = ctx.obj - trait: MapContentTrait = await _v1_trait(context, device_id, lambda v1: v1.map_content) - if trait.image_content: + device_manager = await context.get_device_manager() + device = await device_manager.get_device(device_id) + if device.b01_q10_properties is not None: + properties = device.b01_q10_properties + await _await_q10_map_push( + properties, + lambda: properties.map.image_content is not None, + allow_cached_on_timeout=True, + ) + image_content = properties.map.image_content + else: + v1_trait: MapContentTrait = await _v1_trait(context, device_id, lambda v1: v1.map_content) + image_content = v1_trait.image_content + if image_content: with open(output_file, "wb") as f: - f.write(trait.image_content) + f.write(image_content) click.echo(f"Map image saved to {output_file}") else: click.echo("No map image content available.") @@ -570,6 +622,39 @@ async def map_data(ctx, device_id: str, include_path: bool): click.echo(dump_json(data_summary)) +@session.command() +@click.option("--device_id", required=True) +@click.option("--include_path", is_flag=True, default=False, help="Include all path points in the output.") +@click.pass_context +@async_command +async def q10_position(ctx, device_id: str, include_path: bool): + """Get the current Q10 robot position and live cleaning path. + + The Q10 only streams its position/path while it is actively cleaning, so this + will report that no live trace is available for an idle/docked robot. + """ + context: RoborockContext = ctx.obj + device_manager = await context.get_device_manager() + device = await device_manager.get_device(device_id) + if device.b01_q10_properties is None: + click.echo("Feature not supported by device") + return + properties = device.b01_q10_properties + got_trace = await _await_q10_map_push(properties, lambda: bool(properties.map.path)) + if not got_trace: + click.echo("No live trace available (the robot only reports position while cleaning).") + return + map_trait = properties.map + position = map_trait.robot_position + summary: dict[str, Any] = { + "robot_position": {"x": position.x, "y": position.y} if position else None, + "path_points": len(map_trait.path), + } + if include_path: + summary["path"] = [[p.x, p.y] for p in map_trait.path] + click.echo(dump_json(summary)) + + @session.command() @click.option("--device_id", required=True) @click.pass_context @@ -711,7 +796,20 @@ async def set_child_lock(ctx, device_id: str, enabled: bool): async def rooms(ctx, device_id: str): """Get device room mapping info.""" context: RoborockContext = ctx.obj - await _display_v1_trait(context, device_id, lambda v1: v1.rooms) + device_manager = await context.get_device_manager() + device = await device_manager.get_device(device_id) + if device.b01_q10_properties is not None: + properties = device.b01_q10_properties + # A valid map may have no room records, so wait on the map arriving + # (image_content) rather than on rooms being non-empty. + await _await_q10_map_push( + properties, + lambda: properties.map.image_content is not None, + allow_cached_on_timeout=True, + ) + click.echo(dump_json({room.id: room.name for room in properties.map.rooms})) + else: + await _display_v1_trait(context, device_id, lambda v1: v1.rooms) @session.command() @@ -1200,6 +1298,7 @@ def write_markdown_table(product_features: dict[str, dict[str, any]], all_featur cli.add_command(maps) cli.add_command(map_image) cli.add_command(map_data) +cli.add_command(q10_position) cli.add_command(consumables) cli.add_command(reset_consumable) cli.add_command(rooms) diff --git a/roborock/devices/rpc/b01_q10_channel.py b/roborock/devices/rpc/b01_q10_channel.py index 1e0510ba..49758326 100644 --- a/roborock/devices/rpc/b01_q10_channel.py +++ b/roborock/devices/rpc/b01_q10_channel.py @@ -2,36 +2,42 @@ import logging from collections.abc import AsyncGenerator -from typing import Any from roborock.data.b01_q10.b01_q10_code_mappings import B01_Q10_DP from roborock.devices.transport.mqtt_channel import MqttChannel from roborock.exceptions import RoborockException from roborock.protocols.b01_q10_protocol import ( ParamsType, - decode_rpc_response, + Q10Message, + decode_message, encode_mqtt_payload, ) _LOGGER = logging.getLogger(__name__) -async def stream_decoded_responses( +async def stream_decoded_messages( mqtt_channel: MqttChannel, -) -> AsyncGenerator[dict[B01_Q10_DP, Any], None]: - """Stream decoded DPS messages received via MQTT.""" +) -> AsyncGenerator[Q10Message, None]: + """Stream decoded Q10 messages received via MQTT. - async for response_message in mqtt_channel.subscribe_stream(): + Each pushed ``RoborockMessage`` is decoded into a typed :data:`Q10Message` + (a DPS status update, a map packet, or a trace packet). Messages that fail + to decode or carry an unrecognized payload are skipped. + """ + + async for message in mqtt_channel.subscribe_stream(): try: - decoded_dps = decode_rpc_response(response_message) + decoded = decode_message(message) except RoborockException as ex: _LOGGER.debug( - "Failed to decode B01 Q10 RPC response: %s: %s", - response_message, + "Failed to decode B01 Q10 message: %s: %s", + message, ex, ) continue - yield decoded_dps + if decoded is not None: + yield decoded async def send_command( diff --git a/roborock/devices/traits/b01/q10/__init__.py b/roborock/devices/traits/b01/q10/__init__.py index 184de2d2..799151a6 100644 --- a/roborock/devices/traits/b01/q10/__init__.py +++ b/roborock/devices/traits/b01/q10/__init__.py @@ -4,17 +4,21 @@ import logging from roborock.data.b01_q10.b01_q10_code_mappings import B01_Q10_DP -from roborock.devices.rpc.b01_q10_channel import stream_decoded_responses +from roborock.devices.rpc.b01_q10_channel import stream_decoded_messages from roborock.devices.traits import Trait from roborock.devices.transport.mqtt_channel import MqttChannel +from roborock.map.b01_q10_map_parser import Q10MapPacket, Q10TracePacket +from roborock.protocols.b01_q10_protocol import Q10DpsUpdate, Q10Message from .command import CommandTrait +from .map import MapContentTrait from .remote import RemoteTrait from .status import StatusTrait from .vacuum import VacuumTrait __all__ = [ "Q10PropertiesApi", + "MapContentTrait", ] _LOGGER = logging.getLogger(__name__) @@ -35,6 +39,9 @@ class Q10PropertiesApi(Trait): remote: RemoteTrait """Trait for sending remote control related commands to Q10 devices.""" + map: MapContentTrait + """Trait for fetching the current parsed map (image + rooms).""" + def __init__(self, channel: MqttChannel) -> None: """Initialize the B01Props API.""" self._channel = channel @@ -42,6 +49,7 @@ def __init__(self, channel: MqttChannel) -> None: self.vacuum = VacuumTrait(self.command) self.remote = RemoteTrait(self.command) self.status = StatusTrait() + self.map = MapContentTrait() self._subscribe_task: asyncio.Task[None] | None = None async def start(self) -> None: @@ -65,14 +73,25 @@ async def refresh(self) -> None: await self.command.send(B01_Q10_DP.REQUEST_DPS, params={}) async def _subscribe_loop(self) -> None: - """Persistent loop to listen for status updates.""" - async for decoded_dps in stream_decoded_responses(self._channel): - _LOGGER.debug("Received Q10 status update: %s", decoded_dps) - - # Notify all traits about a new message and each trait will - # only update what fields that it is responsible for. - # More traits can be added here below. - self.status.update_from_dps(decoded_dps) + """Persistent loop dispatching decoded messages to the read-model traits.""" + async for message in stream_decoded_messages(self._channel): + self._handle_message(message) + + def _handle_message(self, message: Q10Message) -> None: + """Route a single decoded message to the trait responsible for it. + + Map and trace packets arrive as protocol-301 ``MAP_RESPONSE`` pushes (the + Q10 is entirely push-driven: there is no synchronous get-map request, a + ``dpRequestDps`` just nudges the device to publish its current map). DPS + updates feed the status trait. More traits can be dispatched here below. + """ + if isinstance(message, Q10MapPacket): + self.map.update_from_map_packet(message) + elif isinstance(message, Q10TracePacket): + self.map.update_from_trace_packet(message) + elif isinstance(message, Q10DpsUpdate): + _LOGGER.debug("Received Q10 status update: %s", message.dps) + self.status.update_from_dps(message.dps) def create(channel: MqttChannel) -> Q10PropertiesApi: diff --git a/roborock/devices/traits/b01/q10/map.py b/roborock/devices/traits/b01/q10/map.py new file mode 100644 index 00000000..c132def4 --- /dev/null +++ b/roborock/devices/traits/b01/q10/map.py @@ -0,0 +1,110 @@ +"""Map content trait for B01 Q10 devices. + +Unlike the v1 / Q7 maps, the Q10 has no synchronous "get map" command, so this +trait is purely push-driven and mirrors the Q10 ``StatusTrait`` contract: + +- The device pushes its current map/path as protocol-301 ``MAP_RESPONSE`` + messages (a ``dpRequestDps`` nudges it to do so). The protocol layer decodes + those into :class:`Q10MapPacket` / :class:`Q10TracePacket` objects and the + ``Q10PropertiesApi`` subscribe loop routes them to + :meth:`MapContentTrait.update_from_map_packet` / + :meth:`MapContentTrait.update_from_trace_packet`. +- Those methods render/cache the content and notify update listeners (register + via :meth:`add_update_listener`). +- ``image_content``, ``map_data``, ``rooms``, ``path`` and ``robot_position`` + are readable and reflect the most recently pushed map. + +Unlike the Q7, the Q10 map payload is unencrypted, so no map key is required. +""" + +import logging +from dataclasses import dataclass, field + +from vacuum_map_parser_base.map_data import MapData + +from roborock.data import RoborockBase +from roborock.devices.traits.common import TraitUpdateListener +from roborock.map.b01_q10_map_parser import ( + B01Q10MapParser, + B01Q10MapParserConfig, + Q10MapPacket, + Q10Point, + Q10Room, + Q10TracePacket, +) + +_LOGGER = logging.getLogger(__name__) + +_TRUNCATE_LENGTH = 20 + + +@dataclass +class MapContent(RoborockBase): + """Dataclass representing Q10 map content.""" + + image_content: bytes | None = None + """The rendered image of the map in PNG format.""" + + map_data: MapData | None = None + """Parsed map data (image metadata + room names).""" + + rooms: list[Q10Room] = field(default_factory=list) + """Rooms (segments) reported by the device, with ids and names.""" + + path: list[Q10Point] = field(default_factory=list) + """Full path of the current cleaning session (oldest point first). + + The robot accumulates this server-side and serves the whole trajectory so + far in one packet, so it is complete even if we connect mid-session. Only + populated while a cleaning session is active.""" + + robot_position: Q10Point | None = None + """Current robot position (the most recent path point), if known.""" + + def __repr__(self) -> str: + img = self.image_content + if img and len(img) > _TRUNCATE_LENGTH: + img = img[: _TRUNCATE_LENGTH - 3] + b"..." + return f"MapContent(image_content={img!r}, rooms={self.rooms!r})" + + +class MapContentTrait(MapContent, TraitUpdateListener): + """Trait holding the most recently pushed parsed map content for Q10 devices. + + The Q10 has no synchronous get-map request; the device pushes map and trace + packets, which the protocol layer decodes and the ``Q10PropertiesApi`` + subscribe loop feeds into :meth:`update_from_map_packet` / + :meth:`update_from_trace_packet`. Consumers read the cached fields and/or + register a callback with :meth:`add_update_listener` to be notified when new + map content arrives. + """ + + def __init__( + self, + *, + map_parser_config: B01Q10MapParserConfig | None = None, + ) -> None: + super().__init__() + TraitUpdateListener.__init__(self, logger=_LOGGER) + self._map_parser = B01Q10MapParser(map_parser_config) + + def update_from_map_packet(self, packet: Q10MapPacket) -> None: + """Render a pushed full-map packet into the cached image/rooms. + + Rendering failures are logged and skipped (listeners are not notified) so + a single bad push cannot tear down the subscribe loop. + """ + parsed = self._map_parser.parse_packet(packet) + if parsed.image_content is None: + _LOGGER.debug("Failed to render Q10 map image") + return + self.image_content = parsed.image_content + self.map_data = parsed.map_data + self.rooms = packet.rooms + self._notify_update() + + def update_from_trace_packet(self, packet: Q10TracePacket) -> None: + """Cache the path/robot position from a pushed trace packet.""" + self.path = packet.points + self.robot_position = packet.robot_position + self._notify_update() diff --git a/roborock/map/b01_q10_map_parser.py b/roborock/map/b01_q10_map_parser.py new file mode 100644 index 00000000..89dec083 --- /dev/null +++ b/roborock/map/b01_q10_map_parser.py @@ -0,0 +1,399 @@ +"""Parser for Roborock Q10 (B01/ss07) map packets. + +Q10 devices deliver map data as a protocol-301 ``MAP_RESPONSE`` message (pushed a +few seconds after a ``dpRequestDps`` request). Unlike the Q7 ``SCMap`` protobuf +format, the Q10 uses a custom, unencrypted binary packet: + +- ``01 01`` marker, then a ``u32be`` map id (bytes 2-5) and two consecutive + ``u16be`` dimensions: grid width (bytes 7-8) and grid height (bytes 9-10). +- A header field at offset 27 (``u16be``) giving the compressed layout length. +- An LZ4-block-compressed occupancy grid starting at offset 29. Once inflated it + is ``width * height`` cells of grid data followed by room metadata records. +- Room metadata begins with ``01 `` followed by fixed 47-byte + records (id, hints, ascii name). Each room paints cells with value + ``room_id * 4`` in the grid. + +The packet layout was confirmed against live Q10 captures. The format +documentation that informed this clean-room implementation comes from the +``roborock-qseries-map-bridge`` project (GPL-3.0-or-later): +https://github.com/v1b3c0d3x3r/roborock-qseries-map-bridge +""" + +import colorsys +import io +import math +import statistics +from dataclasses import dataclass, field + +from PIL import Image +from vacuum_map_parser_base.config.image_config import ImageConfig +from vacuum_map_parser_base.map_data import ImageData, MapData + +from roborock.exceptions import RoborockException + +from .map_parser import ParsedMapData + +_MAP_FILE_FORMAT = "PNG" + +MAP_PACKET_MARKER = b"\x01\x01" +TRACE_PACKET_MARKER = b"\x02\x01" + +_MAP_ID_OFFSET = 2 +# Width and height are two consecutive u16be fields. An earlier revision read the +# width as u16le at offset 8; that high byte is actually the height's high byte, +# so it only matched the true width when width and height fell in the same +# 256-band -- e.g. a 222x261 map decoded its width as 478 and failed to split. +# Reported and diagnosed by @andrewlyeats (independent B01/Q10 decoder), and +# corroborated by the ioBroker roborock adapter (both read these as u16be). +_WIDTH_OFFSET = 7 +_HEIGHT_OFFSET = 9 +_COMPRESSED_LAYOUT_LENGTH_OFFSET = 27 +_LAYOUT_COMPRESSED_OFFSET = 29 +_ROOM_RECORD_LENGTH = 47 +_ROOM_NAME_LENGTH_OFFSET = 26 +_MAX_ROOMS = 32 + +# Grid cell values >= this are walls / borders rather than room segments. +_WALL_THRESHOLD = 240 + + +@dataclass +class Q10Room: + """A room (segment) described in a Q10 map packet.""" + + id: int + raw_name: str + pixel_value: int + pixel_count: int + + @property + def name(self) -> str: + """User friendly room name (firmware ``rr_`` defaults are normalized).""" + return self.raw_name.removeprefix("rr_").replace("_", " ").strip().title() + + +@dataclass +class Q10MapPacket: + """Decoded contents of a Q10 ``01 01`` map packet.""" + + map_id: int + width: int + height: int + grid: bytes + rooms: list[Q10Room] = field(default_factory=list) + + +@dataclass +class Q10Point: + """A single point in Q10 map/trace coordinate space.""" + + x: int + y: int + + +@dataclass +class Q10TracePacket: + """Decoded contents of a Q10 ``02 01`` cleaning-path packet. + + The robot accumulates the **full path of the current cleaning session** and + serves it in a single packet: ``points`` holds the whole trajectory so far + (oldest first), growing as the robot cleans. This was confirmed live -- a + corridor run produced packets of 1, then 3, then 15 points, each a strict + superset describing the path travelled. Because the robot keeps the path + server-side, a client that connects mid-session still receives the complete + path (this is how the app shows the trail even after a cold launch). + + The robot only emits these while a session is active, so an idle/docked robot + will not produce them. The most recent point is the current robot position. + """ + + points: list[Q10Point] = field(default_factory=list) + sequence: int = 0 + """Session counter (byte 3); increments per cleaning session, tracking the + device clean count. Not a per-packet sequence.""" + + @property + def robot_position(self) -> Q10Point | None: + """The current robot position (the most recent point).""" + return self.points[-1] if self.points else None + + +# Trace packet (``02 01``): a 10-byte header followed by big-endian int16 (x, y) +# point pairs forming the accumulated session path. Header layout confirmed +# against live ss07 captures: byte 3 is a session counter (tracks the device +# clean count); bytes 8-9 are a u16be point count minus one (verified: a 15-point +# packet carried 0x000e == 14). The parser reads all 4-byte pairs in the body +# rather than trusting the count field, so a truncated tail can't desync it. +# NOTE: the format documented by roborock-qseries-map-bridge (18-byte header) +# did not match this firmware -- this 10-byte layout is what the device sent. +_TRACE_HEADER_LENGTH = 10 +_TRACE_SEQUENCE_OFFSET = 3 + +# Some cleans prepend a single stray point to the path, far outside the map +# (e.g. ~(0, -1907) when the real path starts near (-3760, -1920)); it skews the +# rendered start/bounding box and any path-based calibration. We drop points[0] +# only when its step to points[1] is a gross outlier (this multiple of the +# median step of the rest of the path), so a genuine first point is never lost. +# The current position (last point) is unaffected. Trigger and threshold +# reported and verified by @andrewlyeats across independent B01/Q10 captures. +_STRAY_POINT_STEP_RATIO = 20 + + +def is_map_packet(payload: bytes) -> bool: + """Return True if the payload is a Q10 full-map (``01 01``) packet.""" + return payload[:2] == MAP_PACKET_MARKER + + +def is_trace_packet(payload: bytes) -> bool: + """Return True if the payload is a Q10 live trace (``02 01``) packet.""" + return payload[:2] == TRACE_PACKET_MARKER + + +def parse_trace_packet(payload: bytes) -> Q10TracePacket: + """Parse a Q10 ``02 01`` trace packet into path points + robot position.""" + if not is_trace_packet(payload): + raise RoborockException("Payload is not a Q10 trace packet") + if len(payload) < _TRACE_HEADER_LENGTH: + raise RoborockException("Q10 trace packet is shorter than its header") + body = payload[_TRACE_HEADER_LENGTH:] + if len(body) % 4: + raise RoborockException("Q10 trace points are not 4-byte (x, y) pairs") + + points = [ + Q10Point( + x=int.from_bytes(body[offset : offset + 2], "big", signed=True), + y=int.from_bytes(body[offset + 2 : offset + 4], "big", signed=True), + ) + for offset in range(0, len(body), 4) + ] + points = _drop_stray_leading_point(points) + return Q10TracePacket(points=points, sequence=payload[_TRACE_SEQUENCE_OFFSET]) + + +def _drop_stray_leading_point(points: list[Q10Point]) -> list[Q10Point]: + """Drop a spurious leading point that some cleans prepend to the trace. + + Returns ``points`` unchanged unless the very first step is a gross outlier + versus the median of the remaining steps (see ``_STRAY_POINT_STEP_RATIO``), + in which case the first point is dropped. Needs at least three points to have + a stable median to compare against. + """ + if len(points) < 3: + return points + steps = [math.hypot(b.x - a.x, b.y - a.y) for a, b in zip(points, points[1:])] + median_rest = statistics.median(steps[1:]) + if median_rest > 0 and steps[0] > _STRAY_POINT_STEP_RATIO * median_rest: + return points[1:] + return points + + +def lz4_block_decompress(data: bytes) -> bytes: + """Decompress a raw LZ4 *block* (no frame header). + + The Q10 map grid is stored as a single LZ4 block. This implements the + standard LZ4 block format so we don't add a native dependency. + """ + index = 0 + output = bytearray() + + def read_length(value: int) -> int: + nonlocal index + if value != 0x0F: + return value + while True: + if index >= len(data): + raise RoborockException("Truncated LZ4 block while reading length") + part = data[index] + index += 1 + value += part + if part != 0xFF: + return value + + while True: + if index >= len(data): + raise RoborockException("Truncated LZ4 block while reading token") + token = data[index] + index += 1 + + literal_length = read_length((token >> 4) & 0x0F) + end = index + literal_length + if end > len(data): + raise RoborockException("Truncated LZ4 block while reading literals") + output.extend(data[index:end]) + index = end + + if index == len(data): + return bytes(output) + if index + 2 > len(data): + raise RoborockException("Truncated LZ4 block while reading match offset") + + offset = data[index] | (data[index + 1] << 8) + index += 2 + if offset == 0 or offset > len(output): + raise RoborockException("Invalid LZ4 back-reference offset") + + match_length = read_length(token & 0x0F) + 4 + for _ in range(match_length): + output.append(output[-offset]) + + +def _split_with_dims(decoded: bytes, width: int, height: int) -> tuple[bytes, bytes] | None: + """Split the inflated layout into (grid, room_data) using header dimensions. + + Returns ``None`` when ``width * height`` does not leave a well-formed + ``01 `` room-record section, so the caller can fall back to + brute-force inference (e.g. for captures/fixtures without a height field). + """ + area = width * height + if area <= 0 or area > len(decoded): + return None + room_data = decoded[area:] + if len(room_data) < 2 or room_data[0] != 1: + return None + if len(room_data) != 2 + room_data[1] * _ROOM_RECORD_LENGTH: + return None + return decoded[:area], room_data + + +def _infer_layout(decoded: bytes, width: int) -> tuple[int, bytes, bytes]: + """Split the inflated layout into (height, grid, room_data). + + The grid is ``width * height`` cells; the remaining bytes are room records + introduced by an ``01 `` marker. The room count is unknown up + front, so we search for the split that makes the grid rectangular and lines + up with the marker. Used as a fallback when the header carries no usable + height. + """ + for room_count in range(0, _MAX_ROOMS + 1): + room_data_length = 2 + room_count * _ROOM_RECORD_LENGTH + area = len(decoded) - room_data_length + if area <= 0 or area % width: + continue + room_data = decoded[area:] + if room_data[0] == 1 and room_data[1] == room_count: + return area // width, decoded[:area], room_data + raise RoborockException("Could not infer Q10 layout dimensions / room records") + + +def _parse_rooms(room_data: bytes, grid: bytes) -> list[Q10Room]: + rooms: list[Q10Room] = [] + room_count = room_data[1] + for index in range(room_count): + start = 2 + index * _ROOM_RECORD_LENGTH + record = room_data[start : start + _ROOM_RECORD_LENGTH] + room_id = int.from_bytes(record[0:2], "big") + name_length = record[_ROOM_NAME_LENGTH_OFFSET] + raw_name = record[27 : 27 + name_length].decode("utf-8", errors="replace") + pixel_value = (room_id * 4) & 0xFF + rooms.append( + Q10Room( + id=room_id, + raw_name=raw_name, + pixel_value=pixel_value, + pixel_count=grid.count(pixel_value), + ) + ) + return rooms + + +def parse_map_packet(payload: bytes) -> Q10MapPacket: + """Parse a Q10 ``01 01`` map packet into grid + room metadata.""" + if len(payload) < _LAYOUT_COMPRESSED_OFFSET or not is_map_packet(payload): + raise RoborockException("Payload is not a Q10 map packet") + + map_id = int.from_bytes(payload[_MAP_ID_OFFSET : _MAP_ID_OFFSET + 4], "big") + width = int.from_bytes(payload[_WIDTH_OFFSET : _WIDTH_OFFSET + 2], "big") + height = int.from_bytes(payload[_HEIGHT_OFFSET : _HEIGHT_OFFSET + 2], "big") + if width <= 0: + raise RoborockException("Q10 map packet has invalid width") + + compressed_length = int.from_bytes( + payload[_COMPRESSED_LAYOUT_LENGTH_OFFSET : _COMPRESSED_LAYOUT_LENGTH_OFFSET + 2], "big" + ) + layout_end = _LAYOUT_COMPRESSED_OFFSET + compressed_length + if compressed_length <= 0 or layout_end > len(payload): + raise RoborockException("Q10 map packet has invalid layout block length") + + decoded = lz4_block_decompress(payload[_LAYOUT_COMPRESSED_OFFSET:layout_end]) + # Prefer the header height; fall back to inference if it doesn't line up + # (e.g. older captures/fixtures that don't populate the height field). + split = _split_with_dims(decoded, width, height) if height > 0 else None + if split is not None: + grid, room_data = split + else: + height, grid, room_data = _infer_layout(decoded, width) + rooms = _parse_rooms(room_data, grid) + return Q10MapPacket(map_id=map_id, width=width, height=height, grid=grid, rooms=rooms) + + +@dataclass +class B01Q10MapParserConfig: + """Configuration for the Q10 map parser.""" + + map_scale: int = 4 + """Scale factor for the rendered map image.""" + + +class B01Q10MapParser: + """Decoder/renderer for Q10 ``MAP_RESPONSE`` (protocol 301) payloads.""" + + def __init__(self, config: B01Q10MapParserConfig | None = None) -> None: + self._config = config or B01Q10MapParserConfig() + + def parse(self, payload: bytes) -> ParsedMapData: + """Parse a raw Q10 map packet into a rendered PNG + ``MapData``.""" + return self.parse_packet(parse_map_packet(payload)) + + def parse_packet(self, packet: Q10MapPacket) -> ParsedMapData: + """Render an already-parsed Q10 map packet into a PNG + ``MapData``. + + The protocol layer parses the wire bytes into a :class:`Q10MapPacket`; + this renders that packet without re-parsing it. + """ + image = self._render(packet) + + map_data = MapData() + map_data.image = ImageData( + size=packet.width * packet.height, + top=0, + left=0, + height=packet.height, + width=packet.width, + image_config=ImageConfig(scale=self._config.map_scale), + data=image, + img_transformation=lambda p: p, + ) + room_names = {room.id: room.name for room in packet.rooms} + if room_names: + map_data.additional_parameters["room_names"] = room_names + + image_bytes = io.BytesIO() + image.save(image_bytes, format=_MAP_FILE_FORMAT) + return ParsedMapData(image_content=image_bytes.getvalue(), map_data=map_data) + + def _render(self, packet: Q10MapPacket) -> Image.Image: + """Render the Q10 grid: rooms get distinct colors, walls white, rest dark.""" + palette = _build_palette(packet.grid) + rgb = bytearray() + for value in packet.grid: + rgb.extend(palette[value]) + img = Image.frombytes("RGB", (packet.width, packet.height), bytes(rgb)) + img = img.transpose(Image.Transpose.FLIP_TOP_BOTTOM) + scale = self._config.map_scale + if scale > 1: + img = img.resize((packet.width * scale, packet.height * scale), resample=Image.Resampling.NEAREST) + return img + + +def _build_palette(grid: bytes) -> list[tuple[int, int, int]]: + """Map each grid value to an RGB color (rooms distinct, walls white).""" + palette: list[tuple[int, int, int]] = [(28, 30, 38)] * 256 # default: unknown/outside + room_values = sorted({v for v in set(grid) if 0 < v < _WALL_THRESHOLD}) + for index, value in enumerate(room_values): + hue = (index * 0.139) % 1.0 + r, g, b = colorsys.hsv_to_rgb(hue, 0.5, 0.95) + palette[value] = (int(r * 255), int(g * 255), int(b * 255)) + for value in range(_WALL_THRESHOLD, 256): + palette[value] = (235, 235, 240) # walls / borders + palette[0] = (28, 30, 38) + return palette diff --git a/roborock/protocols/b01_q10_protocol.py b/roborock/protocols/b01_q10_protocol.py index 94a1e7b3..c0cf6b9b 100644 --- a/roborock/protocols/b01_q10_protocol.py +++ b/roborock/protocols/b01_q10_protocol.py @@ -2,10 +2,19 @@ import json import logging +from dataclasses import dataclass from typing import Any from roborock.data.b01_q10.b01_q10_code_mappings import B01_Q10_DP from roborock.exceptions import RoborockException +from roborock.map.b01_q10_map_parser import ( + Q10MapPacket, + Q10TracePacket, + is_map_packet, + is_trace_packet, + parse_map_packet, + parse_trace_packet, +) from roborock.roborock_message import ( RoborockMessage, RoborockMessageProtocol, @@ -85,3 +94,37 @@ def decode_rpc_response(message: RoborockMessage) -> dict[B01_Q10_DP, Any]: result.update(common_dps_result) return result + + +@dataclass +class Q10DpsUpdate: + """A decoded Q10 DPS status update pushed by the device.""" + + dps: dict[B01_Q10_DP, Any] + """Data points keyed by ``B01_Q10_DP`` code.""" + + +# A single decoded message from a Q10 device: a DPS status update, a full map +# packet, or a live cleaning-path (trace) packet. Map/trace packets arrive as +# protocol-301 ``MAP_RESPONSE`` pushes; everything else is a DPS update. +Q10Message = Q10DpsUpdate | Q10MapPacket | Q10TracePacket + + +def decode_message(message: RoborockMessage) -> Q10Message | None: + """Decode a pushed Q10 ``RoborockMessage`` into a typed message. + + ``MAP_RESPONSE`` (protocol 301) payloads carry the binary map (``01 01``) or + trace (``02 01``) packets, which are parsed by the map parser; any other + ``MAP_RESPONSE`` marker is unrecognized and yields ``None``. Every other + protocol is treated as a DPS status update. + + Raises ``RoborockException`` if a recognized payload fails to parse. + """ + if message.protocol == RoborockMessageProtocol.MAP_RESPONSE: + payload = message.payload or b"" + if is_map_packet(payload): + return parse_map_packet(payload) + if is_trace_packet(payload): + return parse_trace_packet(payload) + return None + return Q10DpsUpdate(dps=decode_rpc_response(message)) diff --git a/tests/devices/traits/b01/q10/test_map.py b/tests/devices/traits/b01/q10/test_map.py new file mode 100644 index 00000000..470f3f99 --- /dev/null +++ b/tests/devices/traits/b01/q10/test_map.py @@ -0,0 +1,180 @@ +"""Tests for the Q10 B01 map content trait. + +The Q10 map API is push-driven: the device publishes ``MAP_RESPONSE`` messages +and the trait updates its cached state from them via ``update_from_map_response`` +(there is no synchronous get-map request). +""" + +import asyncio +from collections.abc import AsyncGenerator +from pathlib import Path +from typing import cast +from unittest.mock import AsyncMock, Mock + +import pytest + +from roborock.cli import _await_q10_map_push, cli +from roborock.devices.traits.b01.q10 import Q10PropertiesApi, create +from roborock.devices.traits.b01.q10.map import MapContentTrait +from roborock.map.b01_q10_map_parser import Q10Point, parse_map_packet, parse_trace_packet +from roborock.roborock_message import RoborockMessage, RoborockMessageProtocol + +FIXTURE = Path("tests/map/testdata/b01_q10_map.bin") +TRACE_FIXTURE = Path("tests/map/testdata/b01_q10_trace.bin") + + +def _map_message( + payload: bytes, protocol: RoborockMessageProtocol = RoborockMessageProtocol.MAP_RESPONSE +) -> RoborockMessage: + return RoborockMessage(protocol=protocol, payload=payload, version=b"B01") + + +def test_update_from_map_packet_populates_image_and_rooms() -> None: + """A parsed 01 01 map packet populates the image, rooms and map data.""" + packet = parse_map_packet(FIXTURE.read_bytes()) + trait = MapContentTrait() + updates: list[None] = [] + trait.add_update_listener(lambda: updates.append(None)) + + trait.update_from_map_packet(packet) + + assert trait.image_content is not None + assert trait.image_content[:8] == b"\x89PNG\r\n\x1a\n" + assert {room.id: room.name for room in trait.rooms} == {2: "Living Room", 3: "Bedroom"} + assert trait.map_data is not None + assert len(updates) == 1 + + +def test_update_from_trace_packet_populates_path_and_position() -> None: + """A parsed 02 01 trace packet populates the path and robot position.""" + packet = parse_trace_packet(TRACE_FIXTURE.read_bytes()) + trait = MapContentTrait() + updates: list[None] = [] + trait.add_update_listener(lambda: updates.append(None)) + + trait.update_from_trace_packet(packet) + + assert [(p.x, p.y) for p in trait.path] == [(169, 0)] + assert trait.robot_position is not None + assert (trait.robot_position.x, trait.robot_position.y) == (169, 0) + assert len(updates) == 1 + + +def test_q10_position_is_available_as_top_level_cli_command() -> None: + assert "q10-position" in cli.commands + + +# --- CLI push waiting -------------------------------------------------------- + + +class _FakeQ10Properties: + def __init__(self) -> None: + self.map = MapContentTrait() + self.refresh_count = 0 + + async def refresh(self) -> None: + self.refresh_count += 1 + + +class _FakeQ10PropertiesWithTrace(_FakeQ10Properties): + async def refresh(self) -> None: + await super().refresh() + self.map.update_from_trace_packet(parse_trace_packet(TRACE_FIXTURE.read_bytes())) + + +async def test_await_q10_map_push_waits_for_fresh_update() -> None: + """A cached trace alone is not treated as a successful new map push.""" + properties = _FakeQ10Properties() + properties.map.path = [Q10Point(1, 2)] + + got_trace = await _await_q10_map_push( + cast(Q10PropertiesApi, properties), lambda: bool(properties.map.path), timeout=0.01 + ) + + assert got_trace is False + assert properties.refresh_count == 1 + + +async def test_await_q10_map_push_returns_true_after_update() -> None: + properties = _FakeQ10PropertiesWithTrace() + + got_trace = await _await_q10_map_push( + cast(Q10PropertiesApi, properties), lambda: bool(properties.map.path), timeout=0.01 + ) + + assert got_trace is True + assert [(p.x, p.y) for p in properties.map.path] == [(169, 0)] + + +async def test_await_q10_map_push_can_fall_back_to_cached_map_on_timeout() -> None: + properties = _FakeQ10Properties() + properties.map.image_content = b"cached-png" + + got_map = await _await_q10_map_push( + cast(Q10PropertiesApi, properties), + lambda: properties.map.image_content is not None, + timeout=0.01, + allow_cached_on_timeout=True, + ) + + assert got_map is True + assert properties.refresh_count == 1 + + +# --- Integration through the Q10PropertiesApi subscribe loop ----------------- + + +@pytest.fixture +def message_queue() -> asyncio.Queue[RoborockMessage]: + return asyncio.Queue() + + +@pytest.fixture +def mock_channel(message_queue: asyncio.Queue[RoborockMessage]) -> AsyncMock: + async def mock_stream() -> AsyncGenerator[RoborockMessage, None]: + while True: + yield await message_queue.get() + + channel = AsyncMock() + channel.subscribe_stream = Mock(return_value=mock_stream()) + return channel + + +@pytest.fixture +async def q10_api(mock_channel: AsyncMock) -> AsyncGenerator[Q10PropertiesApi, None]: + api = create(mock_channel) + await api.start() + yield api + await api.close() + + +async def _wait_for(predicate, timeout: float = 2.0) -> None: + async with asyncio.timeout(timeout): + while not predicate(): + await asyncio.sleep(0.01) + + +async def test_subscribe_loop_routes_map_push( + q10_api: Q10PropertiesApi, + message_queue: asyncio.Queue[RoborockMessage], +) -> None: + """A map pushed onto the stream is routed to the map trait by the loop.""" + assert q10_api.map.image_content is None + + message_queue.put_nowait(_map_message(FIXTURE.read_bytes())) + + await _wait_for(lambda: q10_api.map.image_content is not None) + assert {room.id: room.name for room in q10_api.map.rooms} == {2: "Living Room", 3: "Bedroom"} + + +async def test_subscribe_loop_routes_trace_push( + q10_api: Q10PropertiesApi, + message_queue: asyncio.Queue[RoborockMessage], +) -> None: + """A trace pushed onto the stream is routed to the map trait by the loop.""" + assert not q10_api.map.path + + message_queue.put_nowait(_map_message(TRACE_FIXTURE.read_bytes())) + + await _wait_for(lambda: bool(q10_api.map.path)) + assert q10_api.map.robot_position is not None diff --git a/tests/map/test_b01_q10_map_parser.py b/tests/map/test_b01_q10_map_parser.py new file mode 100644 index 00000000..53c3c63c --- /dev/null +++ b/tests/map/test_b01_q10_map_parser.py @@ -0,0 +1,274 @@ +"""Tests for the Roborock Q10 (B01/ss07) map parser.""" + +from pathlib import Path + +import pytest + +from roborock.exceptions import RoborockException +from roborock.map.b01_q10_map_parser import ( + B01Q10MapParser, + Q10Room, + is_map_packet, + is_trace_packet, + lz4_block_decompress, + parse_map_packet, + parse_trace_packet, +) + +FIXTURE = Path(__file__).resolve().parent / "testdata" / "b01_q10_map.bin" +TRACE_FIXTURE = Path(__file__).resolve().parent / "testdata" / "b01_q10_trace.bin" +TRACE_MULTI_FIXTURE = Path(__file__).resolve().parent / "testdata" / "b01_q10_trace_multi.bin" +# Real 15-point packet captured from an R1 corridor run (full session path). +TRACE_SESSION_FIXTURE = Path(__file__).resolve().parent / "testdata" / "b01_q10_trace_session.bin" + + +def _payload() -> bytes: + return FIXTURE.read_bytes() + + +def _literal_lz4_block(data: bytes) -> bytes: + block = bytearray() + literal_length = len(data) + if literal_length < 15: + block.append(literal_length << 4) + else: + block.append(0xF0) + remaining = literal_length - 15 + while remaining >= 0xFF: + block.append(0xFF) + remaining -= 0xFF + block.append(remaining) + block.extend(data) + return bytes(block) + + +def _synthetic_map_payload(width: int, decoded_layout: bytes) -> bytes: + compressed = _literal_lz4_block(decoded_layout) + payload = bytearray(29) + payload[0:2] = b"\x01\x01" + payload[2:6] = (0x01020304).to_bytes(4, "big") + payload[8:10] = width.to_bytes(2, "little") + payload[27:29] = len(compressed).to_bytes(2, "big") + payload.extend(compressed) + return bytes(payload) + + +def _room_record(room_id: int, name: str) -> bytes: + record = bytearray(47) # _ROOM_RECORD_LENGTH + record[0:2] = room_id.to_bytes(2, "big") + encoded = name.encode("utf-8") + record[26] = len(encoded) + record[27 : 27 + len(encoded)] = encoded + return bytes(record) + + +def _full_header_map_payload(width: int, height: int, decoded_layout: bytes) -> bytes: + """Build a map packet that populates the real u16be width and height fields.""" + compressed = _literal_lz4_block(decoded_layout) + payload = bytearray(29) + payload[0:2] = b"\x01\x01" + payload[2:6] = (0x01020304).to_bytes(4, "big") + payload[7:9] = width.to_bytes(2, "big") + payload[9:11] = height.to_bytes(2, "big") + payload[27:29] = len(compressed).to_bytes(2, "big") + payload.extend(compressed) + return bytes(payload) + + +def _trace_payload(points: list[tuple[int, int]], sequence: int = 1) -> bytes: + header = bytearray(10) + header[0:2] = b"\x02\x01" + header[3] = sequence + body = b"".join(x.to_bytes(2, "big", signed=True) + y.to_bytes(2, "big", signed=True) for x, y in points) + return bytes(header) + body + + +def test_lz4_block_roundtrip_all_literals() -> None: + """A simple all-literals block decodes back to the original bytes.""" + original = bytes(range(60)) * 3 + block = bytearray() + block.append(0x0F << 4) + block.append(len(original) - 15) + block += original + assert lz4_block_decompress(bytes(block)) == original + + +def test_lz4_block_back_reference() -> None: + """Back-references expand runs (e.g. RLE-style repeats).""" + # seq1: 1 literal 'A', then match (offset 1, length 4+4=8) -> 'A' x9. + # seq2: final literals-only token (0 literals) ends the block per LZ4 spec. + block = bytes([0x14, ord("A"), 0x01, 0x00, 0x00]) + assert lz4_block_decompress(block) == b"A" * 9 + + +def test_is_map_packet() -> None: + assert is_map_packet(b"\x01\x01rest") + assert not is_map_packet(b"\x02\x01rest") # trace packet + assert not is_map_packet(b"") + + +def test_parse_map_packet() -> None: + packet = parse_map_packet(_payload()) + assert packet.width == 8 + assert packet.height == 6 + assert packet.map_id == 0x01020304 + assert len(packet.grid) == packet.width * packet.height + assert [(r.id, r.raw_name) for r in packet.rooms] == [(2, "rr_living_room"), (3, "bedroom")] + + +def test_parse_map_packet_allows_zero_room_metadata() -> None: + """A map can be present before the robot has room segmentation records.""" + grid = bytes([240, 240, 249, 243, 240, 240]) + packet = parse_map_packet(_synthetic_map_payload(width=3, decoded_layout=grid + b"\x01\x00")) + assert packet.width == 3 + assert packet.height == 2 + assert packet.grid == grid + assert packet.rooms == [] + + +def test_parse_map_packet_reads_header_height() -> None: + """Width and height come straight from the u16be header fields.""" + grid = bytes([8]) * 6 + bytes([12]) * 6 # two rooms, 4x3 grid + layout = grid + b"\x01\x02" + _room_record(2, "rr_kitchen") + _room_record(3, "den") + packet = parse_map_packet(_full_header_map_payload(width=4, height=3, decoded_layout=layout)) + assert (packet.width, packet.height) == (4, 3) + assert [(r.id, r.raw_name) for r in packet.rooms] == [(2, "rr_kitchen"), (3, "den")] + + +def test_parse_map_packet_dimensions_straddling_256() -> None: + """Regression: a 222x261 map (dimensions in different 256-bands). + + Width and height are consecutive u16be header fields. The earlier u16le read + at offset 8 picked up the height's high byte, decoding this map's width as + 478 (0xDE | 0x01 << 8) and failing to split the layout. Reported, diagnosed + and verified by @andrewlyeats from a real 222x261 capture. + """ + width, height = 222, 261 + grid = bytearray(width * height) + grid[0:100] = bytes([8]) * 100 # room id 2 -> pixel value 8 + grid[100:250] = bytes([12]) * 150 # room id 3 -> pixel value 12 + layout = bytes(grid) + b"\x01\x02" + _room_record(2, "rr_kitchen") + _room_record(3, "den") + payload = _full_header_map_payload(width, height, layout) + # The old u16le @ offset 8 read would have produced 478, not 222. + assert int.from_bytes(payload[8:10], "little") == 478 + packet = parse_map_packet(payload) + assert (packet.width, packet.height) == (222, 261) + assert [(r.id, r.raw_name) for r in packet.rooms] == [(2, "rr_kitchen"), (3, "den")] + + +def test_room_name_normalization() -> None: + """Firmware ``rr_`` default names are normalized; custom names are titled.""" + assert Q10Room(id=2, raw_name="rr_living_room", pixel_value=8, pixel_count=9).name == "Living Room" + assert Q10Room(id=3, raw_name="bedroom", pixel_value=12, pixel_count=9).name == "Bedroom" + + +def test_room_pixel_count_matches_grid() -> None: + packet = parse_map_packet(_payload()) + for room in packet.rooms: + assert room.pixel_value == (room.id * 4) & 0xFF + assert room.pixel_count == packet.grid.count(room.pixel_value) + + +def test_parser_renders_png_and_room_names() -> None: + parsed = B01Q10MapParser().parse(_payload()) + assert parsed.image_content is not None + assert parsed.image_content[:8] == b"\x89PNG\r\n\x1a\n" # PNG magic + assert parsed.map_data is not None + assert parsed.map_data.additional_parameters["room_names"] == {2: "Living Room", 3: "Bedroom"} + + +def test_parse_rejects_non_map_packet() -> None: + with pytest.raises(RoborockException, match="not a Q10 map packet"): + parse_map_packet(b"\x02\x01" + b"\x00" * 40) + + +def test_packet_markers_are_distinct() -> None: + map_payload = _payload() + trace_payload = TRACE_FIXTURE.read_bytes() + assert is_map_packet(map_payload) and not is_trace_packet(map_payload) + assert is_trace_packet(trace_payload) and not is_map_packet(trace_payload) + + +def test_parse_trace_packet_real_single_point() -> None: + """A real ss07 packet captured early in a session has a single path point.""" + trace = parse_trace_packet(TRACE_FIXTURE.read_bytes()) + assert trace.sequence == 9 + assert [(p.x, p.y) for p in trace.points] == [(169, 0)] + assert trace.robot_position is not None + assert (trace.robot_position.x, trace.robot_position.y) == (169, 0) + + +def test_parse_trace_packet_real_session_path() -> None: + """A real 15-point packet (corridor run) decodes the full accumulated path. + + Captured live from an R1: the same session emitted packets of 1, then 3, + then 15 points, proving the path accumulates rather than reporting only the + current position. The most recent point is the current robot position. + """ + trace = parse_trace_packet(TRACE_SESSION_FIXTURE.read_bytes()) + points = [(p.x, p.y) for p in trace.points] + assert len(points) == 15 + assert points[0] == (-34, 0) # oldest + assert points[-1] == (276, -1) # most recent == current position + # After the initial repositioning, x marches steadily down the corridor. + tail_x = [p[0] for p in points[2:]] + assert tail_x == sorted(tail_x) + assert points[-1][0] - points[0][0] > 300 # spans the corridor + assert trace.robot_position is not None + assert (trace.robot_position.x, trace.robot_position.y) == (276, -1) + + +def test_parse_trace_packet_multi_point() -> None: + """A multi-point packet decodes all points; position is the most recent.""" + trace = parse_trace_packet(TRACE_MULTI_FIXTURE.read_bytes()) + assert [(p.x, p.y) for p in trace.points] == [(100, 200), (150, 250), (-50, 300)] + # Signed coordinates are supported (negative x). + assert trace.robot_position is not None + assert (trace.robot_position.x, trace.robot_position.y) == (-50, 300) + + +def test_parse_trace_drops_stray_leading_point() -> None: + """A stray first point far outside the path is dropped (calibration hygiene).""" + points = [(0, -1907), (-3760, -1920), (-3758, -1918), (-3756, -1919)] + trace = parse_trace_packet(_trace_payload(points)) + assert [(p.x, p.y) for p in trace.points] == points[1:] + assert trace.robot_position is not None + assert (trace.robot_position.x, trace.robot_position.y) == points[-1] + + +def test_parse_trace_keeps_genuine_first_point() -> None: + """A normal first step (same scale as the rest) is never dropped.""" + points = [(0, 0), (10, 0), (22, 0), (35, 1)] + trace = parse_trace_packet(_trace_payload(points)) + assert [(p.x, p.y) for p in trace.points] == points + + +def test_parse_trace_session_keeps_initial_reposition() -> None: + """The real corridor capture has a 4.8x first step -- well under the 20x cut.""" + trace = parse_trace_packet(TRACE_SESSION_FIXTURE.read_bytes()) + assert len(trace.points) == 15 + assert (trace.points[0].x, trace.points[0].y) == (-34, 0) + + +def test_parse_trace_empty_path_has_no_position() -> None: + header_only = b"\x02\x01" + b"\x00" * 8 # 10-byte header, no points + trace = parse_trace_packet(header_only) + assert trace.points == [] + assert trace.robot_position is None + + +def test_parse_trace_rejects_non_trace_packet() -> None: + with pytest.raises(RoborockException, match="not a Q10 trace packet"): + parse_trace_packet(_payload()) + + +def test_parse_trace_rejects_misaligned_points() -> None: + with pytest.raises(RoborockException, match="not 4-byte"): + parse_trace_packet(b"\x02\x01" + b"\x00" * 8 + b"\x01\x02\x03") + + +def test_parse_rejects_bad_layout_length() -> None: + payload = bytearray(_payload()) + payload[27:29] = (0xFFFF).to_bytes(2, "big") # compressed length past the buffer + with pytest.raises(RoborockException, match="invalid layout block length"): + parse_map_packet(bytes(payload)) diff --git a/tests/map/testdata/b01_q10_map.bin b/tests/map/testdata/b01_q10_map.bin new file mode 100644 index 00000000..05ca7083 Binary files /dev/null and b/tests/map/testdata/b01_q10_map.bin differ diff --git a/tests/map/testdata/b01_q10_trace.bin b/tests/map/testdata/b01_q10_trace.bin new file mode 100644 index 00000000..ace261d0 Binary files /dev/null and b/tests/map/testdata/b01_q10_trace.bin differ diff --git a/tests/map/testdata/b01_q10_trace_multi.bin b/tests/map/testdata/b01_q10_trace_multi.bin new file mode 100644 index 00000000..8377e6c0 Binary files /dev/null and b/tests/map/testdata/b01_q10_trace_multi.bin differ diff --git a/tests/map/testdata/b01_q10_trace_session.bin b/tests/map/testdata/b01_q10_trace_session.bin new file mode 100644 index 00000000..a4222e81 Binary files /dev/null and b/tests/map/testdata/b01_q10_trace_session.bin differ diff --git a/tests/protocols/test_b01_q10_protocol.py b/tests/protocols/test_b01_q10_protocol.py index 62ee2c27..8e42f07e 100644 --- a/tests/protocols/test_b01_q10_protocol.py +++ b/tests/protocols/test_b01_q10_protocol.py @@ -11,7 +11,10 @@ from roborock.data.b01_q10.b01_q10_code_mappings import B01_Q10_DP, YXWaterLevel from roborock.exceptions import RoborockException +from roborock.map.b01_q10_map_parser import Q10MapPacket, Q10TracePacket from roborock.protocols.b01_q10_protocol import ( + Q10DpsUpdate, + decode_message, decode_rpc_response, encode_mqtt_payload, ) @@ -21,6 +24,42 @@ TESTDATA_FILES = list(TESTDATA_PATH.glob("*.json")) TESTDATA_IDS = [x.stem for x in TESTDATA_FILES] +MAP_FIXTURE = pathlib.Path("tests/map/testdata/b01_q10_map.bin") +TRACE_FIXTURE = pathlib.Path("tests/map/testdata/b01_q10_trace.bin") + + +def _message(payload: bytes, protocol: RoborockMessageProtocol) -> RoborockMessage: + return RoborockMessage(protocol=protocol, payload=payload, version=b"B01") + + +def test_decode_message_dps_update() -> None: + """A non-MAP_RESPONSE message decodes into a Q10DpsUpdate.""" + message = _message(b'{"dps": {"122": 100}}', RoborockMessageProtocol.RPC_RESPONSE) + decoded = decode_message(message) + assert decoded == Q10DpsUpdate(dps={B01_Q10_DP.BATTERY: 100}) + + +def test_decode_message_map_packet() -> None: + """A MAP_RESPONSE 01 01 payload decodes into a Q10MapPacket.""" + message = _message(MAP_FIXTURE.read_bytes(), RoborockMessageProtocol.MAP_RESPONSE) + decoded = decode_message(message) + assert isinstance(decoded, Q10MapPacket) + assert {room.id: room.name for room in decoded.rooms} == {2: "Living Room", 3: "Bedroom"} + + +def test_decode_message_trace_packet() -> None: + """A MAP_RESPONSE 02 01 payload decodes into a Q10TracePacket.""" + message = _message(TRACE_FIXTURE.read_bytes(), RoborockMessageProtocol.MAP_RESPONSE) + decoded = decode_message(message) + assert isinstance(decoded, Q10TracePacket) + assert [(p.x, p.y) for p in decoded.points] == [(169, 0)] + + +def test_decode_message_unknown_map_marker_returns_none() -> None: + """A MAP_RESPONSE with an unrecognized marker is skipped (returns None).""" + assert decode_message(_message(b"\x09\x09junk", RoborockMessageProtocol.MAP_RESPONSE)) is None + assert decode_message(_message(b"", RoborockMessageProtocol.MAP_RESPONSE)) is None + @pytest.fixture(autouse=True) def fixed_time_fixture() -> Generator[None, None, None]: