Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ CHANGED

- Changed the default large-payload externalization threshold (`LargePayloadStorageOptions.threshold_bytes`) from 900,000 bytes to 262,144 bytes (256 KiB), matching the .NET SDK default. Behavioral change (not source/binary breaking): payloads larger than 256 KiB are now externalized by default.

FIXED

- Fixed `OrchestrationContext.call_entity` not propagating entity operation failures when running under the legacy entity protocol (used by the Azure Functions Durable extension). A failed entity operation now raises `TaskFailedError` in the calling orchestration instead of silently completing, matching the behavior of the current entity protocol and the .NET SDK.
- Fixed `OrchestrationContext.call_entity` returning a double-encoded result under the legacy entity protocol. The entity's return value was left as a raw serialized JSON string (for example, a returned string arrived with extra quotes and dicts/lists arrived as strings); it is now fully deserialized and coerced to the requested `return_type`, matching the current entity protocol.
- Fixed unbounded growth of the internal entity request/lock tracking maps when using entities over the legacy entity protocol. Entries are now released as each response is handled, reducing memory use in long-running orchestrations that call or lock entities.

## v1.7.0

ADDED
Expand Down
32 changes: 32 additions & 0 deletions durabletask/internal/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import traceback
from datetime import datetime, timezone
from typing import Any, cast

from google.protobuf import timestamp_pb2, wrappers_pb2

Expand Down Expand Up @@ -136,6 +137,37 @@ def new_failure_details(ex: Exception, _visited: set[int] | None = None) -> pb.T
)


def _failure_details_from_core_dict(fd: dict[str, Any]) -> pb.TaskFailureDetails:
"""Convert a serialized DurableTask.Core ``FailureDetails`` dict to protobuf."""
inner = fd.get("InnerFailure")
stack_trace = fd.get("StackTrace")
return pb.TaskFailureDetails(
errorType=str(fd.get("ErrorType") or ""),
errorMessage=str(fd.get("ErrorMessage") or ""),
stackTrace=get_string_value(str(stack_trace) if stack_trace is not None else None),
innerFailure=_failure_details_from_core_dict(cast(dict[str, Any], inner)) if isinstance(inner, dict) else None,
isNonRetriable=bool(fd.get("IsNonRetriable", False)),
)


def entity_response_failure_details(response: dict[str, Any]) -> pb.TaskFailureDetails | None:
"""Extract failure details from a legacy-protocol entity ``ResponseMessage``.

The legacy entity protocol (used when the Durable WebJobs extension
translates entity operations) delivers operation results as a
``ResponseMessage`` whose ``exceptionType`` (error message) or
``failureDetails`` field is populated when the operation failed. Returns
``None`` when the response represents a successful operation.
"""
failure_details = response.get("failureDetails")
if isinstance(failure_details, dict):
return _failure_details_from_core_dict(cast(dict[str, Any], failure_details))
error_message = response.get("exceptionType")
if error_message is not None:
return pb.TaskFailureDetails(errorType="", errorMessage=str(error_message))
return None


def new_event_sent_event(event_id: int, instance_id: str, input: str):
return pb.HistoryEvent(
eventId=event_id,
Expand Down
43 changes: 30 additions & 13 deletions durabletask/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2573,10 +2573,10 @@ def _cancel_timer() -> None:
raise TypeError("Unexpected sub-orchestration task type")
elif event.HasField("eventRaised"):
if event.eventRaised.name in ctx._entity_task_id_map: # pyright: ignore[reportPrivateUsage]
entity_id, operation, task_id = ctx._entity_task_id_map.get(event.eventRaised.name, (None, None, None)) # pyright: ignore[reportPrivateUsage]
self._handle_entity_event_raised(ctx, event, entity_id, task_id, False)
entity_id, operation, task_id = ctx._entity_task_id_map.pop(event.eventRaised.name, (None, None, None)) # pyright: ignore[reportPrivateUsage]
self._handle_entity_event_raised(ctx, event, entity_id, task_id, False, operation)
Comment thread
andystaples marked this conversation as resolved.
elif event.eventRaised.name in ctx._entity_lock_task_id_map: # pyright: ignore[reportPrivateUsage]
entity_id, task_id = ctx._entity_lock_task_id_map.get(event.eventRaised.name, (None, None)) # pyright: ignore[reportPrivateUsage]
entity_id, task_id = ctx._entity_lock_task_id_map.pop(event.eventRaised.name, (None, None)) # pyright: ignore[reportPrivateUsage]
self._handle_entity_event_raised(ctx, event, entity_id, task_id, True)
else:
# event names are case-insensitive
Expand Down Expand Up @@ -2803,7 +2803,8 @@ def _handle_entity_event_raised(self,
event: pb.HistoryEvent,
entity_id: EntityInstanceId | None,
task_id: int | None,
is_lock_event: bool):
is_lock_event: bool,
operation: str | None = None):
# This eventRaised represents the result of an entity operation after being translated to the old
# entity protocol by the Durable WebJobs extension
if entity_id is None:
Expand All @@ -2813,16 +2814,32 @@ def _handle_entity_event_raised(self,
entity_task = ctx._pending_tasks.pop(task_id, None) # pyright: ignore[reportPrivateUsage]
if not entity_task:
raise RuntimeError(f"Could not retrieve entity task for entity-related eventRaised with ID '{event.eventId}'")
result = None
response: dict[str, Any] | None = None
if not ph.is_empty(event.eventRaised.input):
# TODO: Investigate why the event result is wrapped in a dict with "result" key
# The expected type applies to the unwrapped result value, not the
# transport wrapper. Unwrap first, then coerce the already-parsed
# inner value to the expected type via the converter (no redundant
# re-serialization round-trip).
unwrapped = self._data_converter.deserialize(event.eventRaised.input.value)["result"]
result = self._data_converter.coerce(
unwrapped,
response = self._data_converter.deserialize(event.eventRaised.input.value)

# For entity operation calls (lock acquisitions never fail this way), the legacy-protocol
# ResponseMessage signals a failed operation via its "exceptionType" (error message) or
# "failureDetails" field. Propagate that as a task failure so an awaiting call_entity raises,
# matching the new entity protocol and the .NET SDK.
if not is_lock_event and isinstance(response, dict):
failure_details = ph.entity_response_failure_details(response)
if failure_details is not None:
failure = EntityOperationFailedException(entity_id, operation or "", failure_details)
ctx._entity_context.recover_lock_after_call(entity_id) # pyright: ignore[reportPrivateUsage]
entity_task.fail(str(failure), failure)
ctx.resume()
return

result = None
if response is not None:
# The legacy protocol wraps the result as {"result": <serialized>},
# where the value is a serialized JSON string (like the new protocol's
# entityOperationCompleted.output). Deserialize it -- not coerce -- so
# the value is fully parsed and the expected type applied; coercing
# would skip JSON parsing and leave it double-encoded (e.g. '"done"').
result = self._data_converter.deserialize(
response["result"],
entity_task._expected_type, # pyright: ignore[reportPrivateUsage]
)
if is_lock_event:
Expand Down
161 changes: 161 additions & 0 deletions tests/durabletask/test_orchestration_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2095,6 +2095,167 @@ def orchestrator(ctx: task.OrchestrationContext, _):
assert actions[0].sendEntityMessage.entityOperationCalled.targetInstanceId.value == str(test_entity_id)


def test_entity_call_failure_propagated_over_old_protocol():
"""A failed entity operation delivered via the legacy protocol (an eventRaised
ResponseMessage with failureDetails) must surface as a TaskFailedError to the caller."""
test_entity_id = entities.EntityInstanceId("Counter", "myCounter")

def orchestrator(ctx: task.OrchestrationContext, _):
try:
yield ctx.call_entity(test_entity_id, "set", 1)
except task.TaskFailedError as e:
return e.details.message
return "no error"

registry = worker._Registry()
name = registry.add_orchestrator(orchestrator)

started_events = [
helpers.new_orchestrator_started_event(),
helpers.new_execution_started_event(name, TEST_INSTANCE_ID, None),
]

executor = worker._OrchestrationExecutor(registry, TEST_LOGGER, JsonDataConverter())
result1 = executor.execute(TEST_INSTANCE_ID, [], started_events)
actions = result1.actions
assert len(actions) == 1
assert actions[0].sendEntityMessage.HasField("entityOperationCalled")
request_id = actions[0].sendEntityMessage.entityOperationCalled.requestId

# The Durable WebJobs extension translates a failed entity operation into a
# legacy-protocol ResponseMessage carrying a serialized FailureDetails.
response_message = {
"result": None,
"failureDetails": {
"ErrorType": "ValueError",
"ErrorMessage": "Something went wrong!",
"StackTrace": None,
"InnerFailure": None,
"IsNonRetriable": False,
},
}
new_events = [
helpers.new_event_sent_event(1, str(test_entity_id), json.dumps({"id": request_id})),
helpers.new_event_raised_event(request_id, json.dumps(response_message)),
]
result2 = executor.execute(TEST_INSTANCE_ID, started_events, new_events)
complete_action = get_and_validate_complete_orchestration_action_list(1, result2.actions)
assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED
output = json.loads(complete_action.result.value)
assert output == (
"Operation 'set' on entity '@counter@myCounter' failed with error: Something went wrong!"
)


def test_entity_call_failure_propagated_over_old_protocol_exception_type():
"""A failed entity operation whose legacy ResponseMessage only carries the
``exceptionType`` (error message) field must still surface as a TaskFailedError."""
test_entity_id = entities.EntityInstanceId("Counter", "myCounter")

def orchestrator(ctx: task.OrchestrationContext, _):
try:
yield ctx.call_entity(test_entity_id, "set", 1)
except task.TaskFailedError as e:
return e.details.message
return "no error"

registry = worker._Registry()
name = registry.add_orchestrator(orchestrator)

started_events = [
helpers.new_orchestrator_started_event(),
helpers.new_execution_started_event(name, TEST_INSTANCE_ID, None),
]

executor = worker._OrchestrationExecutor(registry, TEST_LOGGER, JsonDataConverter())
result1 = executor.execute(TEST_INSTANCE_ID, [], started_events)
actions = result1.actions
assert len(actions) == 1
request_id = actions[0].sendEntityMessage.entityOperationCalled.requestId

response_message = {"result": None, "exceptionType": "Something went wrong!"}
new_events = [
helpers.new_event_sent_event(1, str(test_entity_id), json.dumps({"id": request_id})),
helpers.new_event_raised_event(request_id, json.dumps(response_message)),
]
result2 = executor.execute(TEST_INSTANCE_ID, started_events, new_events)
complete_action = get_and_validate_complete_orchestration_action_list(1, result2.actions)
assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED
output = json.loads(complete_action.result.value)
assert output == (
"Operation 'set' on entity '@counter@myCounter' failed with error: Something went wrong!"
)


def test_entity_call_success_over_old_protocol():
"""A successful entity operation delivered via the legacy protocol must
complete the call_entity task with the unwrapped result."""
test_entity_id = entities.EntityInstanceId("Counter", "myCounter")

def orchestrator(ctx: task.OrchestrationContext, _):
return (yield ctx.call_entity(test_entity_id, "get", return_type=int))

registry = worker._Registry()
name = registry.add_orchestrator(orchestrator)

started_events = [
helpers.new_orchestrator_started_event(),
helpers.new_execution_started_event(name, TEST_INSTANCE_ID, None),
]

executor = worker._OrchestrationExecutor(registry, TEST_LOGGER, JsonDataConverter())
result1 = executor.execute(TEST_INSTANCE_ID, [], started_events)
actions = result1.actions
assert len(actions) == 1
request_id = actions[0].sendEntityMessage.entityOperationCalled.requestId

response_message = {"result": json.dumps(42)}
new_events = [
helpers.new_event_sent_event(1, str(test_entity_id), json.dumps({"id": request_id})),
helpers.new_event_raised_event(request_id, json.dumps(response_message)),
]
result2 = executor.execute(TEST_INSTANCE_ID, started_events, new_events)
complete_action = get_and_validate_complete_orchestration_action_list(1, result2.actions)
assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED
assert json.loads(complete_action.result.value) == 42


@pytest.mark.parametrize("entity_result", ["done", 42, 3.5, True, {"a": 1}, [1, 2, 3]])
def test_entity_call_success_over_old_protocol_round_trips_result(entity_result):
"""The legacy-protocol ``result`` field holds a *serialized* JSON string, so
the caller must receive the fully deserialized value regardless of type and
without needing an explicit ``return_type`` (i.e. no double-encoding)."""
test_entity_id = entities.EntityInstanceId("Counter", "myCounter")

def orchestrator(ctx: task.OrchestrationContext, _):
return (yield ctx.call_entity(test_entity_id, "get"))

registry = worker._Registry()
name = registry.add_orchestrator(orchestrator)

started_events = [
helpers.new_orchestrator_started_event(),
helpers.new_execution_started_event(name, TEST_INSTANCE_ID, None),
]

executor = worker._OrchestrationExecutor(registry, TEST_LOGGER, JsonDataConverter())
result1 = executor.execute(TEST_INSTANCE_ID, [], started_events)
actions = result1.actions
assert len(actions) == 1
request_id = actions[0].sendEntityMessage.entityOperationCalled.requestId

# The entity worker places the *serialized* return value into the "result" field.
response_message = {"result": json.dumps(entity_result)}
new_events = [
helpers.new_event_sent_event(1, str(test_entity_id), json.dumps({"id": request_id})),
helpers.new_event_raised_event(request_id, json.dumps(response_message)),
]
result2 = executor.execute(TEST_INSTANCE_ID, started_events, new_events)
complete_action = get_and_validate_complete_orchestration_action_list(1, result2.actions)
assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED
assert json.loads(complete_action.result.value) == entity_result


def get_and_validate_complete_orchestration_action_list(expected_action_count: int, actions: list[pb.OrchestratorAction]) -> pb.CompleteOrchestrationAction:
assert len(actions) == expected_action_count
assert type(actions[-1]) is pb.OrchestratorAction
Expand Down
Loading