From df65b2326d5edfa28d3978b66169805b0687f0f1 Mon Sep 17 00:00:00 2001 From: radu-mocanu Date: Sat, 27 Jun 2026 23:36:42 +0300 Subject: [PATCH] feat: support multiple resume triggers per interrupt --- pyproject.toml | 2 +- src/uipath/runtime/resumable/protocols.py | 18 ++++ src/uipath/runtime/resumable/runtime.py | 16 ++-- tests/test_resumable.py | 111 ++++++++++++++++++++++ uv.lock | 2 +- 5 files changed, 141 insertions(+), 8 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 9146787..c8c9aa4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "uipath-runtime" -version = "0.11.5" +version = "0.12.0" description = "Runtime abstractions and interfaces for building agents and automation scripts in the UiPath ecosystem" readme = { file = "README.md", content-type = "text/markdown" } requires-python = ">=3.11" diff --git a/src/uipath/runtime/resumable/protocols.py b/src/uipath/runtime/resumable/protocols.py index 7ce77c8..447913f 100644 --- a/src/uipath/runtime/resumable/protocols.py +++ b/src/uipath/runtime/resumable/protocols.py @@ -68,6 +68,24 @@ async def create_trigger(self, suspend_value: Any) -> UiPathResumeTrigger: """ ... + async def create_triggers(self, suspend_value: Any) -> list[UiPathResumeTrigger]: + """Create resume triggers from a suspend value. + + Most suspend values produce one trigger. Values with alternative resume + conditions, such as operation timeouts, may produce multiple triggers for + the same interrupt. + + Args: + suspend_value: The value that caused the suspension. + + Returns: + UiPathResumeTrigger objects ready to be persisted. + + Raises: + UiPathRuntimeError: If trigger creation fails + """ + ... + class UiPathResumeTriggerReaderProtocol(Protocol): """Protocol for reading resume triggers and converting them to runtime input.""" diff --git a/src/uipath/runtime/resumable/runtime.py b/src/uipath/runtime/resumable/runtime.py index 8cf2ed5..a496226 100644 --- a/src/uipath/runtime/resumable/runtime.py +++ b/src/uipath/runtime/resumable/runtime.py @@ -225,11 +225,14 @@ async def _build_resume_map( """ resume_map: dict[str, Any] = {} for trigger in triggers: + assert trigger.interrupt_id is not None, ( + "Trigger interrupt_id cannot be None" + ) + if trigger.interrupt_id in resume_map: + continue + try: data = await self.trigger_manager.read_trigger(trigger) - assert trigger.interrupt_id is not None, ( - "Trigger interrupt_id cannot be None" - ) resume_map[trigger.interrupt_id] = data await self.storage.delete_trigger(self.runtime_id, trigger) except UiPathPendingTriggerError: @@ -273,11 +276,12 @@ async def _handle_suspension( # Create triggers only for new interrupts for interrupt_id in new_ids: - trigger = await self.trigger_manager.create_trigger( + triggers = await self.trigger_manager.create_triggers( current_interrupts[interrupt_id] ) - trigger.interrupt_id = interrupt_id - suspended_result.triggers.append(trigger) + for trigger in triggers: + trigger.interrupt_id = interrupt_id + suspended_result.triggers.append(trigger) if suspended_result.triggers: await self.storage.save_triggers(self.runtime_id, suspended_result.triggers) diff --git a/tests/test_resumable.py b/tests/test_resumable.py index 16d0c41..1358cdc 100644 --- a/tests/test_resumable.py +++ b/tests/test_resumable.py @@ -89,6 +89,48 @@ async def get_schema(self) -> UiPathRuntimeSchema: raise NotImplementedError() +class TimeoutRaceMockRuntime: + """Mock runtime that suspends on a timeout-raced interrupt, then suspends again.""" + + def __init__(self) -> None: + self.execution_count = 0 + + async def dispose(self) -> None: + pass + + async def execute( + self, + input: dict[str, Any] | None = None, + options: UiPathExecuteOptions | None = None, + ) -> UiPathRuntimeResult: + self.execution_count += 1 + is_resume = options and options.resume + + if self.execution_count == 1: + return UiPathRuntimeResult( + status=UiPathRuntimeStatus.SUSPENDED, + output={"child-1": {"process": "first-child"}}, + ) + + assert is_resume + assert input == {"child-1": {"completed": True}} + return UiPathRuntimeResult( + status=UiPathRuntimeStatus.SUSPENDED, + output={"child-2": {"process": "second-child"}}, + ) + + async def stream( + self, + input: dict[str, Any] | None = None, + options: UiPathStreamOptions | None = None, + ) -> AsyncGenerator[UiPathRuntimeEvent, None]: + result = await self.execute(input, options) + yield result + + async def get_schema(self) -> UiPathRuntimeSchema: + raise NotImplementedError() + + class StatefulStorageMock: """Stateful storage mock that tracks triggers.""" @@ -134,6 +176,11 @@ async def read_trigger_default(trigger: UiPathResumeTrigger) -> dict[str, Any]: raise UiPathPendingTriggerError(ErrorCategory.USER, "Trigger not fired yet") manager.create_trigger = AsyncMock(side_effect=create_trigger_impl) + + async def create_triggers_impl(data: dict[str, Any]) -> list[UiPathResumeTrigger]: + return [await manager.create_trigger(data)] + + manager.create_triggers = AsyncMock(side_effect=create_triggers_impl) manager.read_trigger = AsyncMock(side_effect=read_trigger_default) return cast(UiPathResumeTriggerProtocol, manager) @@ -640,3 +687,67 @@ async def read_trigger_impl(trigger: UiPathResumeTrigger) -> dict[str, Any]: # Delegate should have been executed twice (initial + auto-resume for TASK trigger) assert runtime_impl.execution_count == 2 + + @pytest.mark.asyncio + async def test_resumable_removes_timeout_sibling_when_normal_trigger_fires( + self, + ) -> None: + """When a trigger wins a timeout race, its timer sibling must not leak forward.""" + + runtime_impl = TimeoutRaceMockRuntime() + storage = StatefulStorageMock() + trigger_manager = make_trigger_manager_mock() + + async def create_timeout_race_triggers( + data: dict[str, Any], + ) -> list[UiPathResumeTrigger]: + return [ + UiPathResumeTrigger( + interrupt_id="", # Will be set by resumable runtime + trigger_type=UiPathResumeTriggerType.JOB, + payload={"source": data["process"]}, + ), + UiPathResumeTrigger( + interrupt_id="", # Will be set by resumable runtime + trigger_type=UiPathResumeTriggerType.TIMER, + payload={"kind": "timeout", "source": data["process"]}, + ), + ] + + create_triggers_mock = AsyncMock(side_effect=create_timeout_race_triggers) + cast(Any, trigger_manager).create_triggers = create_triggers_mock + + async def read_trigger_impl(trigger: UiPathResumeTrigger) -> dict[str, Any]: + if ( + trigger.interrupt_id == "child-1" + and trigger.trigger_type == UiPathResumeTriggerType.JOB + ): + return {"completed": True} + raise UiPathPendingTriggerError(ErrorCategory.USER, "still pending") + + read_trigger_mock = AsyncMock(side_effect=read_trigger_impl) + cast(Any, trigger_manager).read_trigger = read_trigger_mock + + resumable = UiPathResumableRuntime( + delegate=runtime_impl, + storage=storage, + trigger_manager=trigger_manager, + runtime_id="runtime-1", + ) + + result = await resumable.execute({}) + + assert result.status == UiPathRuntimeStatus.SUSPENDED + assert result.triggers is not None + assert {t.interrupt_id for t in result.triggers} == {"child-2"} + assert len(result.triggers) == 2 + assert {t.trigger_type for t in result.triggers} == { + UiPathResumeTriggerType.JOB, + UiPathResumeTriggerType.TIMER, + } + assert runtime_impl.execution_count == 2 + assert create_triggers_mock.await_count == 2 + assert read_trigger_mock.await_count == 2 + + saved_triggers = await storage.get_triggers("runtime-1") + assert {t.interrupt_id for t in saved_triggers} == {"child-2"} diff --git a/uv.lock b/uv.lock index e533134..1e9d968 100644 --- a/uv.lock +++ b/uv.lock @@ -1012,7 +1012,7 @@ wheels = [ [[package]] name = "uipath-runtime" -version = "0.11.5" +version = "0.12.0" source = { editable = "." } dependencies = [ { name = "uipath-core" },