From bd4b3f98583ffcb4e36a4ab71b94551f30a24246 Mon Sep 17 00:00:00 2001 From: Vincent <2070309+tubededentifrice@users.noreply.github.com> Date: Sun, 14 Jun 2026 11:26:41 +0400 Subject: [PATCH 1/9] feat: add Q10 (B01/ss07) map support with rooms and rendered image Brings Q10 maps toward parity with V1 devices. Verified end-to-end against two physical Q10 (roborock.vacuum.ss07) robots. Protocol (reverse-engineered from live captures): - Requesting device state (dpRequestDps) makes the robot push its current map as a protocol-301 MAP_RESPONSE a few seconds later (firmware throttles to ~once per minute). - The "01 01" map packet carries a u32be map id, u16le grid width, and an LZ4-block-compressed occupancy grid followed by 47-byte room records (id + ascii name); room cells use value room_id*4. The payload is unencrypted, unlike the Q7 SCMap protobuf format. Changes: - roborock/map/b01_q10_map_parser.py: clean LZ4 block decoder + packet parser + renderer producing a PNG and MapData with room names. - roborock/devices/rpc/b01_q10_channel.py: request_map() triggers and awaits the MAP_RESPONSE push. - roborock/devices/traits/b01/q10/map.py: MapContentTrait (refresh/parse/image/ rooms), wired into Q10PropertiesApi. - cli: `map-image` and `rooms` now work for Q10 devices. - Tests + a synthetic (no-PII) map fixture. Map packet format documentation credit: the roborock-qseries-map-bridge project (GPL-3.0): https://github.com/v1b3c0d3x3r/roborock-qseries-map-bridge --- roborock/cli.py | 23 +- roborock/devices/rpc/b01_q10_channel.py | 36 +++ roborock/devices/traits/b01/q10/__init__.py | 6 + roborock/devices/traits/b01/q10/map.py | 87 +++++++ roborock/map/b01_q10_map_parser.py | 257 ++++++++++++++++++++ tests/devices/traits/b01/q10/test_map.py | 67 +++++ tests/map/test_b01_q10_map_parser.py | 86 +++++++ tests/map/testdata/b01_q10_map.bin | Bin 0 -> 175 bytes 8 files changed, 558 insertions(+), 4 deletions(-) create mode 100644 roborock/devices/traits/b01/q10/map.py create mode 100644 roborock/map/b01_q10_map_parser.py create mode 100644 tests/devices/traits/b01/q10/test_map.py create mode 100644 tests/map/test_b01_q10_map_parser.py create mode 100644 tests/map/testdata/b01_q10_map.bin diff --git a/roborock/cli.py b/roborock/cli.py index b36b11ce..1ca8ab11 100644 --- a/roborock/cli.py +++ b/roborock/cli.py @@ -529,10 +529,18 @@ async def maps(ctx, device_id: str): async def map_image(ctx, device_id: str, output_file: str): """Get device map image and save it to a file.""" context: RoborockContext = ctx.obj - trait: MapContentTrait = await _v1_trait(context, device_id, lambda v1: v1.map_content) - if trait.image_content: + device_manager = await context.get_device_manager() + device = await device_manager.get_device(device_id) + if device.b01_q10_properties is not None: + map_trait = device.b01_q10_properties.map + await map_trait.refresh() + image_content = map_trait.image_content + else: + v1_trait: MapContentTrait = await _v1_trait(context, device_id, lambda v1: v1.map_content) + image_content = v1_trait.image_content + if image_content: with open(output_file, "wb") as f: - f.write(trait.image_content) + f.write(image_content) click.echo(f"Map image saved to {output_file}") else: click.echo("No map image content available.") @@ -705,7 +713,14 @@ async def set_child_lock(ctx, device_id: str, enabled: bool): async def rooms(ctx, device_id: str): """Get device room mapping info.""" context: RoborockContext = ctx.obj - await _display_v1_trait(context, device_id, lambda v1: v1.rooms) + device_manager = await context.get_device_manager() + device = await device_manager.get_device(device_id) + if device.b01_q10_properties is not None: + map_trait = device.b01_q10_properties.map + await map_trait.refresh() + click.echo(dump_json({room.id: room.name for room in map_trait.rooms})) + else: + await _display_v1_trait(context, device_id, lambda v1: v1.rooms) @session.command() diff --git a/roborock/devices/rpc/b01_q10_channel.py b/roborock/devices/rpc/b01_q10_channel.py index 1e0510ba..79cb5dde 100644 --- a/roborock/devices/rpc/b01_q10_channel.py +++ b/roborock/devices/rpc/b01_q10_channel.py @@ -1,5 +1,6 @@ """Thin wrapper around the MQTT channel for Roborock B01 Q10 devices.""" +import asyncio import logging from collections.abc import AsyncGenerator from typing import Any @@ -12,9 +13,15 @@ decode_rpc_response, encode_mqtt_payload, ) +from roborock.roborock_message import RoborockMessage, RoborockMessageProtocol _LOGGER = logging.getLogger(__name__) +# Requesting the device state (dpRequestDps) also makes the robot push its current +# map as a separate MAP_RESPONSE message a few seconds later. Q10 firmware throttles +# these pushes (~60-70s between maps), so callers should not poll tightly. +_MAP_TIMEOUT = 20.0 + async def stream_decoded_responses( mqtt_channel: MqttChannel, @@ -34,6 +41,35 @@ async def stream_decoded_responses( yield decoded_dps +async def request_map(mqtt_channel: MqttChannel, *, timeout: float | None = None) -> bytes: + """Request the current map and return the raw ``MAP_RESPONSE`` payload. + + The Q10 does not have a dedicated "get map" command. Instead, requesting the + device state (``dpRequestDps``) triggers the robot to push its current map as + a ``MAP_RESPONSE`` (protocol 301) message shortly afterwards. This subscribes + for that push, sends the trigger, and resolves on the first map message. + """ + if timeout is None: + timeout = _MAP_TIMEOUT + loop = asyncio.get_running_loop() + future: asyncio.Future[bytes] = loop.create_future() + + def on_message(message: RoborockMessage) -> None: + if future.done(): + return + if message.protocol == RoborockMessageProtocol.MAP_RESPONSE and message.payload: + future.set_result(message.payload) + + unsub = await mqtt_channel.subscribe(on_message) + try: + await send_command(mqtt_channel, B01_Q10_DP.REQUEST_DPS, {}) + return await asyncio.wait_for(future, timeout=timeout) + except TimeoutError as ex: + raise RoborockException(f"Timed out waiting for Q10 map after {timeout}s") from ex + finally: + unsub() + + async def send_command( mqtt_channel: MqttChannel, command: B01_Q10_DP, diff --git a/roborock/devices/traits/b01/q10/__init__.py b/roborock/devices/traits/b01/q10/__init__.py index 184de2d2..27b2af04 100644 --- a/roborock/devices/traits/b01/q10/__init__.py +++ b/roborock/devices/traits/b01/q10/__init__.py @@ -9,12 +9,14 @@ from roborock.devices.transport.mqtt_channel import MqttChannel from .command import CommandTrait +from .map import MapContentTrait from .remote import RemoteTrait from .status import StatusTrait from .vacuum import VacuumTrait __all__ = [ "Q10PropertiesApi", + "MapContentTrait", ] _LOGGER = logging.getLogger(__name__) @@ -35,6 +37,9 @@ class Q10PropertiesApi(Trait): remote: RemoteTrait """Trait for sending remote control related commands to Q10 devices.""" + map: MapContentTrait + """Trait for fetching the current parsed map (image + rooms).""" + def __init__(self, channel: MqttChannel) -> None: """Initialize the B01Props API.""" self._channel = channel @@ -42,6 +47,7 @@ def __init__(self, channel: MqttChannel) -> None: self.vacuum = VacuumTrait(self.command) self.remote = RemoteTrait(self.command) self.status = StatusTrait() + self.map = MapContentTrait(channel) self._subscribe_task: asyncio.Task[None] | None = None async def start(self) -> None: diff --git a/roborock/devices/traits/b01/q10/map.py b/roborock/devices/traits/b01/q10/map.py new file mode 100644 index 00000000..fdd8fcca --- /dev/null +++ b/roborock/devices/traits/b01/q10/map.py @@ -0,0 +1,87 @@ +"""Map content trait for B01 Q10 devices. + +This mirrors the v1 / Q7 ``MapContentTrait`` contract: +- ``refresh()`` performs I/O and populates cached fields. +- ``parse_map_content()`` reparses cached raw bytes without I/O. +- ``image_content``, ``map_data``, ``rooms`` and ``raw_api_response`` are readable. + +Unlike the Q7, the Q10 map payload is unencrypted, so no map key is required. +The raw payload is retrieved by :func:`request_map`, which triggers the device +to push its current map. +""" + +from dataclasses import dataclass, field + +from vacuum_map_parser_base.map_data import MapData + +from roborock.data import RoborockBase +from roborock.devices.rpc.b01_q10_channel import request_map +from roborock.devices.traits import Trait +from roborock.devices.transport.mqtt_channel import MqttChannel +from roborock.exceptions import RoborockException +from roborock.map.b01_q10_map_parser import B01Q10MapParser, B01Q10MapParserConfig, Q10Room, parse_map_packet + +_TRUNCATE_LENGTH = 20 + + +@dataclass +class MapContent(RoborockBase): + """Dataclass representing Q10 map content.""" + + image_content: bytes | None = None + """The rendered image of the map in PNG format.""" + + map_data: MapData | None = None + """Parsed map data (image metadata + room names).""" + + rooms: list[Q10Room] = field(default_factory=list) + """Rooms (segments) reported by the device, with ids and names.""" + + raw_api_response: bytes | None = None + """Raw bytes of the map payload from the device (opaque blob for re-parsing).""" + + def __repr__(self) -> str: + img = self.image_content + if img and len(img) > _TRUNCATE_LENGTH: + img = img[: _TRUNCATE_LENGTH - 3] + b"..." + return f"MapContent(image_content={img!r}, rooms={self.rooms!r})" + + +class MapContentTrait(MapContent, Trait): + """Trait for fetching parsed map content for Q10 devices.""" + + def __init__( + self, + channel: MqttChannel, + *, + map_parser_config: B01Q10MapParserConfig | None = None, + ) -> None: + super().__init__() + self._channel = channel + self._map_parser = B01Q10MapParser(map_parser_config) + + async def refresh(self) -> None: + """Fetch, decode, and parse the current map payload.""" + raw_payload = await request_map(self._channel) + self.raw_api_response = raw_payload + self.parse_map_content() + + def parse_map_content(self) -> None: + """Reparse the cached raw map payload without performing any I/O.""" + if self.raw_api_response is None: + raise RoborockException("No map payload available; call refresh() first") + + try: + parsed = self._map_parser.parse(self.raw_api_response) + packet = parse_map_packet(self.raw_api_response) + except RoborockException: + raise + except Exception as ex: + raise RoborockException("Failed to parse Q10 map data") from ex + + if parsed.image_content is None: + raise RoborockException("Failed to render Q10 map image") + + self.image_content = parsed.image_content + self.map_data = parsed.map_data + self.rooms = packet.rooms diff --git a/roborock/map/b01_q10_map_parser.py b/roborock/map/b01_q10_map_parser.py new file mode 100644 index 00000000..bb747959 --- /dev/null +++ b/roborock/map/b01_q10_map_parser.py @@ -0,0 +1,257 @@ +"""Parser for Roborock Q10 (B01/ss07) map packets. + +Q10 devices deliver map data as a protocol-301 ``MAP_RESPONSE`` message (pushed a +few seconds after a ``dpRequestDps`` request). Unlike the Q7 ``SCMap`` protobuf +format, the Q10 uses a custom, unencrypted binary packet: + +- ``01 01`` marker, then a ``u32be`` map id and a ``u16le`` grid width. +- A header field at offset 27 (``u16be``) giving the compressed layout length. +- An LZ4-block-compressed occupancy grid starting at offset 29. Once inflated it + is ``width * height`` cells of grid data followed by room metadata records. +- Room metadata begins with ``01 `` followed by fixed 47-byte + records (id, hints, ascii name). Each room paints cells with value + ``room_id * 4`` in the grid. + +The packet layout was confirmed against live Q10 captures. The format +documentation that informed this clean-room implementation comes from the +``roborock-qseries-map-bridge`` project (GPL-3.0-or-later): +https://github.com/v1b3c0d3x3r/roborock-qseries-map-bridge +""" + +import colorsys +import io +from dataclasses import dataclass, field + +from PIL import Image +from vacuum_map_parser_base.config.image_config import ImageConfig +from vacuum_map_parser_base.map_data import ImageData, MapData + +from roborock.exceptions import RoborockException + +from .map_parser import ParsedMapData + +_MAP_FILE_FORMAT = "PNG" + +MAP_PACKET_MARKER = b"\x01\x01" +TRACE_PACKET_MARKER = b"\x02\x01" + +_MAP_ID_OFFSET = 2 +_WIDTH_OFFSET = 8 +_COMPRESSED_LAYOUT_LENGTH_OFFSET = 27 +_LAYOUT_COMPRESSED_OFFSET = 29 +_ROOM_RECORD_LENGTH = 47 +_ROOM_NAME_LENGTH_OFFSET = 26 +_MAX_ROOMS = 32 + +# Grid cell values >= this are walls / borders rather than room segments. +_WALL_THRESHOLD = 240 + + +@dataclass +class Q10Room: + """A room (segment) described in a Q10 map packet.""" + + id: int + raw_name: str + pixel_value: int + pixel_count: int + + @property + def name(self) -> str: + """User friendly room name (firmware ``rr_`` defaults are normalized).""" + return self.raw_name.removeprefix("rr_").replace("_", " ").strip().title() + + +@dataclass +class Q10MapPacket: + """Decoded contents of a Q10 ``01 01`` map packet.""" + + map_id: int + width: int + height: int + grid: bytes + rooms: list[Q10Room] = field(default_factory=list) + + +def is_map_packet(payload: bytes) -> bool: + """Return True if the payload is a Q10 full-map (``01 01``) packet.""" + return payload[:2] == MAP_PACKET_MARKER + + +def lz4_block_decompress(data: bytes) -> bytes: + """Decompress a raw LZ4 *block* (no frame header). + + The Q10 map grid is stored as a single LZ4 block. This implements the + standard LZ4 block format so we don't add a native dependency. + """ + index = 0 + output = bytearray() + + def read_length(value: int) -> int: + nonlocal index + if value != 0x0F: + return value + while True: + if index >= len(data): + raise RoborockException("Truncated LZ4 block while reading length") + part = data[index] + index += 1 + value += part + if part != 0xFF: + return value + + while True: + if index >= len(data): + raise RoborockException("Truncated LZ4 block while reading token") + token = data[index] + index += 1 + + literal_length = read_length((token >> 4) & 0x0F) + end = index + literal_length + if end > len(data): + raise RoborockException("Truncated LZ4 block while reading literals") + output.extend(data[index:end]) + index = end + + if index == len(data): + return bytes(output) + if index + 2 > len(data): + raise RoborockException("Truncated LZ4 block while reading match offset") + + offset = data[index] | (data[index + 1] << 8) + index += 2 + if offset == 0 or offset > len(output): + raise RoborockException("Invalid LZ4 back-reference offset") + + match_length = read_length(token & 0x0F) + 4 + for _ in range(match_length): + output.append(output[-offset]) + + +def _infer_layout(decoded: bytes, width: int) -> tuple[int, bytes, bytes]: + """Split the inflated layout into (height, grid, room_data). + + The grid is ``width * height`` cells; the remaining bytes are room records + introduced by an ``01 `` marker. The room count is unknown up + front, so we search for the split that makes the grid rectangular and lines + up with the marker. + """ + for room_count in range(1, _MAX_ROOMS): + room_data_length = 2 + room_count * _ROOM_RECORD_LENGTH + area = len(decoded) - room_data_length + if area <= 0 or area % width: + continue + room_data = decoded[area:] + if room_data[0] == 1 and room_data[1] == room_count: + return area // width, decoded[:area], room_data + raise RoborockException("Could not infer Q10 layout dimensions / room records") + + +def _parse_rooms(room_data: bytes, grid: bytes) -> list[Q10Room]: + rooms: list[Q10Room] = [] + room_count = room_data[1] + for index in range(room_count): + start = 2 + index * _ROOM_RECORD_LENGTH + record = room_data[start : start + _ROOM_RECORD_LENGTH] + room_id = int.from_bytes(record[0:2], "big") + name_length = record[_ROOM_NAME_LENGTH_OFFSET] + raw_name = record[27 : 27 + name_length].decode("utf-8", errors="replace") + pixel_value = (room_id * 4) & 0xFF + rooms.append( + Q10Room( + id=room_id, + raw_name=raw_name, + pixel_value=pixel_value, + pixel_count=grid.count(pixel_value), + ) + ) + return rooms + + +def parse_map_packet(payload: bytes) -> Q10MapPacket: + """Parse a Q10 ``01 01`` map packet into grid + room metadata.""" + if len(payload) < _LAYOUT_COMPRESSED_OFFSET or not is_map_packet(payload): + raise RoborockException("Payload is not a Q10 map packet") + + map_id = int.from_bytes(payload[_MAP_ID_OFFSET : _MAP_ID_OFFSET + 4], "big") + width = int.from_bytes(payload[_WIDTH_OFFSET : _WIDTH_OFFSET + 2], "little") + if width <= 0: + raise RoborockException("Q10 map packet has invalid width") + + compressed_length = int.from_bytes( + payload[_COMPRESSED_LAYOUT_LENGTH_OFFSET : _COMPRESSED_LAYOUT_LENGTH_OFFSET + 2], "big" + ) + layout_end = _LAYOUT_COMPRESSED_OFFSET + compressed_length + if compressed_length <= 0 or layout_end > len(payload): + raise RoborockException("Q10 map packet has invalid layout block length") + + decoded = lz4_block_decompress(payload[_LAYOUT_COMPRESSED_OFFSET:layout_end]) + height, grid, room_data = _infer_layout(decoded, width) + rooms = _parse_rooms(room_data, grid) + return Q10MapPacket(map_id=map_id, width=width, height=height, grid=grid, rooms=rooms) + + +@dataclass +class B01Q10MapParserConfig: + """Configuration for the Q10 map parser.""" + + map_scale: int = 4 + """Scale factor for the rendered map image.""" + + +class B01Q10MapParser: + """Decoder/renderer for Q10 ``MAP_RESPONSE`` (protocol 301) payloads.""" + + def __init__(self, config: B01Q10MapParserConfig | None = None) -> None: + self._config = config or B01Q10MapParserConfig() + + def parse(self, payload: bytes) -> ParsedMapData: + """Parse a raw Q10 map packet into a rendered PNG + ``MapData``.""" + packet = parse_map_packet(payload) + image = self._render(packet) + + map_data = MapData() + map_data.image = ImageData( + size=packet.width * packet.height, + top=0, + left=0, + height=packet.height, + width=packet.width, + image_config=ImageConfig(scale=self._config.map_scale), + data=image, + img_transformation=lambda p: p, + ) + room_names = {room.id: room.name for room in packet.rooms} + if room_names: + map_data.additional_parameters["room_names"] = room_names + + image_bytes = io.BytesIO() + image.save(image_bytes, format=_MAP_FILE_FORMAT) + return ParsedMapData(image_content=image_bytes.getvalue(), map_data=map_data) + + def _render(self, packet: Q10MapPacket) -> Image.Image: + """Render the Q10 grid: rooms get distinct colors, walls white, rest dark.""" + palette = _build_palette(packet.grid) + rgb = bytearray() + for value in packet.grid: + rgb.extend(palette[value]) + img = Image.frombytes("RGB", (packet.width, packet.height), bytes(rgb)) + img = img.transpose(Image.Transpose.FLIP_TOP_BOTTOM) + scale = self._config.map_scale + if scale > 1: + img = img.resize((packet.width * scale, packet.height * scale), resample=Image.Resampling.NEAREST) + return img + + +def _build_palette(grid: bytes) -> list[tuple[int, int, int]]: + """Map each grid value to an RGB color (rooms distinct, walls white).""" + palette: list[tuple[int, int, int]] = [(28, 30, 38)] * 256 # default: unknown/outside + room_values = sorted({v for v in set(grid) if 0 < v < _WALL_THRESHOLD}) + for index, value in enumerate(room_values): + hue = (index * 0.139) % 1.0 + r, g, b = colorsys.hsv_to_rgb(hue, 0.5, 0.95) + palette[value] = (int(r * 255), int(g * 255), int(b * 255)) + for value in range(_WALL_THRESHOLD, 256): + palette[value] = (235, 235, 240) # walls / borders + palette[0] = (28, 30, 38) + return palette diff --git a/tests/devices/traits/b01/q10/test_map.py b/tests/devices/traits/b01/q10/test_map.py new file mode 100644 index 00000000..28c0bbea --- /dev/null +++ b/tests/devices/traits/b01/q10/test_map.py @@ -0,0 +1,67 @@ +"""Tests for the Q10 B01 map content trait.""" + +from pathlib import Path +from typing import cast + +import pytest + +from roborock.devices.traits.b01.q10.map import MapContentTrait +from roborock.devices.transport.mqtt_channel import MqttChannel +from roborock.exceptions import RoborockException +from roborock.roborock_message import RoborockMessage, RoborockMessageProtocol +from tests.fixtures.channel_fixtures import FakeChannel + +FIXTURE = Path("tests/map/testdata/b01_q10_map.bin") + + +@pytest.fixture +def fake_channel() -> FakeChannel: + return FakeChannel() + + +def _trait(channel: FakeChannel) -> MapContentTrait: + return MapContentTrait(cast(MqttChannel, channel)) + + +def _map_message(payload: bytes) -> RoborockMessage: + return RoborockMessage( + protocol=RoborockMessageProtocol.MAP_RESPONSE, + payload=payload, + version=b"B01", + ) + + +async def test_map_refresh_populates_image_and_rooms(fake_channel: FakeChannel) -> None: + """refresh() triggers the device push, then parses the map payload.""" + payload = FIXTURE.read_bytes() + fake_channel.response_queue.append(_map_message(payload)) + + trait = _trait(fake_channel) + await trait.refresh() + + assert trait.raw_api_response == payload + assert trait.image_content is not None + assert trait.image_content[:8] == b"\x89PNG\r\n\x1a\n" + assert {room.id: room.name for room in trait.rooms} == {2: "Living Room", 3: "Bedroom"} + assert trait.map_data is not None + + # The refresh trigger is a dpRequestDps (code 102) request. + assert len(fake_channel.published_messages) == 1 + trigger = fake_channel.published_messages[0].payload + assert trigger is not None and b'"102"' in trigger + + +async def test_map_refresh_times_out_without_response( + fake_channel: FakeChannel, monkeypatch: pytest.MonkeyPatch +) -> None: + """If the device never pushes a map, refresh raises a clear error.""" + monkeypatch.setattr("roborock.devices.rpc.b01_q10_channel._MAP_TIMEOUT", 0.05) + trait = _trait(fake_channel) # no queued response -> times out + with pytest.raises(RoborockException, match="Timed out waiting for Q10 map"): + await trait.refresh() + + +def test_parse_without_refresh_raises(fake_channel: FakeChannel) -> None: + trait = _trait(fake_channel) + with pytest.raises(RoborockException, match="No map payload available"): + trait.parse_map_content() diff --git a/tests/map/test_b01_q10_map_parser.py b/tests/map/test_b01_q10_map_parser.py new file mode 100644 index 00000000..a3ab89b1 --- /dev/null +++ b/tests/map/test_b01_q10_map_parser.py @@ -0,0 +1,86 @@ +"""Tests for the Roborock Q10 (B01/ss07) map parser.""" + +from pathlib import Path + +import pytest + +from roborock.exceptions import RoborockException +from roborock.map.b01_q10_map_parser import ( + B01Q10MapParser, + Q10Room, + is_map_packet, + lz4_block_decompress, + parse_map_packet, +) + +FIXTURE = Path(__file__).resolve().parent / "testdata" / "b01_q10_map.bin" + + +def _payload() -> bytes: + return FIXTURE.read_bytes() + + +def test_lz4_block_roundtrip_all_literals() -> None: + """A simple all-literals block decodes back to the original bytes.""" + original = bytes(range(60)) * 3 + block = bytearray() + block.append(0x0F << 4) + block.append(len(original) - 15) + block += original + assert lz4_block_decompress(bytes(block)) == original + + +def test_lz4_block_back_reference() -> None: + """Back-references expand runs (e.g. RLE-style repeats).""" + # seq1: 1 literal 'A', then match (offset 1, length 4+4=8) -> 'A' x9. + # seq2: final literals-only token (0 literals) ends the block per LZ4 spec. + block = bytes([0x14, ord("A"), 0x01, 0x00, 0x00]) + assert lz4_block_decompress(block) == b"A" * 9 + + +def test_is_map_packet() -> None: + assert is_map_packet(b"\x01\x01rest") + assert not is_map_packet(b"\x02\x01rest") # trace packet + assert not is_map_packet(b"") + + +def test_parse_map_packet() -> None: + packet = parse_map_packet(_payload()) + assert packet.width == 8 + assert packet.height == 6 + assert packet.map_id == 0x01020304 + assert len(packet.grid) == packet.width * packet.height + assert [(r.id, r.raw_name) for r in packet.rooms] == [(2, "rr_living_room"), (3, "bedroom")] + + +def test_room_name_normalization() -> None: + """Firmware ``rr_`` default names are normalized; custom names are titled.""" + assert Q10Room(id=2, raw_name="rr_living_room", pixel_value=8, pixel_count=9).name == "Living Room" + assert Q10Room(id=3, raw_name="bedroom", pixel_value=12, pixel_count=9).name == "Bedroom" + + +def test_room_pixel_count_matches_grid() -> None: + packet = parse_map_packet(_payload()) + for room in packet.rooms: + assert room.pixel_value == (room.id * 4) & 0xFF + assert room.pixel_count == packet.grid.count(room.pixel_value) + + +def test_parser_renders_png_and_room_names() -> None: + parsed = B01Q10MapParser().parse(_payload()) + assert parsed.image_content is not None + assert parsed.image_content[:8] == b"\x89PNG\r\n\x1a\n" # PNG magic + assert parsed.map_data is not None + assert parsed.map_data.additional_parameters["room_names"] == {2: "Living Room", 3: "Bedroom"} + + +def test_parse_rejects_non_map_packet() -> None: + with pytest.raises(RoborockException, match="not a Q10 map packet"): + parse_map_packet(b"\x02\x01" + b"\x00" * 40) + + +def test_parse_rejects_bad_layout_length() -> None: + payload = bytearray(_payload()) + payload[27:29] = (0xFFFF).to_bytes(2, "big") # compressed length past the buffer + with pytest.raises(RoborockException, match="invalid layout block length"): + parse_map_packet(bytes(payload)) diff --git a/tests/map/testdata/b01_q10_map.bin b/tests/map/testdata/b01_q10_map.bin new file mode 100644 index 0000000000000000000000000000000000000000..05ca70835a07073c39d107a0def01ed65a32f47b GIT binary patch literal 175 zcmZQ%WMpDyVPN23zyy;%G=6{r1`ZAm9v&W`Iy4$h7b6n`6BC*kjLTP46rYn>mYJ6x YUzDGp3l(N&hRLJR>`AF9Na`T607st_J^%m! literal 0 HcmV?d00001 From 1c4353efe87e0070d978454ed1df63860da4b27f Mon Sep 17 00:00:00 2001 From: Vincent <2070309+tubededentifrice@users.noreply.github.com> Date: Sun, 14 Jun 2026 11:42:21 +0400 Subject: [PATCH 2/9] feat: add Q10 live position parsing from 02 01 packets Adds parsing for the Q10 "02 01" live position packet (delivered on the same protocol-301 channel as the map, only while the robot is moving). The packet format was reverse-engineered and validated against live ss07 captures (the 18-byte-header layout documented elsewhere did NOT match this firmware): - 10-byte header (sequence counter at byte 3, then a constant type/flag). - big-endian int16 (x, y) point pairs; this firmware sends the current position as a single point per packet rather than an accumulated path. - Confirmed live: as R1 traversed the corridor, the decoded x moved from -163 to +169 with y ~0. The full saved map packet (01 01) was checked too and does NOT carry the live path (identical across captures during a clean), so position comes from 02 01. - b01_q10_map_parser: parse_trace_packet() + Q10TracePacket/Q10Point. - b01_q10_channel: request_trace() (marker-filtered). - MapContentTrait.refresh_trace() exposes path + robot_position. - cli: `q10-position` (reports gracefully when the robot is idle). - Tests use a real captured position packet + a synthetic multi-point packet. --- roborock/cli.py | 33 +++++++++++ roborock/devices/rpc/b01_q10_channel.py | 41 +++++++++++--- roborock/devices/traits/b01/q10/map.py | 28 +++++++++- roborock/map/b01_q10_map_parser.py | 62 +++++++++++++++++++++ tests/devices/traits/b01/q10/test_map.py | 22 ++++++++ tests/map/test_b01_q10_map_parser.py | 46 +++++++++++++++ tests/map/testdata/b01_q10_trace.bin | Bin 0 -> 14 bytes tests/map/testdata/b01_q10_trace_multi.bin | Bin 0 -> 22 bytes 8 files changed, 221 insertions(+), 11 deletions(-) create mode 100644 tests/map/testdata/b01_q10_trace.bin create mode 100644 tests/map/testdata/b01_q10_trace_multi.bin diff --git a/roborock/cli.py b/roborock/cli.py index 1ca8ab11..2a59e1c6 100644 --- a/roborock/cli.py +++ b/roborock/cli.py @@ -572,6 +572,39 @@ async def map_data(ctx, device_id: str, include_path: bool): click.echo(dump_json(data_summary)) +@session.command() +@click.option("--device_id", required=True) +@click.option("--include_path", is_flag=True, default=False, help="Include all path points in the output.") +@click.pass_context +@async_command +async def q10_position(ctx, device_id: str, include_path: bool): + """Get the current Q10 robot position and live cleaning path. + + The Q10 only streams its position/path while it is actively cleaning, so this + will report that no live trace is available for an idle/docked robot. + """ + context: RoborockContext = ctx.obj + device_manager = await context.get_device_manager() + device = await device_manager.get_device(device_id) + if device.b01_q10_properties is None: + click.echo("Feature not supported by device") + return + map_trait = device.b01_q10_properties.map + try: + await map_trait.refresh_trace() + except RoborockException: + click.echo("No live trace available (the robot only reports position while cleaning).") + return + position = map_trait.robot_position + summary: dict[str, Any] = { + "robot_position": {"x": position.x, "y": position.y} if position else None, + "path_points": len(map_trait.path), + } + if include_path: + summary["path"] = [[p.x, p.y] for p in map_trait.path] + click.echo(dump_json(summary)) + + @session.command() @click.option("--device_id", required=True) @click.pass_context diff --git a/roborock/devices/rpc/b01_q10_channel.py b/roborock/devices/rpc/b01_q10_channel.py index 79cb5dde..a2ddd212 100644 --- a/roborock/devices/rpc/b01_q10_channel.py +++ b/roborock/devices/rpc/b01_q10_channel.py @@ -41,14 +41,14 @@ async def stream_decoded_responses( yield decoded_dps -async def request_map(mqtt_channel: MqttChannel, *, timeout: float | None = None) -> bytes: - """Request the current map and return the raw ``MAP_RESPONSE`` payload. +# MAP_RESPONSE (protocol 301) payloads start with a 2-byte marker identifying the +# packet kind: a full map (``01 01``) or a live trace/path (``02 01``). +_MAP_PACKET_MARKER = b"\x01\x01" +_TRACE_PACKET_MARKER = b"\x02\x01" - The Q10 does not have a dedicated "get map" command. Instead, requesting the - device state (``dpRequestDps``) triggers the robot to push its current map as - a ``MAP_RESPONSE`` (protocol 301) message shortly afterwards. This subscribes - for that push, sends the trigger, and resolves on the first map message. - """ + +async def _request_map_response(mqtt_channel: MqttChannel, marker: bytes, what: str, timeout: float | None) -> bytes: + """Trigger a map push and resolve on the first ``MAP_RESPONSE`` with ``marker``.""" if timeout is None: timeout = _MAP_TIMEOUT loop = asyncio.get_running_loop() @@ -57,7 +57,11 @@ async def request_map(mqtt_channel: MqttChannel, *, timeout: float | None = None def on_message(message: RoborockMessage) -> None: if future.done(): return - if message.protocol == RoborockMessageProtocol.MAP_RESPONSE and message.payload: + if ( + message.protocol == RoborockMessageProtocol.MAP_RESPONSE + and message.payload + and message.payload[:2] == marker + ): future.set_result(message.payload) unsub = await mqtt_channel.subscribe(on_message) @@ -65,11 +69,30 @@ def on_message(message: RoborockMessage) -> None: await send_command(mqtt_channel, B01_Q10_DP.REQUEST_DPS, {}) return await asyncio.wait_for(future, timeout=timeout) except TimeoutError as ex: - raise RoborockException(f"Timed out waiting for Q10 map after {timeout}s") from ex + raise RoborockException(f"Timed out waiting for Q10 {what} after {timeout}s") from ex finally: unsub() +async def request_map(mqtt_channel: MqttChannel, *, timeout: float | None = None) -> bytes: + """Request the current map and return the raw ``01 01`` ``MAP_RESPONSE`` payload. + + The Q10 does not have a dedicated "get map" command. Instead, requesting the + device state (``dpRequestDps``) triggers the robot to push its current map as + a ``MAP_RESPONSE`` (protocol 301) message shortly afterwards. + """ + return await _request_map_response(mqtt_channel, _MAP_PACKET_MARKER, "map", timeout) + + +async def request_trace(mqtt_channel: MqttChannel, *, timeout: float | None = None) -> bytes: + """Request the live trace/path and return the raw ``02 01`` ``MAP_RESPONSE`` payload. + + The robot only emits trace packets while it is actively moving (cleaning), so + this will time out for an idle/docked robot. + """ + return await _request_map_response(mqtt_channel, _TRACE_PACKET_MARKER, "trace", timeout) + + async def send_command( mqtt_channel: MqttChannel, command: B01_Q10_DP, diff --git a/roborock/devices/traits/b01/q10/map.py b/roborock/devices/traits/b01/q10/map.py index fdd8fcca..6daf2ab3 100644 --- a/roborock/devices/traits/b01/q10/map.py +++ b/roborock/devices/traits/b01/q10/map.py @@ -15,11 +15,18 @@ from vacuum_map_parser_base.map_data import MapData from roborock.data import RoborockBase -from roborock.devices.rpc.b01_q10_channel import request_map +from roborock.devices.rpc.b01_q10_channel import request_map, request_trace from roborock.devices.traits import Trait from roborock.devices.transport.mqtt_channel import MqttChannel from roborock.exceptions import RoborockException -from roborock.map.b01_q10_map_parser import B01Q10MapParser, B01Q10MapParserConfig, Q10Room, parse_map_packet +from roborock.map.b01_q10_map_parser import ( + B01Q10MapParser, + B01Q10MapParserConfig, + Q10Point, + Q10Room, + parse_map_packet, + parse_trace_packet, +) _TRUNCATE_LENGTH = 20 @@ -37,6 +44,12 @@ class MapContent(RoborockBase): rooms: list[Q10Room] = field(default_factory=list) """Rooms (segments) reported by the device, with ids and names.""" + path: list[Q10Point] = field(default_factory=list) + """Latest live cleaning path points (only available while the robot moves).""" + + robot_position: Q10Point | None = None + """Current robot position (the most recent trace point), if known.""" + raw_api_response: bytes | None = None """Raw bytes of the map payload from the device (opaque blob for re-parsing).""" @@ -85,3 +98,14 @@ def parse_map_content(self) -> None: self.image_content = parsed.image_content self.map_data = parsed.map_data self.rooms = packet.rooms + + async def refresh_trace(self) -> None: + """Fetch the live cleaning path and current robot position. + + The robot only emits trace packets while it is actively moving, so this + raises :class:`RoborockException` (timeout) for an idle/docked robot. + """ + raw_payload = await request_trace(self._channel) + trace = parse_trace_packet(raw_payload) + self.path = trace.points + self.robot_position = trace.robot_position diff --git a/roborock/map/b01_q10_map_parser.py b/roborock/map/b01_q10_map_parser.py index bb747959..b2f68b70 100644 --- a/roborock/map/b01_q10_map_parser.py +++ b/roborock/map/b01_q10_map_parser.py @@ -73,11 +73,73 @@ class Q10MapPacket: rooms: list[Q10Room] = field(default_factory=list) +@dataclass +class Q10Point: + """A single point in Q10 map/trace coordinate space.""" + + x: int + y: int + + +@dataclass +class Q10TracePacket: + """Decoded contents of a Q10 ``02 01`` live position packet. + + The robot only emits these while it is actively moving (cleaning), so an + idle/docked robot will not produce them. Observed firmware sends the current + position as a single point per packet rather than an accumulated path, so + ``points`` typically holds one point. + """ + + points: list[Q10Point] = field(default_factory=list) + sequence: int = 0 + """Header sequence counter; increments as new position packets are sent.""" + + @property + def robot_position(self) -> Q10Point | None: + """The current robot position (the most recent point).""" + return self.points[-1] if self.points else None + + +# Trace packet (``02 01``): a 10-byte header followed by big-endian int16 (x, y) +# point pairs. Header layout was confirmed against live ss07 captures (the +# sequence counter is at byte 3; bytes 4-9 are a constant type/flag + padding). +# NOTE: the format documented by roborock-qseries-map-bridge (18-byte header) +# did not match this firmware -- this 10-byte layout is what the device sent. +_TRACE_HEADER_LENGTH = 10 +_TRACE_SEQUENCE_OFFSET = 3 + + def is_map_packet(payload: bytes) -> bool: """Return True if the payload is a Q10 full-map (``01 01``) packet.""" return payload[:2] == MAP_PACKET_MARKER +def is_trace_packet(payload: bytes) -> bool: + """Return True if the payload is a Q10 live trace (``02 01``) packet.""" + return payload[:2] == TRACE_PACKET_MARKER + + +def parse_trace_packet(payload: bytes) -> Q10TracePacket: + """Parse a Q10 ``02 01`` trace packet into path points + robot position.""" + if not is_trace_packet(payload): + raise RoborockException("Payload is not a Q10 trace packet") + if len(payload) < _TRACE_HEADER_LENGTH: + raise RoborockException("Q10 trace packet is shorter than its header") + body = payload[_TRACE_HEADER_LENGTH:] + if len(body) % 4: + raise RoborockException("Q10 trace points are not 4-byte (x, y) pairs") + + points = [ + Q10Point( + x=int.from_bytes(body[offset : offset + 2], "big", signed=True), + y=int.from_bytes(body[offset + 2 : offset + 4], "big", signed=True), + ) + for offset in range(0, len(body), 4) + ] + return Q10TracePacket(points=points, sequence=payload[_TRACE_SEQUENCE_OFFSET]) + + def lz4_block_decompress(data: bytes) -> bytes: """Decompress a raw LZ4 *block* (no frame header). diff --git a/tests/devices/traits/b01/q10/test_map.py b/tests/devices/traits/b01/q10/test_map.py index 28c0bbea..e9d47fdf 100644 --- a/tests/devices/traits/b01/q10/test_map.py +++ b/tests/devices/traits/b01/q10/test_map.py @@ -12,6 +12,7 @@ from tests.fixtures.channel_fixtures import FakeChannel FIXTURE = Path("tests/map/testdata/b01_q10_map.bin") +TRACE_FIXTURE = Path("tests/map/testdata/b01_q10_trace.bin") @pytest.fixture @@ -65,3 +66,24 @@ def test_parse_without_refresh_raises(fake_channel: FakeChannel) -> None: trait = _trait(fake_channel) with pytest.raises(RoborockException, match="No map payload available"): trait.parse_map_content() + + +async def test_refresh_trace_populates_path_and_position(fake_channel: FakeChannel) -> None: + """refresh_trace() parses the live position from a real ss07 trace packet.""" + fake_channel.response_queue.append(_map_message(TRACE_FIXTURE.read_bytes())) + + trait = _trait(fake_channel) + await trait.refresh_trace() + + assert [(p.x, p.y) for p in trait.path] == [(169, 0)] + assert trait.robot_position is not None + assert (trait.robot_position.x, trait.robot_position.y) == (169, 0) + + +async def test_refresh_trace_ignores_map_packets(fake_channel: FakeChannel, monkeypatch: pytest.MonkeyPatch) -> None: + """A map (01 01) push must not satisfy a trace request.""" + monkeypatch.setattr("roborock.devices.rpc.b01_q10_channel._MAP_TIMEOUT", 0.05) + fake_channel.response_queue.append(_map_message(FIXTURE.read_bytes())) # map, not trace + trait = _trait(fake_channel) + with pytest.raises(RoborockException, match="Timed out waiting for Q10 trace"): + await trait.refresh_trace() diff --git a/tests/map/test_b01_q10_map_parser.py b/tests/map/test_b01_q10_map_parser.py index a3ab89b1..1c4e8a61 100644 --- a/tests/map/test_b01_q10_map_parser.py +++ b/tests/map/test_b01_q10_map_parser.py @@ -9,11 +9,15 @@ B01Q10MapParser, Q10Room, is_map_packet, + is_trace_packet, lz4_block_decompress, parse_map_packet, + parse_trace_packet, ) FIXTURE = Path(__file__).resolve().parent / "testdata" / "b01_q10_map.bin" +TRACE_FIXTURE = Path(__file__).resolve().parent / "testdata" / "b01_q10_trace.bin" +TRACE_MULTI_FIXTURE = Path(__file__).resolve().parent / "testdata" / "b01_q10_trace_multi.bin" def _payload() -> bytes: @@ -79,6 +83,48 @@ def test_parse_rejects_non_map_packet() -> None: parse_map_packet(b"\x02\x01" + b"\x00" * 40) +def test_packet_markers_are_distinct() -> None: + map_payload = _payload() + trace_payload = TRACE_FIXTURE.read_bytes() + assert is_map_packet(map_payload) and not is_trace_packet(map_payload) + assert is_trace_packet(trace_payload) and not is_map_packet(trace_payload) + + +def test_parse_trace_packet_real_single_point() -> None: + """A real ss07 position packet decodes to a single current-position point.""" + trace = parse_trace_packet(TRACE_FIXTURE.read_bytes()) + assert trace.sequence == 9 + assert [(p.x, p.y) for p in trace.points] == [(169, 0)] + assert trace.robot_position is not None + assert (trace.robot_position.x, trace.robot_position.y) == (169, 0) + + +def test_parse_trace_packet_multi_point() -> None: + """A multi-point packet decodes all points; position is the most recent.""" + trace = parse_trace_packet(TRACE_MULTI_FIXTURE.read_bytes()) + assert [(p.x, p.y) for p in trace.points] == [(100, 200), (150, 250), (-50, 300)] + # Signed coordinates are supported (negative x). + assert trace.robot_position is not None + assert (trace.robot_position.x, trace.robot_position.y) == (-50, 300) + + +def test_parse_trace_empty_path_has_no_position() -> None: + header_only = b"\x02\x01" + b"\x00" * 8 # 10-byte header, no points + trace = parse_trace_packet(header_only) + assert trace.points == [] + assert trace.robot_position is None + + +def test_parse_trace_rejects_non_trace_packet() -> None: + with pytest.raises(RoborockException, match="not a Q10 trace packet"): + parse_trace_packet(_payload()) + + +def test_parse_trace_rejects_misaligned_points() -> None: + with pytest.raises(RoborockException, match="not 4-byte"): + parse_trace_packet(b"\x02\x01" + b"\x00" * 8 + b"\x01\x02\x03") + + def test_parse_rejects_bad_layout_length() -> None: payload = bytearray(_payload()) payload[27:29] = (0xFFFF).to_bytes(2, "big") # compressed length past the buffer diff --git a/tests/map/testdata/b01_q10_trace.bin b/tests/map/testdata/b01_q10_trace.bin new file mode 100644 index 0000000000000000000000000000000000000000..ace261d0907cb56b8c991d10c1f2d36eb6faf38b GIT binary patch literal 14 ScmZQ#WZ-0AVgP}a3=9AQrvSJB literal 0 HcmV?d00001 diff --git a/tests/map/testdata/b01_q10_trace_multi.bin b/tests/map/testdata/b01_q10_trace_multi.bin new file mode 100644 index 0000000000000000000000000000000000000000..8377e6c0d2b46c02bf0934e59c4c997027d424b3 GIT binary patch literal 22 bcmZQ#WME}rVgP{@h7%0a7=Haf$EX7U8@vR; literal 0 HcmV?d00001 From 57f263918e13eb194592c2434907f28e329046e0 Mon Sep 17 00:00:00 2001 From: Vincent <2070309+tubededentifrice@users.noreply.github.com> Date: Sun, 14 Jun 2026 12:51:28 +0400 Subject: [PATCH 3/9] fix: frame Q10 02 01 trace as full cleaning-session path Live capture (R1 corridor run) disproved the earlier 'single current point per packet' assumption: the same session emitted packets of 1, then 3, then 15 points, each a strict superset. The robot accumulates the full session path server-side and returns it whole, so a client connecting mid-session still gets the complete trail (matching the app showing it after a cold launch). The parser already read all points; this corrects the docs and adds a real 15-point fixture + test, and clarifies that byte 3 is a session counter (tracks the device clean count) not a per-packet sequence. --- roborock/devices/traits/b01/q10/map.py | 20 +++++++++---- roborock/map/b01_q10_map_parser.py | 28 +++++++++++++------ tests/map/test_b01_q10_map_parser.py | 24 +++++++++++++++- tests/map/testdata/b01_q10_trace_session.bin | Bin 0 -> 70 bytes 4 files changed, 56 insertions(+), 16 deletions(-) create mode 100644 tests/map/testdata/b01_q10_trace_session.bin diff --git a/roborock/devices/traits/b01/q10/map.py b/roborock/devices/traits/b01/q10/map.py index 6daf2ab3..07f5c24b 100644 --- a/roborock/devices/traits/b01/q10/map.py +++ b/roborock/devices/traits/b01/q10/map.py @@ -45,10 +45,14 @@ class MapContent(RoborockBase): """Rooms (segments) reported by the device, with ids and names.""" path: list[Q10Point] = field(default_factory=list) - """Latest live cleaning path points (only available while the robot moves).""" + """Full path of the current cleaning session (oldest point first). + + The robot accumulates this server-side and serves the whole trajectory so + far in one packet, so it is complete even if we connect mid-session. Only + populated while a cleaning session is active.""" robot_position: Q10Point | None = None - """Current robot position (the most recent trace point), if known.""" + """Current robot position (the most recent path point), if known.""" raw_api_response: bytes | None = None """Raw bytes of the map payload from the device (opaque blob for re-parsing).""" @@ -100,10 +104,14 @@ def parse_map_content(self) -> None: self.rooms = packet.rooms async def refresh_trace(self) -> None: - """Fetch the live cleaning path and current robot position. - - The robot only emits trace packets while it is actively moving, so this - raises :class:`RoborockException` (timeout) for an idle/docked robot. + """Fetch the current session's cleaning path and robot position. + + Populates :attr:`path` with the full trajectory of the active cleaning + session (the robot accumulates it, so the whole path is returned even + when we connect mid-session) and :attr:`robot_position` with the most + recent point. The robot only emits trace packets while a session is + active, so this raises :class:`RoborockException` (timeout) for an + idle/docked robot. """ raw_payload = await request_trace(self._channel) trace = parse_trace_packet(raw_payload) diff --git a/roborock/map/b01_q10_map_parser.py b/roborock/map/b01_q10_map_parser.py index b2f68b70..346c53d2 100644 --- a/roborock/map/b01_q10_map_parser.py +++ b/roborock/map/b01_q10_map_parser.py @@ -83,17 +83,24 @@ class Q10Point: @dataclass class Q10TracePacket: - """Decoded contents of a Q10 ``02 01`` live position packet. - - The robot only emits these while it is actively moving (cleaning), so an - idle/docked robot will not produce them. Observed firmware sends the current - position as a single point per packet rather than an accumulated path, so - ``points`` typically holds one point. + """Decoded contents of a Q10 ``02 01`` cleaning-path packet. + + The robot accumulates the **full path of the current cleaning session** and + serves it in a single packet: ``points`` holds the whole trajectory so far + (oldest first), growing as the robot cleans. This was confirmed live -- a + corridor run produced packets of 1, then 3, then 15 points, each a strict + superset describing the path travelled. Because the robot keeps the path + server-side, a client that connects mid-session still receives the complete + path (this is how the app shows the trail even after a cold launch). + + The robot only emits these while a session is active, so an idle/docked robot + will not produce them. The most recent point is the current robot position. """ points: list[Q10Point] = field(default_factory=list) sequence: int = 0 - """Header sequence counter; increments as new position packets are sent.""" + """Session counter (byte 3); increments per cleaning session, tracking the + device clean count. Not a per-packet sequence.""" @property def robot_position(self) -> Q10Point | None: @@ -102,8 +109,11 @@ def robot_position(self) -> Q10Point | None: # Trace packet (``02 01``): a 10-byte header followed by big-endian int16 (x, y) -# point pairs. Header layout was confirmed against live ss07 captures (the -# sequence counter is at byte 3; bytes 4-9 are a constant type/flag + padding). +# point pairs forming the accumulated session path. Header layout confirmed +# against live ss07 captures: byte 3 is a session counter (tracks the device +# clean count); bytes 8-9 are a u16be point count minus one (verified: a 15-point +# packet carried 0x000e == 14). The parser reads all 4-byte pairs in the body +# rather than trusting the count field, so a truncated tail can't desync it. # NOTE: the format documented by roborock-qseries-map-bridge (18-byte header) # did not match this firmware -- this 10-byte layout is what the device sent. _TRACE_HEADER_LENGTH = 10 diff --git a/tests/map/test_b01_q10_map_parser.py b/tests/map/test_b01_q10_map_parser.py index 1c4e8a61..d40a158c 100644 --- a/tests/map/test_b01_q10_map_parser.py +++ b/tests/map/test_b01_q10_map_parser.py @@ -18,6 +18,8 @@ FIXTURE = Path(__file__).resolve().parent / "testdata" / "b01_q10_map.bin" TRACE_FIXTURE = Path(__file__).resolve().parent / "testdata" / "b01_q10_trace.bin" TRACE_MULTI_FIXTURE = Path(__file__).resolve().parent / "testdata" / "b01_q10_trace_multi.bin" +# Real 15-point packet captured from an R1 corridor run (full session path). +TRACE_SESSION_FIXTURE = Path(__file__).resolve().parent / "testdata" / "b01_q10_trace_session.bin" def _payload() -> bytes: @@ -91,7 +93,7 @@ def test_packet_markers_are_distinct() -> None: def test_parse_trace_packet_real_single_point() -> None: - """A real ss07 position packet decodes to a single current-position point.""" + """A real ss07 packet captured early in a session has a single path point.""" trace = parse_trace_packet(TRACE_FIXTURE.read_bytes()) assert trace.sequence == 9 assert [(p.x, p.y) for p in trace.points] == [(169, 0)] @@ -99,6 +101,26 @@ def test_parse_trace_packet_real_single_point() -> None: assert (trace.robot_position.x, trace.robot_position.y) == (169, 0) +def test_parse_trace_packet_real_session_path() -> None: + """A real 15-point packet (corridor run) decodes the full accumulated path. + + Captured live from an R1: the same session emitted packets of 1, then 3, + then 15 points, proving the path accumulates rather than reporting only the + current position. The most recent point is the current robot position. + """ + trace = parse_trace_packet(TRACE_SESSION_FIXTURE.read_bytes()) + points = [(p.x, p.y) for p in trace.points] + assert len(points) == 15 + assert points[0] == (-34, 0) # oldest + assert points[-1] == (276, -1) # most recent == current position + # After the initial repositioning, x marches steadily down the corridor. + tail_x = [p[0] for p in points[2:]] + assert tail_x == sorted(tail_x) + assert points[-1][0] - points[0][0] > 300 # spans the corridor + assert trace.robot_position is not None + assert (trace.robot_position.x, trace.robot_position.y) == (276, -1) + + def test_parse_trace_packet_multi_point() -> None: """A multi-point packet decodes all points; position is the most recent.""" trace = parse_trace_packet(TRACE_MULTI_FIXTURE.read_bytes()) diff --git a/tests/map/testdata/b01_q10_trace_session.bin b/tests/map/testdata/b01_q10_trace_session.bin new file mode 100644 index 0000000000000000000000000000000000000000..a4222e8170b649884537705401fc896ff9289d0d GIT binary patch literal 70 zcmZQ#WZ+_8Vqjq4`+pBeYBD%5h%zWJXfP-PS;`C%4Dt*)4AKk@4B`wkfOrjqFvAfB Ueuf)B{DFa+k%57kQRM%B069hr761SM literal 0 HcmV?d00001 From d45d488bb264c214dfbe02a863a34a555151863a Mon Sep 17 00:00:00 2001 From: Vincent <2070309+tubededentifrice@users.noreply.github.com> Date: Sun, 14 Jun 2026 17:11:34 +0400 Subject: [PATCH 4/9] fix: allow Q10 maps without room records --- roborock/map/b01_q10_map_parser.py | 2 +- tests/map/test_b01_q10_map_parser.py | 37 ++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+), 1 deletion(-) diff --git a/roborock/map/b01_q10_map_parser.py b/roborock/map/b01_q10_map_parser.py index 346c53d2..a56e9024 100644 --- a/roborock/map/b01_q10_map_parser.py +++ b/roborock/map/b01_q10_map_parser.py @@ -208,7 +208,7 @@ def _infer_layout(decoded: bytes, width: int) -> tuple[int, bytes, bytes]: front, so we search for the split that makes the grid rectangular and lines up with the marker. """ - for room_count in range(1, _MAX_ROOMS): + for room_count in range(0, _MAX_ROOMS + 1): room_data_length = 2 + room_count * _ROOM_RECORD_LENGTH area = len(decoded) - room_data_length if area <= 0 or area % width: diff --git a/tests/map/test_b01_q10_map_parser.py b/tests/map/test_b01_q10_map_parser.py index d40a158c..4dbcc9bf 100644 --- a/tests/map/test_b01_q10_map_parser.py +++ b/tests/map/test_b01_q10_map_parser.py @@ -26,6 +26,33 @@ def _payload() -> bytes: return FIXTURE.read_bytes() +def _literal_lz4_block(data: bytes) -> bytes: + block = bytearray() + literal_length = len(data) + if literal_length < 15: + block.append(literal_length << 4) + else: + block.append(0xF0) + remaining = literal_length - 15 + while remaining >= 0xFF: + block.append(0xFF) + remaining -= 0xFF + block.append(remaining) + block.extend(data) + return bytes(block) + + +def _synthetic_map_payload(width: int, decoded_layout: bytes) -> bytes: + compressed = _literal_lz4_block(decoded_layout) + payload = bytearray(29) + payload[0:2] = b"\x01\x01" + payload[2:6] = (0x01020304).to_bytes(4, "big") + payload[8:10] = width.to_bytes(2, "little") + payload[27:29] = len(compressed).to_bytes(2, "big") + payload.extend(compressed) + return bytes(payload) + + def test_lz4_block_roundtrip_all_literals() -> None: """A simple all-literals block decodes back to the original bytes.""" original = bytes(range(60)) * 3 @@ -59,6 +86,16 @@ def test_parse_map_packet() -> None: assert [(r.id, r.raw_name) for r in packet.rooms] == [(2, "rr_living_room"), (3, "bedroom")] +def test_parse_map_packet_allows_zero_room_metadata() -> None: + """A map can be present before the robot has room segmentation records.""" + grid = bytes([240, 240, 249, 243, 240, 240]) + packet = parse_map_packet(_synthetic_map_payload(width=3, decoded_layout=grid + b"\x01\x00")) + assert packet.width == 3 + assert packet.height == 2 + assert packet.grid == grid + assert packet.rooms == [] + + def test_room_name_normalization() -> None: """Firmware ``rr_`` default names are normalized; custom names are titled.""" assert Q10Room(id=2, raw_name="rr_living_room", pixel_value=8, pixel_count=9).name == "Living Room" From e6feafc3d345f9fe3f737e0dbbcafd578d04df93 Mon Sep 17 00:00:00 2001 From: Vincent <2070309+tubededentifrice@users.noreply.github.com> Date: Mon, 15 Jun 2026 10:49:18 +0400 Subject: [PATCH 5/9] refactor: make Q10 map support fully push-driven The Q10 has no synchronous get-map command. The previous MapContentTrait faked one: refresh()/refresh_trace() sent a dpRequestDps and blocked awaiting the next MAP_RESPONSE push with a timeout. That has no request/response correlation and fights the firmware's ~60-70s push throttle. Mirror the existing Q10 StatusTrait model instead: - MapContentTrait is now a push-only TraitUpdateListener. The Q10PropertiesApi subscribe loop routes protocol-301 MAP_RESPONSE packets to update_from_map_response(), which parses the payload, updates the cached fields and notifies listeners. - Drop request_map()/request_trace() and the trait's refresh()/refresh_trace(). - CLI map-image/rooms/q10-position now nudge the device with refresh() and wait on a map-trait update listener for the pushed data. --- roborock/cli.py | 63 ++++++-- roborock/devices/rpc/b01_q10_channel.py | 66 +------- roborock/devices/traits/b01/q10/__init__.py | 40 +++-- roborock/devices/traits/b01/q10/map.py | 97 +++++++----- tests/devices/traits/b01/q10/test_map.py | 157 +++++++++++++------- 5 files changed, 256 insertions(+), 167 deletions(-) diff --git a/roborock/cli.py b/roborock/cli.py index 2a59e1c6..013719da 100644 --- a/roborock/cli.py +++ b/roborock/cli.py @@ -50,6 +50,7 @@ from roborock.devices.device import RoborockDevice from roborock.devices.device_manager import DeviceManager, UserParams, create_device_manager from roborock.devices.traits import Trait +from roborock.devices.traits.b01.q10 import Q10PropertiesApi from roborock.devices.traits.b01.q10.vacuum import VacuumTrait from roborock.devices.traits.v1 import V1TraitMixin from roborock.devices.traits.v1.consumeable import ConsumableAttribute @@ -521,6 +522,46 @@ async def maps(ctx, device_id: str): await _display_v1_trait(context, device_id, lambda v1: v1.maps) +# The Q10 pushes its map ~9s after a dpRequestDps; firmware throttles pushes to +# ~once per 60-70s, so a single request is answered quickly but rapid re-requests +# may not be. This bounds how long a one-shot CLI command waits for that push. +_Q10_MAP_PUSH_TIMEOUT = 30.0 + + +async def _await_q10_map_push( + properties: Q10PropertiesApi, + predicate: Callable[[], bool], + *, + timeout: float = _Q10_MAP_PUSH_TIMEOUT, +) -> bool: + """Nudge a Q10 to push its map/trace and wait until ``predicate`` holds. + + The Q10 map API is entirely push-driven: there is no synchronous get-map + request. A ``dpRequestDps`` causes the device to publish a ``MAP_RESPONSE``, + which the device's subscribe loop feeds into the map trait. Here we register + an update listener, send the request, and wait for the pushed data to satisfy + ``predicate``. Returns whether it did within ``timeout``. + """ + if predicate(): + return True + loop = asyncio.get_running_loop() + updated: asyncio.Future[None] = loop.create_future() + + def on_update() -> None: + if predicate() and not updated.done(): + updated.set_result(None) + + unsub = properties.map.add_update_listener(on_update) + try: + await properties.refresh() + await asyncio.wait_for(updated, timeout=timeout) + return True + except TimeoutError: + return False + finally: + unsub() + + @session.command() @click.option("--device_id", required=True) @click.option("--output-file", required=True, help="Path to save the map image.") @@ -532,9 +573,9 @@ async def map_image(ctx, device_id: str, output_file: str): device_manager = await context.get_device_manager() device = await device_manager.get_device(device_id) if device.b01_q10_properties is not None: - map_trait = device.b01_q10_properties.map - await map_trait.refresh() - image_content = map_trait.image_content + properties = device.b01_q10_properties + await _await_q10_map_push(properties, lambda: properties.map.image_content is not None) + image_content = properties.map.image_content else: v1_trait: MapContentTrait = await _v1_trait(context, device_id, lambda v1: v1.map_content) image_content = v1_trait.image_content @@ -589,12 +630,12 @@ async def q10_position(ctx, device_id: str, include_path: bool): if device.b01_q10_properties is None: click.echo("Feature not supported by device") return - map_trait = device.b01_q10_properties.map - try: - await map_trait.refresh_trace() - except RoborockException: + properties = device.b01_q10_properties + got_trace = await _await_q10_map_push(properties, lambda: bool(properties.map.path)) + if not got_trace: click.echo("No live trace available (the robot only reports position while cleaning).") return + map_trait = properties.map position = map_trait.robot_position summary: dict[str, Any] = { "robot_position": {"x": position.x, "y": position.y} if position else None, @@ -749,9 +790,11 @@ async def rooms(ctx, device_id: str): device_manager = await context.get_device_manager() device = await device_manager.get_device(device_id) if device.b01_q10_properties is not None: - map_trait = device.b01_q10_properties.map - await map_trait.refresh() - click.echo(dump_json({room.id: room.name for room in map_trait.rooms})) + properties = device.b01_q10_properties + # A valid map may have no room records, so wait on the map arriving + # (image_content) rather than on rooms being non-empty. + await _await_q10_map_push(properties, lambda: properties.map.image_content is not None) + click.echo(dump_json({room.id: room.name for room in properties.map.rooms})) else: await _display_v1_trait(context, device_id, lambda v1: v1.rooms) diff --git a/roborock/devices/rpc/b01_q10_channel.py b/roborock/devices/rpc/b01_q10_channel.py index a2ddd212..50ec8e5a 100644 --- a/roborock/devices/rpc/b01_q10_channel.py +++ b/roborock/devices/rpc/b01_q10_channel.py @@ -1,6 +1,5 @@ """Thin wrapper around the MQTT channel for Roborock B01 Q10 devices.""" -import asyncio import logging from collections.abc import AsyncGenerator from typing import Any @@ -13,20 +12,19 @@ decode_rpc_response, encode_mqtt_payload, ) -from roborock.roborock_message import RoborockMessage, RoborockMessageProtocol _LOGGER = logging.getLogger(__name__) -# Requesting the device state (dpRequestDps) also makes the robot push its current -# map as a separate MAP_RESPONSE message a few seconds later. Q10 firmware throttles -# these pushes (~60-70s between maps), so callers should not poll tightly. -_MAP_TIMEOUT = 20.0 - async def stream_decoded_responses( mqtt_channel: MqttChannel, ) -> AsyncGenerator[dict[B01_Q10_DP, Any], None]: - """Stream decoded DPS messages received via MQTT.""" + """Stream decoded DPS messages received via MQTT. + + Messages that are not decodable DPS responses (e.g. protocol-301 + ``MAP_RESPONSE`` map pushes) are skipped; callers that need the raw + messages should subscribe to :meth:`MqttChannel.subscribe_stream` directly. + """ async for response_message in mqtt_channel.subscribe_stream(): try: @@ -41,58 +39,6 @@ async def stream_decoded_responses( yield decoded_dps -# MAP_RESPONSE (protocol 301) payloads start with a 2-byte marker identifying the -# packet kind: a full map (``01 01``) or a live trace/path (``02 01``). -_MAP_PACKET_MARKER = b"\x01\x01" -_TRACE_PACKET_MARKER = b"\x02\x01" - - -async def _request_map_response(mqtt_channel: MqttChannel, marker: bytes, what: str, timeout: float | None) -> bytes: - """Trigger a map push and resolve on the first ``MAP_RESPONSE`` with ``marker``.""" - if timeout is None: - timeout = _MAP_TIMEOUT - loop = asyncio.get_running_loop() - future: asyncio.Future[bytes] = loop.create_future() - - def on_message(message: RoborockMessage) -> None: - if future.done(): - return - if ( - message.protocol == RoborockMessageProtocol.MAP_RESPONSE - and message.payload - and message.payload[:2] == marker - ): - future.set_result(message.payload) - - unsub = await mqtt_channel.subscribe(on_message) - try: - await send_command(mqtt_channel, B01_Q10_DP.REQUEST_DPS, {}) - return await asyncio.wait_for(future, timeout=timeout) - except TimeoutError as ex: - raise RoborockException(f"Timed out waiting for Q10 {what} after {timeout}s") from ex - finally: - unsub() - - -async def request_map(mqtt_channel: MqttChannel, *, timeout: float | None = None) -> bytes: - """Request the current map and return the raw ``01 01`` ``MAP_RESPONSE`` payload. - - The Q10 does not have a dedicated "get map" command. Instead, requesting the - device state (``dpRequestDps``) triggers the robot to push its current map as - a ``MAP_RESPONSE`` (protocol 301) message shortly afterwards. - """ - return await _request_map_response(mqtt_channel, _MAP_PACKET_MARKER, "map", timeout) - - -async def request_trace(mqtt_channel: MqttChannel, *, timeout: float | None = None) -> bytes: - """Request the live trace/path and return the raw ``02 01`` ``MAP_RESPONSE`` payload. - - The robot only emits trace packets while it is actively moving (cleaning), so - this will time out for an idle/docked robot. - """ - return await _request_map_response(mqtt_channel, _TRACE_PACKET_MARKER, "trace", timeout) - - async def send_command( mqtt_channel: MqttChannel, command: B01_Q10_DP, diff --git a/roborock/devices/traits/b01/q10/__init__.py b/roborock/devices/traits/b01/q10/__init__.py index 27b2af04..791cdd0b 100644 --- a/roborock/devices/traits/b01/q10/__init__.py +++ b/roborock/devices/traits/b01/q10/__init__.py @@ -4,9 +4,11 @@ import logging from roborock.data.b01_q10.b01_q10_code_mappings import B01_Q10_DP -from roborock.devices.rpc.b01_q10_channel import stream_decoded_responses from roborock.devices.traits import Trait from roborock.devices.transport.mqtt_channel import MqttChannel +from roborock.exceptions import RoborockException +from roborock.protocols.b01_q10_protocol import decode_rpc_response +from roborock.roborock_message import RoborockMessage from .command import CommandTrait from .map import MapContentTrait @@ -47,7 +49,7 @@ def __init__(self, channel: MqttChannel) -> None: self.vacuum = VacuumTrait(self.command) self.remote = RemoteTrait(self.command) self.status = StatusTrait() - self.map = MapContentTrait(channel) + self.map = MapContentTrait() self._subscribe_task: asyncio.Task[None] | None = None async def start(self) -> None: @@ -71,14 +73,32 @@ async def refresh(self) -> None: await self.command.send(B01_Q10_DP.REQUEST_DPS, params={}) async def _subscribe_loop(self) -> None: - """Persistent loop to listen for status updates.""" - async for decoded_dps in stream_decoded_responses(self._channel): - _LOGGER.debug("Received Q10 status update: %s", decoded_dps) - - # Notify all traits about a new message and each trait will - # only update what fields that it is responsible for. - # More traits can be added here below. - self.status.update_from_dps(decoded_dps) + """Persistent loop dispatching pushed messages to the read-model traits.""" + async for message in self._channel.subscribe_stream(): + self._handle_message(message) + + def _handle_message(self, message: RoborockMessage) -> None: + """Route a single pushed message to the trait responsible for it. + + Map/trace pushes arrive as protocol-301 ``MAP_RESPONSE`` messages (not + DPS), so they are handled separately from the status DPS stream. The Q10 + is entirely push-driven: there is no synchronous get-map request, the + device just publishes its current map (a ``dpRequestDps`` nudges it to). + """ + if self.map.update_from_map_response(message): + return + + try: + decoded_dps = decode_rpc_response(message) + except RoborockException as ex: + _LOGGER.debug("Failed to decode Q10 RPC response: %s: %s", message, ex) + return + + _LOGGER.debug("Received Q10 status update: %s", decoded_dps) + # Notify all traits about a new message and each trait will + # only update what fields that it is responsible for. + # More traits can be added here below. + self.status.update_from_dps(decoded_dps) def create(channel: MqttChannel) -> Q10PropertiesApi: diff --git a/roborock/devices/traits/b01/q10/map.py b/roborock/devices/traits/b01/q10/map.py index 07f5c24b..8cbb5d9e 100644 --- a/roborock/devices/traits/b01/q10/map.py +++ b/roborock/devices/traits/b01/q10/map.py @@ -1,23 +1,27 @@ """Map content trait for B01 Q10 devices. -This mirrors the v1 / Q7 ``MapContentTrait`` contract: -- ``refresh()`` performs I/O and populates cached fields. -- ``parse_map_content()`` reparses cached raw bytes without I/O. -- ``image_content``, ``map_data``, ``rooms`` and ``raw_api_response`` are readable. +Unlike the v1 / Q7 maps, the Q10 has no synchronous "get map" command, so this +trait is purely push-driven and mirrors the Q10 ``StatusTrait`` contract: + +- The device pushes its current map/path as protocol-301 ``MAP_RESPONSE`` + messages (a ``dpRequestDps`` nudges it to do so). The ``Q10PropertiesApi`` + subscribe loop routes those messages to :meth:`MapContentTrait.update_from_map_response`. +- ``update_from_map_response`` parses the payload, updates the cached fields and + notifies update listeners (register via :meth:`add_update_listener`). +- ``parse_map_content()`` reparses the cached raw bytes without I/O. +- ``image_content``, ``map_data``, ``rooms``, ``path``, ``robot_position`` and + ``raw_api_response`` are readable and reflect the most recently pushed map. Unlike the Q7, the Q10 map payload is unencrypted, so no map key is required. -The raw payload is retrieved by :func:`request_map`, which triggers the device -to push its current map. """ +import logging from dataclasses import dataclass, field from vacuum_map_parser_base.map_data import MapData from roborock.data import RoborockBase -from roborock.devices.rpc.b01_q10_channel import request_map, request_trace -from roborock.devices.traits import Trait -from roborock.devices.transport.mqtt_channel import MqttChannel +from roborock.devices.traits.common import TraitUpdateListener from roborock.exceptions import RoborockException from roborock.map.b01_q10_map_parser import ( B01Q10MapParser, @@ -27,9 +31,17 @@ parse_map_packet, parse_trace_packet, ) +from roborock.roborock_message import RoborockMessage, RoborockMessageProtocol + +_LOGGER = logging.getLogger(__name__) _TRUNCATE_LENGTH = 20 +# MAP_RESPONSE (protocol 301) payloads start with a 2-byte marker identifying the +# packet kind: a full map (``01 01``) or a live trace/path (``02 01``). +_MAP_PACKET_MARKER = b"\x01\x01" +_TRACE_PACKET_MARKER = b"\x02\x01" + @dataclass class MapContent(RoborockBase): @@ -64,29 +76,61 @@ def __repr__(self) -> str: return f"MapContent(image_content={img!r}, rooms={self.rooms!r})" -class MapContentTrait(MapContent, Trait): - """Trait for fetching parsed map content for Q10 devices.""" +class MapContentTrait(MapContent, TraitUpdateListener): + """Trait holding the most recently pushed parsed map content for Q10 devices. + + The Q10 has no synchronous get-map request; the device pushes map and trace + packets, which the ``Q10PropertiesApi`` subscribe loop feeds into + :meth:`update_from_map_response`. Consumers read the cached fields and/or + register a callback with :meth:`add_update_listener` to be notified when new + map content arrives. + """ def __init__( self, - channel: MqttChannel, *, map_parser_config: B01Q10MapParserConfig | None = None, ) -> None: super().__init__() - self._channel = channel + TraitUpdateListener.__init__(self, logger=_LOGGER) self._map_parser = B01Q10MapParser(map_parser_config) - async def refresh(self) -> None: - """Fetch, decode, and parse the current map payload.""" - raw_payload = await request_map(self._channel) - self.raw_api_response = raw_payload - self.parse_map_content() + def update_from_map_response(self, message: RoborockMessage) -> bool: + """Update cached map/trace state from a pushed ``MAP_RESPONSE`` message. + + Returns ``True`` if the message was a recognized Q10 map (``01 01``) or + trace (``02 01``) packet (so the caller can stop processing it), and + ``False`` otherwise. Update listeners are notified only when a packet is + parsed successfully. + """ + if message.protocol != RoborockMessageProtocol.MAP_RESPONSE or not message.payload: + return False + marker = message.payload[:2] + if marker == _MAP_PACKET_MARKER: + self.raw_api_response = message.payload + try: + self.parse_map_content() + except RoborockException as ex: + _LOGGER.debug("Failed to parse Q10 map packet: %s", ex) + return True + self._notify_update() + return True + if marker == _TRACE_PACKET_MARKER: + try: + trace = parse_trace_packet(message.payload) + except RoborockException as ex: + _LOGGER.debug("Failed to parse Q10 trace packet: %s", ex) + return True + self.path = trace.points + self.robot_position = trace.robot_position + self._notify_update() + return True + return False def parse_map_content(self) -> None: """Reparse the cached raw map payload without performing any I/O.""" if self.raw_api_response is None: - raise RoborockException("No map payload available; call refresh() first") + raise RoborockException("No map payload available; no map has been pushed yet") try: parsed = self._map_parser.parse(self.raw_api_response) @@ -102,18 +146,3 @@ def parse_map_content(self) -> None: self.image_content = parsed.image_content self.map_data = parsed.map_data self.rooms = packet.rooms - - async def refresh_trace(self) -> None: - """Fetch the current session's cleaning path and robot position. - - Populates :attr:`path` with the full trajectory of the active cleaning - session (the robot accumulates it, so the whole path is returned even - when we connect mid-session) and :attr:`robot_position` with the most - recent point. The robot only emits trace packets while a session is - active, so this raises :class:`RoborockException` (timeout) for an - idle/docked robot. - """ - raw_payload = await request_trace(self._channel) - trace = parse_trace_packet(raw_payload) - self.path = trace.points - self.robot_position = trace.robot_position diff --git a/tests/devices/traits/b01/q10/test_map.py b/tests/devices/traits/b01/q10/test_map.py index e9d47fdf..702d5693 100644 --- a/tests/devices/traits/b01/q10/test_map.py +++ b/tests/devices/traits/b01/q10/test_map.py @@ -1,89 +1,140 @@ -"""Tests for the Q10 B01 map content trait.""" +"""Tests for the Q10 B01 map content trait. +The Q10 map API is push-driven: the device publishes ``MAP_RESPONSE`` messages +and the trait updates its cached state from them via ``update_from_map_response`` +(there is no synchronous get-map request). +""" + +import asyncio +from collections.abc import AsyncGenerator from pathlib import Path -from typing import cast +from unittest.mock import AsyncMock, Mock import pytest +from roborock.devices.traits.b01.q10 import Q10PropertiesApi, create from roborock.devices.traits.b01.q10.map import MapContentTrait -from roborock.devices.transport.mqtt_channel import MqttChannel from roborock.exceptions import RoborockException from roborock.roborock_message import RoborockMessage, RoborockMessageProtocol -from tests.fixtures.channel_fixtures import FakeChannel FIXTURE = Path("tests/map/testdata/b01_q10_map.bin") TRACE_FIXTURE = Path("tests/map/testdata/b01_q10_trace.bin") -@pytest.fixture -def fake_channel() -> FakeChannel: - return FakeChannel() - - -def _trait(channel: FakeChannel) -> MapContentTrait: - return MapContentTrait(cast(MqttChannel, channel)) - +def _map_message( + payload: bytes, protocol: RoborockMessageProtocol = RoborockMessageProtocol.MAP_RESPONSE +) -> RoborockMessage: + return RoborockMessage(protocol=protocol, payload=payload, version=b"B01") -def _map_message(payload: bytes) -> RoborockMessage: - return RoborockMessage( - protocol=RoborockMessageProtocol.MAP_RESPONSE, - payload=payload, - version=b"B01", - ) - -async def test_map_refresh_populates_image_and_rooms(fake_channel: FakeChannel) -> None: - """refresh() triggers the device push, then parses the map payload.""" +def test_update_from_map_response_populates_image_and_rooms() -> None: + """A pushed 01 01 map packet populates the image, rooms and map data.""" payload = FIXTURE.read_bytes() - fake_channel.response_queue.append(_map_message(payload)) + trait = MapContentTrait() + updates: list[None] = [] + trait.add_update_listener(lambda: updates.append(None)) - trait = _trait(fake_channel) - await trait.refresh() + assert trait.update_from_map_response(_map_message(payload)) is True assert trait.raw_api_response == payload assert trait.image_content is not None assert trait.image_content[:8] == b"\x89PNG\r\n\x1a\n" assert {room.id: room.name for room in trait.rooms} == {2: "Living Room", 3: "Bedroom"} assert trait.map_data is not None + assert len(updates) == 1 - # The refresh trigger is a dpRequestDps (code 102) request. - assert len(fake_channel.published_messages) == 1 - trigger = fake_channel.published_messages[0].payload - assert trigger is not None and b'"102"' in trigger +def test_update_from_map_response_populates_path_and_position() -> None: + """A pushed 02 01 trace packet populates the path and robot position.""" + trait = MapContentTrait() + updates: list[None] = [] + trait.add_update_listener(lambda: updates.append(None)) -async def test_map_refresh_times_out_without_response( - fake_channel: FakeChannel, monkeypatch: pytest.MonkeyPatch -) -> None: - """If the device never pushes a map, refresh raises a clear error.""" - monkeypatch.setattr("roborock.devices.rpc.b01_q10_channel._MAP_TIMEOUT", 0.05) - trait = _trait(fake_channel) # no queued response -> times out - with pytest.raises(RoborockException, match="Timed out waiting for Q10 map"): - await trait.refresh() + assert trait.update_from_map_response(_map_message(TRACE_FIXTURE.read_bytes())) is True + assert [(p.x, p.y) for p in trait.path] == [(169, 0)] + assert trait.robot_position is not None + assert (trait.robot_position.x, trait.robot_position.y) == (169, 0) + assert len(updates) == 1 -def test_parse_without_refresh_raises(fake_channel: FakeChannel) -> None: - trait = _trait(fake_channel) + +def test_update_from_map_response_ignores_non_map_messages() -> None: + """Non-MAP_RESPONSE messages are left for the status path to handle.""" + trait = MapContentTrait() + updates: list[None] = [] + trait.add_update_listener(lambda: updates.append(None)) + + rpc = _map_message(b"\x01\x01whatever", protocol=RoborockMessageProtocol.RPC_RESPONSE) + assert trait.update_from_map_response(rpc) is False + + # An unrecognized MAP_RESPONSE marker is also not consumed. + assert trait.update_from_map_response(_map_message(b"\x09\x09junk")) is False + + assert trait.image_content is None + assert not trait.path + assert not updates + + +def test_parse_without_data_raises() -> None: + trait = MapContentTrait() with pytest.raises(RoborockException, match="No map payload available"): trait.parse_map_content() -async def test_refresh_trace_populates_path_and_position(fake_channel: FakeChannel) -> None: - """refresh_trace() parses the live position from a real ss07 trace packet.""" - fake_channel.response_queue.append(_map_message(TRACE_FIXTURE.read_bytes())) +# --- Integration through the Q10PropertiesApi subscribe loop ----------------- - trait = _trait(fake_channel) - await trait.refresh_trace() - assert [(p.x, p.y) for p in trait.path] == [(169, 0)] - assert trait.robot_position is not None - assert (trait.robot_position.x, trait.robot_position.y) == (169, 0) +@pytest.fixture +def message_queue() -> asyncio.Queue[RoborockMessage]: + return asyncio.Queue() + + +@pytest.fixture +def mock_channel(message_queue: asyncio.Queue[RoborockMessage]) -> AsyncMock: + async def mock_stream() -> AsyncGenerator[RoborockMessage, None]: + while True: + yield await message_queue.get() + + channel = AsyncMock() + channel.subscribe_stream = Mock(return_value=mock_stream()) + return channel + + +@pytest.fixture +async def q10_api(mock_channel: AsyncMock) -> AsyncGenerator[Q10PropertiesApi, None]: + api = create(mock_channel) + await api.start() + yield api + await api.close() + + +async def _wait_for(predicate, timeout: float = 2.0) -> None: + async with asyncio.timeout(timeout): + while not predicate(): + await asyncio.sleep(0.01) + + +async def test_subscribe_loop_routes_map_push( + q10_api: Q10PropertiesApi, + message_queue: asyncio.Queue[RoborockMessage], +) -> None: + """A map pushed onto the stream is routed to the map trait by the loop.""" + assert q10_api.map.image_content is None + + message_queue.put_nowait(_map_message(FIXTURE.read_bytes())) + + await _wait_for(lambda: q10_api.map.image_content is not None) + assert {room.id: room.name for room in q10_api.map.rooms} == {2: "Living Room", 3: "Bedroom"} + + +async def test_subscribe_loop_routes_trace_push( + q10_api: Q10PropertiesApi, + message_queue: asyncio.Queue[RoborockMessage], +) -> None: + """A trace pushed onto the stream is routed to the map trait by the loop.""" + assert not q10_api.map.path + message_queue.put_nowait(_map_message(TRACE_FIXTURE.read_bytes())) -async def test_refresh_trace_ignores_map_packets(fake_channel: FakeChannel, monkeypatch: pytest.MonkeyPatch) -> None: - """A map (01 01) push must not satisfy a trace request.""" - monkeypatch.setattr("roborock.devices.rpc.b01_q10_channel._MAP_TIMEOUT", 0.05) - fake_channel.response_queue.append(_map_message(FIXTURE.read_bytes())) # map, not trace - trait = _trait(fake_channel) - with pytest.raises(RoborockException, match="Timed out waiting for Q10 trace"): - await trait.refresh_trace() + await _wait_for(lambda: bool(q10_api.map.path)) + assert q10_api.map.robot_position is not None From 8797ede570262cd944e3927facedbeac05efadec Mon Sep 17 00:00:00 2001 From: Vincent <2070309+tubededentifrice@users.noreply.github.com> Date: Mon, 15 Jun 2026 13:02:09 +0400 Subject: [PATCH 6/9] fix: tighten Q10 map CLI push handling --- roborock/cli.py | 24 ++++++---- tests/devices/traits/b01/q10/test_map.py | 59 ++++++++++++++++++++++++ 2 files changed, 75 insertions(+), 8 deletions(-) diff --git a/roborock/cli.py b/roborock/cli.py index 013719da..fa4ff2aa 100644 --- a/roborock/cli.py +++ b/roborock/cli.py @@ -533,17 +533,16 @@ async def _await_q10_map_push( predicate: Callable[[], bool], *, timeout: float = _Q10_MAP_PUSH_TIMEOUT, + allow_cached_on_timeout: bool = False, ) -> bool: - """Nudge a Q10 to push its map/trace and wait until ``predicate`` holds. + """Nudge a Q10 to push its map/trace and wait for a fresh update. The Q10 map API is entirely push-driven: there is no synchronous get-map request. A ``dpRequestDps`` causes the device to publish a ``MAP_RESPONSE``, which the device's subscribe loop feeds into the map trait. Here we register - an update listener, send the request, and wait for the pushed data to satisfy - ``predicate``. Returns whether it did within ``timeout``. + an update listener, send the request, and wait for a newly pushed update to + satisfy ``predicate``. Returns whether it did within ``timeout``. """ - if predicate(): - return True loop = asyncio.get_running_loop() updated: asyncio.Future[None] = loop.create_future() @@ -557,7 +556,7 @@ def on_update() -> None: await asyncio.wait_for(updated, timeout=timeout) return True except TimeoutError: - return False + return allow_cached_on_timeout and predicate() finally: unsub() @@ -574,7 +573,11 @@ async def map_image(ctx, device_id: str, output_file: str): device = await device_manager.get_device(device_id) if device.b01_q10_properties is not None: properties = device.b01_q10_properties - await _await_q10_map_push(properties, lambda: properties.map.image_content is not None) + await _await_q10_map_push( + properties, + lambda: properties.map.image_content is not None, + allow_cached_on_timeout=True, + ) image_content = properties.map.image_content else: v1_trait: MapContentTrait = await _v1_trait(context, device_id, lambda v1: v1.map_content) @@ -793,7 +796,11 @@ async def rooms(ctx, device_id: str): properties = device.b01_q10_properties # A valid map may have no room records, so wait on the map arriving # (image_content) rather than on rooms being non-empty. - await _await_q10_map_push(properties, lambda: properties.map.image_content is not None) + await _await_q10_map_push( + properties, + lambda: properties.map.image_content is not None, + allow_cached_on_timeout=True, + ) click.echo(dump_json({room.id: room.name for room in properties.map.rooms})) else: await _display_v1_trait(context, device_id, lambda v1: v1.rooms) @@ -1285,6 +1292,7 @@ def write_markdown_table(product_features: dict[str, dict[str, any]], all_featur cli.add_command(maps) cli.add_command(map_image) cli.add_command(map_data) +cli.add_command(q10_position) cli.add_command(consumables) cli.add_command(reset_consumable) cli.add_command(rooms) diff --git a/tests/devices/traits/b01/q10/test_map.py b/tests/devices/traits/b01/q10/test_map.py index 702d5693..8b3be279 100644 --- a/tests/devices/traits/b01/q10/test_map.py +++ b/tests/devices/traits/b01/q10/test_map.py @@ -12,9 +12,11 @@ import pytest +from roborock.cli import _await_q10_map_push, cli from roborock.devices.traits.b01.q10 import Q10PropertiesApi, create from roborock.devices.traits.b01.q10.map import MapContentTrait from roborock.exceptions import RoborockException +from roborock.map.b01_q10_map_parser import Q10Point from roborock.roborock_message import RoborockMessage, RoborockMessageProtocol FIXTURE = Path("tests/map/testdata/b01_q10_map.bin") @@ -81,6 +83,63 @@ def test_parse_without_data_raises() -> None: trait.parse_map_content() +def test_q10_position_is_available_as_top_level_cli_command() -> None: + assert "q10-position" in cli.commands + + +# --- CLI push waiting -------------------------------------------------------- + + +class _FakeQ10Properties: + def __init__(self) -> None: + self.map = MapContentTrait() + self.refresh_count = 0 + + async def refresh(self) -> None: + self.refresh_count += 1 + + +class _FakeQ10PropertiesWithTrace(_FakeQ10Properties): + async def refresh(self) -> None: + await super().refresh() + self.map.update_from_map_response(_map_message(TRACE_FIXTURE.read_bytes())) + + +async def test_await_q10_map_push_waits_for_fresh_update() -> None: + """A cached trace alone is not treated as a successful new map push.""" + properties = _FakeQ10Properties() + properties.map.path = [Q10Point(1, 2)] + + got_trace = await _await_q10_map_push(properties, lambda: bool(properties.map.path), timeout=0.01) + + assert got_trace is False + assert properties.refresh_count == 1 + + +async def test_await_q10_map_push_returns_true_after_update() -> None: + properties = _FakeQ10PropertiesWithTrace() + + got_trace = await _await_q10_map_push(properties, lambda: bool(properties.map.path), timeout=0.01) + + assert got_trace is True + assert [(p.x, p.y) for p in properties.map.path] == [(169, 0)] + + +async def test_await_q10_map_push_can_fall_back_to_cached_map_on_timeout() -> None: + properties = _FakeQ10Properties() + properties.map.image_content = b"cached-png" + + got_map = await _await_q10_map_push( + properties, + lambda: properties.map.image_content is not None, + timeout=0.01, + allow_cached_on_timeout=True, + ) + + assert got_map is True + assert properties.refresh_count == 1 + + # --- Integration through the Q10PropertiesApi subscribe loop ----------------- From 629887a9e40081c4274d8f609809d2ec5ddc0082 Mon Sep 17 00:00:00 2001 From: Vincent <2070309+tubededentifrice@users.noreply.github.com> Date: Sat, 20 Jun 2026 12:22:52 +0400 Subject: [PATCH 7/9] fix: Q10 map header is u16be width+height; drop stray trace point The 01 01 map header carries two consecutive u16be fields: grid width at bytes 7-8 and grid height at bytes 9-10. The previous u16le read at offset 8 picked up the height's high byte, so it only matched the true width when width and height fell in the same 256-band -- a 222x261 map decoded its width as 478 and failed to split the layout. Read both dimensions as u16be straight from the header (falling back to brute-force inference when the height field is absent, e.g. older fixtures). Also drop a spurious leading trace point that some cleans prepend far outside the map; it skews the rendered start and path-based calibration. points[0] is dropped only when its step to points[1] is a gross outlier (>20x the median step), so a genuine first point is never lost. Both issues were reported, diagnosed and verified by @andrewlyeats from an independent B01/Q10 decoder; the u16be dimension read is corroborated by the ioBroker roborock adapter. --- roborock/map/b01_q10_map_parser.py | 73 ++++++++++++++++++++++-- tests/map/test_b01_q10_map_parser.py | 83 ++++++++++++++++++++++++++++ 2 files changed, 151 insertions(+), 5 deletions(-) diff --git a/roborock/map/b01_q10_map_parser.py b/roborock/map/b01_q10_map_parser.py index a56e9024..89025c43 100644 --- a/roborock/map/b01_q10_map_parser.py +++ b/roborock/map/b01_q10_map_parser.py @@ -4,7 +4,8 @@ few seconds after a ``dpRequestDps`` request). Unlike the Q7 ``SCMap`` protobuf format, the Q10 uses a custom, unencrypted binary packet: -- ``01 01`` marker, then a ``u32be`` map id and a ``u16le`` grid width. +- ``01 01`` marker, then a ``u32be`` map id (bytes 2-5) and two consecutive + ``u16be`` dimensions: grid width (bytes 7-8) and grid height (bytes 9-10). - A header field at offset 27 (``u16be``) giving the compressed layout length. - An LZ4-block-compressed occupancy grid starting at offset 29. Once inflated it is ``width * height`` cells of grid data followed by room metadata records. @@ -20,6 +21,8 @@ import colorsys import io +import math +import statistics from dataclasses import dataclass, field from PIL import Image @@ -36,7 +39,14 @@ TRACE_PACKET_MARKER = b"\x02\x01" _MAP_ID_OFFSET = 2 -_WIDTH_OFFSET = 8 +# Width and height are two consecutive u16be fields. An earlier revision read the +# width as u16le at offset 8; that high byte is actually the height's high byte, +# so it only matched the true width when width and height fell in the same +# 256-band -- e.g. a 222x261 map decoded its width as 478 and failed to split. +# Reported and diagnosed by @andrewlyeats (independent B01/Q10 decoder), and +# corroborated by the ioBroker roborock adapter (both read these as u16be). +_WIDTH_OFFSET = 7 +_HEIGHT_OFFSET = 9 _COMPRESSED_LAYOUT_LENGTH_OFFSET = 27 _LAYOUT_COMPRESSED_OFFSET = 29 _ROOM_RECORD_LENGTH = 47 @@ -119,6 +129,15 @@ def robot_position(self) -> Q10Point | None: _TRACE_HEADER_LENGTH = 10 _TRACE_SEQUENCE_OFFSET = 3 +# Some cleans prepend a single stray point to the path, far outside the map +# (e.g. ~(0, -1907) when the real path starts near (-3760, -1920)); it skews the +# rendered start/bounding box and any path-based calibration. We drop points[0] +# only when its step to points[1] is a gross outlier (this multiple of the +# median step of the rest of the path), so a genuine first point is never lost. +# The current position (last point) is unaffected. Trigger and threshold +# reported and verified by @andrewlyeats across independent B01/Q10 captures. +_STRAY_POINT_STEP_RATIO = 20 + def is_map_packet(payload: bytes) -> bool: """Return True if the payload is a Q10 full-map (``01 01``) packet.""" @@ -147,9 +166,27 @@ def parse_trace_packet(payload: bytes) -> Q10TracePacket: ) for offset in range(0, len(body), 4) ] + points = _drop_stray_leading_point(points) return Q10TracePacket(points=points, sequence=payload[_TRACE_SEQUENCE_OFFSET]) +def _drop_stray_leading_point(points: list[Q10Point]) -> list[Q10Point]: + """Drop a spurious leading point that some cleans prepend to the trace. + + Returns ``points`` unchanged unless the very first step is a gross outlier + versus the median of the remaining steps (see ``_STRAY_POINT_STEP_RATIO``), + in which case the first point is dropped. Needs at least three points to have + a stable median to compare against. + """ + if len(points) < 3: + return points + steps = [math.hypot(b.x - a.x, b.y - a.y) for a, b in zip(points, points[1:])] + median_rest = statistics.median(steps[1:]) + if median_rest > 0 and steps[0] > _STRAY_POINT_STEP_RATIO * median_rest: + return points[1:] + return points + + def lz4_block_decompress(data: bytes) -> bytes: """Decompress a raw LZ4 *block* (no frame header). @@ -200,13 +237,32 @@ def read_length(value: int) -> int: output.append(output[-offset]) +def _split_with_dims(decoded: bytes, width: int, height: int) -> tuple[bytes, bytes] | None: + """Split the inflated layout into (grid, room_data) using header dimensions. + + Returns ``None`` when ``width * height`` does not leave a well-formed + ``01 `` room-record section, so the caller can fall back to + brute-force inference (e.g. for captures/fixtures without a height field). + """ + area = width * height + if area <= 0 or area > len(decoded): + return None + room_data = decoded[area:] + if len(room_data) < 2 or room_data[0] != 1: + return None + if len(room_data) != 2 + room_data[1] * _ROOM_RECORD_LENGTH: + return None + return decoded[:area], room_data + + def _infer_layout(decoded: bytes, width: int) -> tuple[int, bytes, bytes]: """Split the inflated layout into (height, grid, room_data). The grid is ``width * height`` cells; the remaining bytes are room records introduced by an ``01 `` marker. The room count is unknown up front, so we search for the split that makes the grid rectangular and lines - up with the marker. + up with the marker. Used as a fallback when the header carries no usable + height. """ for room_count in range(0, _MAX_ROOMS + 1): room_data_length = 2 + room_count * _ROOM_RECORD_LENGTH @@ -246,7 +302,8 @@ def parse_map_packet(payload: bytes) -> Q10MapPacket: raise RoborockException("Payload is not a Q10 map packet") map_id = int.from_bytes(payload[_MAP_ID_OFFSET : _MAP_ID_OFFSET + 4], "big") - width = int.from_bytes(payload[_WIDTH_OFFSET : _WIDTH_OFFSET + 2], "little") + width = int.from_bytes(payload[_WIDTH_OFFSET : _WIDTH_OFFSET + 2], "big") + height = int.from_bytes(payload[_HEIGHT_OFFSET : _HEIGHT_OFFSET + 2], "big") if width <= 0: raise RoborockException("Q10 map packet has invalid width") @@ -258,7 +315,13 @@ def parse_map_packet(payload: bytes) -> Q10MapPacket: raise RoborockException("Q10 map packet has invalid layout block length") decoded = lz4_block_decompress(payload[_LAYOUT_COMPRESSED_OFFSET:layout_end]) - height, grid, room_data = _infer_layout(decoded, width) + # Prefer the header height; fall back to inference if it doesn't line up + # (e.g. older captures/fixtures that don't populate the height field). + split = _split_with_dims(decoded, width, height) if height > 0 else None + if split is not None: + grid, room_data = split + else: + height, grid, room_data = _infer_layout(decoded, width) rooms = _parse_rooms(room_data, grid) return Q10MapPacket(map_id=map_id, width=width, height=height, grid=grid, rooms=rooms) diff --git a/tests/map/test_b01_q10_map_parser.py b/tests/map/test_b01_q10_map_parser.py index 4dbcc9bf..53c3c63c 100644 --- a/tests/map/test_b01_q10_map_parser.py +++ b/tests/map/test_b01_q10_map_parser.py @@ -53,6 +53,36 @@ def _synthetic_map_payload(width: int, decoded_layout: bytes) -> bytes: return bytes(payload) +def _room_record(room_id: int, name: str) -> bytes: + record = bytearray(47) # _ROOM_RECORD_LENGTH + record[0:2] = room_id.to_bytes(2, "big") + encoded = name.encode("utf-8") + record[26] = len(encoded) + record[27 : 27 + len(encoded)] = encoded + return bytes(record) + + +def _full_header_map_payload(width: int, height: int, decoded_layout: bytes) -> bytes: + """Build a map packet that populates the real u16be width and height fields.""" + compressed = _literal_lz4_block(decoded_layout) + payload = bytearray(29) + payload[0:2] = b"\x01\x01" + payload[2:6] = (0x01020304).to_bytes(4, "big") + payload[7:9] = width.to_bytes(2, "big") + payload[9:11] = height.to_bytes(2, "big") + payload[27:29] = len(compressed).to_bytes(2, "big") + payload.extend(compressed) + return bytes(payload) + + +def _trace_payload(points: list[tuple[int, int]], sequence: int = 1) -> bytes: + header = bytearray(10) + header[0:2] = b"\x02\x01" + header[3] = sequence + body = b"".join(x.to_bytes(2, "big", signed=True) + y.to_bytes(2, "big", signed=True) for x, y in points) + return bytes(header) + body + + def test_lz4_block_roundtrip_all_literals() -> None: """A simple all-literals block decodes back to the original bytes.""" original = bytes(range(60)) * 3 @@ -96,6 +126,36 @@ def test_parse_map_packet_allows_zero_room_metadata() -> None: assert packet.rooms == [] +def test_parse_map_packet_reads_header_height() -> None: + """Width and height come straight from the u16be header fields.""" + grid = bytes([8]) * 6 + bytes([12]) * 6 # two rooms, 4x3 grid + layout = grid + b"\x01\x02" + _room_record(2, "rr_kitchen") + _room_record(3, "den") + packet = parse_map_packet(_full_header_map_payload(width=4, height=3, decoded_layout=layout)) + assert (packet.width, packet.height) == (4, 3) + assert [(r.id, r.raw_name) for r in packet.rooms] == [(2, "rr_kitchen"), (3, "den")] + + +def test_parse_map_packet_dimensions_straddling_256() -> None: + """Regression: a 222x261 map (dimensions in different 256-bands). + + Width and height are consecutive u16be header fields. The earlier u16le read + at offset 8 picked up the height's high byte, decoding this map's width as + 478 (0xDE | 0x01 << 8) and failing to split the layout. Reported, diagnosed + and verified by @andrewlyeats from a real 222x261 capture. + """ + width, height = 222, 261 + grid = bytearray(width * height) + grid[0:100] = bytes([8]) * 100 # room id 2 -> pixel value 8 + grid[100:250] = bytes([12]) * 150 # room id 3 -> pixel value 12 + layout = bytes(grid) + b"\x01\x02" + _room_record(2, "rr_kitchen") + _room_record(3, "den") + payload = _full_header_map_payload(width, height, layout) + # The old u16le @ offset 8 read would have produced 478, not 222. + assert int.from_bytes(payload[8:10], "little") == 478 + packet = parse_map_packet(payload) + assert (packet.width, packet.height) == (222, 261) + assert [(r.id, r.raw_name) for r in packet.rooms] == [(2, "rr_kitchen"), (3, "den")] + + def test_room_name_normalization() -> None: """Firmware ``rr_`` default names are normalized; custom names are titled.""" assert Q10Room(id=2, raw_name="rr_living_room", pixel_value=8, pixel_count=9).name == "Living Room" @@ -167,6 +227,29 @@ def test_parse_trace_packet_multi_point() -> None: assert (trace.robot_position.x, trace.robot_position.y) == (-50, 300) +def test_parse_trace_drops_stray_leading_point() -> None: + """A stray first point far outside the path is dropped (calibration hygiene).""" + points = [(0, -1907), (-3760, -1920), (-3758, -1918), (-3756, -1919)] + trace = parse_trace_packet(_trace_payload(points)) + assert [(p.x, p.y) for p in trace.points] == points[1:] + assert trace.robot_position is not None + assert (trace.robot_position.x, trace.robot_position.y) == points[-1] + + +def test_parse_trace_keeps_genuine_first_point() -> None: + """A normal first step (same scale as the rest) is never dropped.""" + points = [(0, 0), (10, 0), (22, 0), (35, 1)] + trace = parse_trace_packet(_trace_payload(points)) + assert [(p.x, p.y) for p in trace.points] == points + + +def test_parse_trace_session_keeps_initial_reposition() -> None: + """The real corridor capture has a 4.8x first step -- well under the 20x cut.""" + trace = parse_trace_packet(TRACE_SESSION_FIXTURE.read_bytes()) + assert len(trace.points) == 15 + assert (trace.points[0].x, trace.points[0].y) == (-34, 0) + + def test_parse_trace_empty_path_has_no_position() -> None: header_only = b"\x02\x01" + b"\x00" * 8 # 10-byte header, no points trace = parse_trace_packet(header_only) From acf4a71b383572cf4429b3655ab3253aa0df014d Mon Sep 17 00:00:00 2001 From: Vincent <2070309+tubededentifrice@users.noreply.github.com> Date: Sun, 21 Jun 2026 08:25:10 +0400 Subject: [PATCH 8/9] fix: unblock lint when tests import roborock.cli test_map.py imports roborock.cli (for _await_q10_map_push), which drags cli.py into mypy via follow_imports. cli.py is intentionally not mypy-clean -- the pre-commit hook excludes it -- but an exclude can't stop transitive imports, so its ~79 latent errors surfaced in CI. Mirror the exclude's intent with [mypy-roborock.cli] ignore_errors, which suppresses cli.py errors however it's reached, and cast the minimal Q10 test doubles to Q10PropertiesApi at the _await_q10_map_push call sites. --- mypy.ini | 3 +++ tests/devices/traits/b01/q10/test_map.py | 11 ++++++++--- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/mypy.ini b/mypy.ini index f6f59369..ebf1c85a 100644 --- a/mypy.ini +++ b/mypy.ini @@ -3,3 +3,6 @@ check_untyped_defs = True [mypy-construct] ignore_missing_imports = True + +[mypy-roborock.cli] +ignore_errors = True diff --git a/tests/devices/traits/b01/q10/test_map.py b/tests/devices/traits/b01/q10/test_map.py index 8b3be279..bb445ad9 100644 --- a/tests/devices/traits/b01/q10/test_map.py +++ b/tests/devices/traits/b01/q10/test_map.py @@ -8,6 +8,7 @@ import asyncio from collections.abc import AsyncGenerator from pathlib import Path +from typing import cast from unittest.mock import AsyncMock, Mock import pytest @@ -110,7 +111,9 @@ async def test_await_q10_map_push_waits_for_fresh_update() -> None: properties = _FakeQ10Properties() properties.map.path = [Q10Point(1, 2)] - got_trace = await _await_q10_map_push(properties, lambda: bool(properties.map.path), timeout=0.01) + got_trace = await _await_q10_map_push( + cast(Q10PropertiesApi, properties), lambda: bool(properties.map.path), timeout=0.01 + ) assert got_trace is False assert properties.refresh_count == 1 @@ -119,7 +122,9 @@ async def test_await_q10_map_push_waits_for_fresh_update() -> None: async def test_await_q10_map_push_returns_true_after_update() -> None: properties = _FakeQ10PropertiesWithTrace() - got_trace = await _await_q10_map_push(properties, lambda: bool(properties.map.path), timeout=0.01) + got_trace = await _await_q10_map_push( + cast(Q10PropertiesApi, properties), lambda: bool(properties.map.path), timeout=0.01 + ) assert got_trace is True assert [(p.x, p.y) for p in properties.map.path] == [(169, 0)] @@ -130,7 +135,7 @@ async def test_await_q10_map_push_can_fall_back_to_cached_map_on_timeout() -> No properties.map.image_content = b"cached-png" got_map = await _await_q10_map_push( - properties, + cast(Q10PropertiesApi, properties), lambda: properties.map.image_content is not None, timeout=0.01, allow_cached_on_timeout=True, From a73bfb88a60ca48495408f99240b0f36d085f9c2 Mon Sep 17 00:00:00 2001 From: Vincent <2070309+tubededentifrice@users.noreply.github.com> Date: Sun, 21 Jun 2026 20:14:04 +0400 Subject: [PATCH 9/9] refactor: parse Q10 map/trace packets in the protocol layer Move RoborockMessage decoding out of the map trait and into the Q10 protocol/rpc layer: decode_message() now returns a typed Q10Message (Q10DpsUpdate | Q10MapPacket | Q10TracePacket) and stream_decoded_messages() yields it. The subscribe loop dispatches by type to the read-model traits. - Trait deals with parsed packets, not raw messages: update_from_map_packet / update_from_trace_packet replace update_from_map_response. - Map packet is parsed once: B01Q10MapParser gains parse_packet(packet) so the trait renders the already-parsed packet instead of re-parsing the bytes. - Drop the broad except in the trait and the raw_api_response state; packet parsing only raises RoborockException, funneled through the stream. - Packet-kind markers live solely in the map parser (is_map_packet / is_trace_packet); the duplicated trait constants are gone. --- roborock/devices/rpc/b01_q10_channel.py | 27 +++--- roborock/devices/traits/b01/q10/__init__.py | 43 ++++------ roborock/devices/traits/b01/q10/map.py | 94 ++++++--------------- roborock/map/b01_q10_map_parser.py | 9 +- roborock/protocols/b01_q10_protocol.py | 43 ++++++++++ tests/devices/traits/b01/q10/test_map.py | 44 +++------- tests/protocols/test_b01_q10_protocol.py | 39 +++++++++ 7 files changed, 160 insertions(+), 139 deletions(-) diff --git a/roborock/devices/rpc/b01_q10_channel.py b/roborock/devices/rpc/b01_q10_channel.py index 50ec8e5a..49758326 100644 --- a/roborock/devices/rpc/b01_q10_channel.py +++ b/roborock/devices/rpc/b01_q10_channel.py @@ -2,41 +2,42 @@ import logging from collections.abc import AsyncGenerator -from typing import Any from roborock.data.b01_q10.b01_q10_code_mappings import B01_Q10_DP from roborock.devices.transport.mqtt_channel import MqttChannel from roborock.exceptions import RoborockException from roborock.protocols.b01_q10_protocol import ( ParamsType, - decode_rpc_response, + Q10Message, + decode_message, encode_mqtt_payload, ) _LOGGER = logging.getLogger(__name__) -async def stream_decoded_responses( +async def stream_decoded_messages( mqtt_channel: MqttChannel, -) -> AsyncGenerator[dict[B01_Q10_DP, Any], None]: - """Stream decoded DPS messages received via MQTT. +) -> AsyncGenerator[Q10Message, None]: + """Stream decoded Q10 messages received via MQTT. - Messages that are not decodable DPS responses (e.g. protocol-301 - ``MAP_RESPONSE`` map pushes) are skipped; callers that need the raw - messages should subscribe to :meth:`MqttChannel.subscribe_stream` directly. + Each pushed ``RoborockMessage`` is decoded into a typed :data:`Q10Message` + (a DPS status update, a map packet, or a trace packet). Messages that fail + to decode or carry an unrecognized payload are skipped. """ - async for response_message in mqtt_channel.subscribe_stream(): + async for message in mqtt_channel.subscribe_stream(): try: - decoded_dps = decode_rpc_response(response_message) + decoded = decode_message(message) except RoborockException as ex: _LOGGER.debug( - "Failed to decode B01 Q10 RPC response: %s: %s", - response_message, + "Failed to decode B01 Q10 message: %s: %s", + message, ex, ) continue - yield decoded_dps + if decoded is not None: + yield decoded async def send_command( diff --git a/roborock/devices/traits/b01/q10/__init__.py b/roborock/devices/traits/b01/q10/__init__.py index 791cdd0b..799151a6 100644 --- a/roborock/devices/traits/b01/q10/__init__.py +++ b/roborock/devices/traits/b01/q10/__init__.py @@ -4,11 +4,11 @@ import logging from roborock.data.b01_q10.b01_q10_code_mappings import B01_Q10_DP +from roborock.devices.rpc.b01_q10_channel import stream_decoded_messages from roborock.devices.traits import Trait from roborock.devices.transport.mqtt_channel import MqttChannel -from roborock.exceptions import RoborockException -from roborock.protocols.b01_q10_protocol import decode_rpc_response -from roborock.roborock_message import RoborockMessage +from roborock.map.b01_q10_map_parser import Q10MapPacket, Q10TracePacket +from roborock.protocols.b01_q10_protocol import Q10DpsUpdate, Q10Message from .command import CommandTrait from .map import MapContentTrait @@ -73,32 +73,25 @@ async def refresh(self) -> None: await self.command.send(B01_Q10_DP.REQUEST_DPS, params={}) async def _subscribe_loop(self) -> None: - """Persistent loop dispatching pushed messages to the read-model traits.""" - async for message in self._channel.subscribe_stream(): + """Persistent loop dispatching decoded messages to the read-model traits.""" + async for message in stream_decoded_messages(self._channel): self._handle_message(message) - def _handle_message(self, message: RoborockMessage) -> None: - """Route a single pushed message to the trait responsible for it. + def _handle_message(self, message: Q10Message) -> None: + """Route a single decoded message to the trait responsible for it. - Map/trace pushes arrive as protocol-301 ``MAP_RESPONSE`` messages (not - DPS), so they are handled separately from the status DPS stream. The Q10 - is entirely push-driven: there is no synchronous get-map request, the - device just publishes its current map (a ``dpRequestDps`` nudges it to). + Map and trace packets arrive as protocol-301 ``MAP_RESPONSE`` pushes (the + Q10 is entirely push-driven: there is no synchronous get-map request, a + ``dpRequestDps`` just nudges the device to publish its current map). DPS + updates feed the status trait. More traits can be dispatched here below. """ - if self.map.update_from_map_response(message): - return - - try: - decoded_dps = decode_rpc_response(message) - except RoborockException as ex: - _LOGGER.debug("Failed to decode Q10 RPC response: %s: %s", message, ex) - return - - _LOGGER.debug("Received Q10 status update: %s", decoded_dps) - # Notify all traits about a new message and each trait will - # only update what fields that it is responsible for. - # More traits can be added here below. - self.status.update_from_dps(decoded_dps) + if isinstance(message, Q10MapPacket): + self.map.update_from_map_packet(message) + elif isinstance(message, Q10TracePacket): + self.map.update_from_trace_packet(message) + elif isinstance(message, Q10DpsUpdate): + _LOGGER.debug("Received Q10 status update: %s", message.dps) + self.status.update_from_dps(message.dps) def create(channel: MqttChannel) -> Q10PropertiesApi: diff --git a/roborock/devices/traits/b01/q10/map.py b/roborock/devices/traits/b01/q10/map.py index 8cbb5d9e..c132def4 100644 --- a/roborock/devices/traits/b01/q10/map.py +++ b/roborock/devices/traits/b01/q10/map.py @@ -4,13 +4,15 @@ trait is purely push-driven and mirrors the Q10 ``StatusTrait`` contract: - The device pushes its current map/path as protocol-301 ``MAP_RESPONSE`` - messages (a ``dpRequestDps`` nudges it to do so). The ``Q10PropertiesApi`` - subscribe loop routes those messages to :meth:`MapContentTrait.update_from_map_response`. -- ``update_from_map_response`` parses the payload, updates the cached fields and - notifies update listeners (register via :meth:`add_update_listener`). -- ``parse_map_content()`` reparses the cached raw bytes without I/O. -- ``image_content``, ``map_data``, ``rooms``, ``path``, ``robot_position`` and - ``raw_api_response`` are readable and reflect the most recently pushed map. + messages (a ``dpRequestDps`` nudges it to do so). The protocol layer decodes + those into :class:`Q10MapPacket` / :class:`Q10TracePacket` objects and the + ``Q10PropertiesApi`` subscribe loop routes them to + :meth:`MapContentTrait.update_from_map_packet` / + :meth:`MapContentTrait.update_from_trace_packet`. +- Those methods render/cache the content and notify update listeners (register + via :meth:`add_update_listener`). +- ``image_content``, ``map_data``, ``rooms``, ``path`` and ``robot_position`` + are readable and reflect the most recently pushed map. Unlike the Q7, the Q10 map payload is unencrypted, so no map key is required. """ @@ -22,26 +24,19 @@ from roborock.data import RoborockBase from roborock.devices.traits.common import TraitUpdateListener -from roborock.exceptions import RoborockException from roborock.map.b01_q10_map_parser import ( B01Q10MapParser, B01Q10MapParserConfig, + Q10MapPacket, Q10Point, Q10Room, - parse_map_packet, - parse_trace_packet, + Q10TracePacket, ) -from roborock.roborock_message import RoborockMessage, RoborockMessageProtocol _LOGGER = logging.getLogger(__name__) _TRUNCATE_LENGTH = 20 -# MAP_RESPONSE (protocol 301) payloads start with a 2-byte marker identifying the -# packet kind: a full map (``01 01``) or a live trace/path (``02 01``). -_MAP_PACKET_MARKER = b"\x01\x01" -_TRACE_PACKET_MARKER = b"\x02\x01" - @dataclass class MapContent(RoborockBase): @@ -66,9 +61,6 @@ class MapContent(RoborockBase): robot_position: Q10Point | None = None """Current robot position (the most recent path point), if known.""" - raw_api_response: bytes | None = None - """Raw bytes of the map payload from the device (opaque blob for re-parsing).""" - def __repr__(self) -> str: img = self.image_content if img and len(img) > _TRUNCATE_LENGTH: @@ -80,8 +72,9 @@ class MapContentTrait(MapContent, TraitUpdateListener): """Trait holding the most recently pushed parsed map content for Q10 devices. The Q10 has no synchronous get-map request; the device pushes map and trace - packets, which the ``Q10PropertiesApi`` subscribe loop feeds into - :meth:`update_from_map_response`. Consumers read the cached fields and/or + packets, which the protocol layer decodes and the ``Q10PropertiesApi`` + subscribe loop feeds into :meth:`update_from_map_packet` / + :meth:`update_from_trace_packet`. Consumers read the cached fields and/or register a callback with :meth:`add_update_listener` to be notified when new map content arrives. """ @@ -95,54 +88,23 @@ def __init__( TraitUpdateListener.__init__(self, logger=_LOGGER) self._map_parser = B01Q10MapParser(map_parser_config) - def update_from_map_response(self, message: RoborockMessage) -> bool: - """Update cached map/trace state from a pushed ``MAP_RESPONSE`` message. + def update_from_map_packet(self, packet: Q10MapPacket) -> None: + """Render a pushed full-map packet into the cached image/rooms. - Returns ``True`` if the message was a recognized Q10 map (``01 01``) or - trace (``02 01``) packet (so the caller can stop processing it), and - ``False`` otherwise. Update listeners are notified only when a packet is - parsed successfully. + Rendering failures are logged and skipped (listeners are not notified) so + a single bad push cannot tear down the subscribe loop. """ - if message.protocol != RoborockMessageProtocol.MAP_RESPONSE or not message.payload: - return False - marker = message.payload[:2] - if marker == _MAP_PACKET_MARKER: - self.raw_api_response = message.payload - try: - self.parse_map_content() - except RoborockException as ex: - _LOGGER.debug("Failed to parse Q10 map packet: %s", ex) - return True - self._notify_update() - return True - if marker == _TRACE_PACKET_MARKER: - try: - trace = parse_trace_packet(message.payload) - except RoborockException as ex: - _LOGGER.debug("Failed to parse Q10 trace packet: %s", ex) - return True - self.path = trace.points - self.robot_position = trace.robot_position - self._notify_update() - return True - return False - - def parse_map_content(self) -> None: - """Reparse the cached raw map payload without performing any I/O.""" - if self.raw_api_response is None: - raise RoborockException("No map payload available; no map has been pushed yet") - - try: - parsed = self._map_parser.parse(self.raw_api_response) - packet = parse_map_packet(self.raw_api_response) - except RoborockException: - raise - except Exception as ex: - raise RoborockException("Failed to parse Q10 map data") from ex - + parsed = self._map_parser.parse_packet(packet) if parsed.image_content is None: - raise RoborockException("Failed to render Q10 map image") - + _LOGGER.debug("Failed to render Q10 map image") + return self.image_content = parsed.image_content self.map_data = parsed.map_data self.rooms = packet.rooms + self._notify_update() + + def update_from_trace_packet(self, packet: Q10TracePacket) -> None: + """Cache the path/robot position from a pushed trace packet.""" + self.path = packet.points + self.robot_position = packet.robot_position + self._notify_update() diff --git a/roborock/map/b01_q10_map_parser.py b/roborock/map/b01_q10_map_parser.py index 89025c43..89dec083 100644 --- a/roborock/map/b01_q10_map_parser.py +++ b/roborock/map/b01_q10_map_parser.py @@ -342,7 +342,14 @@ def __init__(self, config: B01Q10MapParserConfig | None = None) -> None: def parse(self, payload: bytes) -> ParsedMapData: """Parse a raw Q10 map packet into a rendered PNG + ``MapData``.""" - packet = parse_map_packet(payload) + return self.parse_packet(parse_map_packet(payload)) + + def parse_packet(self, packet: Q10MapPacket) -> ParsedMapData: + """Render an already-parsed Q10 map packet into a PNG + ``MapData``. + + The protocol layer parses the wire bytes into a :class:`Q10MapPacket`; + this renders that packet without re-parsing it. + """ image = self._render(packet) map_data = MapData() diff --git a/roborock/protocols/b01_q10_protocol.py b/roborock/protocols/b01_q10_protocol.py index 94a1e7b3..c0cf6b9b 100644 --- a/roborock/protocols/b01_q10_protocol.py +++ b/roborock/protocols/b01_q10_protocol.py @@ -2,10 +2,19 @@ import json import logging +from dataclasses import dataclass from typing import Any from roborock.data.b01_q10.b01_q10_code_mappings import B01_Q10_DP from roborock.exceptions import RoborockException +from roborock.map.b01_q10_map_parser import ( + Q10MapPacket, + Q10TracePacket, + is_map_packet, + is_trace_packet, + parse_map_packet, + parse_trace_packet, +) from roborock.roborock_message import ( RoborockMessage, RoborockMessageProtocol, @@ -85,3 +94,37 @@ def decode_rpc_response(message: RoborockMessage) -> dict[B01_Q10_DP, Any]: result.update(common_dps_result) return result + + +@dataclass +class Q10DpsUpdate: + """A decoded Q10 DPS status update pushed by the device.""" + + dps: dict[B01_Q10_DP, Any] + """Data points keyed by ``B01_Q10_DP`` code.""" + + +# A single decoded message from a Q10 device: a DPS status update, a full map +# packet, or a live cleaning-path (trace) packet. Map/trace packets arrive as +# protocol-301 ``MAP_RESPONSE`` pushes; everything else is a DPS update. +Q10Message = Q10DpsUpdate | Q10MapPacket | Q10TracePacket + + +def decode_message(message: RoborockMessage) -> Q10Message | None: + """Decode a pushed Q10 ``RoborockMessage`` into a typed message. + + ``MAP_RESPONSE`` (protocol 301) payloads carry the binary map (``01 01``) or + trace (``02 01``) packets, which are parsed by the map parser; any other + ``MAP_RESPONSE`` marker is unrecognized and yields ``None``. Every other + protocol is treated as a DPS status update. + + Raises ``RoborockException`` if a recognized payload fails to parse. + """ + if message.protocol == RoborockMessageProtocol.MAP_RESPONSE: + payload = message.payload or b"" + if is_map_packet(payload): + return parse_map_packet(payload) + if is_trace_packet(payload): + return parse_trace_packet(payload) + return None + return Q10DpsUpdate(dps=decode_rpc_response(message)) diff --git a/tests/devices/traits/b01/q10/test_map.py b/tests/devices/traits/b01/q10/test_map.py index bb445ad9..470f3f99 100644 --- a/tests/devices/traits/b01/q10/test_map.py +++ b/tests/devices/traits/b01/q10/test_map.py @@ -16,8 +16,7 @@ from roborock.cli import _await_q10_map_push, cli from roborock.devices.traits.b01.q10 import Q10PropertiesApi, create from roborock.devices.traits.b01.q10.map import MapContentTrait -from roborock.exceptions import RoborockException -from roborock.map.b01_q10_map_parser import Q10Point +from roborock.map.b01_q10_map_parser import Q10Point, parse_map_packet, parse_trace_packet from roborock.roborock_message import RoborockMessage, RoborockMessageProtocol FIXTURE = Path("tests/map/testdata/b01_q10_map.bin") @@ -30,16 +29,15 @@ def _map_message( return RoborockMessage(protocol=protocol, payload=payload, version=b"B01") -def test_update_from_map_response_populates_image_and_rooms() -> None: - """A pushed 01 01 map packet populates the image, rooms and map data.""" - payload = FIXTURE.read_bytes() +def test_update_from_map_packet_populates_image_and_rooms() -> None: + """A parsed 01 01 map packet populates the image, rooms and map data.""" + packet = parse_map_packet(FIXTURE.read_bytes()) trait = MapContentTrait() updates: list[None] = [] trait.add_update_listener(lambda: updates.append(None)) - assert trait.update_from_map_response(_map_message(payload)) is True + trait.update_from_map_packet(packet) - assert trait.raw_api_response == payload assert trait.image_content is not None assert trait.image_content[:8] == b"\x89PNG\r\n\x1a\n" assert {room.id: room.name for room in trait.rooms} == {2: "Living Room", 3: "Bedroom"} @@ -47,13 +45,14 @@ def test_update_from_map_response_populates_image_and_rooms() -> None: assert len(updates) == 1 -def test_update_from_map_response_populates_path_and_position() -> None: - """A pushed 02 01 trace packet populates the path and robot position.""" +def test_update_from_trace_packet_populates_path_and_position() -> None: + """A parsed 02 01 trace packet populates the path and robot position.""" + packet = parse_trace_packet(TRACE_FIXTURE.read_bytes()) trait = MapContentTrait() updates: list[None] = [] trait.add_update_listener(lambda: updates.append(None)) - assert trait.update_from_map_response(_map_message(TRACE_FIXTURE.read_bytes())) is True + trait.update_from_trace_packet(packet) assert [(p.x, p.y) for p in trait.path] == [(169, 0)] assert trait.robot_position is not None @@ -61,29 +60,6 @@ def test_update_from_map_response_populates_path_and_position() -> None: assert len(updates) == 1 -def test_update_from_map_response_ignores_non_map_messages() -> None: - """Non-MAP_RESPONSE messages are left for the status path to handle.""" - trait = MapContentTrait() - updates: list[None] = [] - trait.add_update_listener(lambda: updates.append(None)) - - rpc = _map_message(b"\x01\x01whatever", protocol=RoborockMessageProtocol.RPC_RESPONSE) - assert trait.update_from_map_response(rpc) is False - - # An unrecognized MAP_RESPONSE marker is also not consumed. - assert trait.update_from_map_response(_map_message(b"\x09\x09junk")) is False - - assert trait.image_content is None - assert not trait.path - assert not updates - - -def test_parse_without_data_raises() -> None: - trait = MapContentTrait() - with pytest.raises(RoborockException, match="No map payload available"): - trait.parse_map_content() - - def test_q10_position_is_available_as_top_level_cli_command() -> None: assert "q10-position" in cli.commands @@ -103,7 +79,7 @@ async def refresh(self) -> None: class _FakeQ10PropertiesWithTrace(_FakeQ10Properties): async def refresh(self) -> None: await super().refresh() - self.map.update_from_map_response(_map_message(TRACE_FIXTURE.read_bytes())) + self.map.update_from_trace_packet(parse_trace_packet(TRACE_FIXTURE.read_bytes())) async def test_await_q10_map_push_waits_for_fresh_update() -> None: diff --git a/tests/protocols/test_b01_q10_protocol.py b/tests/protocols/test_b01_q10_protocol.py index 62ee2c27..8e42f07e 100644 --- a/tests/protocols/test_b01_q10_protocol.py +++ b/tests/protocols/test_b01_q10_protocol.py @@ -11,7 +11,10 @@ from roborock.data.b01_q10.b01_q10_code_mappings import B01_Q10_DP, YXWaterLevel from roborock.exceptions import RoborockException +from roborock.map.b01_q10_map_parser import Q10MapPacket, Q10TracePacket from roborock.protocols.b01_q10_protocol import ( + Q10DpsUpdate, + decode_message, decode_rpc_response, encode_mqtt_payload, ) @@ -21,6 +24,42 @@ TESTDATA_FILES = list(TESTDATA_PATH.glob("*.json")) TESTDATA_IDS = [x.stem for x in TESTDATA_FILES] +MAP_FIXTURE = pathlib.Path("tests/map/testdata/b01_q10_map.bin") +TRACE_FIXTURE = pathlib.Path("tests/map/testdata/b01_q10_trace.bin") + + +def _message(payload: bytes, protocol: RoborockMessageProtocol) -> RoborockMessage: + return RoborockMessage(protocol=protocol, payload=payload, version=b"B01") + + +def test_decode_message_dps_update() -> None: + """A non-MAP_RESPONSE message decodes into a Q10DpsUpdate.""" + message = _message(b'{"dps": {"122": 100}}', RoborockMessageProtocol.RPC_RESPONSE) + decoded = decode_message(message) + assert decoded == Q10DpsUpdate(dps={B01_Q10_DP.BATTERY: 100}) + + +def test_decode_message_map_packet() -> None: + """A MAP_RESPONSE 01 01 payload decodes into a Q10MapPacket.""" + message = _message(MAP_FIXTURE.read_bytes(), RoborockMessageProtocol.MAP_RESPONSE) + decoded = decode_message(message) + assert isinstance(decoded, Q10MapPacket) + assert {room.id: room.name for room in decoded.rooms} == {2: "Living Room", 3: "Bedroom"} + + +def test_decode_message_trace_packet() -> None: + """A MAP_RESPONSE 02 01 payload decodes into a Q10TracePacket.""" + message = _message(TRACE_FIXTURE.read_bytes(), RoborockMessageProtocol.MAP_RESPONSE) + decoded = decode_message(message) + assert isinstance(decoded, Q10TracePacket) + assert [(p.x, p.y) for p in decoded.points] == [(169, 0)] + + +def test_decode_message_unknown_map_marker_returns_none() -> None: + """A MAP_RESPONSE with an unrecognized marker is skipped (returns None).""" + assert decode_message(_message(b"\x09\x09junk", RoborockMessageProtocol.MAP_RESPONSE)) is None + assert decode_message(_message(b"", RoborockMessageProtocol.MAP_RESPONSE)) is None + @pytest.fixture(autouse=True) def fixed_time_fixture() -> Generator[None, None, None]: