Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions src/uipath/runtime/resumable/protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,24 @@ async def create_trigger(self, suspend_value: Any) -> UiPathResumeTrigger:
"""
...

async def create_triggers(self, suspend_value: Any) -> list[UiPathResumeTrigger]:
"""Create resume triggers from a suspend value.

Most suspend values produce one trigger. Values with alternative resume
conditions, such as operation timeouts, may produce multiple triggers for
the same interrupt.

Args:
suspend_value: The value that caused the suspension.

Returns:
UiPathResumeTrigger objects ready to be persisted.

Raises:
UiPathRuntimeError: If trigger creation fails
"""
...


class UiPathResumeTriggerReaderProtocol(Protocol):
"""Protocol for reading resume triggers and converting them to runtime input."""
Expand Down
27 changes: 18 additions & 9 deletions src/uipath/runtime/resumable/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,9 @@ async def stream(
async def _get_fired_triggers(self) -> dict[str, Any] | None:
"""Check stored triggers for any that have already fired.

Skips async-external triggers (API, Inbox) whose payloads only arrive
asynchronously and cannot be polled at suspend time.
Skips external triggers (API, Inbox, Timer) whose payloads only arrive
asynchronously or through Orchestrator resume and cannot be polled at
suspend time.

Returns:
A resume map of {interrupt_id: resume_data} for fired triggers, or None.
Expand All @@ -161,7 +162,11 @@ async def _get_fired_triggers(self) -> dict[str, Any] | None:
t
for t in triggers
if t.trigger_type
not in (UiPathResumeTriggerType.API, UiPathResumeTriggerType.INBOX)
not in (
UiPathResumeTriggerType.API,
UiPathResumeTriggerType.INBOX,
UiPathResumeTriggerType.TIMER,
)
Comment on lines 150 to +169
]
return await self._build_resume_map(pollable_triggers)

Expand Down Expand Up @@ -220,11 +225,14 @@ async def _build_resume_map(
"""
resume_map: dict[str, Any] = {}
for trigger in triggers:
assert trigger.interrupt_id is not None, (
"Trigger interrupt_id cannot be None"
)
if trigger.interrupt_id in resume_map:
continue

try:
data = await self.trigger_manager.read_trigger(trigger)
assert trigger.interrupt_id is not None, (
"Trigger interrupt_id cannot be None"
)
resume_map[trigger.interrupt_id] = data
await self.storage.delete_trigger(self.runtime_id, trigger)
except UiPathPendingTriggerError:
Expand Down Expand Up @@ -268,11 +276,12 @@ async def _handle_suspension(

# Create triggers only for new interrupts
for interrupt_id in new_ids:
trigger = await self.trigger_manager.create_trigger(
triggers = await self.trigger_manager.create_triggers(
current_interrupts[interrupt_id]
)
trigger.interrupt_id = interrupt_id
suspended_result.triggers.append(trigger)
for trigger in triggers:
trigger.interrupt_id = interrupt_id
suspended_result.triggers.append(trigger)

if suspended_result.triggers:
await self.storage.save_triggers(self.runtime_id, suspended_result.triggers)
Expand Down
5 changes: 5 additions & 0 deletions tests/test_resumable.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@ async def read_trigger_default(trigger: UiPathResumeTrigger) -> dict[str, Any]:
raise UiPathPendingTriggerError(ErrorCategory.USER, "Trigger not fired yet")

manager.create_trigger = AsyncMock(side_effect=create_trigger_impl)

async def create_triggers_impl(data: dict[str, Any]) -> list[UiPathResumeTrigger]:
return [await manager.create_trigger(data)]

manager.create_triggers = AsyncMock(side_effect=create_triggers_impl)
manager.read_trigger = AsyncMock(side_effect=read_trigger_default)

return cast(UiPathResumeTriggerProtocol, manager)
Expand Down
Loading