From be842b660e1c7847b12f8119633b527a52e8512b Mon Sep 17 00:00:00 2001 From: Michael Osthege Date: Thu, 2 Jul 2026 00:07:08 +0200 Subject: [PATCH 1/2] Add `set_replay_mode_async` method --- cri_lib/cri_controller.py | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/cri_lib/cri_controller.py b/cri_lib/cri_controller.py index c6df58c..b0dee27 100644 --- a/cri_lib/cri_controller.py +++ b/cri_lib/cri_controller.py @@ -13,7 +13,7 @@ from .cri_errors import CRICommandError, CRICommandTimeOutError, CRIConnectionError from .cri_protocol_parser import CRIProtocolParser -from .robot_state import KinematicsState, RobotState +from .robot_state import KinematicsState, ReplayMode, RobotState logger = logging.getLogger(__name__) @@ -1355,6 +1355,27 @@ async def load_logic_programm_async(self, program_name: str) -> bool: else: return True + async def set_replay_mode_async(self, replay_mode: ReplayMode) -> None: + """Set the main program replay mode. + + Parameters + ---------- + replay_mode + Which mode to apply if the ``robot_state`` is not already in it. + + Raises + ------ + CRICommandError + If the ``replay_mode`` could not be applied. + """ + while self.robot_state.main_replay_mode != replay_mode: + msg_id = self.send_command( + f"CMD ProgramReplayMode {replay_mode.value}", True + ) + if (error_msg := await self._wait_for_answer_async(msg_id)) is not None: + raise CRICommandError(f"Could not set replay mode: {error_msg}") + await asyncio.sleep(0.1) + async def start_programm_async(self) -> bool: """Start currently loaded Program From 959caba469bef84c0f9cade2e0f23f25abc7c51f Mon Sep 17 00:00:00 2001 From: Michael Osthege Date: Thu, 2 Jul 2026 00:12:04 +0200 Subject: [PATCH 2/2] Add `start_programm_async(..., replay_mode)` kwarg Makes it easy to set the replay mode before starting a program. --- cri_lib/cri_controller.py | 28 +++++++++++++++++++++++----- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/cri_lib/cri_controller.py b/cri_lib/cri_controller.py index b0dee27..284d9f2 100644 --- a/cri_lib/cri_controller.py +++ b/cri_lib/cri_controller.py @@ -1374,17 +1374,33 @@ async def set_replay_mode_async(self, replay_mode: ReplayMode) -> None: ) if (error_msg := await self._wait_for_answer_async(msg_id)) is not None: raise CRICommandError(f"Could not set replay mode: {error_msg}") - await asyncio.sleep(0.1) + await asyncio.sleep(0.05) - async def start_programm_async(self) -> bool: - """Start currently loaded Program + async def start_programm_async( + self, *, replay_mode: ReplayMode | None = None + ) -> bool: + """Start currently loaded Program. + + Parameters + ---------- + replay_mode + Which replay mode to apply before starting. + If not provided, the currently set mode is kept. Returns ------- bool `True` if request was successful `False` if request was not successful + + Raises + ------ + CRICommandError + If the ``replay_mode`` could not be applied. """ + if replay_mode is not None: + await self.set_replay_mode_async(replay_mode) + command = "CMD StartProgram" msg_id = self._send_command(command, True) @@ -1806,9 +1822,11 @@ def load_logic_programm(self, program_name: str) -> bool: self.load_logic_programm_async(program_name) ) - def start_programm(self) -> bool: + def start_programm(self, *, replay_mode: ReplayMode | None = None) -> bool: """Blocking wrapper around :func:`CRIController.start_programm_async`.""" - return asyncio.get_event_loop().run_until_complete(self.start_programm_async()) + return asyncio.get_event_loop().run_until_complete( + self.start_programm_async(replay_mode=replay_mode) + ) def stop_programm(self) -> bool: """Blocking wrapper around :func:`CRIController.stop_programm_async`."""