Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand Down Expand Up @@ -63,6 +64,13 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {

private final TaskHubSidecarServiceGrpc.TaskHubSidecarServiceBlockingStub sidecarClient;
private final boolean isExecutorServiceManaged;

// Per-stream cache of each instance's committed history for the stateful-history optimization,
// or null when disabled. Reset on every reconnect and swept for idle entries by the janitor.
private final WorkflowHistoryCache historyCache;
private static final Duration HISTORY_SWEEP_INTERVAL = Duration.ofMinutes(1);
private volatile ScheduledExecutorService historyJanitor;

private volatile boolean isNormalShutdown = false;
private volatile Thread workerThread;

Expand Down Expand Up @@ -104,7 +112,8 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {

this.isExecutorServiceManaged = builder.executorService == null;


this.historyCache = builder.disableStatefulHistory ? null : new WorkflowHistoryCache(
builder.historyCacheTtl, builder.historyCacheMaxInstances, builder.historyCacheMaxBytes);
}

/**
Expand Down Expand Up @@ -137,6 +146,7 @@ public void close() {
this.workerThread.interrupt();
}
this.isNormalShutdown = true;
this.shutDownHistoryJanitor();
this.shutDownWorkerPool();
this.closeSideCarChannel();
}
Expand Down Expand Up @@ -172,10 +182,24 @@ public void startAndBlock() {
this.dataConverter,
logger);

this.startHistoryJanitor();

while (!this.isNormalShutdown && !Thread.currentThread().isInterrupted()) {
try {
OrchestratorService.GetWorkItemsRequest getWorkItemsRequest = OrchestratorService.GetWorkItemsRequest
.newBuilder().build();
// Each iteration establishes a fresh work-item stream. Start it cold: the sidecar drops
// the previous stream's warm set, so any histories cached from it are no longer in sync.
if (this.historyCache != null) {
this.historyCache.reset();
}

// Advertise the stateful-history capability so the sidecar can send deltas instead of the
// full history on each turn. Absent it, the sidecar always sends the full history.
OrchestratorService.GetWorkItemsRequest.Builder requestBuilder = OrchestratorService.GetWorkItemsRequest
.newBuilder();
if (this.historyCache != null) {
requestBuilder.addCapabilities(OrchestratorService.WorkerCapability.WORKER_CAPABILITY_STATEFUL_HISTORY);
}
OrchestratorService.GetWorkItemsRequest getWorkItemsRequest = requestBuilder.build();
Iterator<OrchestratorService.WorkItem> workItemStream = this.sidecarClient.getWorkItems(getWorkItemsRequest);
while (workItemStream.hasNext()) {
if (this.isNormalShutdown || Thread.currentThread().isInterrupted()) {
Expand All @@ -190,7 +214,8 @@ public void startAndBlock() {
String.format("Processing orchestrator request for instance: {0}",
orchestratorRequest.getInstanceId()));

this.workerPool.submit(new OrchestratorRunner(workItem, taskOrchestrationExecutor, sidecarClient, tracer));
this.workerPool.submit(
new OrchestratorRunner(workItem, taskOrchestrationExecutor, sidecarClient, tracer, historyCache));
} else if (requestType == OrchestratorService.WorkItem.RequestCase.ACTIVITYREQUEST) {
OrchestratorService.ActivityRequest activityRequest = workItem.getActivityRequest();

Expand Down Expand Up @@ -264,6 +289,28 @@ private void closeSideCarChannel() {
}
}

private void startHistoryJanitor() {
if (this.historyCache == null || this.historyJanitor != null) {
return;
}
ScheduledExecutorService janitor = Executors.newSingleThreadScheduledExecutor(runnable -> {
Thread thread = new Thread(runnable, "dapr-workflow-history-janitor");
thread.setDaemon(true);
return thread;
});
long sweepSeconds = HISTORY_SWEEP_INTERVAL.getSeconds();
janitor.scheduleWithFixedDelay(this.historyCache::sweepExpired, sweepSeconds, sweepSeconds, TimeUnit.SECONDS);
this.historyJanitor = janitor;
}

private void shutDownHistoryJanitor() {
ScheduledExecutorService janitor = this.historyJanitor;
if (janitor != null) {
janitor.shutdownNow();
this.historyJanitor = null;
}
}

private void shutDownWorkerPool() {
if (this.isExecutorServiceManaged) {
if (!this.isNormalShutdown) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ public final class DurableTaskGrpcWorkerBuilder {
Duration maximumTimerInterval;
ExecutorService executorService;
String appId; // App ID for cross-app routing
boolean disableStatefulHistory;
Duration historyCacheTtl;
int historyCacheMaxInstances;
long historyCacheMaxBytes;

/**
* Adds an orchestration factory to be used by the constructed {@link DurableTaskGrpcWorker}.
Expand Down Expand Up @@ -145,6 +149,62 @@ public DurableTaskGrpcWorkerBuilder appId(String appId) {
return this;
}

/**
* Disables the stateful-history optimization.
*
* <p>By default the worker advertises {@code WORKER_CAPABILITY_STATEFUL_HISTORY} and caches each
* instance's committed history per work-item stream, so the sidecar can send only the new events
* (the delta) each turn instead of the full history. A cache miss is always recovered safely via
* the GetInstanceHistory RPC, so disabling this only affects per-turn bandwidth, never correctness.
* When disabled, the worker always receives the full history.</p>
*
* @param disableStatefulHistory whether to disable the stateful-history optimization
* @return this builder object
*/
public DurableTaskGrpcWorkerBuilder disableStatefulHistory(boolean disableStatefulHistory) {
this.disableStatefulHistory = disableStatefulHistory;
return this;
}

/**
* Sets the sliding time-to-live for cached instance histories. An instance's entry is reclaimed
* once it has gone idle (no turn) for longer than this. If not specified, a default of one hour is
* used. Ignored when the stateful-history optimization is disabled.
*
* @param historyCacheTtl the sliding time-to-live for a cached instance history
* @return this builder object
*/
public DurableTaskGrpcWorkerBuilder historyCacheTtl(Duration historyCacheTtl) {
this.historyCacheTtl = historyCacheTtl;
return this;
}

/**
* Sets the maximum number of per-instance histories retained on a single work-item stream;
* least-recently-used entries are evicted beyond it. A non-positive value uses the built-in
* default. Ignored when the stateful-history optimization is disabled.
*
* @param historyCacheMaxInstances the instance-count cap for the history cache
* @return this builder object
*/
public DurableTaskGrpcWorkerBuilder historyCacheMaxInstances(int historyCacheMaxInstances) {
this.historyCacheMaxInstances = historyCacheMaxInstances;
return this;
}

/**
* Sets the byte budget for cached histories on a single work-item stream; least-recently-used
* entries are evicted beyond it. A non-positive value means unlimited (bounded only by the
* instance-count cap and the TTL). Ignored when the stateful-history optimization is disabled.
*
* @param historyCacheMaxBytes the byte budget for the history cache
* @return this builder object
*/
public DurableTaskGrpcWorkerBuilder historyCacheMaxBytes(long historyCacheMaxBytes) {
this.historyCacheMaxBytes = historyCacheMaxBytes;
return this;
}

/**
* Initializes a new {@link DurableTaskGrpcWorker} object with the settings specified in the current builder object.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
/*
* Copyright 2026 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.durabletask;

import io.dapr.durabletask.implementation.protobuf.HistoryEvents;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.LongSupplier;

/**
* Per-stream cache of each workflow instance's committed history, enabling "stateful history"
* delta work items: a worker that advertises {@code WORKER_CAPABILITY_STATEFUL_HISTORY} retains
* the history it has already replayed so the sidecar can send only the new events (the delta).
* Entries are reclaimed by a sliding TTL, an instance-count cap, and an optional byte budget
* (LRU eviction). Eviction is always safe: a miss is recovered via the GetInstanceHistory RPC.
*
* <p>Thread-safe; work items are processed concurrently on the worker pool.</p>
*/
public final class WorkflowHistoryCache {

static final Duration DEFAULT_TTL = Duration.ofHours(1);
static final int DEFAULT_MAX_INSTANCES = 100_000;

private static final class Entry {
final List<HistoryEvents.HistoryEvent> events;
final long bytes;
long lastAccess;

Entry(List<HistoryEvents.HistoryEvent> events, long bytes, long lastAccess) {
this.events = events;
this.bytes = bytes;
this.lastAccess = lastAccess;
}
}

private final Object lock = new Object();
private final Map<String, Entry> entries = new HashMap<>();
private final long ttlNanos;
private final int maxInstances;
private final long maxBytes;
private final LongSupplier clockNanos;
private long totalBytes;

/**
* Constructs a cache with the default monotonic clock. Non-positive ttl/maxInstances use the
* package defaults; a non-positive maxBytes means unlimited (bounded by ttl and maxInstances).
*
* @param ttl sliding time-to-live for an idle instance's entry
* @param maxInstances instance-count cap
* @param maxBytes byte budget, or {@code <= 0} for unlimited
*/
public WorkflowHistoryCache(Duration ttl, int maxInstances, long maxBytes) {
this(ttl, maxInstances, maxBytes, System::nanoTime);
}

/**
* Constructs a cache with an injectable clock, for deterministic tests.
*
* @param clockNanos supplier of a monotonic nanosecond timestamp (e.g. {@code System::nanoTime})
*/
WorkflowHistoryCache(Duration ttl, int maxInstances, long maxBytes, LongSupplier clockNanos) {
Duration effectiveTtl = ttl != null && !ttl.isZero() && !ttl.isNegative() ? ttl : DEFAULT_TTL;
this.ttlNanos = effectiveTtl.toNanos();
this.maxInstances = maxInstances > 0 ? maxInstances : DEFAULT_MAX_INSTANCES;
this.maxBytes = maxBytes > 0 ? maxBytes : 0;
this.clockNanos = clockNanos;
}

/**
* Returns the cached committed history for an instance, refreshing its TTL, or {@code null} on a
* miss.
*
* @param instanceId the workflow instance ID
* @return the cached committed history, or {@code null} if the instance is not cached
*/
public List<HistoryEvents.HistoryEvent> get(String instanceId) {
synchronized (this.lock) {
Entry entry = this.entries.get(instanceId);
if (entry == null) {
return null;
}
entry.lastAccess = this.clockNanos.getAsLong();
return entry.events;
}
}

/**
* Caches an instance's committed history, evicting least-recently-used entries to stay within
* the configured bounds.
*
* @param instanceId the workflow instance ID
* @param events the committed history to cache for the instance
*/
public void put(String instanceId, List<HistoryEvents.HistoryEvent> events) {
List<HistoryEvents.HistoryEvent> snapshot = new ArrayList<>(events);
long bytes = 0;
for (HistoryEvents.HistoryEvent event : snapshot) {
bytes += event.getSerializedSize();
}

synchronized (this.lock) {
Entry existing = this.entries.get(instanceId);
if (existing != null) {
this.totalBytes -= existing.bytes;
}
this.entries.put(instanceId, new Entry(snapshot, bytes, this.clockNanos.getAsLong()));
this.totalBytes += bytes;
this.evictToFit(instanceId);
}
}

/**
* Drops an instance's cached history (e.g. once it completes).
*
* @param instanceId the workflow instance ID
*/
public void remove(String instanceId) {
synchronized (this.lock) {
this.removeLocked(instanceId);
}
}

/** Clears the cache; used when the stream reconnects (and starts cold). */
public void reset() {
synchronized (this.lock) {
this.entries.clear();
this.totalBytes = 0;
}
}

/** Evicts entries whose last turn was longer ago than the TTL. */
public void sweepExpired() {
long now = this.clockNanos.getAsLong();
synchronized (this.lock) {
List<String> expired = new ArrayList<>();
for (Map.Entry<String, Entry> entry : this.entries.entrySet()) {
if (now - entry.getValue().lastAccess > this.ttlNanos) {
expired.add(entry.getKey());
}
}
for (String instanceId : expired) {
this.removeLocked(instanceId);
}
}
}

int size() {
synchronized (this.lock) {
return this.entries.size();
}
}

long totalBytes() {
synchronized (this.lock) {
return this.totalBytes;
}
}

private void removeLocked(String instanceId) {
Entry removed = this.entries.remove(instanceId);
if (removed != null) {
this.totalBytes -= removed.bytes;
}
}

/**
* Evicts least-recently-used entries until within the count and byte bounds, always keeping the
* just-touched entry so the active working set is never evicted. A lone entry over the byte
* budget is kept (a soft overage) rather than thrashing.
*/
private void evictToFit(String keep) {
while (this.entries.size() > 1) {
boolean overCount = this.entries.size() > this.maxInstances;
boolean overBytes = this.maxBytes > 0 && this.totalBytes > this.maxBytes;
if (!overCount && !overBytes) {
return;
}
String victim = this.leastRecentlyUsedExcept(keep);
if (victim == null) {
return;
}
this.removeLocked(victim);
}
}

private String leastRecentlyUsedExcept(String keep) {
String oldest = null;
long oldestAccess = Long.MAX_VALUE;
for (Map.Entry<String, Entry> entry : this.entries.entrySet()) {
if (entry.getKey().equals(keep)) {
continue;
}
if (oldest == null || entry.getValue().lastAccess < oldestAccess) {
oldest = entry.getKey();
oldestAccess = entry.getValue().lastAccess;
}
}
return oldest;
}
}
Loading
Loading