From 94ec7a80b9b20a008ab26aebe38bd3843dfece5f Mon Sep 17 00:00:00 2001 From: Andy Staples Date: Thu, 2 Jul 2026 13:32:27 -0600 Subject: [PATCH 1/4] Propagate entity operation failures over legacy entity protocol call_entity failures delivered via the legacy entity protocol (used by the Azure Functions Durable extension) were silently completing instead of raising. _handle_entity_event_raised now detects failed ResponseMessage payloads (exceptionType/failureDetails) and fails the task with an EntityOperationFailedException, matching the current entity protocol and the .NET SDK. --- CHANGELOG.md | 4 + durabletask/internal/helpers.py | 32 +++++ durabletask/worker.py | 26 +++- .../test_orchestration_executor.py | 125 ++++++++++++++++++ 4 files changed, 183 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c32c7572..00e564ff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,10 @@ CHANGED - Changed the default large-payload externalization threshold (`LargePayloadStorageOptions.threshold_bytes`) from 900,000 bytes to 262,144 bytes (256 KiB), matching the .NET SDK default. Behavioral change (not source/binary breaking): payloads larger than 256 KiB are now externalized by default. +FIXED + +- Fixed `OrchestrationContext.call_entity` not propagating entity operation failures when running under the legacy entity protocol (used by the Azure Functions Durable extension). A failed entity operation now raises `TaskFailedError` in the calling orchestration instead of silently completing, matching the behavior of the current entity protocol and the .NET SDK. + ## v1.7.0 ADDED diff --git a/durabletask/internal/helpers.py b/durabletask/internal/helpers.py index dd8940c5..1262eaa6 100644 --- a/durabletask/internal/helpers.py +++ b/durabletask/internal/helpers.py @@ -3,6 +3,7 @@ import traceback from datetime import datetime, timezone +from typing import Any, cast from google.protobuf import timestamp_pb2, wrappers_pb2 @@ -136,6 +137,37 @@ def new_failure_details(ex: Exception, _visited: set[int] | None = None) -> pb.T ) +def _failure_details_from_core_dict(fd: dict[str, Any]) -> pb.TaskFailureDetails: + """Convert a serialized DurableTask.Core ``FailureDetails`` dict to protobuf.""" + inner = fd.get("InnerFailure") + stack_trace = fd.get("StackTrace") + return pb.TaskFailureDetails( + errorType=str(fd.get("ErrorType") or ""), + errorMessage=str(fd.get("ErrorMessage") or ""), + stackTrace=get_string_value(str(stack_trace) if stack_trace is not None else None), + innerFailure=_failure_details_from_core_dict(cast(dict[str, Any], inner)) if isinstance(inner, dict) else None, + isNonRetriable=bool(fd.get("IsNonRetriable", False)), + ) + + +def entity_response_failure_details(response: dict[str, Any]) -> pb.TaskFailureDetails | None: + """Extract failure details from a legacy-protocol entity ``ResponseMessage``. + + The legacy entity protocol (used when the Durable WebJobs extension + translates entity operations) delivers operation results as a + ``ResponseMessage`` whose ``exceptionType`` (error message) or + ``failureDetails`` field is populated when the operation failed. Returns + ``None`` when the response represents a successful operation. + """ + failure_details = response.get("failureDetails") + if isinstance(failure_details, dict): + return _failure_details_from_core_dict(cast(dict[str, Any], failure_details)) + error_message = response.get("exceptionType") + if error_message is not None: + return pb.TaskFailureDetails(errorType="", errorMessage=str(error_message)) + return None + + def new_event_sent_event(event_id: int, instance_id: str, input: str): return pb.HistoryEvent( eventId=event_id, diff --git a/durabletask/worker.py b/durabletask/worker.py index a53d5e71..e90c668a 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -2574,7 +2574,7 @@ def _cancel_timer() -> None: elif event.HasField("eventRaised"): if event.eventRaised.name in ctx._entity_task_id_map: # pyright: ignore[reportPrivateUsage] entity_id, operation, task_id = ctx._entity_task_id_map.get(event.eventRaised.name, (None, None, None)) # pyright: ignore[reportPrivateUsage] - self._handle_entity_event_raised(ctx, event, entity_id, task_id, False) + self._handle_entity_event_raised(ctx, event, entity_id, task_id, False, operation) elif event.eventRaised.name in ctx._entity_lock_task_id_map: # pyright: ignore[reportPrivateUsage] entity_id, task_id = ctx._entity_lock_task_id_map.get(event.eventRaised.name, (None, None)) # pyright: ignore[reportPrivateUsage] self._handle_entity_event_raised(ctx, event, entity_id, task_id, True) @@ -2803,7 +2803,8 @@ def _handle_entity_event_raised(self, event: pb.HistoryEvent, entity_id: EntityInstanceId | None, task_id: int | None, - is_lock_event: bool): + is_lock_event: bool, + operation: str | None = None): # This eventRaised represents the result of an entity operation after being translated to the old # entity protocol by the Durable WebJobs extension if entity_id is None: @@ -2813,14 +2814,31 @@ def _handle_entity_event_raised(self, entity_task = ctx._pending_tasks.pop(task_id, None) # pyright: ignore[reportPrivateUsage] if not entity_task: raise RuntimeError(f"Could not retrieve entity task for entity-related eventRaised with ID '{event.eventId}'") - result = None + response: Any | None = None if not ph.is_empty(event.eventRaised.input): + response = self._data_converter.deserialize(event.eventRaised.input.value) + + # For entity operation calls (lock acquisitions never fail this way), the legacy-protocol + # ResponseMessage signals a failed operation via its "exceptionType" (error message) or + # "failureDetails" field. Propagate that as a task failure so an awaiting call_entity raises, + # matching the new entity protocol and the .NET SDK. + if not is_lock_event and isinstance(response, dict): + failure_details = ph.entity_response_failure_details(cast(dict[str, Any], response)) + if failure_details is not None: + failure = EntityOperationFailedException(entity_id, operation or "", failure_details) + ctx._entity_context.recover_lock_after_call(entity_id) # pyright: ignore[reportPrivateUsage] + entity_task.fail(str(failure), failure) + ctx.resume() + return + + result = None + if response is not None: # TODO: Investigate why the event result is wrapped in a dict with "result" key # The expected type applies to the unwrapped result value, not the # transport wrapper. Unwrap first, then coerce the already-parsed # inner value to the expected type via the converter (no redundant # re-serialization round-trip). - unwrapped = self._data_converter.deserialize(event.eventRaised.input.value)["result"] + unwrapped: Any = cast(Any, response)["result"] result = self._data_converter.coerce( unwrapped, entity_task._expected_type, # pyright: ignore[reportPrivateUsage] diff --git a/tests/durabletask/test_orchestration_executor.py b/tests/durabletask/test_orchestration_executor.py index e95c4613..c8fe77d3 100644 --- a/tests/durabletask/test_orchestration_executor.py +++ b/tests/durabletask/test_orchestration_executor.py @@ -2095,6 +2095,131 @@ def orchestrator(ctx: task.OrchestrationContext, _): assert actions[0].sendEntityMessage.entityOperationCalled.targetInstanceId.value == str(test_entity_id) +def test_entity_call_failure_propagated_over_old_protocol(): + """A failed entity operation delivered via the legacy protocol (an eventRaised + ResponseMessage with failureDetails) must surface as a TaskFailedError to the caller.""" + test_entity_id = entities.EntityInstanceId("Counter", "myCounter") + + def orchestrator(ctx: task.OrchestrationContext, _): + try: + yield ctx.call_entity(test_entity_id, "set", 1) + except task.TaskFailedError as e: + return e.details.message + return "no error" + + registry = worker._Registry() + name = registry.add_orchestrator(orchestrator) + + started_events = [ + helpers.new_orchestrator_started_event(), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, None), + ] + + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER, JsonDataConverter()) + result1 = executor.execute(TEST_INSTANCE_ID, [], started_events) + actions = result1.actions + assert len(actions) == 1 + assert actions[0].sendEntityMessage.HasField("entityOperationCalled") + request_id = actions[0].sendEntityMessage.entityOperationCalled.requestId + + # The Durable WebJobs extension translates a failed entity operation into a + # legacy-protocol ResponseMessage carrying a serialized FailureDetails. + response_message = { + "result": None, + "failureDetails": { + "ErrorType": "ValueError", + "ErrorMessage": "Something went wrong!", + "StackTrace": None, + "InnerFailure": None, + "IsNonRetriable": False, + }, + } + new_events = [ + helpers.new_event_sent_event(1, str(test_entity_id), json.dumps({"id": request_id})), + helpers.new_event_raised_event(request_id, json.dumps(response_message)), + ] + result2 = executor.execute(TEST_INSTANCE_ID, started_events, new_events) + complete_action = get_and_validate_complete_orchestration_action_list(1, result2.actions) + assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED + output = json.loads(complete_action.result.value) + assert output == ( + "Operation 'set' on entity '@counter@myCounter' failed with error: Something went wrong!" + ) + + +def test_entity_call_failure_propagated_over_old_protocol_exception_type(): + """A failed entity operation whose legacy ResponseMessage only carries the + ``exceptionType`` (error message) field must still surface as a TaskFailedError.""" + test_entity_id = entities.EntityInstanceId("Counter", "myCounter") + + def orchestrator(ctx: task.OrchestrationContext, _): + try: + yield ctx.call_entity(test_entity_id, "set", 1) + except task.TaskFailedError as e: + return e.details.message + return "no error" + + registry = worker._Registry() + name = registry.add_orchestrator(orchestrator) + + started_events = [ + helpers.new_orchestrator_started_event(), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, None), + ] + + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER, JsonDataConverter()) + result1 = executor.execute(TEST_INSTANCE_ID, [], started_events) + actions = result1.actions + assert len(actions) == 1 + request_id = actions[0].sendEntityMessage.entityOperationCalled.requestId + + response_message = {"result": None, "exceptionType": "Something went wrong!"} + new_events = [ + helpers.new_event_sent_event(1, str(test_entity_id), json.dumps({"id": request_id})), + helpers.new_event_raised_event(request_id, json.dumps(response_message)), + ] + result2 = executor.execute(TEST_INSTANCE_ID, started_events, new_events) + complete_action = get_and_validate_complete_orchestration_action_list(1, result2.actions) + assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED + output = json.loads(complete_action.result.value) + assert output == ( + "Operation 'set' on entity '@counter@myCounter' failed with error: Something went wrong!" + ) + + +def test_entity_call_success_over_old_protocol(): + """A successful entity operation delivered via the legacy protocol must + complete the call_entity task with the unwrapped result.""" + test_entity_id = entities.EntityInstanceId("Counter", "myCounter") + + def orchestrator(ctx: task.OrchestrationContext, _): + return (yield ctx.call_entity(test_entity_id, "get", return_type=int)) + + registry = worker._Registry() + name = registry.add_orchestrator(orchestrator) + + started_events = [ + helpers.new_orchestrator_started_event(), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, None), + ] + + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER, JsonDataConverter()) + result1 = executor.execute(TEST_INSTANCE_ID, [], started_events) + actions = result1.actions + assert len(actions) == 1 + request_id = actions[0].sendEntityMessage.entityOperationCalled.requestId + + response_message = {"result": json.dumps(42)} + new_events = [ + helpers.new_event_sent_event(1, str(test_entity_id), json.dumps({"id": request_id})), + helpers.new_event_raised_event(request_id, json.dumps(response_message)), + ] + result2 = executor.execute(TEST_INSTANCE_ID, started_events, new_events) + complete_action = get_and_validate_complete_orchestration_action_list(1, result2.actions) + assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED + assert json.loads(complete_action.result.value) == 42 + + def get_and_validate_complete_orchestration_action_list(expected_action_count: int, actions: list[pb.OrchestratorAction]) -> pb.CompleteOrchestrationAction: assert len(actions) == expected_action_count assert type(actions[-1]) is pb.OrchestratorAction From 0f35c3eff18de106dfd86a8315bf3bb8ec6ea66e Mon Sep 17 00:00:00 2001 From: Andy Staples Date: Thu, 2 Jul 2026 13:48:28 -0600 Subject: [PATCH 2/4] Fix double-encoded call_entity result over legacy entity protocol The legacy entity protocol delivers the operation result as a serialized JSON string inside ResponseMessage.result. The success path used coerce, which does not parse JSON strings, so string/dict/list results arrived double-encoded and only numeric results with an explicit return_type worked. Deserialize the result string (type-directed) to match the current entity protocol. Adds parametrized legacy-protocol success round-trip tests. --- CHANGELOG.md | 1 + durabletask/worker.py | 19 +++++----- .../test_orchestration_executor.py | 36 +++++++++++++++++++ 3 files changed, 46 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 00e564ff..bffeb435 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ CHANGED FIXED - Fixed `OrchestrationContext.call_entity` not propagating entity operation failures when running under the legacy entity protocol (used by the Azure Functions Durable extension). A failed entity operation now raises `TaskFailedError` in the calling orchestration instead of silently completing, matching the behavior of the current entity protocol and the .NET SDK. +- Fixed `OrchestrationContext.call_entity` returning a double-encoded result under the legacy entity protocol. The entity's return value was left as a raw serialized JSON string (for example, a returned string arrived with extra quotes and dicts/lists arrived as strings); it is now fully deserialized and coerced to the requested `return_type`, matching the current entity protocol. ## v1.7.0 diff --git a/durabletask/worker.py b/durabletask/worker.py index e90c668a..2137d01a 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -2814,7 +2814,7 @@ def _handle_entity_event_raised(self, entity_task = ctx._pending_tasks.pop(task_id, None) # pyright: ignore[reportPrivateUsage] if not entity_task: raise RuntimeError(f"Could not retrieve entity task for entity-related eventRaised with ID '{event.eventId}'") - response: Any | None = None + response: dict[str, Any] | None = None if not ph.is_empty(event.eventRaised.input): response = self._data_converter.deserialize(event.eventRaised.input.value) @@ -2823,7 +2823,7 @@ def _handle_entity_event_raised(self, # "failureDetails" field. Propagate that as a task failure so an awaiting call_entity raises, # matching the new entity protocol and the .NET SDK. if not is_lock_event and isinstance(response, dict): - failure_details = ph.entity_response_failure_details(cast(dict[str, Any], response)) + failure_details = ph.entity_response_failure_details(response) if failure_details is not None: failure = EntityOperationFailedException(entity_id, operation or "", failure_details) ctx._entity_context.recover_lock_after_call(entity_id) # pyright: ignore[reportPrivateUsage] @@ -2833,14 +2833,13 @@ def _handle_entity_event_raised(self, result = None if response is not None: - # TODO: Investigate why the event result is wrapped in a dict with "result" key - # The expected type applies to the unwrapped result value, not the - # transport wrapper. Unwrap first, then coerce the already-parsed - # inner value to the expected type via the converter (no redundant - # re-serialization round-trip). - unwrapped: Any = cast(Any, response)["result"] - result = self._data_converter.coerce( - unwrapped, + # The legacy protocol wraps the result as {"result": }, + # where the value is a serialized JSON string (like the new protocol's + # entityOperationCompleted.output). Deserialize it -- not coerce -- so + # the value is fully parsed and the expected type applied; coercing + # would skip JSON parsing and leave it double-encoded (e.g. '"done"'). + result = self._data_converter.deserialize( + response["result"], entity_task._expected_type, # pyright: ignore[reportPrivateUsage] ) if is_lock_event: diff --git a/tests/durabletask/test_orchestration_executor.py b/tests/durabletask/test_orchestration_executor.py index c8fe77d3..e8f0f178 100644 --- a/tests/durabletask/test_orchestration_executor.py +++ b/tests/durabletask/test_orchestration_executor.py @@ -2220,6 +2220,42 @@ def orchestrator(ctx: task.OrchestrationContext, _): assert json.loads(complete_action.result.value) == 42 +@pytest.mark.parametrize("entity_result", ["done", 42, 3.5, True, {"a": 1}, [1, 2, 3]]) +def test_entity_call_success_over_old_protocol_round_trips_result(entity_result): + """The legacy-protocol ``result`` field holds a *serialized* JSON string, so + the caller must receive the fully deserialized value regardless of type and + without needing an explicit ``return_type`` (i.e. no double-encoding).""" + test_entity_id = entities.EntityInstanceId("Counter", "myCounter") + + def orchestrator(ctx: task.OrchestrationContext, _): + return (yield ctx.call_entity(test_entity_id, "get")) + + registry = worker._Registry() + name = registry.add_orchestrator(orchestrator) + + started_events = [ + helpers.new_orchestrator_started_event(), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, None), + ] + + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER, JsonDataConverter()) + result1 = executor.execute(TEST_INSTANCE_ID, [], started_events) + actions = result1.actions + assert len(actions) == 1 + request_id = actions[0].sendEntityMessage.entityOperationCalled.requestId + + # The entity worker places the *serialized* return value into the "result" field. + response_message = {"result": json.dumps(entity_result)} + new_events = [ + helpers.new_event_sent_event(1, str(test_entity_id), json.dumps({"id": request_id})), + helpers.new_event_raised_event(request_id, json.dumps(response_message)), + ] + result2 = executor.execute(TEST_INSTANCE_ID, started_events, new_events) + complete_action = get_and_validate_complete_orchestration_action_list(1, result2.actions) + assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED + assert json.loads(complete_action.result.value) == entity_result + + def get_and_validate_complete_orchestration_action_list(expected_action_count: int, actions: list[pb.OrchestratorAction]) -> pb.CompleteOrchestrationAction: assert len(actions) == expected_action_count assert type(actions[-1]) is pb.OrchestratorAction From 4f54158181395787180431218f86e23434e5dc41 Mon Sep 17 00:00:00 2001 From: Andy Staples Date: Thu, 2 Jul 2026 13:50:08 -0600 Subject: [PATCH 3/4] Pop legacy entity map entries after handling response Addresses PR review: _entity_task_id_map and _entity_lock_task_id_map entries were looked up with get() and never removed, so the maps could grow unbounded in long-running orchestrations. Use pop() to release each entry once its eventRaised is handled, matching the new-protocol handlers. --- durabletask/worker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/durabletask/worker.py b/durabletask/worker.py index 2137d01a..88a1e0e6 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -2573,10 +2573,10 @@ def _cancel_timer() -> None: raise TypeError("Unexpected sub-orchestration task type") elif event.HasField("eventRaised"): if event.eventRaised.name in ctx._entity_task_id_map: # pyright: ignore[reportPrivateUsage] - entity_id, operation, task_id = ctx._entity_task_id_map.get(event.eventRaised.name, (None, None, None)) # pyright: ignore[reportPrivateUsage] + entity_id, operation, task_id = ctx._entity_task_id_map.pop(event.eventRaised.name, (None, None, None)) # pyright: ignore[reportPrivateUsage] self._handle_entity_event_raised(ctx, event, entity_id, task_id, False, operation) elif event.eventRaised.name in ctx._entity_lock_task_id_map: # pyright: ignore[reportPrivateUsage] - entity_id, task_id = ctx._entity_lock_task_id_map.get(event.eventRaised.name, (None, None)) # pyright: ignore[reportPrivateUsage] + entity_id, task_id = ctx._entity_lock_task_id_map.pop(event.eventRaised.name, (None, None)) # pyright: ignore[reportPrivateUsage] self._handle_entity_event_raised(ctx, event, entity_id, task_id, True) else: # event names are case-insensitive From a13812e945f8771c105269a9aacd5cfa78359bdf Mon Sep 17 00:00:00 2001 From: Andy Staples Date: Thu, 2 Jul 2026 13:51:58 -0600 Subject: [PATCH 4/4] Add changelog entry for legacy entity map cleanup --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index bffeb435..d42fd68a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ FIXED - Fixed `OrchestrationContext.call_entity` not propagating entity operation failures when running under the legacy entity protocol (used by the Azure Functions Durable extension). A failed entity operation now raises `TaskFailedError` in the calling orchestration instead of silently completing, matching the behavior of the current entity protocol and the .NET SDK. - Fixed `OrchestrationContext.call_entity` returning a double-encoded result under the legacy entity protocol. The entity's return value was left as a raw serialized JSON string (for example, a returned string arrived with extra quotes and dicts/lists arrived as strings); it is now fully deserialized and coerced to the requested `return_type`, matching the current entity protocol. +- Fixed unbounded growth of the internal entity request/lock tracking maps when using entities over the legacy entity protocol. Entries are now released as each response is handled, reducing memory use in long-running orchestrations that call or lock entities. ## v1.7.0