Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,18 @@ adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

ADDED

- Added `durabletask.scheduled`, a recurring schedule feature built on durable
entities. Use `configure_scheduled_tasks(worker)` to enable it on a worker,
then manage schedules from the client via `ScheduledTaskClient` (and the
per-schedule `ScheduleClient`). Supports creating, describing, listing,
updating, pausing, resuming, and deleting schedules with configurable
`interval`, `start_at`, `end_at`, and `start_immediately_if_late` options.
- Added an optional `signal_time` parameter to `EntityContext.signal_entity`
and `DurableEntity.signal_entity`, allowing an entity signal to be scheduled
for future delivery.
- Added an optional `signal_time` parameter to `OrchestrationContext.signal_entity`
and to the client `signal_entity` methods (sync and async), allowing entity
signals to be scheduled for future delivery from orchestrations and clients.
- Added a pluggable `DataConverter` (`durabletask.serialization`) accepted by
`TaskHubGrpcWorker`, `TaskHubGrpcClient`, and `AsyncTaskHubGrpcClient` via a
`data_converter` argument. Every payload boundary (inputs, outputs, events,
Expand Down
12 changes: 8 additions & 4 deletions durabletask/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -825,8 +825,10 @@ def purge_orchestrations_by(self,
def signal_entity(self,
entity_instance_id: EntityInstanceId,
operation_name: str,
input: Any | None = None) -> None:
req = build_signal_entity_req(entity_instance_id, operation_name, input, self._data_converter)
input: Any | None = None,
signal_time: datetime | None = None) -> None:
req = build_signal_entity_req(
entity_instance_id, operation_name, input, signal_time, self._data_converter)
self._logger.info(f"Signaling entity '{entity_instance_id}' operation '{operation_name}'.")
if self._payload_store is not None:
payload_helpers.externalize_payloads(
Expand Down Expand Up @@ -1308,8 +1310,10 @@ async def purge_orchestrations_by(self,
async def signal_entity(self,
entity_instance_id: EntityInstanceId,
operation_name: str,
input: Any | None = None) -> None:
req = build_signal_entity_req(entity_instance_id, operation_name, input, self._data_converter)
input: Any | None = None,
signal_time: datetime | None = None) -> None:
req = build_signal_entity_req(
entity_instance_id, operation_name, input, signal_time, self._data_converter)
self._logger.info(f"Signaling entity '{entity_instance_id}' operation '{operation_name}'.")
if self._payload_store is not None:
await payload_helpers.externalize_payloads_async(
Expand Down
11 changes: 9 additions & 2 deletions durabletask/entities/durable_entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Licensed under the MIT License.

from typing import Any, TypeVar, overload
from datetime import datetime

from durabletask.entities.entity_context import EntityContext
from durabletask.entities.entity_instance_id import EntityInstanceId
Expand Down Expand Up @@ -52,7 +53,9 @@ def set_state(self, state: Any):
"""
self.entity_context.set_state(state)

def signal_entity(self, entity_instance_id: EntityInstanceId, operation: str, input: Any | None = None) -> None:
def signal_entity(self, entity_instance_id: EntityInstanceId, operation: str,
input: Any | None = None,
signal_time: datetime | None = None) -> None:
"""Signal another entity to perform an operation.

Parameters
Expand All @@ -63,8 +66,12 @@ def signal_entity(self, entity_instance_id: EntityInstanceId, operation: str, in
The operation to perform on the entity.
input : Any, optional
The input to provide to the entity for the operation.
signal_time : datetime, optional
The time at which the signal should be delivered. If None, the signal is
delivered as soon as possible. Use this to schedule a future operation,
for example to have an entity wake itself up at a later time.
"""
self.entity_context.signal_entity(entity_instance_id, operation, input)
self.entity_context.signal_entity(entity_instance_id, operation, input, signal_time)

def schedule_new_orchestration(self, orchestration_name: str, input: Any | None = None, instance_id: str | None = None) -> str:
"""Schedule a new orchestration instance.
Expand Down
16 changes: 14 additions & 2 deletions durabletask/entities/entity_context.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

from datetime import datetime
from typing import TYPE_CHECKING, Any, TypeVar, overload
import uuid
from google.protobuf import timestamp_pb2
from durabletask.entities.entity_instance_id import EntityInstanceId
from durabletask.internal import helpers
from durabletask.internal.entity_state_shim import StateShim
Expand Down Expand Up @@ -88,7 +90,9 @@ def set_state(self, new_state: Any) -> None:
"""
self._state.set_state(new_state)

def signal_entity(self, entity_instance_id: EntityInstanceId, operation: str, input: Any | None = None) -> None:
def signal_entity(self, entity_instance_id: EntityInstanceId, operation: str,
input: Any | None = None,
signal_time: datetime | None = None) -> None:
"""Signal another entity to perform an operation.

Parameters
Expand All @@ -99,15 +103,23 @@ def signal_entity(self, entity_instance_id: EntityInstanceId, operation: str, in
The operation to perform on the entity.
input : Any, optional
The input to provide to the entity for the operation.
signal_time : datetime, optional
The time at which the signal should be delivered. If None, the signal is
delivered as soon as possible. Use this to schedule a future operation,
for example to have an entity wake itself up at a later time.
"""
encoded_input: str | None = self._data_converter.serialize(input)
scheduled_time: timestamp_pb2.Timestamp | None = None
if signal_time is not None:
scheduled_time = timestamp_pb2.Timestamp()
scheduled_time.FromDatetime(signal_time)
self._state.add_operation_action(
pb.OperationAction(
sendSignal=pb.SendSignalAction(
instanceId=str(entity_instance_id),
name=operation,
input=helpers.get_string_value(encoded_input),
scheduledTime=None,
scheduledTime=scheduled_time,
requestTime=None,
parentTraceContext=None,
)
Expand Down
4 changes: 3 additions & 1 deletion durabletask/internal/client_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,14 +205,16 @@ def build_signal_entity_req(
entity_instance_id: EntityInstanceId,
operation_name: str,
input: Any | None = None,
signal_time: datetime | None = None,
data_converter: DataConverter | None = None) -> pb.SignalEntityRequest:
"""Build a SignalEntityRequest for signaling an entity."""
scheduled_time = helpers.new_timestamp(signal_time) if signal_time is not None else None
return pb.SignalEntityRequest(
instanceId=str(entity_instance_id),
name=operation_name,
input=helpers.get_string_value(_serialize(input, data_converter)),
requestId=str(uuid.uuid4()),
scheduledTime=None,
scheduledTime=scheduled_time,
parentTraceContext=None,
requestTime=helpers.new_timestamp(datetime.now(timezone.utc))
)
23 changes: 20 additions & 3 deletions durabletask/internal/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Licensed under the MIT License.

import traceback
from datetime import datetime
from datetime import datetime, timezone

from google.protobuf import timestamp_pb2, wrappers_pb2

Expand Down Expand Up @@ -255,11 +255,13 @@ def new_signal_entity_action(id: int,
entity_id: EntityInstanceId,
operation: str,
encoded_input: str | None,
request_id: str) -> pb.OrchestratorAction:
request_id: str,
signal_time: datetime | None = None) -> pb.OrchestratorAction:
scheduled_time = new_timestamp(signal_time) if signal_time is not None else None
return pb.OrchestratorAction(id=id, sendEntityMessage=pb.SendEntityMessageAction(entityOperationSignaled=pb.EntityOperationSignaledEvent(
requestId=request_id,
operation=operation,
scheduledTime=None,
scheduledTime=scheduled_time,
input=get_string_value(encoded_input),
targetInstanceId=get_string_value(str(entity_id)),
)))
Expand Down Expand Up @@ -300,6 +302,21 @@ def new_timestamp(dt: datetime) -> timestamp_pb2.Timestamp:
return ts


def ensure_aware(value: datetime | None) -> datetime | None:
"""Return ``value`` as a timezone-aware datetime, assuming UTC when naive.

A naive datetime is tagged as UTC; an already-aware datetime is returned
unchanged. Useful before comparing user-supplied datetimes against the
SDK's always-aware-UTC timestamps to avoid "can't compare offset-naive and
offset-aware datetimes".
"""
if value is None:
return None
if value.tzinfo is None:
return value.replace(tzinfo=timezone.utc)
return value


def new_create_sub_orchestration_action(
id: int,
name: str,
Expand Down
38 changes: 38 additions & 0 deletions durabletask/scheduled/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

"""Scheduled tasks support for the Durable Task SDK.

This package provides a recurring schedule feature built on top of durable
entities and a helper orchestrator. Register the entity and orchestrator with a
worker via :func:`configure_scheduled_tasks`, then manage schedules from the
client via :class:`ScheduledTaskClient`.
"""

from durabletask.scheduled.client import ScheduleClient, ScheduledTaskClient
from durabletask.scheduled.exceptions import (ScheduleClientValidationError,
ScheduleError,
ScheduleInvalidTransitionError,
ScheduleNotFoundError)
from durabletask.scheduled.models import (ScheduleCreationOptions,
ScheduleDescription, ScheduleQuery,
ScheduleUpdateOptions)
from durabletask.scheduled.registration import configure_scheduled_tasks
from durabletask.scheduled.schedule_status import ScheduleStatus

__all__ = [
"ScheduledTaskClient",
"ScheduleClient",
"ScheduleCreationOptions",
"ScheduleUpdateOptions",
"ScheduleDescription",
"ScheduleQuery",
"ScheduleStatus",
"ScheduleError",
"ScheduleNotFoundError",
"ScheduleClientValidationError",
"ScheduleInvalidTransitionError",
"configure_scheduled_tasks",
]

PACKAGE_NAME = "durabletask.scheduled"
154 changes: 154 additions & 0 deletions durabletask/scheduled/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

import logging

from durabletask.client import (EntityQuery, OrchestrationStatus,
TaskHubGrpcClient)
from durabletask.entities import EntityInstanceId
from durabletask.internal.helpers import ensure_aware
from durabletask.scheduled import transitions
from durabletask.scheduled.exceptions import ScheduleNotFoundError
from durabletask.scheduled.models import (ScheduleCreationOptions,
ScheduleDescription, ScheduleQuery,
ScheduleState, ScheduleUpdateOptions)
from durabletask.scheduled.orchestrator import (
ScheduleOperationRequest, execute_schedule_operation_orchestrator)
from durabletask.scheduled.schedule_entity import (DELETE_OPERATION,
ENTITY_NAME)

logger = logging.getLogger("durabletask.scheduled")


class ScheduleClient:
"""Client for managing a single schedule instance."""

def __init__(self, client: TaskHubGrpcClient, schedule_id: str,
*, operation_timeout: float = 60):
if not schedule_id:
raise ValueError("schedule_id cannot be empty.")
self._client = client
self._schedule_id = schedule_id
self._entity_id = EntityInstanceId(ENTITY_NAME, schedule_id)
self._operation_timeout = operation_timeout

@property
def schedule_id(self) -> str:
"""Gets the ID of this schedule."""
return self._schedule_id

def _run_operation(self, operation_name: str, input: object | None = None) -> None:
request = ScheduleOperationRequest(
entity_id=str(self._entity_id),
operation_name=operation_name,
input=input,
)
instance_id = self._client.schedule_new_orchestration(
execute_schedule_operation_orchestrator, input=request)
state = self._client.wait_for_orchestration_completion(
instance_id, timeout=self._operation_timeout)
if state is None or state.runtime_status != OrchestrationStatus.COMPLETED:
failure = state.failure_details if state else None
message = failure.message if failure else "unknown error"
raise RuntimeError(
Comment thread
andystaples marked this conversation as resolved.
f"Failed to '{operation_name}' schedule '{self._schedule_id}': {message}")

def create(self, options: ScheduleCreationOptions) -> None:
"""Create or update this schedule with the given configuration."""
self._run_operation(transitions.CREATE_SCHEDULE, options)

def update(self, options: ScheduleUpdateOptions) -> None:
"""Update this schedule's configuration."""
self._run_operation(transitions.UPDATE_SCHEDULE, options)

def pause(self) -> None:
"""Pause this schedule."""
self._run_operation(transitions.PAUSE_SCHEDULE)

def resume(self) -> None:
"""Resume this schedule."""
self._run_operation(transitions.RESUME_SCHEDULE)

def delete(self) -> None:
"""Delete this schedule."""
self._run_operation(DELETE_OPERATION)

def describe(self) -> ScheduleDescription:
"""Retrieve the current details of this schedule."""
metadata = self._client.get_entity(self._entity_id, include_state=True)
if metadata is None:
raise ScheduleNotFoundError(self._schedule_id)
state = metadata.get_typed_state(ScheduleState)
if state is None:
raise ScheduleNotFoundError(self._schedule_id)
return state.to_description()


class ScheduledTaskClient:
"""Client for managing scheduled tasks in a Durable Task application."""

def __init__(self, client: TaskHubGrpcClient, *, operation_timeout: float = 60):
self._client = client
self._operation_timeout = operation_timeout

def get_schedule_client(self, schedule_id: str) -> ScheduleClient:
"""Get a handle to manage a specific schedule."""
return ScheduleClient(self._client, schedule_id,
operation_timeout=self._operation_timeout)

def create_schedule(self, options: ScheduleCreationOptions) -> ScheduleClient:
"""Create a new schedule and return a client for managing it."""
schedule_client = self.get_schedule_client(options.schedule_id)
schedule_client.create(options)
return schedule_client

def get_schedule(self, schedule_id: str) -> ScheduleDescription | None:
"""Get a schedule description by ID, or None if it does not exist."""
try:
return self.get_schedule_client(schedule_id).describe()
except ScheduleNotFoundError:
return None

def list_schedules(self, schedule_query: ScheduleQuery | None = None) -> list[ScheduleDescription]:
"""List schedules matching the given filter criteria.

> [!NOTE]
> The ``status`` and ``created_from``/``created_to`` filters are applied
> client-side after each page of entities is fetched, so an individual
> page may contain fewer than ``page_size`` matches (or none) even when
> more matching schedules exist. This mirrors the .NET implementation.
"""
prefix = schedule_query.schedule_id_prefix if schedule_query and schedule_query.schedule_id_prefix else ""
page_size = (schedule_query.page_size if schedule_query and schedule_query.page_size
else ScheduleQuery.DEFAULT_PAGE_SIZE)
query = EntityQuery(
instance_id_starts_with=f"@{ENTITY_NAME}@{prefix}",
include_state=True,
page_size=page_size,
)
results: list[ScheduleDescription] = []
for metadata in self._client.get_all_entities(query):
Comment thread
andystaples marked this conversation as resolved.
state = metadata.get_typed_state(ScheduleState)
if state is None or state.schedule_configuration is None:
continue
if not self._matches_filter(state, schedule_query):
continue
results.append(state.to_description())
return results

@staticmethod
def _matches_filter(state: ScheduleState, schedule_query: ScheduleQuery | None) -> bool:
if schedule_query is None:
return True
if schedule_query.status is not None and state.status != schedule_query.status:
return False
# ``ScheduleQuery`` normalizes its bounds to aware UTC; defensively
# normalize the stored timestamp too (a payload could in principle carry
# a naive value) so the comparison can never raise on naive-vs-aware.
# Bounds are exclusive, matching the .NET ScheduledTasks implementation.
created_at = ensure_aware(state.schedule_created_at)
if schedule_query.created_from is not None and not (created_at and created_at > schedule_query.created_from):
return False
if schedule_query.created_to is not None and not (created_at and created_at < schedule_query.created_to):
return False
Comment thread
andystaples marked this conversation as resolved.
return True
Loading
Loading