Skip to content
58 changes: 37 additions & 21 deletions src/core/assistant-message/NativeToolCallParser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,12 @@ export class NativeToolCallParser {
private static rawChunkTracker = new Map<
number,
{
id: string
id?: string
name: string
// Tracks whether a name field has been observed at all, distinct from a
// truthy name. This is a defensive guard: should a provider ever send an
// empty name, the start-gate must not rely on name truthiness.
nameSeen: boolean
hasStarted: boolean
deltaBuffer: string[]
}
Expand Down Expand Up @@ -105,51 +109,55 @@ export class NativeToolCallParser {
const events: ToolCallStreamEvent[] = []
const { index, id, name, arguments: args } = chunk

// Create the tracker on first sight of this index, independent of whether
// an id has arrived yet. Keying the lifecycle by index (not id) ensures any
// `arguments` that stream before the id is known are buffered rather than dropped.
let tracked = this.rawChunkTracker.get(index)

// Initialize new tool call tracking when we receive an id
if (id && !tracked) {
if (!tracked) {
tracked = {
id,
name: name || "",
nameSeen: name !== undefined,
hasStarted: false,
deltaBuffer: [],
}
this.rawChunkTracker.set(index, tracked)
}

if (!tracked) {
return events
// Record id and name as they arrive (they may come in separate chunks).
if (id) {
tracked.id = id
}

// Update name if present in chunk and not yet set
if (name) {
if (name !== undefined) {
tracked.name = name
Comment thread
awschmeder marked this conversation as resolved.
tracked.nameSeen = true
}

// Emit start event when we have the name
if (!tracked.hasStarted && tracked.name) {
// Emit start event only once both id and name are known. Using a local
// non-null id keeps emitted events typed as id: string.
if (!tracked.hasStarted && tracked.id && tracked.nameSeen) {
const startedId = tracked.id
events.push({
type: "tool_call_start",
id: tracked.id,
id: startedId,
name: tracked.name,
})
tracked.hasStarted = true

// Flush buffered deltas
// Flush buffered deltas accumulated during the pre-start window.
for (const bufferedDelta of tracked.deltaBuffer) {
events.push({
type: "tool_call_delta",
id: tracked.id,
id: startedId,
delta: bufferedDelta,
})
}
tracked.deltaBuffer = []
}

// Emit delta event for argument chunks
// Emit delta event for argument chunks, buffering until start is emitted.
if (args) {
if (tracked.hasStarted) {
if (tracked.hasStarted && tracked.id) {
events.push({
type: "tool_call_delta",
id: tracked.id,
Expand All @@ -172,11 +180,19 @@ export class NativeToolCallParser {

if (finishReason === "tool_calls" && this.rawChunkTracker.size > 0) {
for (const [, tracked] of this.rawChunkTracker.entries()) {
events.push({
type: "tool_call_end",
id: tracked.id,
})
// Only emit an end for trackers that actually started. A tracker that
// never received an id/name (malformed stream) must not emit a phantom
// end; since start requires an id, hasStarted implies tracked.id is set.
if (tracked.hasStarted && tracked.id) {
Comment thread
awschmeder marked this conversation as resolved.
events.push({
type: "tool_call_end",
id: tracked.id,
})
}
}
// Clear so a subsequent finalizeRawChunks() is a safe no-op and cannot
// double-fire end events for the same trackers.
this.rawChunkTracker.clear()
}

return events
Expand All @@ -191,7 +207,7 @@ export class NativeToolCallParser {

if (this.rawChunkTracker.size > 0) {
for (const [, tracked] of this.rawChunkTracker.entries()) {
if (tracked.hasStarted) {
if (tracked.hasStarted && tracked.id) {
events.push({
type: "tool_call_end",
id: tracked.id,
Expand Down
251 changes: 250 additions & 1 deletion src/core/assistant-message/__tests__/NativeToolCallParser.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { NativeToolCallParser } from "../NativeToolCallParser"
import { NativeToolCallParser, type ToolCallStreamEvent } from "../NativeToolCallParser"

describe("NativeToolCallParser", () => {
beforeEach(() => {
Expand Down Expand Up @@ -343,4 +343,253 @@ describe("NativeToolCallParser", () => {
})
})
})

describe("processRawChunk streaming reassembly", () => {
// Mirror the sequencing Task.ts performs: feed each raw chunk through
// processRawChunk, drive startStreamingToolCall on tool_call_start, feed
// tool_call_delta into processStreamingChunk, and emit ends at stream close
// via finalizeRawChunks() (the same call Task.ts makes after the stream ends).
// Returns the ordered event types/ids plus the finalized tool uses by id.
const drive = (rawChunks: Array<{ index: number; id?: string; name?: string; arguments?: string }>) => {
const events: ToolCallStreamEvent[] = []

const handleEvent = (event: ToolCallStreamEvent) => {
events.push(event)
if (event.type === "tool_call_start") {
NativeToolCallParser.startStreamingToolCall(event.id, event.name)
} else if (event.type === "tool_call_delta") {
NativeToolCallParser.processStreamingChunk(event.id, event.delta)
}
}

for (const chunk of rawChunks) {
for (const event of NativeToolCallParser.processRawChunk(chunk)) {
handleEvent(event)
}
}

// Task.ts finalizes any tool calls still open at stream end via
// finalizeRawChunks(), which emits the tool_call_end events.
for (const event of NativeToolCallParser.finalizeRawChunks()) {
handleEvent(event)
}

const finalized = new Map<string, ReturnType<typeof NativeToolCallParser.finalizeStreamingToolCall>>()
const startIds = events.filter((e) => e.type === "tool_call_start").map((e) => e.id)
for (const id of startIds) {
finalized.set(id, NativeToolCallParser.finalizeStreamingToolCall(id))
}

return { events, finalized }
}

it("preserves leading argument bytes that arrive before the id", () => {
// First chunk carries arguments but NO id; id+name arrive later, then more args.
const fullArgs = JSON.stringify({ path: "src/leading.ts", mode: "slice" })
const firstHalf = fullArgs.slice(0, 10)
const secondHalf = fullArgs.slice(10)

const { events, finalized } = drive([
{ index: 0, arguments: firstHalf },
{ index: 0, id: "call_late_id", name: "read_file" },
{ index: 0, arguments: secondHalf },
])

// Exactly one start, in the right order, with the late id.
const starts = events.filter((e) => e.type === "tool_call_start")
expect(starts).toHaveLength(1)
expect(starts[0].id).toBe("call_late_id")

// The finalized arguments must contain the complete, uncorrupted payload.
const result = finalized.get("call_late_id")
expect(result).not.toBeNull()
expect(result?.type).toBe("tool_use")
if (result?.type === "tool_use") {
const nativeArgs = result.nativeArgs as { path: string; mode?: string }
expect(nativeArgs.path).toBe("src/leading.ts")
expect(nativeArgs.mode).toBe("slice")
}
})

it("handles id and name arriving in separate chunks (issue #218)", () => {
Comment thread
awschmeder marked this conversation as resolved.
const fullArgs = JSON.stringify({ path: "src/split.ts" })

const { events, finalized } = drive([
{ index: 0, id: "call_split" },
{ index: 0, name: "read_file" },
{ index: 0, arguments: fullArgs },
])

const starts = events.filter((e) => e.type === "tool_call_start")
expect(starts).toHaveLength(1)
expect(starts[0].id).toBe("call_split")

const result = finalized.get("call_split")
expect(result?.type).toBe("tool_use")
if (result?.type === "tool_use") {
const nativeArgs = result.nativeArgs as { path: string }
expect(nativeArgs.path).toBe("src/split.ts")
}
})

it("handles name arriving before id with buffered args in between (reverse ordering)", () => {
const fullArgs = JSON.stringify({ path: "src/reverse.ts" })
const firstHalf = fullArgs.slice(0, 9)
const secondHalf = fullArgs.slice(9)

const { events, finalized } = drive([
{ index: 0, name: "read_file" },
{ index: 0, arguments: firstHalf },
{ index: 0, id: "call_reverse" },
{ index: 0, arguments: secondHalf },
])

// Start must not fire until the id arrives, so exactly one start with the late id.
const starts = events.filter((e) => e.type === "tool_call_start")
expect(starts).toHaveLength(1)
expect(starts[0].id).toBe("call_reverse")

// The buffered delta must be flushed only after the start event.
const startIndex = events.findIndex((e) => e.type === "tool_call_start")
const firstDeltaIndex = events.findIndex((e) => e.type === "tool_call_delta")
expect(startIndex).toBeLessThan(firstDeltaIndex)

const result = finalized.get("call_reverse")
expect(result).not.toBeNull()
expect(result?.type).toBe("tool_use")
if (result?.type === "tool_use") {
expect((result.nativeArgs as { path: string }).path).toBe("src/reverse.ts")
}
})

it("keeps two parallel tool calls on distinct indices isolated", () => {
const argsA = JSON.stringify({ path: "src/a.ts" })
const argsB = JSON.stringify({ path: "src/b.ts" })

const { events, finalized } = drive([
{ index: 0, arguments: argsA.slice(0, 8) },
{ index: 1, arguments: argsB.slice(0, 8) },
{ index: 0, id: "call_a", name: "read_file" },
{ index: 1, id: "call_b", name: "read_file" },
{ index: 0, arguments: argsA.slice(8) },
{ index: 1, arguments: argsB.slice(8) },
])

const starts = events.filter((e) => e.type === "tool_call_start")
expect(starts).toHaveLength(2)

const resultA = finalized.get("call_a")
const resultB = finalized.get("call_b")
expect(resultA).not.toBeNull()
expect(resultB).not.toBeNull()
if (resultA?.type === "tool_use") {
Comment thread
awschmeder marked this conversation as resolved.
expect((resultA.nativeArgs as { path: string }).path).toBe("src/a.ts")
}
if (resultB?.type === "tool_use") {
expect((resultB.nativeArgs as { path: string }).path).toBe("src/b.ts")
}
})

it("emits the same event sequence for the single-chunk-with-id flow (regression guard)", () => {
const fullArgs = JSON.stringify({ path: "src/single.ts" })

const { events, finalized } = drive([
{ index: 0, id: "call_single", name: "read_file", arguments: fullArgs },
])

expect(events.map((e) => e.type)).toEqual(["tool_call_start", "tool_call_delta", "tool_call_end"])
expect(events.every((e) => e.id === "call_single")).toBe(true)

const result = finalized.get("call_single")
expect(result).not.toBeNull()
expect(result?.type).toBe("tool_use")
if (result?.type === "tool_use") {
Comment thread
awschmeder marked this conversation as resolved.
expect((result.nativeArgs as { path: string }).path).toBe("src/single.ts")
}
})

it("does not emit a phantom tool_call_end for a tracker that never received an id", () => {
const { events } = drive([{ index: 0, arguments: '{"path":"orphan.ts"}' }])

expect(events.filter((e) => e.type === "tool_call_start")).toHaveLength(0)
expect(events.filter((e) => e.type === "tool_call_end")).toHaveLength(0)
})

it("finalizeRawChunks() emits end events and guards against missing id", () => {
// Simulate a started tool call: process chunks to populate state
const chunks = [
{ index: 0, id: "call_finalize", name: "read_file" },
{ index: 0, arguments: '{"path":"file.ts"' },
{ index: 0, arguments: ',"mode":"slice"}' },
]

const events: Array<{ type: string; id?: string }> = []
for (const chunk of chunks) {
for (const event of NativeToolCallParser.processRawChunk(chunk)) {
events.push(event)
if (event.type === "tool_call_start") {
NativeToolCallParser.startStreamingToolCall(event.id, event.name)
} else if (event.type === "tool_call_delta") {
NativeToolCallParser.processStreamingChunk(event.id, event.delta)
}
}
}

// Now finalize the raw chunks to emit the end event
const finalizeEvents = NativeToolCallParser.finalizeRawChunks()
for (const event of finalizeEvents) {
events.push(event)
}

// Verify the end event was produced by finalizeRawChunks
const ends = events.filter((e) => e.type === "tool_call_end")
expect(ends).toHaveLength(1)
expect(ends[0].id).toBe("call_finalize")

// Finalize the tool call to ensure it contains the complete arguments
const result = NativeToolCallParser.finalizeStreamingToolCall("call_finalize")
expect(result?.type).toBe("tool_use")
if (result?.type === "tool_use") {
expect((result.nativeArgs as { path: string }).path).toBe("file.ts")
}
})

it("finalizeRawChunks() does not emit end for tracker without id", () => {
// Start a tracker with arguments but no id, then finalize
const chunks = [{ index: 0, arguments: '{"incomplete":true}' }]

for (const chunk of chunks) {
NativeToolCallParser.processRawChunk(chunk)
}

// Finalize should not emit an end event if id was never set
const finalizeEvents = NativeToolCallParser.finalizeRawChunks()
const ends = finalizeEvents.filter((e) => e.type === "tool_call_end")
expect(ends).toHaveLength(0)

NativeToolCallParser.clearRawChunkState()
})

it("does not double-fire end events across processFinishReason and finalizeRawChunks", () => {
// Drive a started tool call through the raw chunk path.
const chunks = [
{ index: 0, id: "call_dup", name: "read_file" },
{ index: 0, arguments: '{"path":"file.ts"}' },
]
for (const chunk of chunks) {
NativeToolCallParser.processRawChunk(chunk)
}

// Task.ts emits ends via processFinishReason, then calls finalizeRawChunks
// unconditionally. Both must not emit an end for the same tracker.
const finishEvents = NativeToolCallParser.processFinishReason("tool_calls")
const finalizeEvents = NativeToolCallParser.finalizeRawChunks()

const allEnds = [...finishEvents, ...finalizeEvents].filter((e) => e.type === "tool_call_end")
expect(allEnds).toHaveLength(1)
expect(allEnds[0].id).toBe("call_dup")

NativeToolCallParser.clearRawChunkState()
})
})
})
Loading
Loading