Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/__tests__/helpers/provider-stub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export function makeProviderStub<T extends object>(stub: T): T {
const proto = ClineProvider.prototype as any
s.delegationTransitionLocks ??= new Map()
s.cancelledDelegationChildIds ??= new Set()
s.log ??= vi.fn()
s.runDelegationTransition = proto.runDelegationTransition.bind(s)
return s
}
1 change: 1 addition & 0 deletions src/__tests__/history-resume-delegation.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ vi.mock("../core/task-persistence", () => ({
readApiMessages: vi.fn().mockResolvedValue([]),
saveApiMessages: vi.fn().mockResolvedValue(undefined),
saveTaskMessages: vi.fn().mockResolvedValue(undefined),
assertValidTransition: vi.fn(),
}))

import { ClineProvider } from "../core/webview/ClineProvider"
Expand Down
3 changes: 2 additions & 1 deletion src/__tests__/nested-delegation-resume.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ vi.mock("vscode", () => {
vi.mock("../core/task-persistence/taskMessages", () => ({
readTaskMessages: vi.fn().mockResolvedValue([]),
}))
vi.mock("../core/task-persistence", () => ({
vi.mock("../core/task-persistence", async (importOriginal) => ({
...(await importOriginal<typeof import("../core/task-persistence")>()),
readApiMessages: vi.fn().mockResolvedValue([]),
saveApiMessages: vi.fn().mockResolvedValue(undefined),
saveTaskMessages: vi.fn().mockResolvedValue(undefined),
Expand Down
138 changes: 131 additions & 7 deletions src/core/task-persistence/TaskHistoryStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,28 @@ import { GlobalFileNames } from "../../shared/globalFileNames"
import { safeWriteJson } from "../../utils/safeWriteJson"
import { getStorageBasePath } from "../../utils/storage"

/** Valid status values for a task's HistoryItem. */
export type HistoryItemStatus = NonNullable<HistoryItem["status"]>

const VALID_TRANSITIONS: Record<HistoryItemStatus, HistoryItemStatus[]> = {
active: ["delegated", "completed"],
delegated: ["active"],
completed: [],
}

/**
* Asserts that a task status transition is valid, throwing if not.
*
* @throws {Error} When the transition is not allowed by the state machine.
*/
export function assertValidTransition(from: HistoryItemStatus | undefined, to: HistoryItemStatus): void {
const fromStatus: HistoryItemStatus = from ?? "active"
const validTargets = VALID_TRANSITIONS[fromStatus]
if (!validTargets.includes(to)) {
throw new Error(`Invalid task status transition: ${fromStatus} → ${to}`)
}
}

/**
* Index file format for fast startup reads.
*/
Expand Down Expand Up @@ -88,10 +110,13 @@ export class TaskHistoryStore {
// 2. Reconcile cache against actual task directories on disk
await this.reconcile()

// 3. Start fs.watch for cross-instance reactivity
// 3. Repair delegation inconsistencies left by a previous crash
await this.reconcileDelegationState()

// 4. Start fs.watch for cross-instance reactivity
this.startWatcher()

// 4. Start periodic reconciliation as a defensive fallback
// 5. Start periodic reconciliation as a defensive fallback
this.startPeriodicReconciliation()
} finally {
// Mark initialization as complete so callers awaiting `initialized` can proceed
Expand Down Expand Up @@ -158,16 +183,29 @@ export class TaskHistoryStore {
* updates the in-memory Map, and schedules a debounced index write.
*/
async upsert(item: HistoryItem): Promise<HistoryItem[]> {
return this.withLock(() => this._upsertUnlocked(item))
return this.withLock(() => this.upsertCore(item))
}

/**
* Upsert body executed without acquiring the lock.
* Must only be called from within a `withLock` callback.
* Core upsert logic — must only be called from within `withLock`.
*
* Enforces state-machine transition rules when `item.status` changes.
* Pass `skipTransitionCheck: true` only for administrative repairs (reconciliation,
* migration) that need to write corrected state outside the normal task lifecycle.
*/
private async _upsertUnlocked(item: HistoryItem): Promise<HistoryItem[]> {
private async upsertCore(
item: HistoryItem,
options: { skipTransitionCheck?: boolean } = {},
): Promise<HistoryItem[]> {
const existing = this.cache.get(item.id)

// Enforce transition validity at the write boundary so that any caller
// (including fire-and-forget saves) cannot silently stomp a terminal status.
// Skip when there is no existing record — first insert has no prior state to transition from.
if (!options.skipTransitionCheck && existing && item.status !== undefined && item.status !== existing.status) {
assertValidTransition(existing.status, item.status)
}

// Merge: preserve existing metadata unless explicitly overwritten
const merged = existing ? { ...existing, ...item } : item

Expand Down Expand Up @@ -295,6 +333,92 @@ export class TaskHistoryStore {
})
}

/**
* Repair delegation inconsistencies left by a crash mid-transition.
*
* Called once from `initialize()` after `reconcile()`. Runs inside `withLock` to
* prevent interleaving with watcher-triggered reconcile() calls. Iterates until
* convergence so that one-level chained delegations visible at startup are resolved.
*
* Must NOT be called from within `withLock` — `withLock` is non-reentrant (promise
* chain); calling `upsert` (which acquires the lock) from inside would deadlock.
* `upsertCore` is called directly here instead, bypassing transition validation via
* `skipTransitionCheck: true` because these writes are administrative repairs, not
* runtime state-machine transitions.
*
* Cases repaired per pass:
* - Parent `delegated` with no `awaitingChildId` → parent → `active` (invalid state)
* - Parent `delegated`, child not found → parent → `active` (orphaned delegation)
* - Parent `delegated`, child `completed` → parent → `active` (interrupted handoff)
*
* A parent awaiting an `active` child is left as-is — the child is resumable.
*/
private async reconcileDelegationState(): Promise<void> {
return this.withLock(async () => {
let repairsInThisPass: number
do {
repairsInThisPass = 0
// Rebuild the lookup map each pass so repairs from the previous pass
// are visible when evaluating chained delegations.
const byId = new Map(Array.from(this.cache.values()).map((i) => [i.id, i]))

for (const [, item] of byId) {
if (item.status !== "delegated") {
continue
}

if (!item.awaitingChildId) {
await this.upsertCore(
{ ...item, status: "active", awaitingChildId: undefined, delegatedToId: undefined },
{ skipTransitionCheck: true },
)
console.warn(
`[TaskHistoryStore] Reconciled invalid delegation: task ${item.id} → active (no awaitingChildId)`,
)
repairsInThisPass++
continue
}

const child = byId.get(item.awaitingChildId)

if (!child) {
await this.upsertCore(
{
...item,
status: "active",
awaitingChildId: undefined,
delegatedToId: undefined,
},
{ skipTransitionCheck: true },
)
console.warn(
`[TaskHistoryStore] Reconciled orphaned delegation: task ${item.id} → active (child ${item.awaitingChildId} not found)`,
)
repairsInThisPass++
} else if (child.status === "completed") {
await this.upsertCore(
{
...item,
status: "active",
awaitingChildId: undefined,
delegatedToId: undefined,
completedByChildId: child.id,
completionResultSummary:
child.completionResultSummary ?? "Task completed (recovered after interruption)",
},
{ skipTransitionCheck: true },
)
console.warn(
`[TaskHistoryStore] Reconciled interrupted handoff: task ${item.id} → active (child ${item.awaitingChildId} already completed)`,
)
repairsInThisPass++
}
// child.status === "active" or "delegated" → leave as-is this pass
}
} while (repairsInThisPass > 0)
})
}

// ────────────────────────────── Cache invalidation ──────────────────────────────

/**
Expand Down Expand Up @@ -561,7 +685,7 @@ export class TaskHistoryStore {
`[TaskHistoryStore] atomicReadAndUpdate: updater changed task id from ${taskId} to ${updated.id}`,
)
}
return this._upsertUnlocked(updated)
return this.upsertCore(updated)
})
}

Expand Down
Loading
Loading