diff --git a/CHANGELOG.md b/CHANGELOG.md index 6376c6c0..1564cbce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,7 @@ adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ADDED - Added `durabletask.scheduled`, a recurring schedule feature built on durable - entities. Use `configure_scheduled_tasks(worker)` to enable it on a worker, + entities. Use `worker.configure_scheduled_tasks()` to enable it on a worker, then manage schedules from the client via `ScheduledTaskClient` (and the per-schedule `ScheduleClient`). Supports creating, describing, listing, updating, pausing, resuming, and deleting schedules with configurable diff --git a/durabletask/scheduled/__init__.py b/durabletask/scheduled/__init__.py index 36d215a7..57d295ba 100644 --- a/durabletask/scheduled/__init__.py +++ b/durabletask/scheduled/__init__.py @@ -4,9 +4,9 @@ """Scheduled tasks support for the Durable Task SDK. This package provides a recurring schedule feature built on top of durable -entities and a helper orchestrator. Register the entity and orchestrator with a -worker via :func:`configure_scheduled_tasks`, then manage schedules from the -client via :class:`ScheduledTaskClient`. +entities and a helper orchestrator. Enable it on a worker via +:meth:`durabletask.worker.TaskHubGrpcWorker.configure_scheduled_tasks`, then +manage schedules from the client via :class:`ScheduledTaskClient`. """ from durabletask.scheduled.client import ScheduleClient, ScheduledTaskClient @@ -17,7 +17,6 @@ from durabletask.scheduled.models import (ScheduleCreationOptions, ScheduleDescription, ScheduleQuery, ScheduleUpdateOptions) -from durabletask.scheduled.registration import configure_scheduled_tasks from durabletask.scheduled.schedule_status import ScheduleStatus __all__ = [ @@ -32,7 +31,6 @@ "ScheduleNotFoundError", "ScheduleClientValidationError", "ScheduleInvalidTransitionError", - "configure_scheduled_tasks", ] PACKAGE_NAME = "durabletask.scheduled" diff --git a/durabletask/scheduled/registration.py b/durabletask/scheduled/registration.py deleted file mode 100644 index 2a835094..00000000 --- a/durabletask/scheduled/registration.py +++ /dev/null @@ -1,22 +0,0 @@ -# Copyright (c) Microsoft Corporation. -# Licensed under the MIT License. - -import durabletask.internal.orchestrator_service_pb2 as pb -from durabletask.worker import TaskHubGrpcWorker -from durabletask.scheduled.orchestrator import execute_schedule_operation_orchestrator -from durabletask.scheduled.schedule_entity import ENTITY_NAME, Schedule - - -def configure_scheduled_tasks(worker: TaskHubGrpcWorker) -> None: - """Register the scheduled tasks entity and orchestrator with a worker. - - Call this before starting the worker to enable scheduled tasks support. - - Parameters - ---------- - worker : TaskHubGrpcWorker - The worker to register the schedule entity and operation orchestrator with. - """ - worker.add_entity(Schedule, ENTITY_NAME) - worker.add_orchestrator(execute_schedule_operation_orchestrator) - worker.add_capability(pb.WORKER_CAPABILITY_SCHEDULED_TASKS) diff --git a/durabletask/worker.py b/durabletask/worker.py index ea42411e..a53d5e71 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -579,8 +579,8 @@ def __init__( self._runLoop: Thread | None = None # Extra worker capabilities advertised to the backend in # GetWorkItemsRequest (in addition to ones derived from worker state such - # as LARGE_PAYLOADS). Feature-enablement helpers like - # durabletask.scheduled.configure_scheduled_tasks register theirs here. + # as LARGE_PAYLOADS). A feature-enablement helper like + # TaskHubGrpcWorker.configure_scheduled_tasks registers its own here. self._capabilities: set[int] = set() @property @@ -645,9 +645,8 @@ def add_capability(self, capability: int) -> None: """Advertise a worker capability to the backend in ``GetWorkItemsRequest``. Most users do not call this directly; feature-enablement helpers such as - :func:`durabletask.scheduled.configure_scheduled_tasks` use it to - advertise the capabilities (``pb.WORKER_CAPABILITY_*``) their feature - relies on. + :meth:`TaskHubGrpcWorker.configure_scheduled_tasks` use it to advertise + the capabilities (``pb.WORKER_CAPABILITY_*``) their feature relies on. """ if self._is_running: raise RuntimeError( @@ -655,6 +654,29 @@ def add_capability(self, capability: int) -> None: ) self._capabilities.add(capability) + def configure_scheduled_tasks(self) -> None: + """Enable scheduled tasks support on this worker. + + Registers the schedule entity and operation orchestrator and advertises + the scheduled-tasks capability to the backend. Call this before starting + the worker. Schedules are then managed from the client via + :class:`durabletask.scheduled.ScheduledTaskClient`. + """ + if self._is_running: + raise RuntimeError( + "Scheduled tasks cannot be configured while the worker is running." + ) + # Imported lazily to avoid a circular import: durabletask.scheduled + # imports from durabletask.worker. + from durabletask.scheduled.orchestrator import ( + execute_schedule_operation_orchestrator, + ) + from durabletask.scheduled.schedule_entity import ENTITY_NAME, Schedule + + self.add_entity(Schedule, ENTITY_NAME) + self.add_orchestrator(execute_schedule_operation_orchestrator) + self.add_capability(pb.WORKER_CAPABILITY_SCHEDULED_TASKS) + def use_versioning(self, version: VersioningOptions) -> None: """Initializes versioning options for sub-orchestrators and activities.""" if self._is_running: diff --git a/examples/scheduled_tasks.py b/examples/scheduled_tasks.py index 841d3969..1490889e 100644 --- a/examples/scheduled_tasks.py +++ b/examples/scheduled_tasks.py @@ -17,8 +17,7 @@ from durabletask import task from durabletask.azuremanaged.client import DurableTaskSchedulerClient from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker -from durabletask.scheduled import (ScheduledTaskClient, ScheduleCreationOptions, - configure_scheduled_tasks) +from durabletask.scheduled import ScheduledTaskClient, ScheduleCreationOptions def greet_orchestrator(ctx: task.OrchestrationContext, name: str) -> Generator[task.Task[Any], Any, Any]: @@ -41,7 +40,7 @@ def greet_orchestrator(ctx: task.OrchestrationContext, name: str) -> Generator[t taskhub=taskhub_name, token_credential=credential) as worker: worker.add_orchestrator(greet_orchestrator) # Register the schedule entity and operation orchestrator. - configure_scheduled_tasks(worker) + worker.configure_scheduled_tasks() worker.start() client = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=secure_channel, diff --git a/tests/durabletask-azuremanaged/scheduled/test_dts_scheduled_e2e.py b/tests/durabletask-azuremanaged/scheduled/test_dts_scheduled_e2e.py index 06d3d76a..05f7e7e3 100644 --- a/tests/durabletask-azuremanaged/scheduled/test_dts_scheduled_e2e.py +++ b/tests/durabletask-azuremanaged/scheduled/test_dts_scheduled_e2e.py @@ -19,8 +19,7 @@ from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker from durabletask.scheduled import (ScheduledTaskClient, ScheduleCreationOptions, ScheduleQuery, ScheduleStatus, - ScheduleUpdateOptions, - configure_scheduled_tasks) + ScheduleUpdateOptions) import os @@ -61,7 +60,7 @@ def _make_worker() -> DurableTaskSchedulerWorker: w = DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=secure_channel, taskhub=taskhub_name, token_credential=None) w.add_orchestrator(target_orchestrator) - configure_scheduled_tasks(w) + w.configure_scheduled_tasks() return w diff --git a/tests/durabletask/scheduled/test_client_filter_and_capability.py b/tests/durabletask/scheduled/test_client_filter_and_capability.py index 133de4eb..66cfc318 100644 --- a/tests/durabletask/scheduled/test_client_filter_and_capability.py +++ b/tests/durabletask/scheduled/test_client_filter_and_capability.py @@ -6,7 +6,6 @@ from datetime import datetime, timedelta, timezone import durabletask.internal.orchestrator_service_pb2 as pb -from durabletask.scheduled import configure_scheduled_tasks from durabletask.scheduled.client import ScheduledTaskClient from durabletask.scheduled.models import (ScheduleConfiguration, ScheduleCreationOptions, ScheduleQuery, @@ -69,7 +68,7 @@ def test_status_filter(self): class TestScheduledTasksCapability: def test_configure_advertises_scheduled_tasks_capability(self): worker = TaskHubGrpcWorker() - configure_scheduled_tasks(worker) + worker.configure_scheduled_tasks() assert pb.WORKER_CAPABILITY_SCHEDULED_TASKS in worker._capabilities # pyright: ignore[reportPrivateUsage] def test_capability_absent_by_default(self): diff --git a/tests/durabletask/scheduled/test_scheduled_e2e.py b/tests/durabletask/scheduled/test_scheduled_e2e.py index 911cfa64..4a158333 100644 --- a/tests/durabletask/scheduled/test_scheduled_e2e.py +++ b/tests/durabletask/scheduled/test_scheduled_e2e.py @@ -12,8 +12,7 @@ from durabletask import client, task, worker from durabletask.scheduled import (ScheduledTaskClient, ScheduleCreationOptions, ScheduleQuery, ScheduleStatus, - ScheduleUpdateOptions, - configure_scheduled_tasks) + ScheduleUpdateOptions) from durabletask.testing import create_test_backend from tests.durabletask._port_utils import find_free_port @@ -63,7 +62,7 @@ def target_orchestrator(ctx: task.OrchestrationContext, value): def _make_worker() -> worker.TaskHubGrpcWorker: w = worker.TaskHubGrpcWorker(host_address=HOST) w.add_orchestrator(target_orchestrator) - configure_scheduled_tasks(w) + w.configure_scheduled_tasks() return w