Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 28 additions & 32 deletions cri_lib/cri_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,48 +441,52 @@ async def wait_for_kinematics_ready_async(self, timeout: float = 30) -> bool:
async def get_board_temperatures_async(
self,
timeout: float | None = DEFAULT, # type: ignore
) -> bool:
) -> list[float]:
"""Receive motor controller PCB temperatures and save in robot state

Parameters
----------
timeout: float | None
timeout for waiting in seconds or None for infinite waiting

Returns
-------
list of motor controller PCB temperatures
"""
self._send_command("SYSTEM GetBoardTemp", True, "info_boardtemp")
if (
error_msg := await self._wait_for_answer_async(
"info_boardtemp", timeout=timeout
)
) is not None:
logger.debug("Error in GetBoardTemp command: %s", error_msg)
return False
else:
return True
raise CRICommandError(f"Error in GetBoardTemp command: {error_msg}")
return list(self.robot_state.board_temps)

async def get_motor_temperatures_async(
self,
timeout: float | None = DEFAULT, # type: ignore
) -> bool:
) -> list[float]:
"""Receive motor temperatures and save in robot state

Parameters
----------
timeout: float | None
timeout for waiting in seconds or None for infinite waiting

Returns
-------
list of motor temperatures
"""
self._send_command("SYSTEM GetMotorTemp", True, "info_motortemp")
if (
error_msg := await self._wait_for_answer_async(
"info_motortemp", timeout=timeout
)
) is not None:
logger.debug("Error in GetMotorTemp command: %s", error_msg)
return False
else:
return True
raise CRICommandError(f"Error in GetMotorTemp command: {error_msg}")
return list(self.robot_state.motor_temps)

async def list_files_async(self, target_directory: str = "Programs") -> bool:
async def list_files_async(self, target_directory: str = "Programs") -> list[str]:
"""Request a list of all files in the directory, which is relative to the /Data/ directory.

Parameters
Expand All @@ -492,28 +496,20 @@ async def list_files_async(self, target_directory: str = "Programs") -> bool:

Returns
-------
a list of files
a list of file paths relative to `/Data`
"""

command = f"CMD ListFiles {target_directory}"

self._send_command(
command=command, register_answer=True, fixed_answer_name="info_filelist"
)
if (
self._send_command(
command=command, register_answer=True, fixed_answer_name="info_filelist"
)
is not None
):
if (
error_msg := await self._wait_for_answer_async(
"info_filelist", timeout=self.DEFAULT_ANSWER_TIMEOUT
)
) is not None:
logger.debug("Error in ListFiles command: %s", error_msg)
return False
else:
return True
else:
return False
error_msg := await self._wait_for_answer_async("info_filelist")
) is not None:
raise CRICommandError(f"Error in ListFiles command: {error_msg}")
# the obtained file list is cached in the parser
with self.parser.file_list_lock:
return list(self.parser.file_list)

def wait_for_status_update(self, timeout: float | None = None) -> None:
"""Blocking wrapper around :func:`CRIClient.wait_for_status_update_async`."""
Expand All @@ -530,7 +526,7 @@ def wait_for_kinematics_ready(self, timeout: float = 30) -> bool:
def get_board_temperatures(
self,
timeout: float | None = DEFAULT, # type: ignore
) -> bool:
) -> list[float]:
"""Blocking wrapper around :func:`CRIClient.get_board_temperatures_async`."""
return asyncio.get_event_loop().run_until_complete(
self.get_board_temperatures_async(timeout=timeout)
Expand All @@ -539,13 +535,13 @@ def get_board_temperatures(
def get_motor_temperatures(
self,
timeout: float | None = DEFAULT, # type: ignore
) -> bool:
) -> list[float]:
"""Blocking wrapper around :func:`CRIClient.get_motor_temperatures_async`."""
return asyncio.get_event_loop().run_until_complete(
self.get_motor_temperatures_async(timeout=timeout)
)

def list_files(self) -> bool:
def list_files(self) -> list[str]:
"""Blocking wrapper around :func:`CRIClient.list_files_async`."""
return asyncio.get_event_loop().run_until_complete(self.list_files_async())

Expand Down
Loading