diff --git a/.github/workflows/durabletask-azurefunctions.yml b/.github/workflows/durabletask-azurefunctions.yml new file mode 100644 index 00000000..6df274f1 --- /dev/null +++ b/.github/workflows/durabletask-azurefunctions.yml @@ -0,0 +1,67 @@ +name: Durable Task Scheduler SDK (azure-functions-durable) + +on: + push: + branches: + - "main" + tags: + - "azurefunctions-v*" # Only run for tags starting with "azurefunctions-v" + pull_request: + branches: + - "main" + +permissions: + contents: read + +jobs: + lint: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Set up Python 3.14 + uses: actions/setup-python@v5 + with: + python-version: 3.14 + - name: Install dependencies + working-directory: azure-functions-durable + run: | + python -m pip install --upgrade pip + pip install setuptools wheel tox + pip install flake8 + - name: Run flake8 Linter + working-directory: azure-functions-durable + run: flake8 . + - name: Run flake8 Linter + working-directory: tests/azure-functions-durable + run: flake8 . + + run-tests: + strategy: + fail-fast: false + matrix: + python-version: ["3.13", "3.14"] + needs: lint + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Install durabletask locally + run: | + python -m pip install --upgrade pip + pip install pytest + pip install . --force-reinstall + + - name: Install azure-functions-durable locally + run: | + pip install ./azure-functions-durable --force-reinstall + + - name: Run unit tests + working-directory: tests/azure-functions-durable + run: | + pytest -m "not dts and not azurite" --verbose diff --git a/.github/workflows/typecheck.yml b/.github/workflows/typecheck.yml index 28dcddfc..99c639ba 100644 --- a/.github/workflows/typecheck.yml +++ b/.github/workflows/typecheck.yml @@ -7,6 +7,7 @@ on: tags: - "v*" - "azuremanaged-v*" + - "azurefunctions-v*" pull_request: branches: - "main" @@ -46,3 +47,25 @@ jobs: - name: Run pyright (strict, Python 3.10) run: pyright + + pyright-azurefunctions: + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Set up Python 3.13 (lowest supported by azure-functions-durable) + uses: actions/setup-python@v5 + with: + python-version: "3.13" + + - name: Install packages and dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + pip install -e ".[azure-blob-payloads,opentelemetry]" + pip install -e ./azure-functions-durable + pip install pyright + + - name: Run pyright (strict, Python 3.13) + run: pyright -p azure-functions-durable/pyrightconfig.json diff --git a/CHANGELOG.md b/CHANGELOG.md index 46df1f48..15346b80 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,16 @@ adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## Unreleased +ADDED + +- Added a `result` property to `Task` as a convenience alias for `get_result()`. + +FIXED + +- `OrchestrationContext.create_timer` now accepts timezone-aware `datetime` + values, normalizing them to UTC instead of raising when compared against the + orchestration's internal clock. + ## v1.7.0 ADDED diff --git a/azure-functions-durable/CHANGELOG.md b/azure-functions-durable/CHANGELOG.md new file mode 100644 index 00000000..84f9aed3 --- /dev/null +++ b/azure-functions-durable/CHANGELOG.md @@ -0,0 +1,127 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), +and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## Unreleased + +### Fixed + +- `durable_client_input` now injects a rich `DurableFunctionsClient` into the + decorated function's client parameter (the binding's JSON string is converted + to a client object). Previously the client parameter received the raw string. +- `DurableFunctionsClient` now applies the host-provided + `maxGrpcMessageSizeInBytes` to the gRPC channel's send/receive message limits + (when provided), allowing large orchestration payloads to be retrieved. When + the host does not supply a value, the gRPC library defaults are left in place. +- `DurableOrchestrationContext.current_utc_datetime` is now timezone-aware + (UTC), matching v1, so comparisons against timezone-aware datetimes (e.g. a + parsed scheduled-start time) no longer raise. +- `DurableOrchestrationStatus.to_json()` now emits orchestration payloads + (`output`, `input`, `customStatus`) as their raw JSON representation instead + of reconstructed Python objects, so the result is always JSON-serializable + even when payloads are custom types. +- Restored v1 members that were missing on the compatibility types, avoiding + `AttributeError`/`TypeError` for existing code that used them: + - `create_http_management_payload(...)` now returns a `dict`-based + `HttpManagementPayload`, so `json.dumps(payload)` works directly again. + - `RetryOptions.to_json()` returns the v1 + `firstRetryIntervalInMilliseconds`/`maxNumberOfAttempts` dictionary, and the + `first_retry_interval_in_milliseconds` / `max_number_of_attempts` getters + remain available. + - `DurableOrchestrationStatus.from_json(...)` reconstructs a status from its + `to_json()` representation (or the equivalent v1 JSON schema). + - `PurgeHistoryResult.from_json(...)` reconstructs a result from its v1 JSON + representation. + - `DurableOrchestrationContext.version` returns the orchestration instance + version (or `None`). + +### Added + +- The `orchestration_trigger` decorator now accepts an `input_type` argument + (v1 parity). When set, a v1-style `context.get_input()` decodes the input to + that type; a call-site `expected_type` on `get_input` takes precedence. +- One-argument (Azure Functions / v1-style) entity functions + (``def entity(context):``) are now supported. The worker detects the entity's + shape and, for single-argument functions, delivers a functional + `DurableEntityContext` that wraps the durabletask `EntityContext` and exposes + the v1 entity API: `entity_name`, `entity_key`, `operation_name`, + `get_input()`, `get_state()` (with `initializer`), `set_state()`, + `set_result()`, and `destruct_on_exit()`. The operation result is taken from + `set_result(...)`, falling back to the function's return value. + durabletask-native two-argument entity functions and class-based + (`DurableEntity`) entities continue to work unchanged. +- One-argument (Azure Functions / v1-style) orchestrator functions + (``def orchestrator(context):``) are now supported. The worker detects the + orchestrator's arity and, for single-argument functions, delivers a + functional `DurableOrchestrationContext` that wraps the durabletask + `OrchestrationContext` and exposes the v1 context API: `get_input()`, + `call_activity`/`call_activity_with_retry`, + `call_sub_orchestrator`/`call_sub_orchestrator_with_retry`, `create_timer`, + `wait_for_external_event`, `continue_as_new`, `set_custom_status`, + `task_all`/`task_any`, `call_entity`/`signal_entity`, and `new_uuid`/`new_guid`. + Two-argument (durabletask-native) orchestrators continue to work unchanged. + `DurableOrchestrationContext.call_http` raises `NotImplementedError` pending a + durabletask durable-HTTP implementation. +- `DurableOrchestrationContext` also exposes `custom_status` (reflecting the + value set via `set_custom_status`) and `will_continue_as_new` (True once + `continue_as_new` has been called). `parent_instance_id`, `function_context`, + and `histories` raise `NotImplementedError` because durabletask does not + surface that information on the orchestration context. + +- Backwards-compatible, deprecated aliases on `DurableFunctionsClient` for the + v1 `DurableOrchestrationClient` method names: `start_new`, `get_status`, + `get_status_all`, `get_status_by`, `raise_event`, `terminate`, + `purge_instance_history`, `purge_instance_history_by`, `suspend`, `resume`, + `restart`, `read_entity_state`, `get_client_response_links`, and + `wait_for_completion_or_create_check_status_response`. Each delegates to the + corresponding durabletask method and emits a `DeprecationWarning`; new code + should use the durabletask names (e.g. `schedule_new_orchestration`, + `get_orchestration_state`). +- `DurableFunctionsClient.signal_entity` now also accepts the v1 + `operation_input` keyword (alias for `input`); `task_hub_name` and + `connection_name` are accepted for compatibility and ignored. +- `DurableFunctionsClient.rewind` is present as a deprecated stub that raises + `NotImplementedError`, pending a durabletask rewind implementation. +- Deprecated v1 compatibility aliases are now exported from + `azure.durable_functions`: `DurableOrchestrationClient` (alias for + `DurableFunctionsClient`), `DurableOrchestrationContext`, `DurableEntityContext`, + `EntityId`, `ManagedIdentityTokenSource`, `TokenSource`, `Entity`, and + `OrchestrationRuntimeStatus`. +- v1-compatible return-type wrappers `DurableOrchestrationStatus`, + `PurgeHistoryResult`, and `EntityStateResponse` (exported from + `azure.durable_functions`). The deprecated client methods now return these: + `get_status`/`get_status_all`/`get_status_by` return + `DurableOrchestrationStatus` (wrapping durabletask `OrchestrationState`, with + v1 attributes like `runtime_status`, `output`, `input_`, `custom_status`, and + a falsy value for missing instances); `purge_instance_history`/`_by` return + `PurgeHistoryResult` (with `instances_deleted`); and `read_entity_state` + returns `EntityStateResponse` (with `entity_exists`/`entity_state`). +- `DurableOrchestrationContext.call_http` is present as a stub that raises + `NotImplementedError`, documenting the durable-HTTP gap. `TokenSource` / + `ManagedIdentityTokenSource` remain constructible but only apply to + `call_http`, which is not yet supported. +- `RetryOptions`, a deprecated shim that maps the v1 millisecond-based + constructor onto durabletask `RetryPolicy` (which uses `timedelta`). + `RetryPolicy` is now also exported from `azure.durable_functions`. + +### Changed + +- `DurableFunctionsClient` is now an async client. Its orchestration and entity + management methods (e.g. `schedule_new_orchestration`, `get_orchestration_state`, + `wait_for_orchestration_completion`) are now coroutines and must be awaited. + This aligns the client with the async API surface of the V1 + `DurableOrchestrationClient`. +- `create_http_management_payload` now accepts either the durabletask + `(request, instance_id)` signature or the v1 `(instance_id)` signature for + backwards compatibility. +- `HttpManagementPayload` now subclasses `dict`, so it is directly + JSON-serializable via `json.dumps(payload)` and supports mapping-style access + (`payload["statusQueryGetUri"]`, iteration, `in`, `keys()`/`items()`/`values()`) + so v1 code that treated the payload as a `dict` keeps working. + +## v0.1.0 + +- Initial implementation diff --git a/azure-functions-durable/azure/durable_functions/__init__.py b/azure-functions-durable/azure/durable_functions/__init__.py new file mode 100644 index 00000000..b24c353e --- /dev/null +++ b/azure-functions-durable/azure/durable_functions/__init__.py @@ -0,0 +1,49 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +# This module intentionally re-exports deprecated v1 compatibility aliases. +# pyright: reportDeprecated=false + +from durabletask.task import RetryPolicy + +from .decorators.durable_app import Blueprint, DFApp +from .client import DurableFunctionsClient +from .orchestrator import Orchestrator +from .internal.compat.retry_options import RetryOptions +from .internal.compat.orchestration_runtime_status import OrchestrationRuntimeStatus +from .internal.compat.durable_orchestration_status import DurableOrchestrationStatus +from .internal.compat.purge_history_result import PurgeHistoryResult +from .internal.compat.entity_state_response import EntityStateResponse +from .internal.compat.entity_id import EntityId +from .internal.compat.token_source import ManagedIdentityTokenSource, TokenSource +from .internal.compat.orchestration_context import DurableOrchestrationContext +from .internal.compat.entity_context import DurableEntityContext +from .internal.compat.compat_aliases import ( + DurableOrchestrationClient, + Entity, +) + +# IMPORTANT: DO NOT REMOVE. `azure-functions` relies on the presence and value of this variable +# for version detection +version = "2.x" + +__all__ = [ + "Blueprint", + "DFApp", + "DurableEntityContext", + "DurableFunctionsClient", + "DurableOrchestrationClient", + "DurableOrchestrationContext", + "DurableOrchestrationStatus", + "Entity", + "EntityId", + "EntityStateResponse", + "ManagedIdentityTokenSource", + "Orchestrator", + "OrchestrationRuntimeStatus", + "PurgeHistoryResult", + "RetryOptions", + "RetryPolicy", + "TokenSource", + "version", +] diff --git a/azure-functions-durable/azure/durable_functions/client.py b/azure-functions-durable/azure/durable_functions/client.py new file mode 100644 index 00000000..d4de1ab6 --- /dev/null +++ b/azure-functions-durable/azure/durable_functions/client.py @@ -0,0 +1,416 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import json + +from datetime import datetime, timedelta +from typing import Any, Optional, Union +from typing_extensions import deprecated +import azure.functions as func +from urllib.parse import urlparse, quote + +from durabletask.client import ( + AsyncTaskHubGrpcClient, + OrchestrationQuery, + OrchestrationStatus, +) +from durabletask.entities import EntityInstanceId +from durabletask.grpc_options import GrpcChannelOptions +from .internal.azurefunctions_grpc_interceptor import AzureFunctionsAsyncDefaultClientInterceptorImpl +from .internal.serialization import DEFAULT_FUNCTIONS_DATA_CONVERTER +from .http import HttpManagementPayload +from .internal.compat.durable_orchestration_status import DurableOrchestrationStatus +from .internal.compat.entity_state_response import EntityStateResponse +from .internal.compat.orchestration_runtime_status import OrchestrationRuntimeStatus, to_durabletask_statuses +from .internal.compat.purge_history_result import PurgeHistoryResult + + +# Client class used for Durable Functions +class DurableFunctionsClient(AsyncTaskHubGrpcClient): + """A gRPC client passed to Durable Functions durable client bindings. + + Connects to the Durable Functions runtime using async gRPC and provides methods + for creating and managing Durable orchestrations, interacting with Durable entities, + and creating HTTP management payloads and check status responses for use with Durable Functions invocations. + """ + taskHubName: str + connectionName: str + creationUrls: dict[str, str] + managementUrls: dict[str, str] + baseUrl: str + requiredQueryStringParameters: str + rpcBaseUrl: str + httpBaseUrl: str + maxGrpcMessageSizeInBytes: int + grpcHttpClientTimeout: timedelta + + def __init__(self, client_as_string: str): + """Initializes a DurableFunctionsClient instance from a JSON string. + + This string will be provided by the Durable Functions host extension upon invocation of the client trigger. + + Args: + client_as_string (str): A JSON string containing the Durable Functions client configuration. + + Raises: + json.JSONDecodeError: If the provided string is not valid JSON. + """ + self._parse_client_configuration(client_as_string) + + interceptors = [AzureFunctionsAsyncDefaultClientInterceptorImpl(self.taskHubName, self.requiredQueryStringParameters)] + + # Only override the gRPC message size limits when the host explicitly + # provides a value. When unset (0), we leave the gRPC library defaults + # in place rather than applying a large default of our own. + channel_options: GrpcChannelOptions | None = None + if self.maxGrpcMessageSizeInBytes > 0: + channel_options = GrpcChannelOptions( + max_receive_message_length=self.maxGrpcMessageSizeInBytes, + max_send_message_length=self.maxGrpcMessageSizeInBytes) + + # We pass in None for the metadata so we don't construct an additional interceptor in the parent class + # Since the parent class doesn't use anything metadata for anything else, we can set it as None + super().__init__( + host_address=self.rpcBaseUrl, + secure_channel=False, + metadata=None, + interceptors=interceptors, + channel_options=channel_options, + data_converter=DEFAULT_FUNCTIONS_DATA_CONVERTER) + + def _parse_client_configuration(self, client_as_string: str) -> None: + """Parses the client configuration JSON string and sets instance variables. + + Args: + client_as_string (str): A JSON string containing the Durable Functions client configuration. + + Raises: + json.JSONDecodeError: If the provided string is not valid JSON. + """ + client = json.loads(client_as_string) + + self.taskHubName = client.get("taskHubName", "") + self.connectionName = client.get("connectionName", "") + self.creationUrls = client.get("creationUrls", {}) + self.managementUrls = client.get("managementUrls", {}) + self.baseUrl = client.get("baseUrl", "") + self.requiredQueryStringParameters = client.get("requiredQueryStringParameters", "") + self.rpcBaseUrl = client.get("rpcBaseUrl", "") + self.httpBaseUrl = client.get("httpBaseUrl", "") + self.maxGrpcMessageSizeInBytes = client.get("maxGrpcMessageSizeInBytes", 0) + # TODO: convert the string value back to timedelta - annoying regex? + self.grpcHttpClientTimeout = client.get("grpcHttpClientTimeout", timedelta(seconds=30)) + + def create_check_status_response(self, request: func.HttpRequest, instance_id: str) -> func.HttpResponse: + """Creates an HTTP response for checking the status of a Durable Function instance. + + Args: + request (func.HttpRequest): The incoming HTTP request. + instance_id (str): The ID of the Durable Function instance. + """ + location_url = self._get_instance_status_url(request, instance_id) + return func.HttpResponse( + body=str(self._get_client_response_links(request, instance_id)), + status_code=202, + headers={ + 'content-type': 'application/json', + 'Location': location_url, + }, + ) + + def create_http_management_payload( + self, + request: func.HttpRequest | str | None = None, + instance_id: str | None = None) -> HttpManagementPayload: + """Creates an HTTP management payload for a Durable Function instance. + + Two call styles are supported: + + - ``create_http_management_payload(request, instance_id)`` (recommended): + builds the payload URLs relative to the incoming request's origin. + - ``create_http_management_payload(instance_id)`` (deprecated V1 style): + builds the payload URLs from the client binding's base URL when no + request is available. + + Args: + request (func.HttpRequest | str | None): The incoming HTTP request, or, + for backwards compatibility, the instance ID when called with a + single positional argument. + instance_id (str | None): The ID of the Durable Function instance. + """ + # Backwards-compatibility: v1 accepted a single positional ``instance_id``. + if instance_id is None and isinstance(request, str): + instance_id = request + request = None + if instance_id is None: + raise TypeError("instance_id is required") + resolved_request = request if isinstance(request, func.HttpRequest) else None + return self._get_client_response_links(resolved_request, instance_id) + + def _get_client_response_links(self, request: func.HttpRequest | None, instance_id: str) -> HttpManagementPayload: + instance_status_url = self._get_instance_status_url(request, instance_id) + return HttpManagementPayload(instance_id, instance_status_url, self.requiredQueryStringParameters) + + def _get_instance_status_url(self, request: func.HttpRequest | None, instance_id: str) -> str: + encoded_instance_id = quote(instance_id) + if request is not None: + request_url = urlparse(request.url) + location_url = f"{request_url.scheme}://{request_url.netloc}" + location_url = location_url + "/runtime/webhooks/durabletask/instances/" + encoded_instance_id + else: + # No request available (v1-style call): fall back to the base URL + # supplied in the client binding configuration. + base_url = self.baseUrl.rstrip("/") if self.baseUrl else "" + location_url = base_url + "/instances/" + encoded_instance_id + return location_url + + # ------------------------------------------------------------------ + # Backwards-compatibility shims for the v1 azure-functions-durable + # DurableOrchestrationClient API. These delegate to the durabletask + # AsyncTaskHubGrpcClient methods and are deprecated: new code should use + # the durabletask method names directly. + # ------------------------------------------------------------------ + + @deprecated("start_new is deprecated; use schedule_new_orchestration instead.") + async def start_new(self, + orchestration_function_name: str, + instance_id: Optional[str] = None, + client_input: Optional[Any] = None, + version: Optional[str] = None) -> str: + """Deprecated alias for :meth:`schedule_new_orchestration`.""" + return await self.schedule_new_orchestration( + orchestration_function_name, + input=client_input, + instance_id=instance_id, + version=version) + + @deprecated("get_status is deprecated; use get_orchestration_state instead.") + async def get_status( + self, + instance_id: str, + show_history: bool = False, + show_history_output: bool = False, + show_input: bool = False) -> DurableOrchestrationStatus: + """Deprecated alias for :meth:`get_orchestration_state`. + + Returns a :class:`DurableOrchestrationStatus` wrapping the durabletask + ``OrchestrationState`` for v1 back-compat. When the instance does not + exist, a falsy status is returned rather than ``None``. + + The ``show_history`` and ``show_history_output`` flags have no + equivalent in durabletask and are ignored; ``show_input`` maps to + ``fetch_payloads``. + """ + state = await self.get_orchestration_state(instance_id, fetch_payloads=show_input) + return DurableOrchestrationStatus.from_orchestration_state(state) + + @deprecated("get_status_all is deprecated; use get_all_orchestration_states instead.") + async def get_status_all(self) -> list[DurableOrchestrationStatus]: + """Deprecated alias for :meth:`get_all_orchestration_states`.""" + states = await self.get_all_orchestration_states() + return [DurableOrchestrationStatus.from_orchestration_state(state) for state in states] + + @deprecated("raise_event is deprecated; use raise_orchestration_event instead.") + async def raise_event( + self, + instance_id: str, + event_name: str, + event_data: Any = None, + task_hub_name: Optional[str] = None, + connection_name: Optional[str] = None) -> None: + """Deprecated alias for :meth:`raise_orchestration_event`. + + The ``task_hub_name`` and ``connection_name`` arguments have no + equivalent in durabletask and are ignored. + """ + await self.raise_orchestration_event(instance_id, event_name, data=event_data) + + @deprecated("terminate is deprecated; use terminate_orchestration instead.") + async def terminate(self, instance_id: str, reason: Optional[Any] = None) -> None: + """Deprecated alias for :meth:`terminate_orchestration`. + + The v1 ``reason`` maps to the durabletask ``output`` argument. + """ + await self.terminate_orchestration(instance_id, output=reason) + + @deprecated("purge_instance_history is deprecated; use purge_orchestration instead.") + async def purge_instance_history(self, instance_id: str) -> PurgeHistoryResult: + """Deprecated alias for :meth:`purge_orchestration`. + + Returns a :class:`PurgeHistoryResult` wrapping the durabletask + ``PurgeInstancesResult`` for v1 back-compat. + """ + result = await self.purge_orchestration(instance_id) + return PurgeHistoryResult.from_purge_result(result) + + @deprecated("suspend is deprecated; use suspend_orchestration instead.") + async def suspend(self, instance_id: str, reason: Optional[str] = None) -> None: + """Deprecated alias for :meth:`suspend_orchestration`. + + The v1 ``reason`` argument has no equivalent in durabletask and is + ignored. + """ + await self.suspend_orchestration(instance_id) + + @deprecated("resume is deprecated; use resume_orchestration instead.") + async def resume(self, instance_id: str, reason: Optional[str] = None) -> None: + """Deprecated alias for :meth:`resume_orchestration`. + + The v1 ``reason`` argument has no equivalent in durabletask and is + ignored. + """ + await self.resume_orchestration(instance_id) + + @deprecated("restart is deprecated; use restart_orchestration instead.") + async def restart( + self, + instance_id: str, + restart_with_new_instance_id: bool = True) -> str: + """Deprecated alias for :meth:`restart_orchestration`.""" + return await self.restart_orchestration( + instance_id, restart_with_new_instance_id=restart_with_new_instance_id) + + @deprecated("read_entity_state is deprecated; use get_entity instead.") + async def read_entity_state( + self, + entity_instance_id: EntityInstanceId, + task_hub_name: Optional[str] = None, + connection_name: Optional[str] = None) -> EntityStateResponse: + """Deprecated alias for :meth:`get_entity`. + + Returns an :class:`EntityStateResponse` wrapping the durabletask + ``EntityMetadata`` for v1 back-compat. + + The ``task_hub_name`` and ``connection_name`` arguments have no + equivalent in durabletask and are ignored. + """ + metadata = await self.get_entity(entity_instance_id) + return EntityStateResponse.from_entity_metadata(metadata) + + @deprecated("get_status_by is deprecated; use get_all_orchestration_states instead.") + async def get_status_by( + self, + created_time_from: Optional[datetime] = None, + created_time_to: Optional[datetime] = None, + runtime_status: Optional[list[OrchestrationRuntimeStatus]] = None) -> list[DurableOrchestrationStatus]: + """Deprecated alias for :meth:`get_all_orchestration_states`. + + The v1 ``OrchestrationRuntimeStatus`` values are mapped onto the + durabletask ``OrchestrationStatus`` enum, and results are wrapped in + :class:`DurableOrchestrationStatus` for v1 back-compat. + """ + query = OrchestrationQuery( + created_time_from=created_time_from, + created_time_to=created_time_to, + runtime_status=to_durabletask_statuses(runtime_status)) + states = await self.get_all_orchestration_states(query) + return [DurableOrchestrationStatus.from_orchestration_state(state) for state in states] + + @deprecated("purge_instance_history_by is deprecated; use purge_orchestrations_by instead.") + async def purge_instance_history_by( + self, + created_time_from: Optional[datetime] = None, + created_time_to: Optional[datetime] = None, + runtime_status: Optional[list[OrchestrationRuntimeStatus]] = None) -> PurgeHistoryResult: + """Deprecated alias for :meth:`purge_orchestrations_by`. + + The v1 ``OrchestrationRuntimeStatus`` values are mapped onto the + durabletask ``OrchestrationStatus`` enum, and the result is wrapped in + :class:`PurgeHistoryResult` for v1 back-compat. + """ + result = await self.purge_orchestrations_by( + created_time_from=created_time_from, + created_time_to=created_time_to, + runtime_status=to_durabletask_statuses(runtime_status)) + return PurgeHistoryResult.from_purge_result(result) + + async def signal_entity( + self, + entity_instance_id: EntityInstanceId, + operation_name: str, + input: Any = None, + signal_time: Optional[datetime] = None, + *, + operation_input: Any = None, + task_hub_name: Optional[str] = None, + connection_name: Optional[str] = None) -> None: + """Signal an entity to perform an operation. + + Accepts the durabletask ``input`` argument as well as the v1 + ``operation_input`` alias. The ``task_hub_name`` and ``connection_name`` + arguments have no equivalent in durabletask and are ignored. + """ + resolved_input = operation_input if operation_input is not None else input + await super().signal_entity( + entity_instance_id, operation_name, input=resolved_input, signal_time=signal_time) + + @deprecated( + "get_client_response_links is deprecated; use create_http_management_payload instead.") + def get_client_response_links( + self, + request: Optional[func.HttpRequest], + instance_id: str) -> HttpManagementPayload: + """Deprecated alias for :meth:`create_http_management_payload`.""" + return self._get_client_response_links(request, instance_id) + + @deprecated( + "wait_for_completion_or_create_check_status_response is deprecated; use " + "wait_for_orchestration_completion together with create_check_status_response instead.") + async def wait_for_completion_or_create_check_status_response( + self, + request: func.HttpRequest, + instance_id: str, + timeout_in_milliseconds: int = 10000, + retry_interval_in_milliseconds: int = 1000) -> func.HttpResponse: + """Wait for an orchestration to complete, or return a check-status response. + + If the orchestration completes within the timeout, an HTTP response + containing its output (or failure) is returned; otherwise a + check-status response is returned. + + The ``retry_interval_in_milliseconds`` argument has no durabletask + equivalent (durabletask waits server-side) and is ignored. + """ + if retry_interval_in_milliseconds > timeout_in_milliseconds: + raise Exception( + f'Total timeout {timeout_in_milliseconds} (ms) should be bigger than ' + f'retry timeout {retry_interval_in_milliseconds} (ms)') + + try: + state = await self.wait_for_orchestration_completion( + instance_id, timeout=timeout_in_milliseconds / 1000) + except TimeoutError: + return self.create_check_status_response(request, instance_id) + + if state is None: + return self.create_check_status_response(request, instance_id) + + if state.runtime_status == OrchestrationStatus.COMPLETED: + return self._create_http_response(200, state.serialized_output) + if state.runtime_status == OrchestrationStatus.TERMINATED: + return self._create_http_response(200, state.serialized_output) + if state.runtime_status == OrchestrationStatus.FAILED: + return self._create_http_response(500, state.serialized_output) + return self.create_check_status_response(request, instance_id) + + @deprecated( + "rewind is not yet supported in durabletask; this shim raises " + "NotImplementedError.") + async def rewind( + self, + instance_id: str, + reason: str, + task_hub_name: Optional[str] = None, + connection_name: Optional[str] = None) -> None: + """Not implemented: durabletask has no rewind equivalent yet.""" + raise NotImplementedError( + "rewind is not yet supported by durabletask.") + + @staticmethod + def _create_http_response(status_code: int, body: Union[str, Any]) -> func.HttpResponse: + body_as_json = body if isinstance(body, str) else json.dumps(body) + return func.HttpResponse( + status_code=status_code, + body=body_as_json, + mimetype="application/json", + headers={"Content-Type": "application/json"}) diff --git a/azure-functions-durable/azure/durable_functions/constants.py b/azure-functions-durable/azure/durable_functions/constants.py new file mode 100644 index 00000000..fbd268a7 --- /dev/null +++ b/azure-functions-durable/azure/durable_functions/constants.py @@ -0,0 +1,8 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Constants used to determine the local running context.""" +ORCHESTRATION_TRIGGER = "orchestrationTrigger" +ACTIVITY_TRIGGER = "activityTrigger" +ENTITY_TRIGGER = "entityTrigger" +DURABLE_CLIENT = "durableClient" diff --git a/azure-functions-durable/azure/durable_functions/decorators/__init__.py b/azure-functions-durable/azure/durable_functions/decorators/__init__.py new file mode 100644 index 00000000..59e481eb --- /dev/null +++ b/azure-functions-durable/azure/durable_functions/decorators/__init__.py @@ -0,0 +1,2 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. diff --git a/azure-functions-durable/azure/durable_functions/decorators/durable_app.py b/azure-functions-durable/azure/durable_functions/decorators/durable_app.py new file mode 100644 index 00000000..2b3473bb --- /dev/null +++ b/azure-functions-durable/azure/durable_functions/decorators/durable_app.py @@ -0,0 +1,308 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from functools import wraps +from typing import Any, Callable, Optional, Union + +import azure.functions as func +from azure.functions import FunctionRegister, TriggerApi, BindingApi, AuthLevel +from azure.functions.decorators.function_app import FunctionBuilder + +from durabletask import task + +from .metadata import OrchestrationTrigger, ActivityTrigger, EntityTrigger, \ + DurableClient +from ..client import DurableFunctionsClient +from ..worker import DurableFunctionsWorker +from ..orchestrator import Orchestrator + + +class Blueprint(TriggerApi, BindingApi): + """Durable Functions (DF) Blueprint container. + + It allows functions to be declared via trigger and binding decorators, + but does not automatically index/register these functions. + + To register these functions, utilize the `register_functions` method from any + :class:`FunctionRegister` subclass, such as `DFApp`. + """ + + def __init__(self, + http_auth_level: Union[AuthLevel, str] = AuthLevel.FUNCTION): + """Instantiate a Durable Functions app with which to register Functions. + + Parameters + ---------- + http_auth_level: Union[AuthLevel, str] + Authorization level required for Function invocation. + Defaults to AuthLevel.Function. + + Returns + ------- + DFApp + New instance of a Durable Functions app + """ + # The next-in-MRO base (``DecoratorApi.__init__``) is declared with + # untyped ``*args``/``**kwargs``, so pyright cannot see this call's type. + super().__init__(auth_level=http_auth_level) # pyright: ignore[reportUnknownMemberType] + + def _configure_orchestrator_callable( + self, + wrap: Callable[[Callable[..., Any]], FunctionBuilder], + input_type: Optional[type] = None + ) -> Callable[[task.Orchestrator[Any, Any]], FunctionBuilder]: + """Obtain decorator to construct an Orchestrator class from a user-defined Function. + + Parameters + ---------- + wrap: Callable + The next decorator to be applied. + input_type: Optional[type] + The expected type for orchestration input, forwarded from the + ``orchestration_trigger`` decorator so a v1-style + ``context.get_input()`` can decode the input to that type. + + Returns + ------- + Callable + The function to construct an Orchestrator class from the user-defined Function, + wrapped by the next decorator in the sequence. + """ + def decorator(orchestrator_func: task.Orchestrator[Any, Any]) -> FunctionBuilder: + # Construct an orchestrator based on the end-user code + + if input_type is not None: + # Stash the decorator-declared input type so the runtime can + # feed it to a v1-style ``context.get_input()``. + orchestrator_func._df_input_type = input_type # type: ignore[attr-defined] # noqa: E501 + + handle = Orchestrator.create(orchestrator_func) + + # invoke next decorator, with the Orchestrator as input + handle.__name__ = orchestrator_func.__name__ + return wrap(handle) + + return decorator + + def _configure_entity_callable( + self, + wrap: Callable[[Callable[..., Any]], FunctionBuilder] + ) -> Callable[[task.Entity[Any, Any]], FunctionBuilder]: + """Obtain decorator to construct an Entity class from a user-defined Function. + + Parameters + ---------- + wrap: Callable + The next decorator to be applied. + + Returns + ------- + Callable + The function to construct an Entity class from the user-defined Function, + wrapped by the next decorator in the sequence. + """ + def decorator(entity_func: task.Entity[Any, Any]) -> FunctionBuilder: + # Construct an orchestrator based on the end-user code + + # TODO: Because this handle method is the one actually exposed to the Functions SDK decorator, + # the parameter name will always be "context" here, even if the user specified a different name. + # We need to find a way to allow custom context names (like "ctx"). + # The generated handle is what the Azure Functions host registers, + # so its ``context`` parameter must be annotated with + # ``azure.functions.EntityContext`` for the host's entityTrigger + # binding converter to accept it; at runtime the host passes that + # transport context (exposing ``.body``). + def handle(context: func.EntityContext) -> str: + return DurableFunctionsWorker().execute_entity_batch_request(entity_func, context) + + handle.entity_function = entity_func # pyright: ignore[reportFunctionMemberAccess] + + # invoke next decorator, with the Entity as input + handle.__name__ = entity_func.__name__ + return wrap(handle) + + return decorator + + def _add_rich_client( + self, + fb: FunctionBuilder, + parameter_name: str, + client_constructor: Callable[[Any], Any] + ) -> None: + # Obtain user-code and force type annotation on the client-binding parameter to be `str`. + # This ensures a passing type-check of that specific parameter, + # circumventing a limitation of the worker in type-checking rich DF Client objects. + # TODO: Once rich-binding type checking is possible, remove the annotation change. + # ``FunctionBuilder._function`` and ``Function._func`` are private to + # azure-functions with no public accessor for mutating the wrapped + # user function. Holding it as ``Any`` keeps the single private-access + # waiver here rather than spreading it across each ``._func`` use. + function_obj: Any = fb._function # pyright: ignore[reportPrivateUsage] + user_code = function_obj._func + user_code.__annotations__[parameter_name] = str + + # `wraps` This ensures we re-export the same method-signature as the decorated method + @wraps(user_code) + async def df_client_middleware(*args: Any, **kwargs: Any) -> Any: + + # Obtain JSON-string currently passed as DF Client, + # construct rich object from it, + # and assign parameter to that rich object + starter = kwargs[parameter_name] + client = client_constructor(starter) + kwargs[parameter_name] = client + + # Invoke user code with rich DF Client binding + return await user_code(*args, **kwargs) + + # TODO: Is there a better way to support retrieving the unwrapped user code? + df_client_middleware.client_function = function_obj._func # pyright: ignore[reportAttributeAccessIssue] + + function_obj._func = df_client_middleware + + def _build_function( + self, + wrap: Callable[[FunctionBuilder], FunctionBuilder] + ) -> Callable[[Callable[..., Any]], FunctionBuilder]: + """Typed equivalent of the base ``_configure_function_builder``. + + The inherited method is untyped, which would otherwise propagate + ``Unknown`` types through every decorator below. This mirrors its + behaviour exactly using the typed protected members it relies on. + """ + def decorator(func: Callable[..., Any]) -> FunctionBuilder: + fb = self._validate_type(func) + self._function_builders.append(fb) + return wrap(fb) + + return decorator + + def orchestration_trigger(self, context_name: str, + orchestration: Optional[str] = None, + input_type: Optional[type] = None + ) -> Callable[[task.Orchestrator[Any, Any]], FunctionBuilder]: + """Register an Orchestrator Function. + + Parameters + ---------- + context_name: str + Parameter name of the DurableOrchestrationContext object. + orchestration: Optional[str] + Name of Orchestrator Function. + The value is None by default, in which case the name of the method is used. + input_type: Optional[type] + The expected type for the orchestration input. When set, a v1-style + ``context.get_input()`` decodes the input payload to this type. A + call-site ``expected_type`` argument on ``get_input`` takes + precedence over this value. + """ + @self._build_function + def wrap(fb: FunctionBuilder) -> FunctionBuilder: + + def decorator() -> FunctionBuilder: + fb.add_trigger( + trigger=OrchestrationTrigger(name=context_name, + orchestration=orchestration)) + return fb + + return decorator() + + return self._configure_orchestrator_callable(wrap, input_type=input_type) + + def activity_trigger(self, input_name: str, + activity: Optional[str] = None + ) -> Callable[[Callable[..., Any]], FunctionBuilder]: + """Register an Activity Function. + + Parameters + ---------- + input_name: str + Parameter name of the Activity input. + activity: Optional[str] + Name of Activity Function. + The value is None by default, in which case the name of the method is used. + """ + @self._build_function + def wrap(fb: FunctionBuilder) -> FunctionBuilder: + def decorator() -> FunctionBuilder: + fb.add_trigger( + trigger=ActivityTrigger(name=input_name, + activity=activity)) + return fb + + return decorator() + + return wrap + + def entity_trigger(self, + context_name: str, + entity_name: Optional[str] = None + ) -> Callable[[task.Entity[Any, Any]], FunctionBuilder]: + """Register an Entity Function. + + Parameters + ---------- + context_name: str + Parameter name of the Entity input. + entity_name: Optional[str] + Name of Entity Function. + The value is None by default, in which case the name of the method is used. + """ + @self._configure_entity_callable + @self._build_function + def wrap(fb: FunctionBuilder) -> FunctionBuilder: + def decorator() -> FunctionBuilder: + fb.add_trigger( + trigger=EntityTrigger(name=context_name, + entity_name=entity_name)) + return fb + + return decorator() + + return wrap + + def durable_client_input(self, + client_name: str, + task_hub: Optional[str] = None, + connection_name: Optional[str] = None + ) -> Callable[[Callable[..., Any]], FunctionBuilder]: + """Register a Durable-client Function. + + Parameters + ---------- + client_name: str + Parameter name of durable client. + task_hub: Optional[str] + Used in scenarios where multiple function apps share the same storage account + but need to be isolated from each other. If not specified, the default value + from host.json is used. + This value must match the value used by the target orchestrator functions. + connection_name: Optional[str] + The name of an app setting that contains a storage account connection string. + The storage account represented by this connection string must be the same one + used by the target orchestrator functions. If not specified, the default storage + account connection string for the function app is used. + """ + + @self._build_function + def wrap(fb: FunctionBuilder) -> FunctionBuilder: + def decorator() -> FunctionBuilder: + self._add_rich_client(fb, client_name, DurableFunctionsClient) + fb.add_binding( + binding=DurableClient(name=client_name, + task_hub=task_hub, + connection_name=connection_name)) + return fb + + return decorator() + + return wrap + + +class DFApp(Blueprint, FunctionRegister): + """Durable Functions (DF) app. + + Exports the decorators required to declare and index DF Function-types. + """ + + pass diff --git a/azure-functions-durable/azure/durable_functions/decorators/metadata.py b/azure-functions-durable/azure/durable_functions/decorators/metadata.py new file mode 100644 index 00000000..efe3983d --- /dev/null +++ b/azure-functions-durable/azure/durable_functions/decorators/metadata.py @@ -0,0 +1,118 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from typing import Optional + +from ..constants import ORCHESTRATION_TRIGGER, \ + ACTIVITY_TRIGGER, ENTITY_TRIGGER, DURABLE_CLIENT +from azure.functions.decorators.core import Trigger, InputBinding + + +class OrchestrationTrigger(Trigger): + """OrchestrationTrigger. + + Trigger representing an Orchestration Function. + """ + + @staticmethod + def get_binding_name() -> str: + """Get the name of this trigger, as a string. + + Returns + ------- + str + The string representation of this trigger. + """ + return ORCHESTRATION_TRIGGER + + def __init__(self, + name: str, + orchestration: Optional[str] = None, + durable_requires_grpc: bool = True, + ) -> None: + self.orchestration = orchestration + self.durable_requires_grpc = durable_requires_grpc + super().__init__(name=name) + + +class ActivityTrigger(Trigger): + """ActivityTrigger. + + Trigger representing a Durable Functions Activity. + """ + + @staticmethod + def get_binding_name() -> str: + """Get the name of this trigger, as a string. + + Returns + ------- + str + The string representation of this trigger. + """ + return ACTIVITY_TRIGGER + + def __init__(self, + name: str, + activity: Optional[str] = None, + durable_requires_grpc: bool = True, + ) -> None: + self.activity = activity + self.durable_requires_grpc = durable_requires_grpc + super().__init__(name=name) + + +class EntityTrigger(Trigger): + """EntityTrigger. + + Trigger representing an Entity Function. + """ + + @staticmethod + def get_binding_name() -> str: + """Get the name of this trigger, as a string. + + Returns + ------- + str + The string representation of this trigger. + """ + return ENTITY_TRIGGER + + def __init__(self, + name: str, + entity_name: Optional[str] = None, + durable_requires_grpc: bool = True, + ) -> None: + self.entity_name = entity_name + self.durable_requires_grpc = durable_requires_grpc + super().__init__(name=name) + + +class DurableClient(InputBinding): + """DurableClient. + + Binding representing a Durable-client object. + """ + + @staticmethod + def get_binding_name() -> str: + """Get the name of this Binding, as a string. + + Returns + ------- + str + The string representation of this binding. + """ + return DURABLE_CLIENT + + def __init__(self, + name: str, + task_hub: Optional[str] = None, + connection_name: Optional[str] = None, + durable_requires_grpc: bool = True, + ) -> None: + self.task_hub = task_hub + self.connection_name = connection_name + self.durable_requires_grpc = durable_requires_grpc + super().__init__(name=name) diff --git a/azure-functions-durable/azure/durable_functions/http/__init__.py b/azure-functions-durable/azure/durable_functions/http/__init__.py new file mode 100644 index 00000000..b4d2c355 --- /dev/null +++ b/azure-functions-durable/azure/durable_functions/http/__init__.py @@ -0,0 +1,6 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from ..http.http_management_payload import HttpManagementPayload + +__all__ = ["HttpManagementPayload"] diff --git a/azure-functions-durable/azure/durable_functions/http/http_management_payload.py b/azure-functions-durable/azure/durable_functions/http/http_management_payload.py new file mode 100644 index 00000000..8f33ce91 --- /dev/null +++ b/azure-functions-durable/azure/durable_functions/http/http_management_payload.py @@ -0,0 +1,50 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import json +from typing import Any + + +class HttpManagementPayload(dict[str, str]): + """A class representing the HTTP management payload for a Durable Function orchestration instance. + + Contains URLs for managing the instance, such as querying status, + sending events, terminating, restarting, etc. + + Subclasses ``dict`` for backwards compatibility with the v1 API, which + returned a plain ``dict``. As a result the payload supports mapping-style + access (``payload["statusQueryGetUri"]``, iteration, ``in``, + ``.keys()``/``.items()``/``.values()``) and is directly JSON-serializable + via ``json.dumps(payload)``. + """ + + def __init__(self, instance_id: str, instance_status_url: str, required_query_string_parameters: str): + """Initializes the HttpManagementPayload with the necessary URLs. + + Args: + instance_id (str): The ID of the Durable Function instance. + instance_status_url (str): The base URL for the instance status. + required_query_string_parameters (str): The required URL parameters provided by the Durable extension. + """ + super().__init__({ + 'id': instance_id, + 'purgeHistoryDeleteUri': instance_status_url + "?" + required_query_string_parameters, + 'restartPostUri': instance_status_url + "/restart?" + required_query_string_parameters, + 'sendEventPostUri': instance_status_url + "/raiseEvent/{eventName}?" + required_query_string_parameters, + 'statusQueryGetUri': instance_status_url + "?" + required_query_string_parameters, + 'terminatePostUri': instance_status_url + "/terminate?reason={text}&" + required_query_string_parameters, + 'resumePostUri': instance_status_url + "/resume?reason={text}&" + required_query_string_parameters, + 'suspendPostUri': instance_status_url + "/suspend?reason={text}&" + required_query_string_parameters + }) + + def __str__(self) -> str: + return json.dumps(self) + + @property + def urls(self) -> dict[str, Any]: + """Return the management URLs as a plain ``dict`` (v1 compatibility).""" + return dict(self) + + def to_json(self) -> dict[str, Any]: + """Return the management URLs as a plain ``dict``.""" + return dict(self) diff --git a/azure-functions-durable/azure/durable_functions/internal/__init__.py b/azure-functions-durable/azure/durable_functions/internal/__init__.py new file mode 100644 index 00000000..59e481eb --- /dev/null +++ b/azure-functions-durable/azure/durable_functions/internal/__init__.py @@ -0,0 +1,2 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. diff --git a/azure-functions-durable/azure/durable_functions/internal/azurefunctions_grpc_interceptor.py b/azure-functions-durable/azure/durable_functions/internal/azurefunctions_grpc_interceptor.py new file mode 100644 index 00000000..35373b20 --- /dev/null +++ b/azure-functions-durable/azure/durable_functions/internal/azurefunctions_grpc_interceptor.py @@ -0,0 +1,48 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from importlib.metadata import version + +from durabletask.internal.grpc_interceptor import ( + DefaultAsyncClientInterceptorImpl, + DefaultClientInterceptorImpl, +) + + +def _build_metadata(taskhub_name: str) -> list[tuple[str, str]]: + """Build the gRPC metadata headers sent on every Durable Functions call.""" + try: + # Get the version of the azurefunctions package + sdk_version = version('azure-functions-durable') + except Exception: + # Fallback if version cannot be determined + sdk_version = "unknown" + user_agent = f"durabletask-python/{sdk_version}" + return [ + ("taskhub", taskhub_name), + ("x-user-agent", user_agent)] # 'user-agent' is a reserved header in grpc, so we use 'x-user-agent' instead + + +class AzureFunctionsDefaultClientInterceptorImpl(DefaultClientInterceptorImpl): + """The class implements a UnaryUnaryClientInterceptor, UnaryStreamClientInterceptor, + StreamUnaryClientInterceptor and StreamStreamClientInterceptor from grpc to add an + interceptor to add additional headers to all calls as needed.""" + required_query_string_parameters: str + + def __init__(self, taskhub_name: str, required_query_string_parameters: str): + self.required_query_string_parameters = required_query_string_parameters + self._metadata = _build_metadata(taskhub_name) + super().__init__(self._metadata) + + +class AzureFunctionsAsyncDefaultClientInterceptorImpl(DefaultAsyncClientInterceptorImpl): + """Async version of AzureFunctionsDefaultClientInterceptorImpl for use with grpc.aio channels. + + This class implements async gRPC interceptors to add Durable Functions headers + (task hub name and user agent) to all async calls.""" + required_query_string_parameters: str + + def __init__(self, taskhub_name: str, required_query_string_parameters: str): + self.required_query_string_parameters = required_query_string_parameters + self._metadata = _build_metadata(taskhub_name) + super().__init__(self._metadata) diff --git a/azure-functions-durable/azure/durable_functions/internal/azurefunctions_null_stub.py b/azure-functions-durable/azure/durable_functions/internal/azurefunctions_null_stub.py new file mode 100644 index 00000000..af8593d1 --- /dev/null +++ b/azure-functions-durable/azure/durable_functions/internal/azurefunctions_null_stub.py @@ -0,0 +1,23 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from typing import Any, Callable + + +class AzureFunctionsNullStub: + """A task hub sidecar stub whose every method is a no-op. + + Instances structurally satisfy the methods of + ``ProtoTaskHubSidecarServiceStub`` without inheriting from that + ``Protocol`` (a ``Protocol`` subclass cannot be instantiated). Any + attribute access resolves to a callable that ignores its arguments and + returns ``None``, which is sufficient because the Azure Functions worker + replaces the relevant completion callbacks before invoking the base + worker logic. + """ + + def __getattr__(self, name: str) -> Callable[..., None]: + def _noop(*args: Any, **kwargs: Any) -> None: + return None + + return _noop diff --git a/azure-functions-durable/azure/durable_functions/internal/compat/__init__.py b/azure-functions-durable/azure/durable_functions/internal/compat/__init__.py new file mode 100644 index 00000000..72b690df --- /dev/null +++ b/azure-functions-durable/azure/durable_functions/internal/compat/__init__.py @@ -0,0 +1,8 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Backwards-compatibility shims for the v1 azure-functions-durable API. + +The public names defined here are re-exported from ``azure.durable_functions``; +import them from there rather than from this internal package. +""" diff --git a/azure-functions-durable/azure/durable_functions/internal/compat/compat_aliases.py b/azure-functions-durable/azure/durable_functions/internal/compat/compat_aliases.py new file mode 100644 index 00000000..ddfcf923 --- /dev/null +++ b/azure-functions-durable/azure/durable_functions/internal/compat/compat_aliases.py @@ -0,0 +1,31 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from typing import Any + +from typing_extensions import deprecated + +from ...client import DurableFunctionsClient + + +@deprecated( + "DurableOrchestrationClient is deprecated; use DurableFunctionsClient instead.") +class DurableOrchestrationClient(DurableFunctionsClient): + """Deprecated alias for :class:`DurableFunctionsClient`.""" + + +@deprecated( + "The Entity class is deprecated and unsupported in v2; register entities " + "with the entity_trigger decorator instead.") +class Entity: + """Deprecated placeholder for the v1 ``Entity`` executor class. + + Entities in v2 are registered with the ``entity_trigger`` decorator and + executed by the durabletask worker; there is no user-facing ``Entity`` + class. This placeholder is retained only so existing imports do not fail. + """ + + def __init__(self, *args: Any, **kwargs: Any): + raise NotImplementedError( + "The Entity class is not supported in v2. Register entities with " + "the entity_trigger decorator instead.") diff --git a/azure-functions-durable/azure/durable_functions/internal/compat/durable_orchestration_status.py b/azure-functions-durable/azure/durable_functions/internal/compat/durable_orchestration_status.py new file mode 100644 index 00000000..a1e7f9af --- /dev/null +++ b/azure-functions-durable/azure/durable_functions/internal/compat/durable_orchestration_status.py @@ -0,0 +1,180 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import json +from datetime import datetime +from typing import Any, Optional, cast + +from durabletask.client import OrchestrationState, OrchestrationStatus + +from .orchestration_runtime_status import ( + OrchestrationRuntimeStatus, + from_durabletask_status, + to_durabletask_status, +) + + +class DurableOrchestrationStatus: + """Represents the status of a durable orchestration instance. + + Backwards-compatible wrapper around the durabletask + :class:`~durabletask.client.OrchestrationState`. It exposes the v1 + ``DurableOrchestrationStatus`` attribute surface so existing code that reads + ``status.runtime_status``, ``status.output``, ``status.input_``, etc. keeps + working. New code should use ``OrchestrationState`` directly. + + A status wrapping ``None`` (i.e. a non-existent instance) is falsy, matching + the v1 behaviour where ``get_status`` never returned ``None``. + """ + + def __init__(self, state: Optional[OrchestrationState] = None): + self._state = state + + @classmethod + def from_orchestration_state( + cls, state: Optional[OrchestrationState]) -> "DurableOrchestrationStatus": + """Wrap a durabletask ``OrchestrationState`` (or ``None``).""" + return cls(state) + + @classmethod + def from_json(cls, json_obj: Any) -> "DurableOrchestrationStatus": + """Reconstruct a status from its v1 JSON representation. + + Accepts the dictionary produced by :meth:`to_json` (or the equivalent v1 + schema); a JSON string is parsed first. The wrapped + ``OrchestrationState`` is rebuilt so the resulting object exposes the + same attribute surface as one returned by the client. + """ + if isinstance(json_obj, str): + json_obj = json.loads(json_obj) + data = dict(json_obj) + + runtime_status = data.get("runtimeStatus") + dt_status = ( + to_durabletask_status(OrchestrationRuntimeStatus(runtime_status)) + if runtime_status is not None else None) + + def _parse_datetime(value: Any) -> Any: + return datetime.fromisoformat(value) if isinstance(value, str) else value + + def _reserialize(value: Any) -> Optional[str]: + return None if value is None else json.dumps(value) + + state = OrchestrationState( + instance_id=cast(str, data.get("instanceId")), + name=cast(str, data.get("name")), + runtime_status=cast(OrchestrationStatus, dt_status), + created_at=cast(datetime, _parse_datetime(data.get("createdTime"))), + last_updated_at=cast(datetime, _parse_datetime(data.get("lastUpdatedTime"))), + serialized_input=_reserialize(data.get("input")), + serialized_output=_reserialize(data.get("output")), + serialized_custom_status=_reserialize(data.get("customStatus")), + failure_details=None, + ) + return cls(state) + + def __bool__(self) -> bool: + return self._state is not None + + @property + def orchestration_state(self) -> Optional[OrchestrationState]: + """Get the underlying durabletask ``OrchestrationState`` (or ``None``).""" + return self._state + + @property + def name(self) -> Optional[str]: + """Get the orchestrator function name.""" + return self._state.name if self._state is not None else None + + @property + def instance_id(self) -> Optional[str]: + """Get the unique ID of the instance.""" + return self._state.instance_id if self._state is not None else None + + @property + def created_time(self) -> Optional[datetime]: + """Get the time at which the orchestration instance was created.""" + return self._state.created_at if self._state is not None else None + + @property + def last_updated_time(self) -> Optional[datetime]: + """Get the time at which the orchestration instance last updated.""" + return self._state.last_updated_at if self._state is not None else None + + @property + def input_(self) -> Any: + """Get the (deserialized) input of the orchestration instance.""" + return self._state.get_input() if self._state is not None else None + + @property + def output(self) -> Any: + """Get the (deserialized) output of the orchestration instance.""" + return self._state.get_output() if self._state is not None else None + + @property + def runtime_status(self) -> Optional[OrchestrationRuntimeStatus]: + """Get the runtime status as a v1 ``OrchestrationRuntimeStatus``.""" + if self._state is None: + return None + return from_durabletask_status(self._state.runtime_status) + + @property + def custom_status(self) -> Any: + """Get the (deserialized) custom status payload, if any.""" + return self._state.get_custom_status() if self._state is not None else None + + @property + def history(self) -> Optional[list[Any]]: + """Get the execution history. + + History is not available through this compatibility path and is always + ``None``; use ``get_orchestration_history`` on the client instead. + """ + return None + + def to_json(self) -> dict[str, Any]: + """Convert this status into a v1-compatible JSON dictionary. + + Payload fields (``output``, ``input``, ``customStatus``) are emitted as + their raw JSON representation rather than the reconstructed Python + objects, so the result is always JSON-serializable even when the + orchestration payloads are custom types. + """ + result: dict[str, Any] = {} + if self.name is not None: + result["name"] = self.name + if self.instance_id is not None: + result["instanceId"] = self.instance_id + if self.created_time is not None: + result["createdTime"] = self.created_time.isoformat() + if self.last_updated_time is not None: + result["lastUpdatedTime"] = self.last_updated_time.isoformat() + output = self._raw_payload( + self._state.serialized_output if self._state is not None else None) + if output is not None: + result["output"] = output + input_ = self._raw_payload( + self._state.serialized_input if self._state is not None else None) + if input_ is not None: + result["input"] = input_ + if self.runtime_status is not None: + result["runtimeStatus"] = self.runtime_status.name + custom_status = self._raw_payload( + self._state.serialized_custom_status if self._state is not None else None) + if custom_status is not None: + result["customStatus"] = custom_status + return result + + @staticmethod + def _raw_payload(serialized: Optional[str]) -> Any: + """Parse a serialized payload as plain JSON without reconstructing types. + + Returns the parsed JSON value (which is always JSON-serializable), or the + original string if it is not valid JSON, or ``None`` when absent. + """ + if serialized is None: + return None + try: + return json.loads(serialized) + except (TypeError, ValueError): + return serialized diff --git a/azure-functions-durable/azure/durable_functions/internal/compat/entity_context.py b/azure-functions-durable/azure/durable_functions/internal/compat/entity_context.py new file mode 100644 index 00000000..d2043a3f --- /dev/null +++ b/azure-functions-durable/azure/durable_functions/internal/compat/entity_context.py @@ -0,0 +1,116 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from typing import Any, Callable, Optional + +from durabletask.entities import EntityContext + +from .orchestration_context import accepts_two_positional_args + + +class DurableEntityContext: + """Azure Functions-style entity context (v1-compatible). + + Wraps a durabletask :class:`~durabletask.entities.EntityContext` (and the + operation input) and exposes the v1 ``DurableEntityContext`` API. It is + delivered to one-argument entity functions (``def entity(context):``). + durabletask-native two-argument entity functions + (``def entity(ctx, input):``) and class-based entities are used directly. + """ + + def __init__(self, ctx: EntityContext, operation_input: Any = None): + self._ctx = ctx + self._input = operation_input + self._result: Any = None + + # -- identity ------------------------------------------------------------ + @property + def entity_name(self) -> str: + """Get the entity name.""" + return self._ctx.entity_id.entity + + @property + def entity_key(self) -> str: + """Get the entity key.""" + return self._ctx.entity_id.key + + @property + def operation_name(self) -> str: + """Get the current operation name.""" + return self._ctx.operation + + @property + def is_newly_constructed(self) -> bool: + """Whether the entity was newly constructed. + + The v1 semantics of this flag were unspecified; it is always ``False``. + """ + return False + + # -- input / state / result --------------------------------------------- + def get_input(self, expected_type: Optional[type] = None) -> Any: + """Get the input for the current operation. + + ``expected_type`` is accepted for v1 compatibility but the input is + already deserialized by durabletask, so it is returned as-is. + """ + return self._input + + def get_state(self, + initializer: Optional[Callable[[], Any]] = None, + expected_type: Optional[type] = None) -> Any: + """Get the current state of the entity. + + Parameters + ---------- + initializer : Optional[Callable[[], Any]] + A zero-argument callable providing the initial state when no state + exists yet. + expected_type : Optional[type] + Optional type used to reconstruct the state. + """ + default = initializer() if callable(initializer) else None + return self._ctx.get_state(expected_type, default) + + def set_state(self, state: Any) -> None: + """Set the state of the entity.""" + self._ctx.set_state(state) + + def set_result(self, result: Any) -> None: + """Set the result (return value) of the current operation.""" + self._result = result + + def resolve_result(self, fallback: Any) -> Any: + """Return the value set via :meth:`set_result`, or ``fallback`` if unset.""" + return self._result if self._result is not None else fallback + + def destruct_on_exit(self) -> None: + """Delete this entity after the operation completes.""" + self._ctx.set_state(None) + + +def wrap_entity(fn: Callable[..., Any]) -> Callable[..., Any]: + """Adapt a v1-style one-argument entity function to durabletask's ``(ctx, input)``. + + Class-based entities and durabletask-native two-argument entity functions + are returned unchanged. For a wrapped v1 entity, the operation result is + taken from ``context.set_result(...)`` (falling back to the function's + return value). + """ + if isinstance(fn, type): + # Class-based entity: handled natively by durabletask. + return fn + if accepts_two_positional_args(fn): + # durabletask-native (ctx, input) entity function. + return fn + + def _wrapper(ctx: EntityContext, _input: Any = None) -> Any: + adapter = DurableEntityContext(ctx, _input) + returned = fn(adapter) + return adapter.resolve_result(returned) + + _wrapper.__name__ = getattr(fn, "__name__", "entity") + durable_entity_name = getattr(fn, "__durable_entity_name__", None) + if durable_entity_name is not None: + _wrapper.__durable_entity_name__ = durable_entity_name # type: ignore[attr-defined] + return _wrapper diff --git a/azure-functions-durable/azure/durable_functions/internal/compat/entity_id.py b/azure-functions-durable/azure/durable_functions/internal/compat/entity_id.py new file mode 100644 index 00000000..7f296ae1 --- /dev/null +++ b/azure-functions-durable/azure/durable_functions/internal/compat/entity_id.py @@ -0,0 +1,40 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from typing_extensions import deprecated + +from durabletask.entities import EntityInstanceId + + +@deprecated( + "EntityId is deprecated; use durabletask.entities.EntityInstanceId instead.") +class EntityId(EntityInstanceId): + """Backwards-compatible shim for the v1 ``EntityId`` class. + + Identifies an entity by its name and key. New code should use + :class:`durabletask.entities.EntityInstanceId`. + """ + + def __init__(self, name: str, key: str): + """Instantiate an EntityId object. + + Args: + name (str): The entity name. + key (str): The entity key. + """ + super().__init__(entity=name, key=key) + + @property + def name(self) -> str: + """Get the entity name (v1 alias for ``entity``).""" + return self.entity + + @staticmethod + def get_scheduler_id(entity_id: EntityInstanceId) -> str: + """Produce a scheduler ID string (``@name@key``) from an entity ID.""" + return str(entity_id) + + @staticmethod + def get_entity_id(scheduler_id: str) -> EntityInstanceId: + """Return an entity ID from a scheduler ID string (``@name@key``).""" + return EntityInstanceId.parse(scheduler_id) diff --git a/azure-functions-durable/azure/durable_functions/internal/compat/entity_state_response.py b/azure-functions-durable/azure/durable_functions/internal/compat/entity_state_response.py new file mode 100644 index 00000000..dd593d39 --- /dev/null +++ b/azure-functions-durable/azure/durable_functions/internal/compat/entity_state_response.py @@ -0,0 +1,41 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from typing import Any, Optional + +from durabletask.entities.entity_metadata import EntityMetadata + + +class EntityStateResponse: + """Entity state response object for ``read_entity_state``. + + Backwards-compatible wrapper around the durabletask + :class:`~durabletask.entities.entity_metadata.EntityMetadata`. New code + should use ``get_entity`` and the returned ``EntityMetadata`` directly. + """ + + def __init__(self, entity_exists: bool, entity_state: Any = None): + self._entity_exists = entity_exists + self._entity_state = entity_state + + @classmethod + def from_entity_metadata( + cls, metadata: Optional[EntityMetadata]) -> "EntityStateResponse": + """Build a response from a durabletask ``EntityMetadata`` (or ``None``).""" + if metadata is None: + return cls(False) + state = metadata.get_typed_state() if metadata.includes_state else None + return cls(True, state) + + @property + def entity_exists(self) -> bool: + """Get the bool representing whether the entity exists.""" + return self._entity_exists + + @property + def entity_state(self) -> Any: + """Get the state of the entity. + + When ``entity_exists`` is ``False``, this value is ``None``. + """ + return self._entity_state diff --git a/azure-functions-durable/azure/durable_functions/internal/compat/orchestration_context.py b/azure-functions-durable/azure/durable_functions/internal/compat/orchestration_context.py new file mode 100644 index 00000000..294fe755 --- /dev/null +++ b/azure-functions-durable/azure/durable_functions/internal/compat/orchestration_context.py @@ -0,0 +1,271 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import inspect +from datetime import datetime, timezone +from typing import Any, Callable, Generator, Optional, cast +from uuid import UUID + +from durabletask import task +from durabletask.entities import EntityInstanceId +from durabletask.task import OrchestrationContext, RetryPolicy, Task + +from ..serialization import DEFAULT_FUNCTIONS_DATA_CONVERTER +from .token_source import TokenSource + + +class DurableOrchestrationContext: + """Azure Functions-style orchestration context (v1-compatible). + + Wraps a durabletask :class:`~durabletask.task.OrchestrationContext` (and the + orchestration input) and exposes the v1 ``DurableOrchestrationContext`` API. + It is delivered to one-argument orchestrator functions + (``def orchestrator(context):``); durabletask-native two-argument + orchestrators (``def orchestrator(ctx, input):``) receive the durabletask + context directly instead. + """ + + def __init__(self, + ctx: OrchestrationContext, + orchestration_input: Any = None, + input_type: Optional[type] = None): + self._ctx = ctx + self._input = orchestration_input + self._input_type = input_type + self._custom_status: Any = None + self._will_continue_as_new = False + + # -- input --------------------------------------------------------------- + def get_input(self, expected_type: Optional[type] = None) -> Any: + """Get the orchestration input. + + When an ``expected_type`` (or the ``input_type`` declared on the + ``orchestration_trigger`` decorator) is available, the already-decoded + input is coerced to that type; otherwise the raw value is returned. + """ + resolved_type = expected_type or self._input_type + if resolved_type is None: + return self._input + return DEFAULT_FUNCTIONS_DATA_CONVERTER.coerce(self._input, resolved_type) + + # -- properties ---------------------------------------------------------- + @property + def instance_id(self) -> str: + """Get the ID of the current orchestration instance.""" + return self._ctx.instance_id + + @property + def is_replaying(self) -> bool: + """Get whether the orchestrator is currently replaying.""" + return self._ctx.is_replaying + + @property + def current_utc_datetime(self) -> datetime: + """Get the replay-safe current UTC date/time. + + Returned as a timezone-aware (UTC) datetime for v1 compatibility; + durabletask exposes a naive UTC datetime. + """ + value = self._ctx.current_utc_datetime + if value.tzinfo is None: + return value.replace(tzinfo=timezone.utc) + return value + + @property + def custom_status(self) -> Any: + """Get the custom status set during this execution (or ``None``).""" + return self._custom_status + + @property + def will_continue_as_new(self) -> bool: + """Whether :meth:`continue_as_new` has been called in this execution.""" + return self._will_continue_as_new + + @property + def version(self) -> Optional[str]: + """Get the version assigned to the orchestration instance (or ``None``).""" + return self._ctx.version + + @property + def parent_instance_id(self) -> str: + """Get the ID of the parent orchestration. + + Not available: durabletask does not currently surface the parent + instance ID on the orchestration context. + """ + raise NotImplementedError( + "parent_instance_id is not currently exposed by durabletask.") + + @property + def function_context(self) -> Any: + """Get the Azure Functions-level context. + + Not available: durabletask does not provide the v1 ``FunctionContext`` + binding metadata. + """ + raise NotImplementedError( + "function_context is not available in this SDK.") + + @property + def histories(self) -> Any: + """Get the running history of scheduled tasks. + + Not available: durabletask manages orchestration history internally and + does not expose it on the context. + """ + raise NotImplementedError( + "histories is not exposed by durabletask; use the client's " + "get_orchestration_history instead.") + + # -- activities ---------------------------------------------------------- + def call_activity(self, name: Callable[..., Any] | str, input_: Any = None) -> Task[Any]: + """Schedule an activity function for execution.""" + return self._ctx.call_activity(name, input=input_) + + def call_activity_with_retry(self, + name: Callable[..., Any] | str, + retry_options: RetryPolicy, + input_: Any = None) -> Task[Any]: + """Schedule an activity function for execution, with retries.""" + return self._ctx.call_activity(name, input=input_, retry_policy=retry_options) + + # -- sub-orchestrators --------------------------------------------------- + def call_sub_orchestrator(self, + name: Callable[..., Any] | str, + input_: Any = None, + instance_id: Optional[str] = None) -> Task[Any]: + """Schedule a sub-orchestrator function for execution.""" + return self._ctx.call_sub_orchestrator(name, input=input_, instance_id=instance_id) + + def call_sub_orchestrator_with_retry(self, + name: Callable[..., Any] | str, + retry_options: RetryPolicy, + input_: Any = None, + instance_id: Optional[str] = None) -> Task[Any]: + """Schedule a sub-orchestrator function for execution, with retries.""" + return self._ctx.call_sub_orchestrator( + name, input=input_, instance_id=instance_id, retry_policy=retry_options) + + # -- timers and events --------------------------------------------------- + def create_timer(self, fire_at: datetime) -> Task[Any]: + """Create a durable timer that fires at the specified time.""" + return self._ctx.create_timer(fire_at) + + def wait_for_external_event(self, + name: str, + expected_type: Optional[type] = None) -> Task[Any]: + """Wait for an external event with the given name.""" + return self._ctx.wait_for_external_event(name, data_type=expected_type) + + # -- control ------------------------------------------------------------- + def continue_as_new(self, input_: Any) -> None: + """Restart the orchestration with a new input.""" + self._will_continue_as_new = True + self._ctx.continue_as_new(input_) + + def set_custom_status(self, status: Any) -> None: + """Set the orchestration's custom status payload.""" + self._custom_status = status + self._ctx.set_custom_status(status) + + # -- deterministic IDs --------------------------------------------------- + def new_uuid(self) -> str: + """Create a new replay-safe UUID string.""" + return self._ctx.new_uuid() + + def new_guid(self) -> UUID: + """Create a new replay-safe UUID.""" + return UUID(self._ctx.new_uuid()) + + # -- fan-out / fan-in ---------------------------------------------------- + def task_all(self, tasks: list[Task[Any]]) -> Task[Any]: + """Schedule all tasks and complete when all of them complete.""" + return task.when_all(tasks) + + def task_any(self, tasks: list[Task[Any]]) -> Task[Any]: + """Schedule all tasks and complete when the first one completes.""" + return task.when_any(tasks) + + # -- entities ------------------------------------------------------------ + def call_entity(self, + entityId: EntityInstanceId, + operationName: str, + operationInput: Any = None) -> Task[Any]: + """Call an entity operation and get its result.""" + return self._ctx.call_entity(entityId, operationName, operationInput) + + def signal_entity(self, + entityId: EntityInstanceId, + operationName: str, + operationInput: Any = None) -> None: + """Signal an entity operation (fire and forget).""" + self._ctx.signal_entity(entityId, operationName, input=operationInput) + + # -- durable HTTP (not yet supported) ------------------------------------ + def call_http(self, + method: str, + uri: str, + content: Optional[str] = None, + headers: Optional[dict[str, str]] = None, + token_source: Optional[TokenSource] = None, + is_raw_str: bool = False) -> Any: + """Schedule a durable HTTP call (v1 API). + + Not yet supported: durabletask has no durable-HTTP (``call_http``) + equivalent, so this raises ``NotImplementedError``. + """ + raise NotImplementedError( + "call_http is not yet supported by durabletask. The durable-HTTP " + "API (and its TokenSource auth) has no durabletask equivalent yet.") + + +def accepts_two_positional_args(fn: Callable[..., Any]) -> bool: + """Return True if ``fn`` can be called with two positional args ``(ctx, input)``. + + Two-argument functions are treated as durabletask-native orchestrators; + single-argument functions are treated as Azure Functions / v1-style + orchestrators that receive a wrapped :class:`DurableOrchestrationContext`. + """ + try: + sig = inspect.signature(fn) + except (TypeError, ValueError): + # Can't introspect -> assume durabletask-native and pass through. + return True + + positional = 0 + for param in sig.parameters.values(): + if param.kind in (param.POSITIONAL_ONLY, param.POSITIONAL_OR_KEYWORD): + positional += 1 + elif param.kind == param.VAR_POSITIONAL: + return True + return positional >= 2 + + +def wrap_orchestrator(fn: Callable[..., Any]) -> Callable[..., Any]: + """Adapt a v1-style one-argument orchestrator to durabletask's ``(ctx, input)``. + + Two-argument (durabletask-native) orchestrators are returned unchanged. The + returned wrapper deliberately does not set ``__wrapped__`` so durabletask + introspects the wrapper's own ``(context, _input)`` signature (and thus + passes the raw input) rather than the wrapped function's signature. + """ + if accepts_two_positional_args(fn): + return fn + + input_type = getattr(fn, "_df_input_type", None) + name = getattr(fn, "__name__", "orchestrator") + + if inspect.isgeneratorfunction(fn): + def _generator_wrapper(context: OrchestrationContext, _input: Any = None) -> Any: + adapter = DurableOrchestrationContext(context, _input, input_type) + generator = cast("Generator[Any, Any, Any]", fn(adapter)) + result: Any = yield from generator + return result + _generator_wrapper.__name__ = name + return _generator_wrapper + + def _wrapper(context: OrchestrationContext, _input: Any = None) -> Any: + adapter = DurableOrchestrationContext(context, _input, input_type) + return fn(adapter) + _wrapper.__name__ = name + return _wrapper diff --git a/azure-functions-durable/azure/durable_functions/internal/compat/orchestration_runtime_status.py b/azure-functions-durable/azure/durable_functions/internal/compat/orchestration_runtime_status.py new file mode 100644 index 00000000..71b595d4 --- /dev/null +++ b/azure-functions-durable/azure/durable_functions/internal/compat/orchestration_runtime_status.py @@ -0,0 +1,101 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from enum import Enum + +from durabletask.client import OrchestrationStatus + + +class OrchestrationRuntimeStatus(Enum): + """The status of an orchestration instance. + + Backwards-compatible enum matching the v1 ``OrchestrationRuntimeStatus`` + values. New code should use :class:`durabletask.client.OrchestrationStatus`. + """ + + Running = 'Running' + """The orchestration instance has started running.""" + + Completed = 'Completed' + """The orchestration instance has completed normally.""" + + ContinuedAsNew = 'ContinuedAsNew' + """The orchestration instance has restarted itself with a new history. + + This is a transient state. + """ + + Failed = 'Failed' + """The orchestration instance failed with an error.""" + + Canceled = 'Canceled' + """The orchestration was canceled gracefully.""" + + Terminated = 'Terminated' + """The orchestration instance was stopped abruptly.""" + + Pending = 'Pending' + """The orchestration instance has been scheduled but has not yet started running.""" + + Suspended = 'Suspended' + """The orchestration instance has been suspended and may go back to running at a later time.""" + + +# Maps the v1 OrchestrationRuntimeStatus members onto the durabletask +# OrchestrationStatus enum. ``Canceled`` has no durabletask equivalent. +_TO_DURABLETASK_STATUS: dict[OrchestrationRuntimeStatus, OrchestrationStatus] = { + OrchestrationRuntimeStatus.Running: OrchestrationStatus.RUNNING, + OrchestrationRuntimeStatus.Completed: OrchestrationStatus.COMPLETED, + OrchestrationRuntimeStatus.ContinuedAsNew: OrchestrationStatus.CONTINUED_AS_NEW, + OrchestrationRuntimeStatus.Failed: OrchestrationStatus.FAILED, + OrchestrationRuntimeStatus.Terminated: OrchestrationStatus.TERMINATED, + OrchestrationRuntimeStatus.Pending: OrchestrationStatus.PENDING, + OrchestrationRuntimeStatus.Suspended: OrchestrationStatus.SUSPENDED, +} + + +def to_durabletask_status(status: "OrchestrationRuntimeStatus") -> OrchestrationStatus: + """Convert a v1 ``OrchestrationRuntimeStatus`` to a durabletask ``OrchestrationStatus``. + + Raises + ------ + ValueError + If the status has no durabletask equivalent (e.g. ``Canceled``). + """ + try: + return _TO_DURABLETASK_STATUS[status] + except KeyError: + raise ValueError( + f"OrchestrationRuntimeStatus.{status.name} has no durabletask " + "OrchestrationStatus equivalent.") + + +def to_durabletask_statuses( + statuses: "list[OrchestrationRuntimeStatus] | None") -> "list[OrchestrationStatus] | None": + """Convert a list of v1 statuses to durabletask statuses, preserving ``None``.""" + if statuses is None: + return None + return [to_durabletask_status(status) for status in statuses] + + +# Reverse mapping: durabletask OrchestrationStatus -> v1 OrchestrationRuntimeStatus. +# Every durabletask status has a v1 equivalent (``Canceled`` is v1-only). +_FROM_DURABLETASK_STATUS: dict[OrchestrationStatus, OrchestrationRuntimeStatus] = { + durabletask_status: v1_status + for v1_status, durabletask_status in _TO_DURABLETASK_STATUS.items() +} + + +def from_durabletask_status(status: OrchestrationStatus) -> "OrchestrationRuntimeStatus": + """Convert a durabletask ``OrchestrationStatus`` to a v1 ``OrchestrationRuntimeStatus``. + + Raises + ------ + ValueError + If the status has no v1 equivalent. + """ + try: + return _FROM_DURABLETASK_STATUS[status] + except KeyError: + raise ValueError( + f"OrchestrationStatus {status} has no v1 OrchestrationRuntimeStatus equivalent.") diff --git a/azure-functions-durable/azure/durable_functions/internal/compat/purge_history_result.py b/azure-functions-durable/azure/durable_functions/internal/compat/purge_history_result.py new file mode 100644 index 00000000..adf3dd06 --- /dev/null +++ b/azure-functions-durable/azure/durable_functions/internal/compat/purge_history_result.py @@ -0,0 +1,34 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from typing import Any + +from durabletask.client import PurgeInstancesResult + + +class PurgeHistoryResult: + """Information provided when a request to purge history has been made. + + Backwards-compatible wrapper around the durabletask + :class:`~durabletask.client.PurgeInstancesResult`. New code should use + ``PurgeInstancesResult`` directly (note the attribute is + ``deleted_instance_count`` there). + """ + + def __init__(self, instances_deleted: int): + self._instances_deleted = instances_deleted + + @classmethod + def from_purge_result(cls, result: PurgeInstancesResult) -> "PurgeHistoryResult": + """Wrap a durabletask ``PurgeInstancesResult``.""" + return cls(result.deleted_instance_count) + + @classmethod + def from_json(cls, json_obj: "dict[str, Any]") -> "PurgeHistoryResult": + """Reconstruct a result from its v1 JSON representation.""" + return cls(instances_deleted=json_obj["instancesDeleted"]) + + @property + def instances_deleted(self) -> int: + """Get the number of deleted instances.""" + return self._instances_deleted diff --git a/azure-functions-durable/azure/durable_functions/internal/compat/retry_options.py b/azure-functions-durable/azure/durable_functions/internal/compat/retry_options.py new file mode 100644 index 00000000..0a615513 --- /dev/null +++ b/azure-functions-durable/azure/durable_functions/internal/compat/retry_options.py @@ -0,0 +1,53 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from datetime import timedelta + +from typing_extensions import deprecated + +from durabletask.task import RetryPolicy + + +@deprecated( + "RetryOptions is deprecated; use durabletask.task.RetryPolicy with " + "timedelta values instead.") +class RetryOptions(RetryPolicy): + """Backwards-compatible shim for the v1 ``RetryOptions`` class. + + This maps the v1 millisecond-based constructor onto the durabletask + :class:`~durabletask.task.RetryPolicy`, which uses ``timedelta`` values. + New code should use ``RetryPolicy`` directly. + """ + + def __init__( + self, + first_retry_interval_in_milliseconds: int, + max_number_of_attempts: int): + """Create a new RetryOptions instance. + + Args: + first_retry_interval_in_milliseconds (int): The retry interval, in + milliseconds, to use for the first retry attempt. Must be + greater than 0. + max_number_of_attempts (int): The maximum number of retry attempts. + """ + if first_retry_interval_in_milliseconds <= 0: + raise ValueError( + "first_retry_interval_in_milliseconds value must be greater than 0.") + + super().__init__( + first_retry_interval=timedelta( + milliseconds=first_retry_interval_in_milliseconds), + max_number_of_attempts=max_number_of_attempts) + + @property + def first_retry_interval_in_milliseconds(self) -> int: + """Get the first retry interval, in milliseconds.""" + return int(self.first_retry_interval / timedelta(milliseconds=1)) + + def to_json(self) -> dict[str, int]: + """Return the v1 JSON representation of these retry options.""" + return { + "firstRetryIntervalInMilliseconds": self.first_retry_interval_in_milliseconds, + "maxNumberOfAttempts": self.max_number_of_attempts, + } diff --git a/azure-functions-durable/azure/durable_functions/internal/compat/token_source.py b/azure-functions-durable/azure/durable_functions/internal/compat/token_source.py new file mode 100644 index 00000000..5b990f85 --- /dev/null +++ b/azure-functions-durable/azure/durable_functions/internal/compat/token_source.py @@ -0,0 +1,51 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from abc import ABC + +from typing_extensions import deprecated + + +class TokenSource(ABC): + """Token source abstract base class. + + Backwards-compatible shim for the v1 ``TokenSource``. Token sources are + consumed only by the orchestrator ``call_http`` API, which has no + durabletask equivalent yet — see + :meth:`DurableOrchestrationContext.call_http`. Constructing a token source + is harmless, but it cannot be used until ``call_http`` is supported. + """ + + def __init__(self): + super().__init__() + + +@deprecated( + "ManagedIdentityTokenSource is deprecated; it is only usable with the " + "orchestrator call_http API, which is not yet available in durabletask.") +class ManagedIdentityTokenSource(TokenSource): + """Returns a ``ManagedIdentityTokenSource`` object. + + Only meaningful when passed to ``call_http`` (not yet supported in + durabletask). Constructing one is allowed for import/config compatibility. + """ + + def __init__(self, resource: str): + """Create a ManagedIdentityTokenSource. + + Args: + resource (str): The Azure Active Directory resource identifier of the + web API being invoked. + """ + super().__init__() + self._resource: str = resource + self._kind: str = "AzureManagedIdentity" + + @property + def resource(self) -> str: + """Get the Azure Active Directory resource identifier of the web API being invoked.""" + return self._resource + + def to_json(self) -> dict[str, str]: + """Convert this object into a JSON-serializable dictionary.""" + return {"resource": self._resource, "kind": self._kind} diff --git a/azure-functions-durable/azure/durable_functions/internal/serialization.py b/azure-functions-durable/azure/durable_functions/internal/serialization.py new file mode 100644 index 00000000..803b26fd --- /dev/null +++ b/azure-functions-durable/azure/durable_functions/internal/serialization.py @@ -0,0 +1,118 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Azure Functions payload serialization for Durable Task. + +Bridges durabletask's pluggable :class:`~durabletask.serialization.DataConverter` +to the azure-functions SDK's centralized ``df_dumps`` / ``df_loads`` serializers +so that payloads round-trip through the **exact** wire format the Durable +Functions host extension (and the SDK's ``ActivityTriggerConverter``) expect: +builtins as plain JSON, custom objects wrapped in the +``{"__class__", "__module__", "__data__"}`` envelope via their ``to_json`` / +``from_json`` hooks. + +When the installed ``azure-functions`` package exposes ``df_dumps`` / ``df_loads`` +(centralized serializers with optional type validation and strict-typing +support), they are used directly. On older releases that lack them we fall back +to the legacy ``_serialize_custom_object`` / ``_deserialize_custom_object`` hooks +-- the same behavior the SDK converter uses in those versions -- keeping both +sides symmetric. The wire format is unchanged either way. +""" + +from __future__ import annotations + +import importlib +import json +import logging +from typing import Any, Callable, Optional, cast + +from durabletask.serialization import JsonDataConverter + +logger = logging.getLogger("azure.functions.DurableFunctions") + +# ``azure.functions`` only exposes its Durable serialization helpers from a +# private, untyped module. Resolve it dynamically and bind the symbols we need +# to locally-typed callables so the rest of the module stays type-checked. +_df_internal = importlib.import_module("azure.functions._durable_functions") +_serialize_custom_object: Callable[[Any], Any] = getattr( + _df_internal, "_serialize_custom_object") +_deserialize_custom_object: Callable[[dict[str, Any]], Any] = getattr( + _df_internal, "_deserialize_custom_object") + +_FALLBACK_MESSAGE = ( + "The installed 'azure-functions' package does not provide the centralized " + "'df_dumps' / 'df_loads' serializers. Durable Functions is falling back to " + "the legacy serialization pipeline; the wire format is unchanged, but " + "payload type validation (the 'expected_type' argument and strict typing " + "mode) is unavailable. Upgrade to azure-functions>=1.26.0b4 to enable " + "type-validated serialization." +) + +_warned = False + + +def _warn_fallback_once() -> None: + # Deferred to first use (debug level) rather than emitted at import time, so + # users who never exercise the fallback path are not spammed. + global _warned + if not _warned: + _warned = True + logger.debug(_FALLBACK_MESSAGE) + + +def _fallback_df_dumps(value: Any) -> str: + """Serialize ``value`` via the legacy custom-object hook.""" + _warn_fallback_once() + return json.dumps(value, default=_serialize_custom_object) + + +def _fallback_df_loads(s: str, expected_type: Optional[type] = None) -> Any: + """Deserialize ``s`` via the legacy custom-object hook. + + ``expected_type`` is accepted for call-site compatibility but ignored on + this fallback path; type validation is only performed by the SDK's + ``df_loads`` when it is available. + """ + _warn_fallback_once() + return json.loads(s, object_hook=_deserialize_custom_object) + + +# Prefer the SDK's centralized serializers; fall back to the legacy hooks when +# they are unavailable (older azure-functions releases). +_sdk_df_dumps = getattr(_df_internal, "df_dumps", None) +_sdk_df_loads = getattr(_df_internal, "df_loads", None) + +df_dumps: Callable[[Any], str] = ( + cast("Callable[[Any], str]", _sdk_df_dumps) + if callable(_sdk_df_dumps) else _fallback_df_dumps) +df_loads: Callable[..., Any] = ( + cast("Callable[..., Any]", _sdk_df_loads) + if callable(_sdk_df_loads) else _fallback_df_loads) + + +class FunctionsDataConverter(JsonDataConverter): + """:class:`DataConverter` that serializes via azure-functions' codec. + + Overrides only the string boundary (:meth:`serialize` / :meth:`deserialize`) + to route through ``df_dumps`` / ``df_loads`` -- producing the + ``{"__class__", "__module__", "__data__"}`` envelope that the Durable + Functions host expects -- while inheriting :class:`JsonDataConverter`'s + value-level :meth:`coerce` and reconstruction policy + (:meth:`can_reconstruct`), which operate on already-parsed values and are + wire-format agnostic. + """ + + def serialize(self, value: Any) -> str | None: + if value is None: + return None + return df_dumps(value) + + def deserialize(self, data: str | None, target_type: type | None = None) -> Any: + if data is None or data == "": + return None + return df_loads(data, target_type) + + +# Shared instance: the converter is stateless, so a single instance is reused +# across the per-invocation worker/client objects. +DEFAULT_FUNCTIONS_DATA_CONVERTER: FunctionsDataConverter = FunctionsDataConverter() diff --git a/azure-functions-durable/azure/durable_functions/orchestrator.py b/azure-functions-durable/azure/durable_functions/orchestrator.py new file mode 100644 index 00000000..e70d033f --- /dev/null +++ b/azure-functions-durable/azure/durable_functions/orchestrator.py @@ -0,0 +1,78 @@ +"""Durable Orchestrator. + +Responsible for orchestrating the execution of the user defined generator +function. +""" +from typing import Any, Callable, Generator + +import azure.functions as func + +from durabletask.task import OrchestrationContext + +from .worker import DurableFunctionsWorker + + +class Orchestrator: + """Durable Orchestration Class. + + Responsible for orchestrating the execution of the user defined generator + function. + """ + + def __init__(self, + activity_func: Callable[[OrchestrationContext, Any], Generator[Any, Any, Any]]): + """Create a new orchestrator for the user defined generator. + + Responsible for orchestrating the execution of the user defined + generator function. + :param activity_func: Generator function to orchestrate. + """ + self.fn: Callable[[OrchestrationContext, Any], Generator[Any, Any, Any]] = activity_func + + def handle(self, context: func.OrchestrationContext) -> str: + """Handle the orchestration of the user defined generator function. + + Parameters + ---------- + context : azure.functions.OrchestrationContext + The Durable Functions orchestration trigger context. This is the + transport wrapper supplied by the host (it exposes ``.body``); the + user's orchestrator function receives a durabletask + ``OrchestrationContext`` during execution inside the worker. + + Returns + ------- + str + The JSON-formatted string representing the user's orchestration + state after this invocation + """ + self.durable_context = context + return DurableFunctionsWorker().execute_orchestration_request(self.fn, context) + + @classmethod + def create(cls, fn: Callable[[OrchestrationContext, Any], Generator[Any, Any, Any]]) \ + -> Callable[[Any], str]: + """Create an instance of the orchestration class. + + Parameters + ---------- + fn: Callable[[DurableOrchestrationContext], Iterator[Any]] + Generator function that needs orchestration + + Returns + ------- + Callable[[Any], str] + Handle function of the newly created orchestration client + """ + + # The generated handle is the function registered with the Azure + # Functions host. Its ``context`` parameter must be annotated with + # ``azure.functions.OrchestrationContext`` so the host's + # orchestrationTrigger binding converter accepts it; at runtime the + # host passes that transport context (exposing ``.body``). + def handle(context: func.OrchestrationContext) -> str: + return Orchestrator(fn).handle(context) + + handle.orchestrator_function = fn # pyright: ignore[reportFunctionMemberAccess] + + return handle diff --git a/azure-functions-durable/azure/durable_functions/worker.py b/azure-functions-durable/azure/durable_functions/worker.py new file mode 100644 index 00000000..72271f03 --- /dev/null +++ b/azure-functions-durable/azure/durable_functions/worker.py @@ -0,0 +1,99 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import base64 +from typing import Any, Optional + +from durabletask import task +from durabletask.internal.orchestrator_service_pb2 import ( + EntityBatchRequest, + EntityBatchResult, + HistoryEvent, + OrchestratorRequest, + OrchestratorResponse, +) +from durabletask.worker import TaskHubGrpcWorker +from .internal.azurefunctions_null_stub import AzureFunctionsNullStub +from .internal.compat.entity_context import wrap_entity +from .internal.compat.orchestration_context import wrap_orchestrator +from .internal.serialization import DEFAULT_FUNCTIONS_DATA_CONVERTER + + +# Worker class used for Durable Task Scheduler (DTS) +class DurableFunctionsWorker(TaskHubGrpcWorker): + """A worker that can execute orchestrator and entity functions in the context of Azure Functions. + + Used internally by the Durable Functions Python SDK, and should not be visible to functionapps directly. + See TaskHubGrpcWorker for base class documentation. + """ + + def __init__(self) -> None: + # We never start the worker loop or open a gRPC channel. The base + # constructor only initialises in-memory state (registry, logger, + # concurrency options, payload store, etc.) that the inherited + # ``_execute_*`` methods rely on; work items are delivered directly by + # the methods below rather than streamed from a sidecar. + # + # The Functions converter routes payload serialization through the + # azure-functions codec (df_dumps/df_loads) so user types round-trip in + # the wire format the Durable Functions host extension expects. + super().__init__(data_converter=DEFAULT_FUNCTIONS_DATA_CONVERTER) + + def add_named_orchestrator(self, name: str, func: task.Orchestrator[Any, Any]) -> None: + self._registry.add_named_orchestrator(name, func) + + def execute_orchestration_request(self, func: task.Orchestrator[Any, Any], context: Any) -> str: + context_body = getattr(context, "body", None) + if context_body is None: + context_body = context + orchestration_context = context_body + request = OrchestratorRequest() + request.ParseFromString(base64.b64decode(orchestration_context)) + stub: Any = AzureFunctionsNullStub() + response: Optional[OrchestratorResponse] = None + + def stub_complete(stub_response: OrchestratorResponse) -> None: + nonlocal response + response = stub_response + stub.CompleteOrchestratorTask = stub_complete + execution_started_events: list[HistoryEvent] = [] + for e in request.pastEvents: + if e.HasField("executionStarted"): + execution_started_events.append(e) + for e in request.newEvents: + if e.HasField("executionStarted"): + execution_started_events.append(e) + if len(execution_started_events) == 0: + raise Exception("No ExecutionStarted event found in orchestration request.") + + function_name = execution_started_events[-1].executionStarted.name + self.add_named_orchestrator(function_name, wrap_orchestrator(func)) + super()._execute_orchestrator(request, stub, None) + + if response is None: + raise Exception("Orchestrator execution did not produce a response.") + # The Python worker returns the input as type "json", so double-encoding is necessary + return base64.b64encode(response.SerializeToString()).decode('utf-8') + + def execute_entity_batch_request(self, func: task.Entity[Any, Any], context: Any) -> str: + context_body = getattr(context, "body", None) + if context_body is None: + context_body = context + orchestration_context = context_body + request = EntityBatchRequest() + request.ParseFromString(base64.b64decode(orchestration_context)) + stub: Any = AzureFunctionsNullStub() + response: Optional[EntityBatchResult] = None + + def stub_complete(stub_response: EntityBatchResult) -> None: + nonlocal response + response = stub_response + stub.CompleteEntityTask = stub_complete + + self.add_entity(wrap_entity(func)) + super()._execute_entity_batch(request, stub, None) + + if response is None: + raise Exception("Entity execution did not produce a response.") + # The Python worker returns the input as type "json", so double-encoding is necessary + return base64.b64encode(response.SerializeToString()).decode('utf-8') diff --git a/azure-functions-durable/pyproject.toml b/azure-functions-durable/pyproject.toml new file mode 100644 index 00000000..3c6fc6f6 --- /dev/null +++ b/azure-functions-durable/pyproject.toml @@ -0,0 +1,44 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +# For more information on pyproject.toml, see https://peps.python.org/pep-0621/ + +[build-system] +requires = ["setuptools", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "azure-functions-durable" +version = "2.0.0dev0" +description = "Durable Task Python SDK provider implementation for Durable Azure Functions" +keywords = [ + "durable", + "task", + "workflow", + "azure", + "azure functions" +] +classifiers = [ + "Development Status :: 3 - Alpha", + "Programming Language :: Python :: 3", + "License :: OSI Approved :: MIT License", +] +requires-python = ">=3.13" +license = {file = "LICENSE"} +readme = "README.md" +dependencies = [ + "durabletask>=1.2.0dev0", + "azure-identity>=1.19.0", + "azure-functions>=2.2.0b6", + "typing-extensions>=4.9.0" +] + +[project.urls] +repository = "https://github.com/microsoft/durabletask-python" +changelog = "https://github.com/microsoft/durabletask-python/blob/main/CHANGELOG.md" + +[tool.setuptools.packages.find] +include = ["azure.durable_functions", "azure.durable_functions.*"] + +[tool.pytest.ini_options] +minversion = "6.0" diff --git a/azure-functions-durable/pyrightconfig.json b/azure-functions-durable/pyrightconfig.json new file mode 100644 index 00000000..fc3affe5 --- /dev/null +++ b/azure-functions-durable/pyrightconfig.json @@ -0,0 +1,16 @@ +{ + "include": [ + "azure" + ], + "extraPaths": [ + ".." + ], + "exclude": [ + "**/__pycache__", + "**/.venv*", + ".venv*", + "build" + ], + "pythonVersion": "3.13", + "typeCheckingMode": "strict" +} diff --git a/durabletask/task.py b/durabletask/task.py index 2085fd9c..564506f4 100644 --- a/durabletask/task.py +++ b/durabletask/task.py @@ -503,6 +503,11 @@ def is_failed(self) -> bool: """Returns True if the task has failed, False otherwise.""" return self._exception is not None + @property + def result(self) -> T: + """Returns the result of the task (alias for :meth:`get_result`).""" + return self.get_result() + def get_result(self) -> T: """Returns the result of the task.""" if not self._is_complete: diff --git a/durabletask/worker.py b/durabletask/worker.py index a53d5e71..10da62cf 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -1434,8 +1434,7 @@ def _execute_entity_batch( stub.CompleteEntityTask(batch_result) except Exception as ex: self._logger.exception( - f"Failed to deliver entity response for '{entity_instance_id}' of orchestration ID '{instance_id}' to sidecar: {ex}" - ) + f"Failed to deliver entity response for '{entity_instance_id}' of orchestration ID '{instance_id}' to sidecar: {ex}") # TODO: Reset context @@ -1669,6 +1668,11 @@ def create_timer_internal( else: final_fire_at = fire_at + # Normalize timezone-aware datetimes to naive UTC so they can be safely + # compared against and combined with the orchestration's naive UTC clock. + if final_fire_at.tzinfo is not None: + final_fire_at = final_fire_at.astimezone(timezone.utc).replace(tzinfo=None) + next_fire_at: datetime = final_fire_at if ( @@ -2821,6 +2825,16 @@ def _handle_entity_event_raised(self, # inner value to the expected type via the converter (no redundant # re-serialization round-trip). unwrapped = self._data_converter.deserialize(event.eventRaised.input.value)["result"] + # The result here is double-encoded somewhere, so we need to decode it again. This does not happen + # with entityOperationCompleted, so it's either part of the event entity messaging protocol in Core, + # or something done by the WebJobs extension. + if unwrapped and isinstance(unwrapped, str): + try: + unwrapped = self._data_converter.deserialize(unwrapped) + except Exception as ex: + self._logger.warning(f"{ctx.instance_id}: Could not deserialize entity operation result to object " + f"for entity '{entity_id}', defaulting to encoded string." + f"Decode error: {ex}") result = self._data_converter.coerce( unwrapped, entity_task._expected_type, # pyright: ignore[reportPrivateUsage] diff --git a/eng/ci/release.yml b/eng/ci/release.yml index b471e1d6..f27c1545 100644 --- a/eng/ci/release.yml +++ b/eng/ci/release.yml @@ -98,3 +98,35 @@ extends: serviceendpointurl: "https://api.esrp.microsoft.com" mainpublisher: "durabletask-java" domaintenantid: "33e01921-4d64-4f8c-a055-5bdaffd5e33d" + + - job: azure_functions_durable + displayName: "Release azure-functions-durable" + templateContext: + type: releaseJob + isProduction: true + environment: durabletask-pypi-prod + inputs: + - input: pipelineArtifact + pipeline: DurableTaskPythonBuildPipeline + artifactName: drop + targetPath: $(System.DefaultWorkingDirectory)/drop + + steps: + - task: SFP.release-tasks.custom-build-release-task.EsrpRelease@9 + displayName: "ESRP Release azure-functions-durable" + inputs: + connectedservicename: "dtfx-internal-esrp-prod" + usemanagedidentity: true + keyvaultname: "durable-esrp-akv" + signcertname: "dts-esrp-cert" + clientid: "0b3ed1a4-0727-4a50-b82a-02c2bd9dec89" + intent: "PackageDistribution" + contenttype: "PyPi" + contentsource: "Folder" + folderlocation: "$(System.DefaultWorkingDirectory)/drop/buildoutputs/azure-functions-durable" + waitforreleasecompletion: true + owners: $(Build.RequestedForEmail) + approvers: $(Build.RequestedForEmail) + serviceendpointurl: "https://api.esrp.microsoft.com" + mainpublisher: "durabletask-java" + domaintenantid: "33e01921-4d64-4f8c-a055-5bdaffd5e33d" diff --git a/eng/templates/build.yml b/eng/templates/build.yml index 498ba942..c2294b04 100644 --- a/eng/templates/build.yml +++ b/eng/templates/build.yml @@ -13,9 +13,9 @@ jobs: - checkout: self - task: UsePythonVersion@0 - displayName: "Use Python 3.12" + displayName: "Use Python 3.13" inputs: - versionSpec: "3.12" + versionSpec: "3.13" addToPath: true # The 1ES pool is network-isolated, so direct pypi.org access is blocked. @@ -45,6 +45,11 @@ jobs: displayName: "flake8: durabletask-azuremanaged" workingDirectory: durabletask-azuremanaged + # Lint azurefunctions provider + - script: flake8 . + displayName: "flake8: azure-functions-durable" + workingDirectory: azure-functions-durable + # Build sdist + wheel for durabletask (core SDK) - script: | python -m build --sdist --wheel --outdir $(Build.ArtifactStagingDirectory)/buildoutputs/durabletask . @@ -55,10 +60,16 @@ jobs: python -m build --sdist --wheel --outdir $(Build.ArtifactStagingDirectory)/buildoutputs/durabletask-azuremanaged ./durabletask-azuremanaged displayName: "Build durabletask-azuremanaged (sdist + wheel)" + # Build sdist + wheel for azure-functions-durable + - script: | + python -m build --sdist --wheel --outdir $(Build.ArtifactStagingDirectory)/buildoutputs/azure-functions-durable ./azure-functions-durable + displayName: "Build azure-functions-durable (sdist + wheel)" + # List staged outputs for visibility in logs - script: | ls -la $(Build.ArtifactStagingDirectory)/buildoutputs/durabletask ls -la $(Build.ArtifactStagingDirectory)/buildoutputs/durabletask-azuremanaged + ls -la $(Build.ArtifactStagingDirectory)/buildoutputs/azure-functions-durable displayName: "List build outputs" # Install the built wheels with all declared optional extras and let @@ -89,8 +100,10 @@ jobs: # append the extras correctly. DT_WHEEL=$(ls $(Build.ArtifactStagingDirectory)/buildoutputs/durabletask/*.whl) DT_AM_WHEEL=$(ls $(Build.ArtifactStagingDirectory)/buildoutputs/durabletask-azuremanaged/*.whl) + AF_WHEEL=$(ls $(Build.ArtifactStagingDirectory)/buildoutputs/azure-functions-durable/*.whl) python -m pip install "${DT_WHEEL}[opentelemetry,azure-blob-payloads]" python -m pip install "${DT_AM_WHEEL}[azure-blob-payloads]" + python -m pip install "${AF_WHEEL}" displayName: "Install built wheels" - script: pytest -m "not dts and not azurite" --verbose @@ -104,3 +117,12 @@ jobs: set -e python -P -c "import durabletask.azuremanaged; from durabletask.azuremanaged.client import DurableTaskSchedulerClient; from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker; print('durabletask.azuremanaged smoke import OK')" displayName: "smoke import: durabletask-azuremanaged" + + # azure-functions-durable unit tests run here. Integration tests that + # require Azurite or the Azure Functions host emulator are marked + # (azurite / dts) and excluded since those external services aren't + # provisioned in this network-isolated pool. The full suite runs in + # GitHub Actions on PRs to main and main itself. + - script: pytest -m "not dts and not azurite" --verbose + displayName: "pytest: azure-functions-durable (unit tests, no emulators)" + workingDirectory: tests/azure-functions-durable diff --git a/requirements.txt b/requirements.txt index ee1cad9c..61b4ca6b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,6 +5,7 @@ pytest pytest-asyncio pytest-cov azure-identity +azure-functions asyncio packaging opentelemetry-api diff --git a/tests/azure-functions-durable/__init__.py b/tests/azure-functions-durable/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/azure-functions-durable/test_client_compat.py b/tests/azure-functions-durable/test_client_compat.py new file mode 100644 index 00000000..263ab872 --- /dev/null +++ b/tests/azure-functions-durable/test_client_compat.py @@ -0,0 +1,569 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import json +from datetime import datetime, timedelta, timezone +from types import SimpleNamespace +from unittest.mock import AsyncMock, patch + +import azure.functions as func +import pytest + +import azure.durable_functions as df +from azure.durable_functions import RetryOptions +from azure.durable_functions.internal.compat.orchestration_runtime_status import ( + from_durabletask_status, + to_durabletask_status, + to_durabletask_statuses, +) +from durabletask.client import AsyncTaskHubGrpcClient, OrchestrationStatus +from durabletask.entities import EntityInstanceId +from durabletask.task import RetryPolicy + + +_CLIENT_CONFIG = json.dumps({ + "taskHubName": "TestHub", + "requiredQueryStringParameters": "code=xyz", + "baseUrl": "http://localhost:7071/runtime/webhooks/durabletask", + "rpcBaseUrl": "http://localhost:8080/", + "managementUrls": {"id": "INSTANCEID"}, +}) + + +def _make_client() -> df.DurableFunctionsClient: + return df.DurableFunctionsClient(_CLIENT_CONFIG) + + +# --------------------------------------------------------------------------- +# RetryOptions shim +# --------------------------------------------------------------------------- + +def test_retry_options_is_retry_policy_subclass(): + assert issubclass(RetryOptions, RetryPolicy) + + +def test_retry_options_maps_milliseconds_to_timedelta(): + with pytest.warns(DeprecationWarning): + options = RetryOptions( + first_retry_interval_in_milliseconds=1500, + max_number_of_attempts=3) + assert options.first_retry_interval == timedelta(milliseconds=1500) + assert options.max_number_of_attempts == 3 + assert options.first_retry_interval_in_milliseconds == 1500 + + +def test_retry_options_rejects_non_positive_interval(): + with pytest.warns(DeprecationWarning): + with pytest.raises(ValueError): + RetryOptions( + first_retry_interval_in_milliseconds=0, + max_number_of_attempts=3) + + +def test_retry_policy_is_exported(): + assert df.RetryPolicy is RetryPolicy + + +# --------------------------------------------------------------------------- +# create_http_management_payload signature compatibility +# --------------------------------------------------------------------------- + +async def test_create_http_management_payload_v1_signature(): + client = _make_client() + try: + payload = client.create_http_management_payload("inst1") + assert payload.urls["id"] == "inst1" + assert payload.urls["statusQueryGetUri"] == ( + "http://localhost:7071/runtime/webhooks/durabletask/instances/inst1?code=xyz") + finally: + await client.close() + + +async def test_create_http_management_payload_v2_signature(): + client = _make_client() + try: + request = func.HttpRequest( + method="POST", url="http://localhost:7071/api/start", body=b"") + payload = client.create_http_management_payload(request, "inst2") + assert payload.urls["id"] == "inst2" + assert payload.urls["statusQueryGetUri"] == ( + "http://localhost:7071/runtime/webhooks/durabletask/instances/inst2?code=xyz") + finally: + await client.close() + + +async def test_create_http_management_payload_requires_instance_id(): + client = _make_client() + try: + with pytest.raises(TypeError): + client.create_http_management_payload() + finally: + await client.close() + + +# --------------------------------------------------------------------------- +# Deprecated client method aliases +# --------------------------------------------------------------------------- + +async def test_start_new_delegates_to_schedule_new_orchestration(): + client = _make_client() + try: + with patch.object(client, "schedule_new_orchestration", + new=AsyncMock(return_value="new-id")) as mock: + with pytest.warns(DeprecationWarning): + result = await client.start_new( + "MyOrchestrator", instance_id="abc", client_input={"x": 1}) + assert result == "new-id" + mock.assert_awaited_once_with( + "MyOrchestrator", input={"x": 1}, instance_id="abc", version=None) + finally: + await client.close() + + +async def test_get_status_delegates_to_get_orchestration_state(): + client = _make_client() + try: + with patch.object(client, "get_orchestration_state", + new=AsyncMock(return_value=None)) as mock: + with pytest.warns(DeprecationWarning): + await client.get_status("abc", show_input=True) + mock.assert_awaited_once_with("abc", fetch_payloads=True) + finally: + await client.close() + + +async def test_get_status_all_delegates(): + client = _make_client() + try: + with patch.object(client, "get_all_orchestration_states", + new=AsyncMock(return_value=[])) as mock: + with pytest.warns(DeprecationWarning): + await client.get_status_all() + mock.assert_awaited_once_with() + finally: + await client.close() + + +async def test_raise_event_delegates(): + client = _make_client() + try: + with patch.object(client, "raise_orchestration_event", + new=AsyncMock()) as mock: + with pytest.warns(DeprecationWarning): + await client.raise_event("abc", "evt", event_data={"k": "v"}) + mock.assert_awaited_once_with("abc", "evt", data={"k": "v"}) + finally: + await client.close() + + +async def test_terminate_delegates(): + client = _make_client() + try: + with patch.object(client, "terminate_orchestration", + new=AsyncMock()) as mock: + with pytest.warns(DeprecationWarning): + await client.terminate("abc", "because") + mock.assert_awaited_once_with("abc", output="because") + finally: + await client.close() + + +async def test_purge_instance_history_delegates(): + client = _make_client() + try: + with patch.object(client, "purge_orchestration", + new=AsyncMock()) as mock: + with pytest.warns(DeprecationWarning): + await client.purge_instance_history("abc") + mock.assert_awaited_once_with("abc") + finally: + await client.close() + + +async def test_suspend_resume_delegate(): + client = _make_client() + try: + with patch.object(client, "suspend_orchestration", + new=AsyncMock()) as suspend_mock: + with pytest.warns(DeprecationWarning): + await client.suspend("abc", "reason") + suspend_mock.assert_awaited_once_with("abc") + + with patch.object(client, "resume_orchestration", + new=AsyncMock()) as resume_mock: + with pytest.warns(DeprecationWarning): + await client.resume("abc", "reason") + resume_mock.assert_awaited_once_with("abc") + finally: + await client.close() + + +async def test_restart_delegates(): + client = _make_client() + try: + with patch.object(client, "restart_orchestration", + new=AsyncMock(return_value="abc")) as mock: + with pytest.warns(DeprecationWarning): + await client.restart("abc") + mock.assert_awaited_once_with("abc", restart_with_new_instance_id=True) + finally: + await client.close() + + +async def test_read_entity_state_delegates_to_get_entity(): + client = _make_client() + try: + with patch.object(client, "get_entity", + new=AsyncMock(return_value=None)) as mock: + with pytest.warns(DeprecationWarning): + await client.read_entity_state("@counter@one") + mock.assert_awaited_once_with("@counter@one") + finally: + await client.close() + + +# --------------------------------------------------------------------------- +# OrchestrationRuntimeStatus mapping +# --------------------------------------------------------------------------- + +def test_orchestration_runtime_status_is_exported(): + assert df.OrchestrationRuntimeStatus.Running.value == "Running" + + +def test_to_durabletask_status_maps_known_values(): + assert to_durabletask_status( + df.OrchestrationRuntimeStatus.Running) == OrchestrationStatus.RUNNING + assert to_durabletask_status( + df.OrchestrationRuntimeStatus.ContinuedAsNew) == OrchestrationStatus.CONTINUED_AS_NEW + + +def test_to_durabletask_status_rejects_canceled(): + with pytest.raises(ValueError): + to_durabletask_status(df.OrchestrationRuntimeStatus.Canceled) + + +def test_to_durabletask_statuses_preserves_none(): + assert to_durabletask_statuses(None) is None + assert to_durabletask_statuses( + [df.OrchestrationRuntimeStatus.Failed]) == [OrchestrationStatus.FAILED] + + +async def test_get_status_by_maps_statuses(): + client = _make_client() + try: + with patch.object(client, "get_all_orchestration_states", + new=AsyncMock(return_value=[])) as mock: + with pytest.warns(DeprecationWarning): + await client.get_status_by( + runtime_status=[df.OrchestrationRuntimeStatus.Running]) + query = mock.await_args.args[0] + assert query.runtime_status == [OrchestrationStatus.RUNNING] + finally: + await client.close() + + +async def test_purge_instance_history_by_maps_statuses(): + client = _make_client() + try: + with patch.object(client, "purge_orchestrations_by", + new=AsyncMock()) as mock: + with pytest.warns(DeprecationWarning): + await client.purge_instance_history_by( + runtime_status=[df.OrchestrationRuntimeStatus.Completed]) + assert mock.await_args.kwargs["runtime_status"] == [OrchestrationStatus.COMPLETED] + finally: + await client.close() + + +# --------------------------------------------------------------------------- +# signal_entity v1 keyword compatibility +# --------------------------------------------------------------------------- + +async def test_signal_entity_accepts_operation_input(): + client = _make_client() + try: + with patch.object(AsyncTaskHubGrpcClient, "signal_entity", + new=AsyncMock()) as mock: + await client.signal_entity( + "@counter@one", "add", operation_input=5, task_hub_name="hub") + mock.assert_awaited_once_with( + "@counter@one", "add", input=5, signal_time=None) + finally: + await client.close() + + +# --------------------------------------------------------------------------- +# wait_for_completion_or_create_check_status_response +# --------------------------------------------------------------------------- + +def _make_request() -> func.HttpRequest: + return func.HttpRequest( + method="GET", url="http://localhost:7071/api/status", body=b"") + + +async def test_wait_for_completion_returns_output_when_completed(): + client = _make_client() + try: + state = SimpleNamespace( + runtime_status=OrchestrationStatus.COMPLETED, + serialized_output='"done"') + with patch.object(client, "wait_for_orchestration_completion", + new=AsyncMock(return_value=state)): + with pytest.warns(DeprecationWarning): + response = await client.wait_for_completion_or_create_check_status_response( + _make_request(), "abc") + assert response.status_code == 200 + assert response.get_body() == b'"done"' + finally: + await client.close() + + +async def test_wait_for_completion_returns_check_status_on_timeout(): + client = _make_client() + try: + with patch.object(client, "wait_for_orchestration_completion", + new=AsyncMock(side_effect=TimeoutError)): + with pytest.warns(DeprecationWarning): + response = await client.wait_for_completion_or_create_check_status_response( + _make_request(), "abc") + assert response.status_code == 202 + finally: + await client.close() + + +# --------------------------------------------------------------------------- +# rewind (not implemented) +# --------------------------------------------------------------------------- + +async def test_rewind_raises_not_implemented(): + client = _make_client() + try: + with pytest.warns(DeprecationWarning): + with pytest.raises(NotImplementedError): + await client.rewind("abc", "reason") + finally: + await client.close() + + +# --------------------------------------------------------------------------- +# get_client_response_links +# --------------------------------------------------------------------------- + +async def test_get_client_response_links_delegates(): + client = _make_client() + try: + with pytest.warns(DeprecationWarning): + payload = client.get_client_response_links(None, "abc") + assert payload.urls["id"] == "abc" + finally: + await client.close() + + +# --------------------------------------------------------------------------- +# Exported class aliases +# --------------------------------------------------------------------------- + +def test_durable_orchestration_client_is_subclass(): + assert issubclass(df.DurableOrchestrationClient, df.DurableFunctionsClient) + + +def test_entity_id_maps_to_entity_instance_id(): + with pytest.warns(DeprecationWarning): + entity_id = df.EntityId("Counter", "one") + assert isinstance(entity_id, EntityInstanceId) + assert entity_id.name == "counter" + assert str(entity_id) == "@counter@one" + + +def test_managed_identity_token_source_shim(): + with pytest.warns(DeprecationWarning): + source = df.ManagedIdentityTokenSource("https://management.core.windows.net") + assert source.resource == "https://management.core.windows.net" + assert source.to_json()["kind"] == "AzureManagedIdentity" + + +def test_entity_class_raises_not_implemented(): + with pytest.warns(DeprecationWarning): + with pytest.raises(NotImplementedError): + df.Entity(lambda ctx: None) + + +# --------------------------------------------------------------------------- +# Return-type shims: DurableOrchestrationStatus +# --------------------------------------------------------------------------- + +def _fake_state(): + return SimpleNamespace( + name="orch", + instance_id="abc", + created_at=datetime(2026, 1, 1, tzinfo=timezone.utc), + last_updated_at=datetime(2026, 1, 2, tzinfo=timezone.utc), + runtime_status=OrchestrationStatus.RUNNING, + serialized_input='{"in": 1}', + serialized_output='{"out": 2}', + serialized_custom_status='"cs"', + get_input=lambda: {"in": 1}, + get_output=lambda: {"out": 2}, + get_custom_status=lambda: "cs", + ) + + +def test_from_durabletask_status_reverse_mapping(): + assert from_durabletask_status(OrchestrationStatus.RUNNING) == df.OrchestrationRuntimeStatus.Running + assert from_durabletask_status( + OrchestrationStatus.CONTINUED_AS_NEW) == df.OrchestrationRuntimeStatus.ContinuedAsNew + + +async def test_get_status_returns_wrapped_status(): + client = _make_client() + try: + with patch.object(client, "get_orchestration_state", + new=AsyncMock(return_value=_fake_state())): + with pytest.warns(DeprecationWarning): + status = await client.get_status("abc") + assert bool(status) is True + assert status.name == "orch" + assert status.instance_id == "abc" + assert status.runtime_status == df.OrchestrationRuntimeStatus.Running + assert status.input_ == {"in": 1} + assert status.output == {"out": 2} + assert status.custom_status == "cs" + assert status.to_json()["runtimeStatus"] == "Running" + finally: + await client.close() + + +async def test_get_status_missing_instance_is_falsy(): + client = _make_client() + try: + with patch.object(client, "get_orchestration_state", + new=AsyncMock(return_value=None)): + with pytest.warns(DeprecationWarning): + status = await client.get_status("missing") + assert bool(status) is False + assert status.runtime_status is None + assert status.output is None + finally: + await client.close() + + +async def test_get_status_all_returns_wrapped_list(): + client = _make_client() + try: + with patch.object(client, "get_all_orchestration_states", + new=AsyncMock(return_value=[_fake_state()])): + with pytest.warns(DeprecationWarning): + statuses = await client.get_status_all() + assert len(statuses) == 1 + assert statuses[0].runtime_status == df.OrchestrationRuntimeStatus.Running + finally: + await client.close() + + +async def test_get_status_by_returns_wrapped_list(): + client = _make_client() + try: + with patch.object(client, "get_all_orchestration_states", + new=AsyncMock(return_value=[_fake_state()])): + with pytest.warns(DeprecationWarning): + statuses = await client.get_status_by( + runtime_status=[df.OrchestrationRuntimeStatus.Running]) + assert statuses[0].instance_id == "abc" + finally: + await client.close() + + +# --------------------------------------------------------------------------- +# Return-type shims: PurgeHistoryResult +# --------------------------------------------------------------------------- + +async def test_purge_instance_history_returns_purge_history_result(): + client = _make_client() + try: + result = SimpleNamespace(deleted_instance_count=3, is_complete=True) + with patch.object(client, "purge_orchestration", + new=AsyncMock(return_value=result)): + with pytest.warns(DeprecationWarning): + purge = await client.purge_instance_history("abc") + assert purge.instances_deleted == 3 + finally: + await client.close() + + +async def test_purge_instance_history_by_returns_purge_history_result(): + client = _make_client() + try: + result = SimpleNamespace(deleted_instance_count=5, is_complete=True) + with patch.object(client, "purge_orchestrations_by", + new=AsyncMock(return_value=result)): + with pytest.warns(DeprecationWarning): + purge = await client.purge_instance_history_by( + runtime_status=[df.OrchestrationRuntimeStatus.Completed]) + assert purge.instances_deleted == 5 + finally: + await client.close() + + +# --------------------------------------------------------------------------- +# Return-type shims: EntityStateResponse +# --------------------------------------------------------------------------- + +async def test_read_entity_state_wraps_metadata_when_present(): + client = _make_client() + try: + metadata = SimpleNamespace( + includes_state=True, get_typed_state=lambda: {"count": 5}) + with patch.object(client, "get_entity", + new=AsyncMock(return_value=metadata)): + with pytest.warns(DeprecationWarning): + response = await client.read_entity_state("@counter@one") + assert response.entity_exists is True + assert response.entity_state == {"count": 5} + finally: + await client.close() + + +async def test_read_entity_state_when_missing(): + client = _make_client() + try: + with patch.object(client, "get_entity", + new=AsyncMock(return_value=None)): + with pytest.warns(DeprecationWarning): + response = await client.read_entity_state("@counter@one") + assert response.entity_exists is False + assert response.entity_state is None + finally: + await client.close() + + +# --------------------------------------------------------------------------- +# HttpManagementPayload dict-like access +# --------------------------------------------------------------------------- + +async def test_http_management_payload_is_mapping_like(): + client = _make_client() + try: + payload = client.create_http_management_payload("inst1") + assert payload["id"] == "inst1" + assert "statusQueryGetUri" in payload + assert "id" in list(payload.keys()) + assert dict(payload.items())["id"] == "inst1" + finally: + await client.close() + + +# --------------------------------------------------------------------------- +# call_http not implemented +# --------------------------------------------------------------------------- + +def test_call_http_raises_not_implemented(): + # call_http ignores self, so invoke via the class to avoid instantiating + # the abstract context. + with pytest.raises(NotImplementedError): + df.DurableOrchestrationContext.call_http(None, "GET", "http://example.com") + + +def test_token_source_is_still_constructible(): + with pytest.warns(DeprecationWarning): + source = df.ManagedIdentityTokenSource("https://graph.microsoft.com") + assert source.resource == "https://graph.microsoft.com" diff --git a/tests/azure-functions-durable/test_decorator_compat.py b/tests/azure-functions-durable/test_decorator_compat.py new file mode 100644 index 00000000..2ffe2ed9 --- /dev/null +++ b/tests/azure-functions-durable/test_decorator_compat.py @@ -0,0 +1,148 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import json + +import azure.durable_functions as df +from azure.durable_functions import DurableFunctionsClient +from azure.durable_functions.constants import ( + ACTIVITY_TRIGGER, + DURABLE_CLIENT, + ENTITY_TRIGGER, + ORCHESTRATION_TRIGGER, +) + + +_CLIENT_CONFIG = json.dumps({ + "taskHubName": "TestHub", + "requiredQueryStringParameters": "code=xyz", + "baseUrl": "http://localhost:7071/runtime/webhooks/durabletask", + "rpcBaseUrl": "http://localhost:8080/", +}) + + +def _trigger(fb): + return fb._function.get_trigger() + + +# --------------------------------------------------------------------------- +# orchestration_trigger +# --------------------------------------------------------------------------- + +def test_orchestration_trigger_v1_signature(): + app = df.DFApp() + + def my_orchestrator(context): + return 1 + + fb = app.orchestration_trigger( + context_name="context", orchestration="MyOrchestrator")(my_orchestrator) + trigger = _trigger(fb) + assert trigger.get_binding_name() == ORCHESTRATION_TRIGGER + assert trigger.name == "context" + assert trigger.orchestration == "MyOrchestrator" + + +def test_orchestration_trigger_accepts_input_type(): + app = df.DFApp() + + def my_orchestrator(context): + return 1 + + # v1 parity: the input_type keyword must be accepted and stashed. + fb = app.orchestration_trigger( + context_name="context", input_type=dict)(my_orchestrator) + assert fb is not None + assert my_orchestrator._df_input_type is dict + + +# --------------------------------------------------------------------------- +# activity_trigger +# --------------------------------------------------------------------------- + +def test_activity_trigger_v1_signature(): + app = df.DFApp() + + def my_activity(myinput): + return myinput + + fb = app.activity_trigger( + input_name="myinput", activity="MyActivity")(my_activity) + trigger = _trigger(fb) + assert trigger.get_binding_name() == ACTIVITY_TRIGGER + assert trigger.name == "myinput" + assert trigger.activity == "MyActivity" + + +# --------------------------------------------------------------------------- +# entity_trigger +# --------------------------------------------------------------------------- + +def test_entity_trigger_v1_signature(): + app = df.DFApp() + + def my_entity(context): + return None + + fb = app.entity_trigger( + context_name="context", entity_name="MyEntity")(my_entity) + trigger = _trigger(fb) + assert trigger.get_binding_name() == ENTITY_TRIGGER + assert trigger.name == "context" + assert trigger.entity_name == "MyEntity" + + +# --------------------------------------------------------------------------- +# durable_client_input +# --------------------------------------------------------------------------- + +def test_durable_client_input_v1_signature_registers_binding(): + app = df.DFApp() + + async def starter(client): + return None + + fb = app.durable_client_input( + client_name="client", task_hub="hub", connection_name="conn")(starter) + bindings = fb._function.get_bindings() + client_bindings = [b for b in bindings if b.get_binding_name() == DURABLE_CLIENT] + assert len(client_bindings) == 1 + binding = client_bindings[0] + assert binding.name == "client" + assert binding.task_hub == "hub" + assert binding.connection_name == "conn" + + +async def test_durable_client_input_injects_rich_client(): + app = df.DFApp() + received = {} + + async def starter(client): + received["client"] = client + + fb = app.durable_client_input(client_name="client")(starter) + # _add_rich_client replaces the user function with middleware that builds + # a DurableFunctionsClient from the binding's JSON string. + middleware = fb._function._func + await middleware(client=_CLIENT_CONFIG) + + client = received["client"] + assert isinstance(client, DurableFunctionsClient) + try: + assert client.taskHubName == "TestHub" + finally: + await client.close() + + +# --------------------------------------------------------------------------- +# All decorators register a function builder +# --------------------------------------------------------------------------- + +def test_decorators_register_function_builders(): + app = df.DFApp() + + def orch(context): + return 1 + + app.orchestration_trigger(context_name="context")(orch) + assert len(app._function_builders) == 1 diff --git a/tests/azure-functions-durable/test_entity_context_compat.py b/tests/azure-functions-durable/test_entity_context_compat.py new file mode 100644 index 00000000..807846cd --- /dev/null +++ b/tests/azure-functions-durable/test_entity_context_compat.py @@ -0,0 +1,128 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from unittest.mock import MagicMock + +from durabletask.entities import DurableEntity + +from azure.durable_functions.internal.compat.entity_context import ( + DurableEntityContext, + wrap_entity, +) + + +def _adapter(operation_input=None): + fake_ctx = MagicMock() + fake_ctx.entity_id.entity = "counter" + fake_ctx.entity_id.key = "k1" + fake_ctx.operation = "add" + return DurableEntityContext(fake_ctx, operation_input), fake_ctx + + +# --------------------------------------------------------------------------- +# Adapter delegation +# --------------------------------------------------------------------------- + +def test_identity_properties(): + adapter, _ = _adapter() + assert adapter.entity_name == "counter" + assert adapter.entity_key == "k1" + assert adapter.operation_name == "add" + assert adapter.is_newly_constructed is False + + +def test_get_input_returns_stored_input(): + adapter, _ = _adapter(5) + assert adapter.get_input() == 5 + + +def test_get_state_maps_initializer_to_default(): + adapter, fake = _adapter() + fake.get_state.return_value = 0 + result = adapter.get_state(initializer=lambda: 0, expected_type=int) + assert result == 0 + fake.get_state.assert_called_once_with(int, 0) + + +def test_get_state_without_initializer(): + adapter, fake = _adapter() + adapter.get_state() + fake.get_state.assert_called_once_with(None, None) + + +def test_set_state_delegates(): + adapter, fake = _adapter() + adapter.set_state({"count": 3}) + fake.set_state.assert_called_once_with({"count": 3}) + + +def test_destruct_on_exit_clears_state(): + adapter, fake = _adapter() + adapter.destruct_on_exit() + fake.set_state.assert_called_once_with(None) + + +# --------------------------------------------------------------------------- +# wrap_entity +# --------------------------------------------------------------------------- + +def test_wrap_passes_through_two_arg_entity(): + def entity(ctx, inp): + return None + assert wrap_entity(entity) is entity + + +def test_wrap_passes_through_class_based_entity(): + class Counter(DurableEntity): + def add(self, amount): + return amount + assert wrap_entity(Counter) is Counter + + +def test_wrap_adapts_one_arg_entity_with_set_result(): + seen = {} + + def counter_entity(context): + seen["op"] = context.operation_name + seen["input"] = context.get_input() + current = context.get_state(initializer=lambda: 0) + context.set_state(current + context.get_input()) + context.set_result(current + context.get_input()) + + wrapped = wrap_entity(counter_entity) + assert wrapped is not counter_entity + + fake_ctx = MagicMock() + fake_ctx.entity_id.entity = "counter" + fake_ctx.entity_id.key = "k1" + fake_ctx.operation = "add" + fake_ctx.get_state.return_value = 10 + + result = wrapped(fake_ctx, 5) + assert result == 15 + assert seen["op"] == "add" + assert seen["input"] == 5 + fake_ctx.set_state.assert_called_once_with(15) + + +def test_wrap_adapts_one_arg_entity_falls_back_to_return_value(): + def entity(context): + return "returned" + + wrapped = wrap_entity(entity) + fake_ctx = MagicMock() + assert wrapped(fake_ctx, None) == "returned" + + +def test_wrap_preserves_entity_name(): + def my_entity(context): + return None + assert wrap_entity(my_entity).__name__ == "my_entity" + + +def test_wrap_preserves_durable_entity_name(): + def entity_fn(context): + return None + entity_fn.__durable_entity_name__ = "CustomName" + wrapped = wrap_entity(entity_fn) + assert wrapped.__durable_entity_name__ == "CustomName" diff --git a/tests/azure-functions-durable/test_orchestration_context_compat.py b/tests/azure-functions-durable/test_orchestration_context_compat.py new file mode 100644 index 00000000..fecd7f93 --- /dev/null +++ b/tests/azure-functions-durable/test_orchestration_context_compat.py @@ -0,0 +1,218 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from unittest.mock import MagicMock, patch +from uuid import UUID + +import pytest + +from azure.durable_functions.internal.compat.orchestration_context import ( + DurableOrchestrationContext, + accepts_two_positional_args, + wrap_orchestrator, +) + + +# --------------------------------------------------------------------------- +# Adapter delegation +# --------------------------------------------------------------------------- + +def _adapter(orchestration_input=None): + fake_ctx = MagicMock() + fake_ctx.instance_id = "iid" + fake_ctx.is_replaying = True + return DurableOrchestrationContext(fake_ctx, orchestration_input), fake_ctx + + +def test_get_input_returns_stored_input(): + adapter, _ = _adapter({"x": 1}) + assert adapter.get_input() == {"x": 1} + + +def test_property_delegation(): + adapter, fake = _adapter() + assert adapter.instance_id == "iid" + assert adapter.is_replaying is True + assert adapter.current_utc_datetime is fake.current_utc_datetime + + +def test_call_activity_delegates(): + adapter, fake = _adapter() + adapter.call_activity("A", input_=3) + fake.call_activity.assert_called_once_with("A", input=3) + + +def test_call_activity_with_retry_delegates(): + adapter, fake = _adapter() + retry = object() + adapter.call_activity_with_retry("A", retry, input_=4) + fake.call_activity.assert_called_once_with("A", input=4, retry_policy=retry) + + +def test_call_sub_orchestrator_delegates(): + adapter, fake = _adapter() + adapter.call_sub_orchestrator("Sub", input_=1, instance_id="sid") + fake.call_sub_orchestrator.assert_called_once_with("Sub", input=1, instance_id="sid") + + +def test_call_sub_orchestrator_with_retry_delegates(): + adapter, fake = _adapter() + retry = object() + adapter.call_sub_orchestrator_with_retry("Sub", retry, input_=1, instance_id="sid") + fake.call_sub_orchestrator.assert_called_once_with( + "Sub", input=1, instance_id="sid", retry_policy=retry) + + +def test_wait_for_external_event_maps_expected_type(): + adapter, fake = _adapter() + adapter.wait_for_external_event("evt", expected_type=str) + fake.wait_for_external_event.assert_called_once_with("evt", data_type=str) + + +def test_create_timer_delegates(): + adapter, fake = _adapter() + adapter.create_timer("fire_at") + fake.create_timer.assert_called_once_with("fire_at") + + +def test_continue_as_new_and_set_custom_status_delegate(): + adapter, fake = _adapter() + adapter.continue_as_new({"n": 1}) + fake.continue_as_new.assert_called_once_with({"n": 1}) + adapter.set_custom_status("status") + fake.set_custom_status.assert_called_once_with("status") + + +def test_entity_operations_delegate(): + adapter, fake = _adapter() + adapter.call_entity("@e@k", "op", 1) + fake.call_entity.assert_called_once_with("@e@k", "op", 1) + adapter.signal_entity("@e@k", "op", 2) + fake.signal_entity.assert_called_once_with("@e@k", "op", input=2) + + +def test_new_uuid_and_new_guid(): + adapter, fake = _adapter() + fake.new_uuid.return_value = "12345678-1234-5678-1234-567812345678" + assert adapter.new_uuid() == "12345678-1234-5678-1234-567812345678" + guid = adapter.new_guid() + assert isinstance(guid, UUID) + assert str(guid) == "12345678-1234-5678-1234-567812345678" + + +def test_task_all_and_task_any_use_when_helpers(): + adapter, _ = _adapter() + with patch("durabletask.task.when_all", return_value="ALL") as when_all, \ + patch("durabletask.task.when_any", return_value="ANY") as when_any: + assert adapter.task_all(["t1", "t2"]) == "ALL" + assert adapter.task_any(["t1", "t2"]) == "ANY" + when_all.assert_called_once_with(["t1", "t2"]) + when_any.assert_called_once_with(["t1", "t2"]) + + +def test_call_http_raises_not_implemented(): + adapter, _ = _adapter() + with pytest.raises(NotImplementedError): + adapter.call_http("GET", "http://example.com") + + +# --------------------------------------------------------------------------- +# Additional context members +# --------------------------------------------------------------------------- + +def test_custom_status_tracks_set_custom_status(): + adapter, fake = _adapter() + assert adapter.custom_status is None + adapter.set_custom_status({"progress": 50}) + assert adapter.custom_status == {"progress": 50} + fake.set_custom_status.assert_called_once_with({"progress": 50}) + + +def test_will_continue_as_new_tracks_continue_as_new(): + adapter, fake = _adapter() + assert adapter.will_continue_as_new is False + adapter.continue_as_new({"next": 1}) + assert adapter.will_continue_as_new is True + fake.continue_as_new.assert_called_once_with({"next": 1}) + + +def test_parent_instance_id_raises_not_implemented(): + adapter, _ = _adapter() + with pytest.raises(NotImplementedError): + _ = adapter.parent_instance_id + + +def test_function_context_raises_not_implemented(): + adapter, _ = _adapter() + with pytest.raises(NotImplementedError): + _ = adapter.function_context + + +def test_histories_raises_not_implemented(): + adapter, _ = _adapter() + with pytest.raises(NotImplementedError): + _ = adapter.histories + + +# --------------------------------------------------------------------------- +# Arity detection and wrapping +# --------------------------------------------------------------------------- + +def test_accepts_two_positional_args(): + assert accepts_two_positional_args(lambda ctx, inp: None) is True + assert accepts_two_positional_args(lambda ctx: None) is False + assert accepts_two_positional_args(lambda *args: None) is True + + +def test_wrap_passes_through_two_arg_orchestrator(): + def orch(ctx, inp): + return None + assert wrap_orchestrator(orch) is orch + + +def test_wrap_adapts_one_arg_non_generator(): + seen = {} + + def orch(context): + seen["input"] = context.get_input() + return "done" + + wrapped = wrap_orchestrator(orch) + assert wrapped is not orch + fake_ctx = MagicMock() + result = wrapped(fake_ctx, 42) + assert result == "done" + assert seen["input"] == 42 + + +def test_wrap_adapts_one_arg_generator_end_to_end(): + seen = {} + + def orch(context): + seen["input"] = context.get_input() + activity_result = yield context.call_activity("A", input_=5) + seen["activity_result"] = activity_result + return activity_result * 2 + + wrapped = wrap_orchestrator(orch) + fake_ctx = MagicMock() + fake_ctx.call_activity.return_value = "SCHEDULED_TASK" + + gen = wrapped(fake_ctx, 7) + # First advance schedules the activity and yields the durabletask task. + yielded = next(gen) + assert yielded == "SCHEDULED_TASK" + fake_ctx.call_activity.assert_called_once_with("A", input=5) + assert seen["input"] == 7 + + # Feeding the activity result resumes the orchestrator to completion. + with pytest.raises(StopIteration) as stop: + gen.send(10) + assert stop.value.value == 20 + assert seen["activity_result"] == 10 + + +def test_wrap_preserves_orchestrator_name(): + def my_orchestrator(context): + return None + assert wrap_orchestrator(my_orchestrator).__name__ == "my_orchestrator" diff --git a/tests/azure-functions-durable/test_serialization_compat.py b/tests/azure-functions-durable/test_serialization_compat.py new file mode 100644 index 00000000..39c98dcc --- /dev/null +++ b/tests/azure-functions-durable/test_serialization_compat.py @@ -0,0 +1,71 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import pytest + +from azure.durable_functions.internal.serialization import ( + DEFAULT_FUNCTIONS_DATA_CONVERTER, +) + + +class Point: + """Sample custom type using the v1 to_json / from_json convention.""" + + def __init__(self, x: int, y: int): + self.x = x + self.y = y + + def to_json(self): + return {"x": self.x, "y": self.y} + + @classmethod + def from_json(cls, data): + return cls(data["x"], data["y"]) + + def __eq__(self, other): + return isinstance(other, Point) and self.x == other.x and self.y == other.y + + +def test_custom_object_round_trips(): + point = Point(3, 4) + serialized = DEFAULT_FUNCTIONS_DATA_CONVERTER.serialize(point) + assert isinstance(serialized, str) + + restored = DEFAULT_FUNCTIONS_DATA_CONVERTER.deserialize(serialized) + assert isinstance(restored, Point) + assert restored == point + + +def test_nested_custom_object_round_trips(): + payload = {"points": [Point(1, 1), Point(2, 2)], "label": "path"} + serialized = DEFAULT_FUNCTIONS_DATA_CONVERTER.serialize(payload) + restored = DEFAULT_FUNCTIONS_DATA_CONVERTER.deserialize(serialized) + assert restored["label"] == "path" + assert restored["points"] == [Point(1, 1), Point(2, 2)] + + +@pytest.mark.parametrize("value", [ + {"a": 1, "b": [1, 2, 3]}, + [1, 2, 3], + "hello", + 42, + 3.14, + True, +]) +def test_builtin_values_round_trip(value): + serialized = DEFAULT_FUNCTIONS_DATA_CONVERTER.serialize(value) + assert isinstance(serialized, str) + restored = DEFAULT_FUNCTIONS_DATA_CONVERTER.deserialize(serialized) + assert restored == value + + +def test_none_round_trips(): + assert DEFAULT_FUNCTIONS_DATA_CONVERTER.serialize(None) is None + assert DEFAULT_FUNCTIONS_DATA_CONVERTER.deserialize(None) is None + + +def test_coerce_plain_dict_to_type(): + # get_input(expected_type=...) relies on the converter coercing a plain + # (already-deserialized) dict into the declared type. + coerced = DEFAULT_FUNCTIONS_DATA_CONVERTER.coerce({"x": 5, "y": 6}, Point) + assert coerced == Point(5, 6) diff --git a/tests/azure-functions-durable/test_smoke.py b/tests/azure-functions-durable/test_smoke.py new file mode 100644 index 00000000..1046663e --- /dev/null +++ b/tests/azure-functions-durable/test_smoke.py @@ -0,0 +1,20 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import azure.durable_functions as df + + +def test_public_api_is_importable(): + """Smoke test: the package imports and exposes its public API. + + This is a no-op placeholder establishing the unit-test structure for the + azure-functions-durable module. Real unit tests should be added alongside + it; integration tests that require Azurite or the Azure Functions host + emulator should be marked (e.g. ``azurite``) so they can be excluded on + the network-isolated ADO build pool. + """ + assert df.version + assert df.DFApp is not None + assert df.Blueprint is not None + assert df.DurableFunctionsClient is not None + assert df.Orchestrator is not None