Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased

ADDED

- Added `OrchestrationContext.parent_instance_id`, which returns the instance
ID of the parent orchestration for a sub-orchestration, or `None` for a
top-level orchestration.

CHANGED

- Changed the default large-payload externalization threshold (`LargePayloadStorageOptions.threshold_bytes`) from 900,000 bytes to 262,144 bytes (256 KiB), matching the .NET SDK default. Behavioral change (not source/binary breaking): payloads larger than 256 KiB are now externalized by default.
Expand Down
16 changes: 16 additions & 0 deletions durabletask/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,22 @@ def instance_id(self) -> str:
"""
pass

@property
@abstractmethod
def parent_instance_id(self) -> str | None:
"""Get the ID of the parent orchestration instance.

For a sub-orchestration, this is the instance ID of the orchestration
that scheduled it. For a top-level orchestration, this is ``None``.

Returns
-------
str | None
The parent orchestration instance ID, or ``None`` if this
orchestration was not scheduled by a parent orchestration.
"""
pass

@property
@abstractmethod
def version(self) -> str | None:
Expand Down
15 changes: 15 additions & 0 deletions durabletask/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1483,6 +1483,7 @@ def __init__(self,
self._registry = registry
self._entity_context = OrchestrationEntityContext(instance_id)
self._version: str | None = None
self._parent_instance_id: str | None = None
self._completion_status: pb.OrchestrationStatus | None = None
self._received_events: dict[str, list[str | None]] = {}
self._pending_events: dict[str, list[task.CancellableTask[Any]]] = {}
Expand Down Expand Up @@ -1638,6 +1639,10 @@ def instance_id(self) -> str:
def version(self) -> str | None:
return self._version

@property
def parent_instance_id(self) -> str | None:
return self._parent_instance_id

@property
def current_utc_datetime(self) -> datetime:
return self._current_utc_datetime
Expand Down Expand Up @@ -2222,6 +2227,16 @@ def process_event(
if event.executionStarted.version:
ctx._version = event.executionStarted.version.value # pyright: ignore[reportPrivateUsage]

# Store the parent orchestration instance ID (set for
# sub-orchestrations; absent for top-level orchestrations)
if (
event.executionStarted.HasField("parentInstance")
and event.executionStarted.parentInstance.HasField("orchestrationInstance")
):
ctx._parent_instance_id = ( # pyright: ignore[reportPrivateUsage]
event.executionStarted.parentInstance.orchestrationInstance.instanceId
)

# Store the parent trace context for propagation to child tasks
if event.executionStarted.HasField("parentTraceContext"):
ctx._parent_trace_context = event.executionStarted.parentTraceContext # pyright: ignore[reportPrivateUsage]
Expand Down
44 changes: 44 additions & 0 deletions tests/durabletask/test_orchestration_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,50 @@ def orchestrator(ctx: task.OrchestrationContext, my_input: int):
assert complete_action.result.value == json.dumps(expected_output)


def test_orchestrator_parent_instance_id():
"""A sub-orchestration exposes its parent's instance ID on the context."""

parent_id = "parent-instance-42"
observed: dict[str, str | None] = {}

def orchestrator(ctx: task.OrchestrationContext, _):
observed["parent"] = ctx.parent_instance_id
return "done"

registry = worker._Registry()
name = registry.add_orchestrator(orchestrator)

started = helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None)
started.executionStarted.parentInstance.CopyFrom(
pb.ParentInstanceInfo(
taskScheduledId=1,
orchestrationInstance=pb.OrchestrationInstance(instanceId=parent_id)))

executor = worker._OrchestrationExecutor(registry, TEST_LOGGER, JsonDataConverter())
executor.execute(TEST_INSTANCE_ID, [], [started])

assert observed["parent"] == parent_id


def test_orchestrator_parent_instance_id_none_for_top_level():
"""A top-level orchestration has no parent instance ID."""

observed: dict[str, str | None] = {}

def orchestrator(ctx: task.OrchestrationContext, _):
observed["parent"] = ctx.parent_instance_id
return "done"

registry = worker._Registry()
name = registry.add_orchestrator(orchestrator)

new_events = [helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None)]
executor = worker._OrchestrationExecutor(registry, TEST_LOGGER, JsonDataConverter())
executor.execute(TEST_INSTANCE_ID, [], new_events)

assert observed["parent"] is None


def test_complete_orchestration_actions():
"""Tests the actions output for a completed orchestration"""

Expand Down
Loading