diff --git a/src/uipath/runtime/resumable/protocols.py b/src/uipath/runtime/resumable/protocols.py index 7ce77c8..447913f 100644 --- a/src/uipath/runtime/resumable/protocols.py +++ b/src/uipath/runtime/resumable/protocols.py @@ -68,6 +68,24 @@ async def create_trigger(self, suspend_value: Any) -> UiPathResumeTrigger: """ ... + async def create_triggers(self, suspend_value: Any) -> list[UiPathResumeTrigger]: + """Create resume triggers from a suspend value. + + Most suspend values produce one trigger. Values with alternative resume + conditions, such as operation timeouts, may produce multiple triggers for + the same interrupt. + + Args: + suspend_value: The value that caused the suspension. + + Returns: + UiPathResumeTrigger objects ready to be persisted. + + Raises: + UiPathRuntimeError: If trigger creation fails + """ + ... + class UiPathResumeTriggerReaderProtocol(Protocol): """Protocol for reading resume triggers and converting them to runtime input.""" diff --git a/src/uipath/runtime/resumable/runtime.py b/src/uipath/runtime/resumable/runtime.py index 93bc2e5..a496226 100644 --- a/src/uipath/runtime/resumable/runtime.py +++ b/src/uipath/runtime/resumable/runtime.py @@ -147,8 +147,9 @@ async def stream( async def _get_fired_triggers(self) -> dict[str, Any] | None: """Check stored triggers for any that have already fired. - Skips async-external triggers (API, Inbox) whose payloads only arrive - asynchronously and cannot be polled at suspend time. + Skips external triggers (API, Inbox, Timer) whose payloads only arrive + asynchronously or through Orchestrator resume and cannot be polled at + suspend time. Returns: A resume map of {interrupt_id: resume_data} for fired triggers, or None. @@ -161,7 +162,11 @@ async def _get_fired_triggers(self) -> dict[str, Any] | None: t for t in triggers if t.trigger_type - not in (UiPathResumeTriggerType.API, UiPathResumeTriggerType.INBOX) + not in ( + UiPathResumeTriggerType.API, + UiPathResumeTriggerType.INBOX, + UiPathResumeTriggerType.TIMER, + ) ] return await self._build_resume_map(pollable_triggers) @@ -220,11 +225,14 @@ async def _build_resume_map( """ resume_map: dict[str, Any] = {} for trigger in triggers: + assert trigger.interrupt_id is not None, ( + "Trigger interrupt_id cannot be None" + ) + if trigger.interrupt_id in resume_map: + continue + try: data = await self.trigger_manager.read_trigger(trigger) - assert trigger.interrupt_id is not None, ( - "Trigger interrupt_id cannot be None" - ) resume_map[trigger.interrupt_id] = data await self.storage.delete_trigger(self.runtime_id, trigger) except UiPathPendingTriggerError: @@ -268,11 +276,12 @@ async def _handle_suspension( # Create triggers only for new interrupts for interrupt_id in new_ids: - trigger = await self.trigger_manager.create_trigger( + triggers = await self.trigger_manager.create_triggers( current_interrupts[interrupt_id] ) - trigger.interrupt_id = interrupt_id - suspended_result.triggers.append(trigger) + for trigger in triggers: + trigger.interrupt_id = interrupt_id + suspended_result.triggers.append(trigger) if suspended_result.triggers: await self.storage.save_triggers(self.runtime_id, suspended_result.triggers) diff --git a/tests/test_resumable.py b/tests/test_resumable.py index 59655f3..70e45d6 100644 --- a/tests/test_resumable.py +++ b/tests/test_resumable.py @@ -134,6 +134,11 @@ async def read_trigger_default(trigger: UiPathResumeTrigger) -> dict[str, Any]: raise UiPathPendingTriggerError(ErrorCategory.USER, "Trigger not fired yet") manager.create_trigger = AsyncMock(side_effect=create_trigger_impl) + + async def create_triggers_impl(data: dict[str, Any]) -> list[UiPathResumeTrigger]: + return [await manager.create_trigger(data)] + + manager.create_triggers = AsyncMock(side_effect=create_triggers_impl) manager.read_trigger = AsyncMock(side_effect=read_trigger_default) return cast(UiPathResumeTriggerProtocol, manager)