-
Notifications
You must be signed in to change notification settings - Fork 30
Expand file tree
/
Copy pathactivity_sequence.py
More file actions
65 lines (50 loc) · 2.79 KB
/
Copy pathactivity_sequence.py
File metadata and controls
65 lines (50 loc) · 2.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""End-to-end sample that demonstrates how to configure an orchestrator
that calls an activity function in a sequence and prints the outputs."""
import logging
import os
from collections.abc import Generator
from typing import Any
from azure.identity import DefaultAzureCredential
from durabletask import client, task
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker
logger = logging.getLogger("activity_sequence")
def hello(ctx: task.ActivityContext, name: str) -> str:
"""Activity function that returns a greeting"""
return f'Hello {name}!'
def sequence(ctx: task.OrchestrationContext, _: Any) -> Generator[task.Task[Any], Any, list[str]]:
"""Orchestrator function that calls the 'hello' activity function in a sequence"""
# Create a replay-safe logger to avoid duplicate log messages during replay
replay_logger = ctx.create_replay_safe_logger(logger)
replay_logger.info("Starting activity sequence for instance %s", ctx.instance_id)
# call "hello" activity function in a sequence
result1 = yield ctx.call_activity(hello, input='Tokyo')
result2 = yield ctx.call_activity(hello, input='Seattle')
result3 = yield ctx.call_activity(hello, input='London')
replay_logger.info("All activities completed")
# return an array of results
return [result1, result2, result3]
# Use environment variables if provided, otherwise use default emulator values
taskhub_name = os.getenv("TASKHUB", "default")
endpoint = os.getenv("ENDPOINT", "http://localhost:8080")
print(f"Using taskhub: {taskhub_name}")
print(f"Using endpoint: {endpoint}")
# Set credential to None for emulator, or DefaultAzureCredential for Azure
secure_channel = endpoint.startswith("https://")
credential = DefaultAzureCredential() if secure_channel else None
with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=secure_channel,
taskhub=taskhub_name, token_credential=credential) as w:
w.add_orchestrator(sequence)
w.add_activity(hello)
w.start()
# Construct the client and run the orchestrations
c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=secure_channel,
taskhub=taskhub_name, token_credential=credential)
instance_id = c.schedule_new_orchestration(sequence)
state = c.wait_for_orchestration_completion(instance_id, timeout=60)
if state and state.runtime_status == client.OrchestrationStatus.COMPLETED:
print(f'Orchestration completed! Result: {state.serialized_output}')
elif state:
print(f'Orchestration failed: {state.failure_details}')