Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "uipath-runtime"
version = "0.11.5"
version = "0.12.0"
description = "Runtime abstractions and interfaces for building agents and automation scripts in the UiPath ecosystem"
readme = { file = "README.md", content-type = "text/markdown" }
requires-python = ">=3.11"
Expand Down
18 changes: 18 additions & 0 deletions src/uipath/runtime/resumable/protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,24 @@ async def create_trigger(self, suspend_value: Any) -> UiPathResumeTrigger:
"""
...

async def create_triggers(self, suspend_value: Any) -> list[UiPathResumeTrigger]:
"""Create resume triggers from a suspend value.

Most suspend values produce one trigger. Values with alternative resume
conditions, such as operation timeouts, may produce multiple triggers for
the same interrupt.

Args:
suspend_value: The value that caused the suspension.

Returns:
UiPathResumeTrigger objects ready to be persisted.

Raises:
UiPathRuntimeError: If trigger creation fails
"""
...


class UiPathResumeTriggerReaderProtocol(Protocol):
"""Protocol for reading resume triggers and converting them to runtime input."""
Expand Down
16 changes: 10 additions & 6 deletions src/uipath/runtime/resumable/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,14 @@ async def _build_resume_map(
"""
resume_map: dict[str, Any] = {}
for trigger in triggers:
assert trigger.interrupt_id is not None, (
"Trigger interrupt_id cannot be None"
)
if trigger.interrupt_id in resume_map:
continue

try:
data = await self.trigger_manager.read_trigger(trigger)
assert trigger.interrupt_id is not None, (
"Trigger interrupt_id cannot be None"
)
resume_map[trigger.interrupt_id] = data
await self.storage.delete_trigger(self.runtime_id, trigger)
except UiPathPendingTriggerError:
Expand Down Expand Up @@ -273,11 +276,12 @@ async def _handle_suspension(

# Create triggers only for new interrupts
for interrupt_id in new_ids:
trigger = await self.trigger_manager.create_trigger(
triggers = await self.trigger_manager.create_triggers(
current_interrupts[interrupt_id]
)
trigger.interrupt_id = interrupt_id
suspended_result.triggers.append(trigger)
for trigger in triggers:
trigger.interrupt_id = interrupt_id
suspended_result.triggers.append(trigger)

if suspended_result.triggers:
await self.storage.save_triggers(self.runtime_id, suspended_result.triggers)
Expand Down
111 changes: 111 additions & 0 deletions tests/test_resumable.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,48 @@ async def get_schema(self) -> UiPathRuntimeSchema:
raise NotImplementedError()


class TimeoutRaceMockRuntime:
"""Mock runtime that suspends on a timeout-raced interrupt, then suspends again."""

def __init__(self) -> None:
self.execution_count = 0

async def dispose(self) -> None:
pass

async def execute(
self,
input: dict[str, Any] | None = None,
options: UiPathExecuteOptions | None = None,
) -> UiPathRuntimeResult:
self.execution_count += 1
is_resume = options and options.resume

if self.execution_count == 1:
return UiPathRuntimeResult(
status=UiPathRuntimeStatus.SUSPENDED,
output={"child-1": {"process": "first-child"}},
)

assert is_resume
assert input == {"child-1": {"completed": True}}
return UiPathRuntimeResult(
status=UiPathRuntimeStatus.SUSPENDED,
output={"child-2": {"process": "second-child"}},
)

async def stream(
self,
input: dict[str, Any] | None = None,
options: UiPathStreamOptions | None = None,
) -> AsyncGenerator[UiPathRuntimeEvent, None]:
result = await self.execute(input, options)
yield result

async def get_schema(self) -> UiPathRuntimeSchema:
raise NotImplementedError()


class StatefulStorageMock:
"""Stateful storage mock that tracks triggers."""

Expand Down Expand Up @@ -134,6 +176,11 @@ async def read_trigger_default(trigger: UiPathResumeTrigger) -> dict[str, Any]:
raise UiPathPendingTriggerError(ErrorCategory.USER, "Trigger not fired yet")

manager.create_trigger = AsyncMock(side_effect=create_trigger_impl)

async def create_triggers_impl(data: dict[str, Any]) -> list[UiPathResumeTrigger]:
return [await manager.create_trigger(data)]

manager.create_triggers = AsyncMock(side_effect=create_triggers_impl)
manager.read_trigger = AsyncMock(side_effect=read_trigger_default)

return cast(UiPathResumeTriggerProtocol, manager)
Expand Down Expand Up @@ -640,3 +687,67 @@ async def read_trigger_impl(trigger: UiPathResumeTrigger) -> dict[str, Any]:

# Delegate should have been executed twice (initial + auto-resume for TASK trigger)
assert runtime_impl.execution_count == 2

@pytest.mark.asyncio
async def test_resumable_removes_timeout_sibling_when_normal_trigger_fires(
self,
) -> None:
"""When a trigger wins a timeout race, its timer sibling must not leak forward."""

runtime_impl = TimeoutRaceMockRuntime()
storage = StatefulStorageMock()
trigger_manager = make_trigger_manager_mock()

async def create_timeout_race_triggers(
data: dict[str, Any],
) -> list[UiPathResumeTrigger]:
return [
UiPathResumeTrigger(
interrupt_id="", # Will be set by resumable runtime
trigger_type=UiPathResumeTriggerType.JOB,
payload={"source": data["process"]},
),
UiPathResumeTrigger(
interrupt_id="", # Will be set by resumable runtime
trigger_type=UiPathResumeTriggerType.TIMER,
payload={"kind": "timeout", "source": data["process"]},
),
]

create_triggers_mock = AsyncMock(side_effect=create_timeout_race_triggers)
cast(Any, trigger_manager).create_triggers = create_triggers_mock

async def read_trigger_impl(trigger: UiPathResumeTrigger) -> dict[str, Any]:
if (
trigger.interrupt_id == "child-1"
and trigger.trigger_type == UiPathResumeTriggerType.JOB
):
return {"completed": True}
raise UiPathPendingTriggerError(ErrorCategory.USER, "still pending")

read_trigger_mock = AsyncMock(side_effect=read_trigger_impl)
cast(Any, trigger_manager).read_trigger = read_trigger_mock

resumable = UiPathResumableRuntime(
delegate=runtime_impl,
storage=storage,
trigger_manager=trigger_manager,
runtime_id="runtime-1",
)

result = await resumable.execute({})

assert result.status == UiPathRuntimeStatus.SUSPENDED
assert result.triggers is not None
assert {t.interrupt_id for t in result.triggers} == {"child-2"}
assert len(result.triggers) == 2
assert {t.trigger_type for t in result.triggers} == {
UiPathResumeTriggerType.JOB,
UiPathResumeTriggerType.TIMER,
}
assert runtime_impl.execution_count == 2
assert create_triggers_mock.await_count == 2
assert read_trigger_mock.await_count == 2

saved_triggers = await storage.get_triggers("runtime-1")
assert {t.interrupt_id for t in saved_triggers} == {"child-2"}
2 changes: 1 addition & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading