diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java index 8fae93cf93..6a8eff1c95 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java @@ -36,6 +36,7 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; @@ -63,6 +64,13 @@ public final class DurableTaskGrpcWorker implements AutoCloseable { private final TaskHubSidecarServiceGrpc.TaskHubSidecarServiceBlockingStub sidecarClient; private final boolean isExecutorServiceManaged; + + // Per-stream cache of each instance's committed history for the stateful-history optimization, + // or null when disabled. Reset on every reconnect and swept for idle entries by the janitor. + private final WorkflowHistoryCache historyCache; + private static final Duration HISTORY_SWEEP_INTERVAL = Duration.ofMinutes(1); + private volatile ScheduledExecutorService historyJanitor; + private volatile boolean isNormalShutdown = false; private volatile Thread workerThread; @@ -104,7 +112,8 @@ public final class DurableTaskGrpcWorker implements AutoCloseable { this.isExecutorServiceManaged = builder.executorService == null; - + this.historyCache = builder.disableStatefulHistory ? null : new WorkflowHistoryCache( + builder.historyCacheTtl, builder.historyCacheMaxInstances, builder.historyCacheMaxBytes); } /** @@ -137,6 +146,7 @@ public void close() { this.workerThread.interrupt(); } this.isNormalShutdown = true; + this.shutDownHistoryJanitor(); this.shutDownWorkerPool(); this.closeSideCarChannel(); } @@ -172,10 +182,24 @@ public void startAndBlock() { this.dataConverter, logger); + this.startHistoryJanitor(); + while (!this.isNormalShutdown && !Thread.currentThread().isInterrupted()) { try { - OrchestratorService.GetWorkItemsRequest getWorkItemsRequest = OrchestratorService.GetWorkItemsRequest - .newBuilder().build(); + // Each iteration establishes a fresh work-item stream. Start it cold: the sidecar drops + // the previous stream's warm set, so any histories cached from it are no longer in sync. + if (this.historyCache != null) { + this.historyCache.reset(); + } + + // Advertise the stateful-history capability so the sidecar can send deltas instead of the + // full history on each turn. Absent it, the sidecar always sends the full history. + OrchestratorService.GetWorkItemsRequest.Builder requestBuilder = OrchestratorService.GetWorkItemsRequest + .newBuilder(); + if (this.historyCache != null) { + requestBuilder.addCapabilities(OrchestratorService.WorkerCapability.WORKER_CAPABILITY_STATEFUL_HISTORY); + } + OrchestratorService.GetWorkItemsRequest getWorkItemsRequest = requestBuilder.build(); Iterator workItemStream = this.sidecarClient.getWorkItems(getWorkItemsRequest); while (workItemStream.hasNext()) { if (this.isNormalShutdown || Thread.currentThread().isInterrupted()) { @@ -190,7 +214,8 @@ public void startAndBlock() { String.format("Processing orchestrator request for instance: {0}", orchestratorRequest.getInstanceId())); - this.workerPool.submit(new OrchestratorRunner(workItem, taskOrchestrationExecutor, sidecarClient, tracer)); + this.workerPool.submit( + new OrchestratorRunner(workItem, taskOrchestrationExecutor, sidecarClient, tracer, historyCache)); } else if (requestType == OrchestratorService.WorkItem.RequestCase.ACTIVITYREQUEST) { OrchestratorService.ActivityRequest activityRequest = workItem.getActivityRequest(); @@ -264,6 +289,28 @@ private void closeSideCarChannel() { } } + private void startHistoryJanitor() { + if (this.historyCache == null || this.historyJanitor != null) { + return; + } + ScheduledExecutorService janitor = Executors.newSingleThreadScheduledExecutor(runnable -> { + Thread thread = new Thread(runnable, "dapr-workflow-history-janitor"); + thread.setDaemon(true); + return thread; + }); + long sweepSeconds = HISTORY_SWEEP_INTERVAL.getSeconds(); + janitor.scheduleWithFixedDelay(this.historyCache::sweepExpired, sweepSeconds, sweepSeconds, TimeUnit.SECONDS); + this.historyJanitor = janitor; + } + + private void shutDownHistoryJanitor() { + ScheduledExecutorService janitor = this.historyJanitor; + if (janitor != null) { + janitor.shutdownNow(); + this.historyJanitor = null; + } + } + private void shutDownWorkerPool() { if (this.isExecutorServiceManaged) { if (!this.isNormalShutdown) { diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorkerBuilder.java b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorkerBuilder.java index ad60577256..b7135643eb 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorkerBuilder.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorkerBuilder.java @@ -34,6 +34,10 @@ public final class DurableTaskGrpcWorkerBuilder { Duration maximumTimerInterval; ExecutorService executorService; String appId; // App ID for cross-app routing + boolean disableStatefulHistory; + Duration historyCacheTtl; + int historyCacheMaxInstances; + long historyCacheMaxBytes; /** * Adds an orchestration factory to be used by the constructed {@link DurableTaskGrpcWorker}. @@ -145,6 +149,62 @@ public DurableTaskGrpcWorkerBuilder appId(String appId) { return this; } + /** + * Disables the stateful-history optimization. + * + *

By default the worker advertises {@code WORKER_CAPABILITY_STATEFUL_HISTORY} and caches each + * instance's committed history per work-item stream, so the sidecar can send only the new events + * (the delta) each turn instead of the full history. A cache miss is always recovered safely via + * the GetInstanceHistory RPC, so disabling this only affects per-turn bandwidth, never correctness. + * When disabled, the worker always receives the full history.

+ * + * @param disableStatefulHistory whether to disable the stateful-history optimization + * @return this builder object + */ + public DurableTaskGrpcWorkerBuilder disableStatefulHistory(boolean disableStatefulHistory) { + this.disableStatefulHistory = disableStatefulHistory; + return this; + } + + /** + * Sets the sliding time-to-live for cached instance histories. An instance's entry is reclaimed + * once it has gone idle (no turn) for longer than this. If not specified, a default of one hour is + * used. Ignored when the stateful-history optimization is disabled. + * + * @param historyCacheTtl the sliding time-to-live for a cached instance history + * @return this builder object + */ + public DurableTaskGrpcWorkerBuilder historyCacheTtl(Duration historyCacheTtl) { + this.historyCacheTtl = historyCacheTtl; + return this; + } + + /** + * Sets the maximum number of per-instance histories retained on a single work-item stream; + * least-recently-used entries are evicted beyond it. A non-positive value uses the built-in + * default. Ignored when the stateful-history optimization is disabled. + * + * @param historyCacheMaxInstances the instance-count cap for the history cache + * @return this builder object + */ + public DurableTaskGrpcWorkerBuilder historyCacheMaxInstances(int historyCacheMaxInstances) { + this.historyCacheMaxInstances = historyCacheMaxInstances; + return this; + } + + /** + * Sets the byte budget for cached histories on a single work-item stream; least-recently-used + * entries are evicted beyond it. A non-positive value means unlimited (bounded only by the + * instance-count cap and the TTL). Ignored when the stateful-history optimization is disabled. + * + * @param historyCacheMaxBytes the byte budget for the history cache + * @return this builder object + */ + public DurableTaskGrpcWorkerBuilder historyCacheMaxBytes(long historyCacheMaxBytes) { + this.historyCacheMaxBytes = historyCacheMaxBytes; + return this; + } + /** * Initializes a new {@link DurableTaskGrpcWorker} object with the settings specified in the current builder object. * diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/WorkflowHistoryCache.java b/durabletask-client/src/main/java/io/dapr/durabletask/WorkflowHistoryCache.java new file mode 100644 index 0000000000..0df02e5ec6 --- /dev/null +++ b/durabletask-client/src/main/java/io/dapr/durabletask/WorkflowHistoryCache.java @@ -0,0 +1,215 @@ +/* + * Copyright 2026 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.durabletask; + +import io.dapr.durabletask.implementation.protobuf.HistoryEvents; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.LongSupplier; + +/** + * Per-stream cache of each workflow instance's committed history, enabling "stateful history" + * delta work items: a worker that advertises {@code WORKER_CAPABILITY_STATEFUL_HISTORY} retains + * the history it has already replayed so the sidecar can send only the new events (the delta). + * Entries are reclaimed by a sliding TTL, an instance-count cap, and an optional byte budget + * (LRU eviction). Eviction is always safe: a miss is recovered via the GetInstanceHistory RPC. + * + *

Thread-safe; work items are processed concurrently on the worker pool.

+ */ +public final class WorkflowHistoryCache { + + static final Duration DEFAULT_TTL = Duration.ofHours(1); + static final int DEFAULT_MAX_INSTANCES = 100_000; + + private static final class Entry { + final List events; + final long bytes; + long lastAccess; + + Entry(List events, long bytes, long lastAccess) { + this.events = events; + this.bytes = bytes; + this.lastAccess = lastAccess; + } + } + + private final Object lock = new Object(); + private final Map entries = new HashMap<>(); + private final long ttlNanos; + private final int maxInstances; + private final long maxBytes; + private final LongSupplier clockNanos; + private long totalBytes; + + /** + * Constructs a cache with the default monotonic clock. Non-positive ttl/maxInstances use the + * package defaults; a non-positive maxBytes means unlimited (bounded by ttl and maxInstances). + * + * @param ttl sliding time-to-live for an idle instance's entry + * @param maxInstances instance-count cap + * @param maxBytes byte budget, or {@code <= 0} for unlimited + */ + public WorkflowHistoryCache(Duration ttl, int maxInstances, long maxBytes) { + this(ttl, maxInstances, maxBytes, System::nanoTime); + } + + /** + * Constructs a cache with an injectable clock, for deterministic tests. + * + * @param clockNanos supplier of a monotonic nanosecond timestamp (e.g. {@code System::nanoTime}) + */ + WorkflowHistoryCache(Duration ttl, int maxInstances, long maxBytes, LongSupplier clockNanos) { + Duration effectiveTtl = ttl != null && !ttl.isZero() && !ttl.isNegative() ? ttl : DEFAULT_TTL; + this.ttlNanos = effectiveTtl.toNanos(); + this.maxInstances = maxInstances > 0 ? maxInstances : DEFAULT_MAX_INSTANCES; + this.maxBytes = maxBytes > 0 ? maxBytes : 0; + this.clockNanos = clockNanos; + } + + /** + * Returns the cached committed history for an instance, refreshing its TTL, or {@code null} on a + * miss. + * + * @param instanceId the workflow instance ID + * @return the cached committed history, or {@code null} if the instance is not cached + */ + public List get(String instanceId) { + synchronized (this.lock) { + Entry entry = this.entries.get(instanceId); + if (entry == null) { + return null; + } + entry.lastAccess = this.clockNanos.getAsLong(); + return entry.events; + } + } + + /** + * Caches an instance's committed history, evicting least-recently-used entries to stay within + * the configured bounds. + * + * @param instanceId the workflow instance ID + * @param events the committed history to cache for the instance + */ + public void put(String instanceId, List events) { + List snapshot = new ArrayList<>(events); + long bytes = 0; + for (HistoryEvents.HistoryEvent event : snapshot) { + bytes += event.getSerializedSize(); + } + + synchronized (this.lock) { + Entry existing = this.entries.get(instanceId); + if (existing != null) { + this.totalBytes -= existing.bytes; + } + this.entries.put(instanceId, new Entry(snapshot, bytes, this.clockNanos.getAsLong())); + this.totalBytes += bytes; + this.evictToFit(instanceId); + } + } + + /** + * Drops an instance's cached history (e.g. once it completes). + * + * @param instanceId the workflow instance ID + */ + public void remove(String instanceId) { + synchronized (this.lock) { + this.removeLocked(instanceId); + } + } + + /** Clears the cache; used when the stream reconnects (and starts cold). */ + public void reset() { + synchronized (this.lock) { + this.entries.clear(); + this.totalBytes = 0; + } + } + + /** Evicts entries whose last turn was longer ago than the TTL. */ + public void sweepExpired() { + long now = this.clockNanos.getAsLong(); + synchronized (this.lock) { + List expired = new ArrayList<>(); + for (Map.Entry entry : this.entries.entrySet()) { + if (now - entry.getValue().lastAccess > this.ttlNanos) { + expired.add(entry.getKey()); + } + } + for (String instanceId : expired) { + this.removeLocked(instanceId); + } + } + } + + int size() { + synchronized (this.lock) { + return this.entries.size(); + } + } + + long totalBytes() { + synchronized (this.lock) { + return this.totalBytes; + } + } + + private void removeLocked(String instanceId) { + Entry removed = this.entries.remove(instanceId); + if (removed != null) { + this.totalBytes -= removed.bytes; + } + } + + /** + * Evicts least-recently-used entries until within the count and byte bounds, always keeping the + * just-touched entry so the active working set is never evicted. A lone entry over the byte + * budget is kept (a soft overage) rather than thrashing. + */ + private void evictToFit(String keep) { + while (this.entries.size() > 1) { + boolean overCount = this.entries.size() > this.maxInstances; + boolean overBytes = this.maxBytes > 0 && this.totalBytes > this.maxBytes; + if (!overCount && !overBytes) { + return; + } + String victim = this.leastRecentlyUsedExcept(keep); + if (victim == null) { + return; + } + this.removeLocked(victim); + } + } + + private String leastRecentlyUsedExcept(String keep) { + String oldest = null; + long oldestAccess = Long.MAX_VALUE; + for (Map.Entry entry : this.entries.entrySet()) { + if (entry.getKey().equals(keep)) { + continue; + } + if (oldest == null || entry.getValue().lastAccess < oldestAccess) { + oldest = entry.getKey(); + oldestAccess = entry.getValue().lastAccess; + } + } + return oldest; + } +} diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/runner/OrchestratorRunner.java b/durabletask-client/src/main/java/io/dapr/durabletask/runner/OrchestratorRunner.java index e46cbe978b..4caa38527e 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/runner/OrchestratorRunner.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/runner/OrchestratorRunner.java @@ -16,7 +16,10 @@ import com.google.protobuf.StringValue; import io.dapr.durabletask.TaskOrchestrationExecutor; import io.dapr.durabletask.TaskOrchestratorResult; +import io.dapr.durabletask.WorkflowHistoryCache; +import io.dapr.durabletask.implementation.protobuf.HistoryEvents; import io.dapr.durabletask.implementation.protobuf.Orchestration; +import io.dapr.durabletask.implementation.protobuf.OrchestratorActions; import io.dapr.durabletask.implementation.protobuf.OrchestratorService; import io.dapr.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc; import io.grpc.StatusRuntimeException; @@ -25,6 +28,8 @@ import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; @@ -33,6 +38,8 @@ public class OrchestratorRunner extends DurableRunner { private final OrchestratorService.WorkflowRequest orchestratorRequest; private final TaskOrchestrationExecutor taskOrchestrationExecutor; + @Nullable + private final WorkflowHistoryCache historyCache; /** * Constructs a new instance of the OrchestratorRunner class. @@ -41,26 +48,45 @@ public class OrchestratorRunner extends DurableRunner { * @param taskOrchestrationExecutor The executor responsible for running task orchestration logic. * @param sidecarClient The gRPC stub for communication with the Task Hub sidecar service. * @param tracer An optional tracer used for distributed tracing, can be null. + * @param historyCache The per-stream committed-history cache for stateful-history + * delta work items, or null when the optimization is disabled. */ public OrchestratorRunner( OrchestratorService.WorkItem workItem, TaskOrchestrationExecutor taskOrchestrationExecutor, TaskHubSidecarServiceGrpc.TaskHubSidecarServiceBlockingStub sidecarClient, - @Nullable Tracer tracer) { + @Nullable Tracer tracer, + @Nullable WorkflowHistoryCache historyCache) { super(workItem, sidecarClient, tracer); this.orchestratorRequest = workItem.getWorkflowRequest(); this.taskOrchestrationExecutor = taskOrchestrationExecutor; + this.historyCache = historyCache; } @Override public void run() { + String instanceId = orchestratorRequest.getInstanceId(); + + List pastEvents; + try { + pastEvents = resolvePastEvents(instanceId); + } catch (StatusRuntimeException e) { + // The cache-miss fallback fetch failed and there is no per-item NACK. Abandon this + // work item so the backend redelivers it (as a full-history send on a future stream) + // rather than completing the turn with an incomplete history. + logException(e); + return; + } + TaskOrchestratorResult taskOrchestratorResult = taskOrchestrationExecutor.execute( - orchestratorRequest.getPastEventsList(), + pastEvents, orchestratorRequest.getNewEventsList(), orchestratorRequest.hasPropagatedHistory() ? orchestratorRequest.getPropagatedHistory() : null); + updateHistoryCache(instanceId, pastEvents, taskOrchestratorResult); + var versionBuilder = Orchestration.WorkflowVersion.newBuilder(); if (StringUtils.isNotEmpty(taskOrchestratorResult.getVersion())) { @@ -88,4 +114,55 @@ public void run() { this.logException(e); } } + + /** + * Reconstructs the full committed history to replay. For a full send it is simply the request's + * pastEvents; for a delta send (cachedHistory) it is the cached prefix plus the delta, falling + * back to a GetInstanceHistory fetch on any cache miss. + */ + List resolvePastEvents(String instanceId) { + if (this.historyCache == null || !orchestratorRequest.hasCachedHistory()) { + return orchestratorRequest.getPastEventsList(); + } + + List cached = this.historyCache.get(instanceId); + int expected = orchestratorRequest.getCachedHistory().getEventCount(); + if (cached != null && cached.size() == expected) { + List full = + new ArrayList<>(cached.size() + orchestratorRequest.getPastEventsCount()); + full.addAll(cached); + full.addAll(orchestratorRequest.getPastEventsList()); + return full; + } + + // Cache miss: recover the full committed history from the sidecar. NewEvents is applied on + // top of this by the executor, so only the committed past is needed here. + OrchestratorService.GetInstanceHistoryResponse historyResponse = this.sidecarClient.getInstanceHistory( + OrchestratorService.GetInstanceHistoryRequest.newBuilder().setInstanceId(instanceId).build()); + return historyResponse.getEventsList(); + } + + /** + * Refreshes the per-stream cache after a turn so the next turn can be served as a delta. Caches + * only the committed history just replayed (never the not-yet-committed NewEvents), and drops the + * entry once the instance ends. A CompleteWorkflow action covers completed/failed/terminated/ + * continued-as-new; a TerminateWorkflow action targets a different instance and is deliberately + * not treated as a reset. + */ + void updateHistoryCache( + String instanceId, + List pastEvents, + TaskOrchestratorResult result) { + if (this.historyCache == null) { + return; + } + + boolean ended = result.getActions().stream() + .anyMatch(OrchestratorActions.WorkflowAction::hasCompleteWorkflow); + if (ended) { + this.historyCache.remove(instanceId); + } else { + this.historyCache.put(instanceId, pastEvents); + } + } } diff --git a/durabletask-client/src/test/java/io/dapr/durabletask/DurableTaskGrpcWorkerStatefulHistoryTest.java b/durabletask-client/src/test/java/io/dapr/durabletask/DurableTaskGrpcWorkerStatefulHistoryTest.java new file mode 100644 index 0000000000..c1e1d3fe6c --- /dev/null +++ b/durabletask-client/src/test/java/io/dapr/durabletask/DurableTaskGrpcWorkerStatefulHistoryTest.java @@ -0,0 +1,151 @@ +/* + * Copyright 2026 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dapr.durabletask; + +import io.dapr.durabletask.implementation.protobuf.OrchestratorService; +import io.dapr.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc; +import io.grpc.ManagedChannel; +import io.grpc.Server; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.stub.StreamObserver; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Worker-level tests for the stateful-history protocol driven through the real + * {@link DurableTaskGrpcWorker} against an in-process fake sidecar: the capability is advertised by + * default and suppressed when disabled, and a delta work item against a cold cache falls back to + * the GetInstanceHistory RPC. + */ +class DurableTaskGrpcWorkerStatefulHistoryTest { + + private DurableTaskGrpcWorker worker; + private Server server; + private ManagedChannel channel; + + @AfterEach + void tearDown() throws Exception { + if (worker != null) { + worker.close(); + } + if (channel != null) { + channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); + } + if (server != null) { + server.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); + } + } + + private void startWorker(TaskHubSidecarServiceGrpc.TaskHubSidecarServiceImplBase service, boolean disableStatefulHistory) + throws Exception { + String serverName = InProcessServerBuilder.generateName(); + server = InProcessServerBuilder.forName(serverName).directExecutor().addService(service).build().start(); + channel = InProcessChannelBuilder.forName(serverName).directExecutor().build(); + worker = new DurableTaskGrpcWorkerBuilder() + .grpcChannel(channel) + .disableStatefulHistory(disableStatefulHistory) + .build(); + worker.start(); + } + + @Test + void advertisesStatefulHistoryCapabilityByDefault() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + AtomicReference> captured = new AtomicReference<>(); + + startWorker(new TaskHubSidecarServiceGrpc.TaskHubSidecarServiceImplBase() { + @Override + public void getWorkItems(OrchestratorService.GetWorkItemsRequest request, + StreamObserver responseObserver) { + captured.compareAndSet(null, request.getCapabilitiesList()); + latch.countDown(); + // Keep the stream open so the worker does not reconnect in a tight loop. + } + }, false); + + assertTrue(latch.await(10, TimeUnit.SECONDS), "worker should have called getWorkItems"); + assertNotNull(captured.get()); + assertTrue(captured.get().contains(OrchestratorService.WorkerCapability.WORKER_CAPABILITY_STATEFUL_HISTORY), + "the worker must advertise WORKER_CAPABILITY_STATEFUL_HISTORY by default"); + } + + @Test + void doesNotAdvertiseCapabilityWhenDisabled() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + AtomicReference> captured = new AtomicReference<>(); + + startWorker(new TaskHubSidecarServiceGrpc.TaskHubSidecarServiceImplBase() { + @Override + public void getWorkItems(OrchestratorService.GetWorkItemsRequest request, + StreamObserver responseObserver) { + captured.compareAndSet(null, request.getCapabilitiesList()); + latch.countDown(); + } + }, true); + + assertTrue(latch.await(10, TimeUnit.SECONDS), "worker should have called getWorkItems"); + assertNotNull(captured.get()); + assertFalse(captured.get().contains(OrchestratorService.WorkerCapability.WORKER_CAPABILITY_STATEFUL_HISTORY), + "a disabled worker must not advertise the capability"); + assertTrue(captured.get().isEmpty()); + } + + @Test + void deltaWorkItemWithColdCacheFallsBackToGetInstanceHistory() throws Exception { + CountDownLatch fetchLatch = new CountDownLatch(1); + AtomicInteger getHistoryCalls = new AtomicInteger(0); + + startWorker(new TaskHubSidecarServiceGrpc.TaskHubSidecarServiceImplBase() { + @Override + public void getWorkItems(OrchestratorService.GetWorkItemsRequest request, + StreamObserver responseObserver) { + // A delta work item (cachedHistory set) for an instance the freshly connected worker holds + // nothing for: its cache is cold, so it must fetch the full history. + OrchestratorService.WorkflowRequest workflowRequest = OrchestratorService.WorkflowRequest.newBuilder() + .setInstanceId("inst-miss") + .setCachedHistory(OrchestratorService.CachedHistory.newBuilder().setEventCount(5)) + .build(); + responseObserver.onNext(OrchestratorService.WorkItem.newBuilder() + .setWorkflowRequest(workflowRequest) + .build()); + // Keep the stream open. + } + + @Override + public void getInstanceHistory(OrchestratorService.GetInstanceHistoryRequest request, + StreamObserver responseObserver) { + getHistoryCalls.incrementAndGet(); + fetchLatch.countDown(); + responseObserver.onNext(OrchestratorService.GetInstanceHistoryResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } + }, false); + + assertTrue(fetchLatch.await(10, TimeUnit.SECONDS), + "a delta work item against a cold cache must trigger a GetInstanceHistory fetch"); + assertEquals(1, getHistoryCalls.get()); + } +} diff --git a/durabletask-client/src/test/java/io/dapr/durabletask/WorkflowHistoryCacheTest.java b/durabletask-client/src/test/java/io/dapr/durabletask/WorkflowHistoryCacheTest.java new file mode 100644 index 0000000000..197f320acf --- /dev/null +++ b/durabletask-client/src/test/java/io/dapr/durabletask/WorkflowHistoryCacheTest.java @@ -0,0 +1,156 @@ +/* + * Copyright 2026 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dapr.durabletask; + +import io.dapr.durabletask.implementation.protobuf.HistoryEvents; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Unit tests for the worker's stateful-history cache bounds. These mirror the Go reference + * (durabletask-go client/worker_history_test.go) and the Python/.NET SDKs: a sliding TTL, an + * instance-count cap, and a byte budget, all with least-recently-used eviction. + */ +class WorkflowHistoryCacheTest { + + /** Events with non-zero serialized size (eventId 0 is the proto default, which is 0 bytes). */ + private static List events(int count) { + List list = new ArrayList<>(count); + for (int i = 0; i < count; i++) { + list.add(HistoryEvents.HistoryEvent.newBuilder().setEventId(i + 1).build()); + } + return list; + } + + private static long bytesOf(int count) { + long total = 0; + for (HistoryEvents.HistoryEvent event : events(count)) { + total += event.getSerializedSize(); + } + return total; + } + + @Test + void getPutRemoveReset() { + WorkflowHistoryCache cache = new WorkflowHistoryCache(null, 0, 0); + + assertNull(cache.get("a")); + + cache.put("a", events(3)); + assertNotNull(cache.get("a")); + assertEquals(3, cache.get("a").size()); + + cache.remove("a"); + assertNull(cache.get("a")); + + cache.put("b", events(1)); + cache.reset(); + assertNull(cache.get("b")); + } + + @Test + void countCapEvictsLeastRecentlyUsed() { + AtomicLong clock = new AtomicLong(0); + WorkflowHistoryCache cache = new WorkflowHistoryCache(null, 2, 0, clock::get); + + cache.put("a", events(1)); + clock.incrementAndGet(); + cache.put("b", events(1)); + clock.incrementAndGet(); + cache.put("c", events(1)); // over the cap, evicts the LRU entry ("a") + + assertNull(cache.get("a")); + assertNotNull(cache.get("b")); + assertNotNull(cache.get("c")); + } + + @Test + void byteCapEvictsLeastRecentlyUsed() { + long entryBytes = bytesOf(4); + assertTrue(entryBytes > 0); + AtomicLong clock = new AtomicLong(0); + WorkflowHistoryCache cache = new WorkflowHistoryCache(null, 0, entryBytes + 1, clock::get); + + cache.put("a", events(4)); + clock.incrementAndGet(); + cache.put("b", events(4)); // two entries exceed the byte budget, evicts the LRU entry ("a") + + assertNull(cache.get("a")); + assertNotNull(cache.get("b")); + assertTrue(cache.totalBytes() <= entryBytes + 1); + } + + @Test + void singleOversizedEntryIsKept() { + WorkflowHistoryCache cache = new WorkflowHistoryCache(null, 0, 1); + cache.put("big", events(5)); + assertNotNull(cache.get("big")); + } + + @Test + void byteAccountingTracksReplaceAndRemove() { + WorkflowHistoryCache cache = new WorkflowHistoryCache(null, 0, 0); + + cache.put("a", events(3)); + cache.put("b", events(2)); + assertEquals(bytesOf(3) + bytesOf(2), cache.totalBytes()); + + cache.put("a", events(6)); // replace adjusts the running total to the new size + assertEquals(bytesOf(6) + bytesOf(2), cache.totalBytes()); + + cache.remove("a"); + assertEquals(bytesOf(2), cache.totalBytes()); + + cache.reset(); + assertEquals(0, cache.totalBytes()); + } + + @Test + void ttlSweepIsSliding() { + AtomicLong clock = new AtomicLong(0); + WorkflowHistoryCache cache = new WorkflowHistoryCache(Duration.ofSeconds(60), 0, 0, clock::get); + + cache.put("idle", events(2)); + cache.put("active", events(2)); + + clock.set(Duration.ofSeconds(120).toNanos()); // past the TTL... + assertNotNull(cache.get("active")); // ...but a turn refreshes "active" + + cache.sweepExpired(); + assertNull(cache.get("idle")); + assertNotNull(cache.get("active")); + } + + @Test + void nonPositiveConfigUsesDefaults() { + // ttl/maxInstances fall back to their (large) defaults; maxBytes becomes unlimited. None of + // these should evict the three modest entries below. + WorkflowHistoryCache cache = new WorkflowHistoryCache(Duration.ZERO, -1, -5); + + cache.put("a", events(1)); + cache.put("b", events(1)); + cache.put("c", events(1)); + + assertEquals(3, cache.size()); + } +} diff --git a/durabletask-client/src/test/java/io/dapr/durabletask/runner/OrchestratorRunnerHistoryTest.java b/durabletask-client/src/test/java/io/dapr/durabletask/runner/OrchestratorRunnerHistoryTest.java new file mode 100644 index 0000000000..6407ac71ee --- /dev/null +++ b/durabletask-client/src/test/java/io/dapr/durabletask/runner/OrchestratorRunnerHistoryTest.java @@ -0,0 +1,128 @@ +/* + * Copyright 2026 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dapr.durabletask.runner; + +import io.dapr.durabletask.TaskOrchestratorResult; +import io.dapr.durabletask.WorkflowHistoryCache; +import io.dapr.durabletask.implementation.protobuf.HistoryEvents; +import io.dapr.durabletask.implementation.protobuf.OrchestratorActions; +import io.dapr.durabletask.implementation.protobuf.OrchestratorService; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +/** + * Deterministic tests for the worker's history resolution and cache update, driving + * {@link OrchestratorRunner}'s package-visible helpers directly. The full-send and cache-hit paths + * make no RPC, so a null sidecar client suffices; the cache-miss fallback (which does call + * GetInstanceHistory) is covered by the worker-level in-process test. + */ +class OrchestratorRunnerHistoryTest { + + private static List events(int count) { + List list = new ArrayList<>(count); + for (int i = 0; i < count; i++) { + list.add(HistoryEvents.HistoryEvent.newBuilder().setEventId(i + 1).build()); + } + return list; + } + + private static OrchestratorRunner runner(OrchestratorService.WorkflowRequest request, WorkflowHistoryCache cache) { + OrchestratorService.WorkItem workItem = OrchestratorService.WorkItem.newBuilder() + .setWorkflowRequest(request) + .build(); + return new OrchestratorRunner(workItem, null, null, null, cache); + } + + private static TaskOrchestratorResult resultWith(OrchestratorActions.WorkflowAction action) { + return new TaskOrchestratorResult(List.of(action), "", null, null); + } + + @Test + void fullSendReturnsRequestPastEvents() { + OrchestratorService.WorkflowRequest request = OrchestratorService.WorkflowRequest.newBuilder() + .setInstanceId("a") + .addAllPastEvents(events(4)) + .build(); + OrchestratorRunner runner = runner(request, new WorkflowHistoryCache(null, 0, 0)); + + assertEquals(4, runner.resolvePastEvents("a").size()); + } + + @Test + void cacheHitReconstructsPrefixPlusDelta() { + WorkflowHistoryCache cache = new WorkflowHistoryCache(null, 0, 0); + cache.put("a", events(5)); + OrchestratorService.WorkflowRequest request = OrchestratorService.WorkflowRequest.newBuilder() + .setInstanceId("a") + .addAllPastEvents(events(3)) + .setCachedHistory(OrchestratorService.CachedHistory.newBuilder().setEventCount(5)) + .build(); + OrchestratorRunner runner = runner(request, cache); + + assertEquals(8, runner.resolvePastEvents("a").size()); // 5 cached prefix + 3 delta + } + + @Test + void disabledCacheReturnsRequestPastEvents() { + OrchestratorService.WorkflowRequest request = OrchestratorService.WorkflowRequest.newBuilder() + .setInstanceId("a") + .addAllPastEvents(events(4)) + .setCachedHistory(OrchestratorService.CachedHistory.newBuilder().setEventCount(2)) + .build(); + OrchestratorRunner runner = runner(request, null); // stateful history disabled + + assertEquals(4, runner.resolvePastEvents("a").size()); + } + + @Test + void updateCachePutsWhenRunningThenEvictsOnComplete() { + WorkflowHistoryCache cache = new WorkflowHistoryCache(null, 0, 0); + OrchestratorService.WorkflowRequest request = OrchestratorService.WorkflowRequest.newBuilder() + .setInstanceId("a") + .build(); + OrchestratorRunner runner = runner(request, cache); + + OrchestratorActions.WorkflowAction running = OrchestratorActions.WorkflowAction.newBuilder() + .setScheduleTask(OrchestratorActions.ScheduleTaskAction.newBuilder()) + .build(); + runner.updateHistoryCache("a", events(6), resultWith(running)); + assertNotNull(cache.get("a")); + + OrchestratorActions.WorkflowAction completed = OrchestratorActions.WorkflowAction.newBuilder() + .setCompleteWorkflow(OrchestratorActions.CompleteWorkflowAction.newBuilder()) + .build(); + runner.updateHistoryCache("a", events(6), resultWith(completed)); + assertNull(cache.get("a")); + } + + @Test + void updateCacheSkippedWhenDisabled() { + OrchestratorService.WorkflowRequest request = OrchestratorService.WorkflowRequest.newBuilder() + .setInstanceId("a") + .build(); + OrchestratorRunner runner = runner(request, null); // no cache + + OrchestratorActions.WorkflowAction running = OrchestratorActions.WorkflowAction.newBuilder() + .setScheduleTask(OrchestratorActions.ScheduleTaskAction.newBuilder()) + .build(); + // Must not throw when the cache is disabled (null). + runner.updateHistoryCache("a", events(6), resultWith(running)); + } +}