diff --git a/cri_lib/cri_controller.py b/cri_lib/cri_controller.py index c6df58c..284d9f2 100644 --- a/cri_lib/cri_controller.py +++ b/cri_lib/cri_controller.py @@ -13,7 +13,7 @@ from .cri_errors import CRICommandError, CRICommandTimeOutError, CRIConnectionError from .cri_protocol_parser import CRIProtocolParser -from .robot_state import KinematicsState, RobotState +from .robot_state import KinematicsState, ReplayMode, RobotState logger = logging.getLogger(__name__) @@ -1355,15 +1355,52 @@ async def load_logic_programm_async(self, program_name: str) -> bool: else: return True - async def start_programm_async(self) -> bool: - """Start currently loaded Program + async def set_replay_mode_async(self, replay_mode: ReplayMode) -> None: + """Set the main program replay mode. + + Parameters + ---------- + replay_mode + Which mode to apply if the ``robot_state`` is not already in it. + + Raises + ------ + CRICommandError + If the ``replay_mode`` could not be applied. + """ + while self.robot_state.main_replay_mode != replay_mode: + msg_id = self.send_command( + f"CMD ProgramReplayMode {replay_mode.value}", True + ) + if (error_msg := await self._wait_for_answer_async(msg_id)) is not None: + raise CRICommandError(f"Could not set replay mode: {error_msg}") + await asyncio.sleep(0.05) + + async def start_programm_async( + self, *, replay_mode: ReplayMode | None = None + ) -> bool: + """Start currently loaded Program. + + Parameters + ---------- + replay_mode + Which replay mode to apply before starting. + If not provided, the currently set mode is kept. Returns ------- bool `True` if request was successful `False` if request was not successful + + Raises + ------ + CRICommandError + If the ``replay_mode`` could not be applied. """ + if replay_mode is not None: + await self.set_replay_mode_async(replay_mode) + command = "CMD StartProgram" msg_id = self._send_command(command, True) @@ -1785,9 +1822,11 @@ def load_logic_programm(self, program_name: str) -> bool: self.load_logic_programm_async(program_name) ) - def start_programm(self) -> bool: + def start_programm(self, *, replay_mode: ReplayMode | None = None) -> bool: """Blocking wrapper around :func:`CRIController.start_programm_async`.""" - return asyncio.get_event_loop().run_until_complete(self.start_programm_async()) + return asyncio.get_event_loop().run_until_complete( + self.start_programm_async(replay_mode=replay_mode) + ) def stop_programm(self) -> bool: """Blocking wrapper around :func:`CRIController.stop_programm_async`."""