diff --git a/src/core/assistant-message/NativeToolCallParser.ts b/src/core/assistant-message/NativeToolCallParser.ts index 9639ae1baa..a223d95edd 100644 --- a/src/core/assistant-message/NativeToolCallParser.ts +++ b/src/core/assistant-message/NativeToolCallParser.ts @@ -66,8 +66,12 @@ export class NativeToolCallParser { private static rawChunkTracker = new Map< number, { - id: string + id?: string name: string + // Tracks whether a name field has been observed at all, distinct from a + // truthy name. This is a defensive guard: should a provider ever send an + // empty name, the start-gate must not rely on name truthiness. + nameSeen: boolean hasStarted: boolean deltaBuffer: string[] } @@ -105,51 +109,55 @@ export class NativeToolCallParser { const events: ToolCallStreamEvent[] = [] const { index, id, name, arguments: args } = chunk + // Create the tracker on first sight of this index, independent of whether + // an id has arrived yet. Keying the lifecycle by index (not id) ensures any + // `arguments` that stream before the id is known are buffered rather than dropped. let tracked = this.rawChunkTracker.get(index) - - // Initialize new tool call tracking when we receive an id - if (id && !tracked) { + if (!tracked) { tracked = { id, name: name || "", + nameSeen: name !== undefined, hasStarted: false, deltaBuffer: [], } this.rawChunkTracker.set(index, tracked) } - if (!tracked) { - return events + // Record id and name as they arrive (they may come in separate chunks). + if (id) { + tracked.id = id } - - // Update name if present in chunk and not yet set - if (name) { + if (name !== undefined) { tracked.name = name + tracked.nameSeen = true } - // Emit start event when we have the name - if (!tracked.hasStarted && tracked.name) { + // Emit start event only once both id and name are known. Using a local + // non-null id keeps emitted events typed as id: string. + if (!tracked.hasStarted && tracked.id && tracked.nameSeen) { + const startedId = tracked.id events.push({ type: "tool_call_start", - id: tracked.id, + id: startedId, name: tracked.name, }) tracked.hasStarted = true - // Flush buffered deltas + // Flush buffered deltas accumulated during the pre-start window. for (const bufferedDelta of tracked.deltaBuffer) { events.push({ type: "tool_call_delta", - id: tracked.id, + id: startedId, delta: bufferedDelta, }) } tracked.deltaBuffer = [] } - // Emit delta event for argument chunks + // Emit delta event for argument chunks, buffering until start is emitted. if (args) { - if (tracked.hasStarted) { + if (tracked.hasStarted && tracked.id) { events.push({ type: "tool_call_delta", id: tracked.id, @@ -172,11 +180,19 @@ export class NativeToolCallParser { if (finishReason === "tool_calls" && this.rawChunkTracker.size > 0) { for (const [, tracked] of this.rawChunkTracker.entries()) { - events.push({ - type: "tool_call_end", - id: tracked.id, - }) + // Only emit an end for trackers that actually started. A tracker that + // never received an id/name (malformed stream) must not emit a phantom + // end; since start requires an id, hasStarted implies tracked.id is set. + if (tracked.hasStarted && tracked.id) { + events.push({ + type: "tool_call_end", + id: tracked.id, + }) + } } + // Clear so a subsequent finalizeRawChunks() is a safe no-op and cannot + // double-fire end events for the same trackers. + this.rawChunkTracker.clear() } return events @@ -191,7 +207,7 @@ export class NativeToolCallParser { if (this.rawChunkTracker.size > 0) { for (const [, tracked] of this.rawChunkTracker.entries()) { - if (tracked.hasStarted) { + if (tracked.hasStarted && tracked.id) { events.push({ type: "tool_call_end", id: tracked.id, diff --git a/src/core/assistant-message/__tests__/NativeToolCallParser.spec.ts b/src/core/assistant-message/__tests__/NativeToolCallParser.spec.ts index 2c15e12069..47dd616420 100644 --- a/src/core/assistant-message/__tests__/NativeToolCallParser.spec.ts +++ b/src/core/assistant-message/__tests__/NativeToolCallParser.spec.ts @@ -1,4 +1,4 @@ -import { NativeToolCallParser } from "../NativeToolCallParser" +import { NativeToolCallParser, type ToolCallStreamEvent } from "../NativeToolCallParser" describe("NativeToolCallParser", () => { beforeEach(() => { @@ -343,4 +343,253 @@ describe("NativeToolCallParser", () => { }) }) }) + + describe("processRawChunk streaming reassembly", () => { + // Mirror the sequencing Task.ts performs: feed each raw chunk through + // processRawChunk, drive startStreamingToolCall on tool_call_start, feed + // tool_call_delta into processStreamingChunk, and emit ends at stream close + // via finalizeRawChunks() (the same call Task.ts makes after the stream ends). + // Returns the ordered event types/ids plus the finalized tool uses by id. + const drive = (rawChunks: Array<{ index: number; id?: string; name?: string; arguments?: string }>) => { + const events: ToolCallStreamEvent[] = [] + + const handleEvent = (event: ToolCallStreamEvent) => { + events.push(event) + if (event.type === "tool_call_start") { + NativeToolCallParser.startStreamingToolCall(event.id, event.name) + } else if (event.type === "tool_call_delta") { + NativeToolCallParser.processStreamingChunk(event.id, event.delta) + } + } + + for (const chunk of rawChunks) { + for (const event of NativeToolCallParser.processRawChunk(chunk)) { + handleEvent(event) + } + } + + // Task.ts finalizes any tool calls still open at stream end via + // finalizeRawChunks(), which emits the tool_call_end events. + for (const event of NativeToolCallParser.finalizeRawChunks()) { + handleEvent(event) + } + + const finalized = new Map>() + const startIds = events.filter((e) => e.type === "tool_call_start").map((e) => e.id) + for (const id of startIds) { + finalized.set(id, NativeToolCallParser.finalizeStreamingToolCall(id)) + } + + return { events, finalized } + } + + it("preserves leading argument bytes that arrive before the id", () => { + // First chunk carries arguments but NO id; id+name arrive later, then more args. + const fullArgs = JSON.stringify({ path: "src/leading.ts", mode: "slice" }) + const firstHalf = fullArgs.slice(0, 10) + const secondHalf = fullArgs.slice(10) + + const { events, finalized } = drive([ + { index: 0, arguments: firstHalf }, + { index: 0, id: "call_late_id", name: "read_file" }, + { index: 0, arguments: secondHalf }, + ]) + + // Exactly one start, in the right order, with the late id. + const starts = events.filter((e) => e.type === "tool_call_start") + expect(starts).toHaveLength(1) + expect(starts[0].id).toBe("call_late_id") + + // The finalized arguments must contain the complete, uncorrupted payload. + const result = finalized.get("call_late_id") + expect(result).not.toBeNull() + expect(result?.type).toBe("tool_use") + if (result?.type === "tool_use") { + const nativeArgs = result.nativeArgs as { path: string; mode?: string } + expect(nativeArgs.path).toBe("src/leading.ts") + expect(nativeArgs.mode).toBe("slice") + } + }) + + it("handles id and name arriving in separate chunks (issue #218)", () => { + const fullArgs = JSON.stringify({ path: "src/split.ts" }) + + const { events, finalized } = drive([ + { index: 0, id: "call_split" }, + { index: 0, name: "read_file" }, + { index: 0, arguments: fullArgs }, + ]) + + const starts = events.filter((e) => e.type === "tool_call_start") + expect(starts).toHaveLength(1) + expect(starts[0].id).toBe("call_split") + + const result = finalized.get("call_split") + expect(result?.type).toBe("tool_use") + if (result?.type === "tool_use") { + const nativeArgs = result.nativeArgs as { path: string } + expect(nativeArgs.path).toBe("src/split.ts") + } + }) + + it("handles name arriving before id with buffered args in between (reverse ordering)", () => { + const fullArgs = JSON.stringify({ path: "src/reverse.ts" }) + const firstHalf = fullArgs.slice(0, 9) + const secondHalf = fullArgs.slice(9) + + const { events, finalized } = drive([ + { index: 0, name: "read_file" }, + { index: 0, arguments: firstHalf }, + { index: 0, id: "call_reverse" }, + { index: 0, arguments: secondHalf }, + ]) + + // Start must not fire until the id arrives, so exactly one start with the late id. + const starts = events.filter((e) => e.type === "tool_call_start") + expect(starts).toHaveLength(1) + expect(starts[0].id).toBe("call_reverse") + + // The buffered delta must be flushed only after the start event. + const startIndex = events.findIndex((e) => e.type === "tool_call_start") + const firstDeltaIndex = events.findIndex((e) => e.type === "tool_call_delta") + expect(startIndex).toBeLessThan(firstDeltaIndex) + + const result = finalized.get("call_reverse") + expect(result).not.toBeNull() + expect(result?.type).toBe("tool_use") + if (result?.type === "tool_use") { + expect((result.nativeArgs as { path: string }).path).toBe("src/reverse.ts") + } + }) + + it("keeps two parallel tool calls on distinct indices isolated", () => { + const argsA = JSON.stringify({ path: "src/a.ts" }) + const argsB = JSON.stringify({ path: "src/b.ts" }) + + const { events, finalized } = drive([ + { index: 0, arguments: argsA.slice(0, 8) }, + { index: 1, arguments: argsB.slice(0, 8) }, + { index: 0, id: "call_a", name: "read_file" }, + { index: 1, id: "call_b", name: "read_file" }, + { index: 0, arguments: argsA.slice(8) }, + { index: 1, arguments: argsB.slice(8) }, + ]) + + const starts = events.filter((e) => e.type === "tool_call_start") + expect(starts).toHaveLength(2) + + const resultA = finalized.get("call_a") + const resultB = finalized.get("call_b") + expect(resultA).not.toBeNull() + expect(resultB).not.toBeNull() + if (resultA?.type === "tool_use") { + expect((resultA.nativeArgs as { path: string }).path).toBe("src/a.ts") + } + if (resultB?.type === "tool_use") { + expect((resultB.nativeArgs as { path: string }).path).toBe("src/b.ts") + } + }) + + it("emits the same event sequence for the single-chunk-with-id flow (regression guard)", () => { + const fullArgs = JSON.stringify({ path: "src/single.ts" }) + + const { events, finalized } = drive([ + { index: 0, id: "call_single", name: "read_file", arguments: fullArgs }, + ]) + + expect(events.map((e) => e.type)).toEqual(["tool_call_start", "tool_call_delta", "tool_call_end"]) + expect(events.every((e) => e.id === "call_single")).toBe(true) + + const result = finalized.get("call_single") + expect(result).not.toBeNull() + expect(result?.type).toBe("tool_use") + if (result?.type === "tool_use") { + expect((result.nativeArgs as { path: string }).path).toBe("src/single.ts") + } + }) + + it("does not emit a phantom tool_call_end for a tracker that never received an id", () => { + const { events } = drive([{ index: 0, arguments: '{"path":"orphan.ts"}' }]) + + expect(events.filter((e) => e.type === "tool_call_start")).toHaveLength(0) + expect(events.filter((e) => e.type === "tool_call_end")).toHaveLength(0) + }) + + it("finalizeRawChunks() emits end events and guards against missing id", () => { + // Simulate a started tool call: process chunks to populate state + const chunks = [ + { index: 0, id: "call_finalize", name: "read_file" }, + { index: 0, arguments: '{"path":"file.ts"' }, + { index: 0, arguments: ',"mode":"slice"}' }, + ] + + const events: Array<{ type: string; id?: string }> = [] + for (const chunk of chunks) { + for (const event of NativeToolCallParser.processRawChunk(chunk)) { + events.push(event) + if (event.type === "tool_call_start") { + NativeToolCallParser.startStreamingToolCall(event.id, event.name) + } else if (event.type === "tool_call_delta") { + NativeToolCallParser.processStreamingChunk(event.id, event.delta) + } + } + } + + // Now finalize the raw chunks to emit the end event + const finalizeEvents = NativeToolCallParser.finalizeRawChunks() + for (const event of finalizeEvents) { + events.push(event) + } + + // Verify the end event was produced by finalizeRawChunks + const ends = events.filter((e) => e.type === "tool_call_end") + expect(ends).toHaveLength(1) + expect(ends[0].id).toBe("call_finalize") + + // Finalize the tool call to ensure it contains the complete arguments + const result = NativeToolCallParser.finalizeStreamingToolCall("call_finalize") + expect(result?.type).toBe("tool_use") + if (result?.type === "tool_use") { + expect((result.nativeArgs as { path: string }).path).toBe("file.ts") + } + }) + + it("finalizeRawChunks() does not emit end for tracker without id", () => { + // Start a tracker with arguments but no id, then finalize + const chunks = [{ index: 0, arguments: '{"incomplete":true}' }] + + for (const chunk of chunks) { + NativeToolCallParser.processRawChunk(chunk) + } + + // Finalize should not emit an end event if id was never set + const finalizeEvents = NativeToolCallParser.finalizeRawChunks() + const ends = finalizeEvents.filter((e) => e.type === "tool_call_end") + expect(ends).toHaveLength(0) + + NativeToolCallParser.clearRawChunkState() + }) + + it("does not double-fire end events across processFinishReason and finalizeRawChunks", () => { + // Drive a started tool call through the raw chunk path. + const chunks = [ + { index: 0, id: "call_dup", name: "read_file" }, + { index: 0, arguments: '{"path":"file.ts"}' }, + ] + for (const chunk of chunks) { + NativeToolCallParser.processRawChunk(chunk) + } + + // Task.ts emits ends via processFinishReason, then calls finalizeRawChunks + // unconditionally. Both must not emit an end for the same tracker. + const finishEvents = NativeToolCallParser.processFinishReason("tool_calls") + const finalizeEvents = NativeToolCallParser.finalizeRawChunks() + + const allEnds = [...finishEvents, ...finalizeEvents].filter((e) => e.type === "tool_call_end") + expect(allEnds).toHaveLength(1) + expect(allEnds[0].id).toBe("call_dup") + + NativeToolCallParser.clearRawChunkState() + }) + }) }) diff --git a/src/core/task/Task.ts b/src/core/task/Task.ts index 50d4674fd0..447d94acbe 100644 --- a/src/core/task/Task.ts +++ b/src/core/task/Task.ts @@ -389,6 +389,43 @@ export class Task extends EventEmitter implements TaskLike { // Native tool call streaming state (track which index each tool is at) private streamingToolCallIndices: Map = new Map() + /** + * Finalize a streaming native tool call by id and present it. + * + * Shared by every site that observes a tool_call_end: the per-chunk event + * loop, the stream-level tool_call_end case, and the end-of-stream + * finalizeRawChunks() pass. Calling it again for an already-finalized id is a + * safe no-op because finalizeStreamingToolCall() and the index map entry are + * both cleared on first finalize. + */ + private finalizeStreamingToolCallById(id: string): void { + const finalToolUse = NativeToolCallParser.finalizeStreamingToolCall(id) + const toolUseIndex = this.streamingToolCallIndices.get(id) + + if (finalToolUse) { + ;(finalToolUse as any).id = id + if (toolUseIndex !== undefined) { + this.assistantMessageContent[toolUseIndex] = finalToolUse + } + this.streamingToolCallIndices.delete(id) + this.userMessageContentReady = false + presentAssistantMessage(this) + } else if (toolUseIndex !== undefined) { + // finalizeStreamingToolCall returned null (malformed JSON or missing args). + // Mark the tool as non-partial so it is presented as complete; execution + // will be short-circuited in presentAssistantMessage with a structured + // tool_result and validation will surface any missing required params. + const existingToolUse = this.assistantMessageContent[toolUseIndex] + if (existingToolUse && existingToolUse.type === "tool_use") { + existingToolUse.partial = false + ;(existingToolUse as any).id = id + } + this.streamingToolCallIndices.delete(id) + this.userMessageContentReady = false + presentAssistantMessage(this) + } + } + // Cached model info for current streaming session (set at start of each API request) // This prevents excessive getModel() calls during tool execution cachedStreamingModel?: { id: string; info: ModelInfo } @@ -2817,54 +2854,21 @@ export class Task extends EventEmitter implements TaskLike { } } } else if (event.type === "tool_call_end") { - // Finalize the streaming tool call - const finalToolUse = NativeToolCallParser.finalizeStreamingToolCall(event.id) - - // Get the index for this tool call - const toolUseIndex = this.streamingToolCallIndices.get(event.id) - - if (finalToolUse) { - // Store the tool call ID - ;(finalToolUse as any).id = event.id - - // Get the index and replace partial with final - if (toolUseIndex !== undefined) { - this.assistantMessageContent[toolUseIndex] = finalToolUse - } - - // Clean up tracking - this.streamingToolCallIndices.delete(event.id) - - // Mark that we have new content to process - this.userMessageContentReady = false - - // Present the finalized tool call - presentAssistantMessage(this) - } else if (toolUseIndex !== undefined) { - // finalizeStreamingToolCall returned null (malformed JSON or missing args) - // Mark the tool as non-partial so it's presented as complete, but execution - // will be short-circuited in presentAssistantMessage with a structured tool_result. - const existingToolUse = this.assistantMessageContent[toolUseIndex] - if (existingToolUse && existingToolUse.type === "tool_use") { - existingToolUse.partial = false - // Ensure it has the ID for native protocol - ;(existingToolUse as any).id = event.id - } - - // Clean up tracking - this.streamingToolCallIndices.delete(event.id) - - // Mark that we have new content to process - this.userMessageContentReady = false - - // Present the tool call - validation will handle missing params - presentAssistantMessage(this) - } + this.finalizeStreamingToolCallById(event.id) } } break } + case "tool_call_end": { + // Providers emit a tool_call_end chunk when finish_reason is + // "tool_calls" (either directly or via processFinishReason). + // Finalize the streaming tool call now so it is presented during + // streaming rather than waiting for finalizeRawChunks() at stream end. + this.finalizeStreamingToolCallById(chunk.id) + break + } + case "tool_call": { // Legacy: Handle complete tool calls (for backward compatibility) // Convert native tool call to ToolUse format @@ -3215,49 +3219,7 @@ export class Task extends EventEmitter implements TaskLike { const finalizeEvents = NativeToolCallParser.finalizeRawChunks() for (const event of finalizeEvents) { if (event.type === "tool_call_end") { - // Finalize the streaming tool call - const finalToolUse = NativeToolCallParser.finalizeStreamingToolCall(event.id) - - // Get the index for this tool call - const toolUseIndex = this.streamingToolCallIndices.get(event.id) - - if (finalToolUse) { - // Store the tool call ID - ;(finalToolUse as any).id = event.id - - // Get the index and replace partial with final - if (toolUseIndex !== undefined) { - this.assistantMessageContent[toolUseIndex] = finalToolUse - } - - // Clean up tracking - this.streamingToolCallIndices.delete(event.id) - - // Mark that we have new content to process - this.userMessageContentReady = false - - // Present the finalized tool call - presentAssistantMessage(this) - } else if (toolUseIndex !== undefined) { - // finalizeStreamingToolCall returned null (malformed JSON or missing args) - // We still need to mark the tool as non-partial so it gets executed - // The tool's validation will catch any missing required parameters - const existingToolUse = this.assistantMessageContent[toolUseIndex] - if (existingToolUse && existingToolUse.type === "tool_use") { - existingToolUse.partial = false - // Ensure it has the ID for native protocol - ;(existingToolUse as any).id = event.id - } - - // Clean up tracking - this.streamingToolCallIndices.delete(event.id) - - // Mark that we have new content to process - this.userMessageContentReady = false - - // Present the tool call - validation will handle missing params - presentAssistantMessage(this) - } + this.finalizeStreamingToolCallById(event.id) } } diff --git a/src/core/task/__tests__/finalizeStreamingToolCallById.spec.ts b/src/core/task/__tests__/finalizeStreamingToolCallById.spec.ts new file mode 100644 index 0000000000..da1872ab15 --- /dev/null +++ b/src/core/task/__tests__/finalizeStreamingToolCallById.spec.ts @@ -0,0 +1,118 @@ +// npx vitest run core/task/__tests__/finalizeStreamingToolCallById.spec.ts + +import { Task } from "../Task" +import { presentAssistantMessage } from "../../assistant-message" +import { NativeToolCallParser } from "../../assistant-message/NativeToolCallParser" +import type { ToolUse } from "../../../shared/tools" + +// presentAssistantMessage is invoked by finalizeStreamingToolCallById to flush the +// finalized tool use; mocking it isolates the helper from the full presentation pipeline. +vi.mock("../../assistant-message", async (importOriginal) => { + const actual = (await importOriginal()) as Record + return { + ...actual, + presentAssistantMessage: vi.fn(), + } +}) + +const mockedPresent = vi.mocked(presentAssistantMessage) + +/** + * Invoke the private finalizeStreamingToolCallById against a minimal `this` stub. + * + * Instantiating a full Task requires a provider, context, and async setup that are + * irrelevant to this helper. The method only touches assistantMessageContent, + * streamingToolCallIndices, and userMessageContentReady, so a stub carrying those + * fields exercises the real source lines without the constructor. + */ +function callFinalize( + stub: { + assistantMessageContent: any[] + streamingToolCallIndices: Map + userMessageContentReady: boolean + }, + id: string, +): void { + ;(Task.prototype as any).finalizeStreamingToolCallById.call(stub, id) +} + +describe("Task.finalizeStreamingToolCallById", () => { + beforeEach(() => { + vi.clearAllMocks() + }) + + it("replaces the partial block with the finalized tool use and presents it", () => { + const finalToolUse = { type: "tool_use", name: "read_file", partial: false } as unknown as ToolUse + const finalizeSpy = vi + .spyOn(NativeToolCallParser, "finalizeStreamingToolCall") + .mockReturnValue(finalToolUse) + + const stub = { + assistantMessageContent: [{ type: "tool_use", id: "call_abc", name: "read_file", partial: true }], + streamingToolCallIndices: new Map([["call_abc", 0]]), + userMessageContentReady: true, + } + + callFinalize(stub, "call_abc") + + expect(finalizeSpy).toHaveBeenCalledWith("call_abc") + expect(stub.assistantMessageContent[0]).toBe(finalToolUse) + expect((stub.assistantMessageContent[0] as any).id).toBe("call_abc") + expect(stub.streamingToolCallIndices.has("call_abc")).toBe(false) + expect(stub.userMessageContentReady).toBe(false) + expect(mockedPresent).toHaveBeenCalledTimes(1) + }) + + it("marks the existing block non-partial when finalize returns null (malformed JSON)", () => { + vi.spyOn(NativeToolCallParser, "finalizeStreamingToolCall").mockReturnValue(null) + + const existingBlock = { type: "tool_use", id: "call_bad", name: "write_to_file", partial: true } + const stub = { + assistantMessageContent: [existingBlock], + streamingToolCallIndices: new Map([["call_bad", 0]]), + userMessageContentReady: true, + } + + callFinalize(stub, "call_bad") + + expect(existingBlock.partial).toBe(false) + expect((existingBlock as any).id).toBe("call_bad") + expect(stub.streamingToolCallIndices.has("call_bad")).toBe(false) + expect(stub.userMessageContentReady).toBe(false) + expect(mockedPresent).toHaveBeenCalledTimes(1) + }) + + it("is a no-op when the id is not tracked", () => { + vi.spyOn(NativeToolCallParser, "finalizeStreamingToolCall").mockReturnValue(null) + + const stub = { + assistantMessageContent: [] as any[], + streamingToolCallIndices: new Map(), + userMessageContentReady: true, + } + + callFinalize(stub, "call_unknown") + + expect(stub.assistantMessageContent).toHaveLength(0) + expect(stub.userMessageContentReady).toBe(true) + expect(mockedPresent).not.toHaveBeenCalled() + }) + + it("is idempotent: a second call for the same id does nothing", () => { + const finalToolUse = { type: "tool_use", name: "read_file", partial: false } as unknown as ToolUse + vi.spyOn(NativeToolCallParser, "finalizeStreamingToolCall") + .mockReturnValueOnce(finalToolUse) + .mockReturnValue(null) + + const stub = { + assistantMessageContent: [{ type: "tool_use", id: "call_once", name: "read_file", partial: true }], + streamingToolCallIndices: new Map([["call_once", 0]]), + userMessageContentReady: true, + } + + callFinalize(stub, "call_once") + callFinalize(stub, "call_once") // id no longer tracked -> no-op + + expect(mockedPresent).toHaveBeenCalledTimes(1) + }) +})