From d3a3227d26de5fece7287183177c24a957e61a59 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muhammet=20=C5=9Eafak?= Date: Fri, 19 Jun 2026 08:23:54 +0300 Subject: [PATCH] =?UTF-8?q?feat(redrive):=20DLQ=20redrive=20tooling=20?= =?UTF-8?q?=E2=80=94=20safe=20replay=20(ADR-0026)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The Node mirror of babelqueue-go/redrive.go. Since the core is codec-only, redrive(io, dlq, opts) takes a small RedriveIO (pop/publish) over the caller's transport, and resetForRedrive(env) is the pure core: strip dead_letter, attempts->0, preserve job/trace_id/data/meta. Options toQueue (sandbox), max, dryRun, select. Drains-then-processes; restores skipped/dry-run/undecodable bodies; restores + re-throws on a publish failure. A normal root export (no dependency). Envelope frozen (GR-1); Replay-Bypass header is a documented phase two. --- package.json | 2 +- src/index.ts | 9 +++ src/redrive.ts | 170 +++++++++++++++++++++++++++++++++++++++++++ test/redrive.test.ts | 159 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 339 insertions(+), 1 deletion(-) create mode 100644 src/redrive.ts create mode 100644 test/redrive.test.ts diff --git a/package.json b/package.json index d2479aa..f0cffc8 100644 --- a/package.json +++ b/package.json @@ -59,7 +59,7 @@ "build": "tsup", "typecheck": "tsc --noEmit", "lint": "eslint src test", - "test": "node --import tsx --test test/codec.test.ts test/dead-letter.test.ts test/conformance.test.ts test/overhead.test.ts test/idempotency.test.ts test/schema.test.ts test/otel.test.ts", + "test": "node --import tsx --test test/codec.test.ts test/dead-letter.test.ts test/conformance.test.ts test/overhead.test.ts test/idempotency.test.ts test/schema.test.ts test/otel.test.ts test/redrive.test.ts", "coverage": "c8 --check-coverage --lines 90 --functions 90 --branches 85 --reporter=text npm test", "prepublishOnly": "npm run build" }, diff --git a/src/index.ts b/src/index.ts index a11a8dc..ae1ebaf 100644 --- a/src/index.ts +++ b/src/index.ts @@ -38,3 +38,12 @@ export type { Handler, Store } from "./idempotency.js"; export * as schema from "./schema.js"; export type { SchemaProvider, SchemaNode } from "./schema.js"; + +export { redrive, resetForRedrive } from "./redrive.js"; +export type { + RedriveIO, + RedriveItem, + RedriveMessage, + RedriveOptions, + RedriveResult, +} from "./redrive.js"; diff --git a/src/redrive.ts b/src/redrive.ts new file mode 100644 index 0000000..d040e38 --- /dev/null +++ b/src/redrive.ts @@ -0,0 +1,170 @@ +/** + * DLQ redrive tooling — safe replay off the dead-letter queue (ADR-0026). + * + * The Node mirror of the Go reference `babelqueue-go/redrive.go`. Because the Node core is + * codec-only (no runtime / no transport), the orchestration takes a small {@link RedriveIO} + * the caller implements over their transport — the same shape the optional `otel.publish` + * helper used. {@link resetForRedrive} is the pure, transport-free core of it. + * + * A redriven message is **reset for reprocessing**: its `dead_letter` block is removed and + * `attempts` reset to 0, while `job`, `trace_id`, `data` and `meta` are preserved verbatim, so + * the replay is still fully traceable (same `trace_id`). The wire envelope is untouched (GR-1). + * + * Replay safety here is `dryRun` + `select` + redrive-to-`toQueue` (a sandbox). The + * **Replay-Bypass** guard — a `bq-replay-bypass` transport header surfaced to handlers so a + * replay can skip external side-effects — is a documented phase-two follow-up that touches the + * runtime + every transport, like ADR-0025's `traceparent` follow-up. + */ + +import { EnvelopeCodec, type Envelope } from "./codec.js"; + +/** A message reserved from a queue, plus a way to acknowledge (remove) it. */ +export interface RedriveMessage { + body: string; + ack(): Promise; +} + +/** The minimal transport surface {@link redrive} needs: reserve the next message, and publish. */ +export interface RedriveIO { + /** Reserve the next message from queue, or `null` when it is empty. */ + pop(queue: string): Promise; + /** Append an already-encoded body to queue. */ + publish(queue: string, body: string): Promise; +} + +/** Options for {@link redrive}. */ +export interface RedriveOptions { + /** Override where messages are re-published; default is each message's `dead_letter.original_queue`. Set a sandbox queue to replay safely. */ + toQueue?: string; + /** Cap how many messages are pulled from the DLQ (0 / omitted = all currently available). */ + max?: number; + /** Inspect without redriving: every message is read, reported, and returned to the DLQ unchanged. */ + dryRun?: boolean; + /** Pick which messages to redrive (e.g. by reason or URN). Unselected messages are returned unchanged. */ + select?: (envelope: Envelope) => boolean; +} + +/** What happened to one message during a {@link redrive} run. */ +export interface RedriveItem { + messageId: string; + traceId: string; + urn: string; + reason: string; + from: string; + /** Target queue (the plan, even on a dry run; "" when skipped or undecodable). */ + to: string; + /** True only when actually re-published to `to`. */ + redriven: boolean; +} + +/** Summary of a {@link redrive} run. */ +export interface RedriveResult { + redriven: number; + skipped: number; + items: RedriveItem[]; +} + +/** + * Returns a copy of `envelope` reset for reprocessing: no `dead_letter` block and `attempts` + * at 0, with `job`, `trace_id`, `data` and `meta` preserved verbatim. Pure — the input is not + * mutated. + */ +export function resetForRedrive(envelope: Envelope): Envelope { + return { + job: envelope.job, + trace_id: envelope.trace_id, + data: envelope.data, + meta: envelope.meta, + attempts: 0, + }; +} + +function sourceQueueOf(envelope: Envelope): string { + return envelope.dead_letter?.original_queue || envelope.meta.queue; +} + +/** + * Moves dead-lettered messages off the `dlq` queue and re-publishes each — via {@link resetForRedrive} — + * to its `dead_letter.original_queue` or `opts.toQueue`. + * + * Messages are drained from the DLQ first and then processed, so restored messages (skipped, + * dry-run, or undecodable) are never re-encountered in the same run. A message is acknowledged + * only after its re-publish succeeds; an undecodable body is restored, not dropped. On a publish + * failure the message is restored to the DLQ and the error is re-thrown. + */ +export async function redrive( + io: RedriveIO, + dlq: string, + opts: RedriveOptions = {}, +): Promise { + const max = opts.max ?? 0; + + interface Pending { + message: RedriveMessage; + envelope: Envelope | null; + } + const batch: Pending[] = []; + while (max === 0 || batch.length < max) { + const message = await io.pop(dlq); + if (!message) { + break; + } + const decoded = EnvelopeCodec.decode(message.body); + batch.push({ message, envelope: EnvelopeCodec.accepts(decoded) ? decoded : null }); + } + + const result: RedriveResult = { redriven: 0, skipped: 0, items: [] }; + + for (const { message, envelope } of batch) { + if (!envelope) { + await io.publish(dlq, message.body); // restore the undecodable body; never drop it + await message.ack(); + result.skipped++; + result.items.push({ messageId: "", traceId: "", urn: "", reason: "", from: dlq, to: "", redriven: false }); + continue; + } + + const item: RedriveItem = { + messageId: envelope.meta.id, + traceId: envelope.trace_id, + urn: EnvelopeCodec.urn(envelope), + reason: envelope.dead_letter?.reason ?? "", + from: dlq, + to: "", + redriven: false, + }; + + if (opts.select && !opts.select(envelope)) { + await io.publish(dlq, message.body); // not selected: restore unchanged + await message.ack(); + result.skipped++; + result.items.push(item); + continue; + } + + const target = opts.toQueue ?? sourceQueueOf(envelope); + item.to = target; + + if (opts.dryRun) { + await io.publish(dlq, message.body); // report the plan; restore unchanged + await message.ack(); + result.skipped++; + result.items.push(item); + continue; + } + + try { + await io.publish(target, EnvelopeCodec.encode(resetForRedrive(envelope))); + } catch (err) { + await io.publish(dlq, message.body); // restore on a publish failure + await message.ack(); + throw err; + } + await message.ack(); + item.redriven = true; + result.redriven++; + result.items.push(item); + } + + return result; +} diff --git a/test/redrive.test.ts b/test/redrive.test.ts new file mode 100644 index 0000000..75c8ce1 --- /dev/null +++ b/test/redrive.test.ts @@ -0,0 +1,159 @@ +import assert from "node:assert/strict"; +import { test } from "node:test"; + +import { EnvelopeCodec, type Envelope } from "../src/codec.js"; +import { + redrive, + resetForRedrive, + type RedriveIO, + type RedriveMessage, +} from "../src/redrive.js"; + +// memoryIO is a tiny in-memory RedriveIO over a Map of queues, with a queue() accessor for assertions. +function memoryIO(): RedriveIO & { queue(name: string): string[]; failOn?: string } { + const queues = new Map(); + const q = (name: string): string[] => { + let arr = queues.get(name); + if (!arr) { + arr = []; + queues.set(name, arr); + } + return arr; + }; + const io: RedriveIO & { queue(name: string): string[]; failOn?: string } = { + queue: q, + async pop(queue: string): Promise { + const arr = q(queue); + if (arr.length === 0) { + return null; + } + const body = arr.shift() as string; + return { + body, + async ack(): Promise {}, + }; + }, + async publish(queue: string, body: string): Promise { + if (io.failOn && queue === io.failOn) { + throw new Error(`publish refused for ${queue}`); + } + q(queue).push(body); + }, + }; + return io; +} + +function deadLetteredBody(urn: string, originalQueue: string, data: Record = {}): string { + const base = EnvelopeCodec.make(urn, data, { queue: originalQueue }); + const dl: Envelope = { + ...base, + attempts: 3, + dead_letter: { + reason: "failed", + error: "boom", + exception: "Error", + failed_at: 1, + original_queue: originalQueue, + attempts: 3, + lang: "node", + }, + }; + return EnvelopeCodec.encode(dl); +} + +test("resetForRedrive strips dead_letter and resets attempts, without mutating the input", () => { + const base = EnvelopeCodec.make("urn:babel:orders:created", { order_id: 1 }, { queue: "orders" }); + const input: Envelope = { ...base, attempts: 3, dead_letter: { reason: "failed", error: null, exception: null, failed_at: 1, original_queue: "orders", attempts: 3, lang: "node" } }; + + const out = resetForRedrive(input); + assert.equal(out.dead_letter, undefined); + assert.equal(out.attempts, 0); + assert.equal(out.trace_id, input.trace_id); + assert.equal(out.job, input.job); + // input untouched + assert.ok(input.dead_letter); + assert.equal(input.attempts, 3); +}); + +test("redrive sends a message back to its source queue, reset and traceable", async () => { + const io = memoryIO(); + io.queue("orders.dlq").push(deadLetteredBody("urn:babel:orders:created", "orders", { order_id: 1 })); + const traceId = EnvelopeCodec.decode(io.queue("orders.dlq")[0]).trace_id; + + const res = await redrive(io, "orders.dlq"); + assert.deepEqual([res.redriven, res.skipped], [1, 0]); + + assert.equal(io.queue("orders.dlq").length, 0, "DLQ should be drained"); + assert.equal(io.queue("orders").length, 1); + const back = EnvelopeCodec.decode(io.queue("orders")[0]); + assert.equal(back.dead_letter, undefined); + assert.equal(back.attempts, 0); + assert.equal(back.trace_id, traceId); +}); + +test("redrive routes to a sandbox queue without touching the source", async () => { + const io = memoryIO(); + io.queue("orders.dlq").push(deadLetteredBody("urn:babel:orders:created", "orders")); + + const res = await redrive(io, "orders.dlq", { toQueue: "sandbox" }); + assert.equal(res.redriven, 1); + assert.equal(io.queue("orders").length, 0); + assert.equal(io.queue("sandbox").length, 1); +}); + +test("dryRun reports the plan and leaves the DLQ unchanged", async () => { + const io = memoryIO(); + io.queue("orders.dlq").push(deadLetteredBody("urn:babel:orders:created", "orders")); + + const res = await redrive(io, "orders.dlq", { dryRun: true }); + assert.deepEqual([res.redriven, res.skipped], [0, 1]); + assert.equal(res.items[0].to, "orders"); + assert.equal(res.items[0].redriven, false); + assert.equal(io.queue("orders").length, 0, "source untouched"); + assert.equal(io.queue("orders.dlq").length, 1, "DLQ unchanged"); + assert.ok(EnvelopeCodec.decode(io.queue("orders.dlq")[0]).dead_letter, "dead_letter intact"); +}); + +test("select redrives only the matching messages, restoring the rest", async () => { + const io = memoryIO(); + io.queue("dlq").push(deadLetteredBody("urn:babel:orders:created", "orders")); + io.queue("dlq").push(deadLetteredBody("urn:babel:emails:welcome", "emails")); + + const res = await redrive(io, "dlq", { + select: (e) => EnvelopeCodec.urn(e) === "urn:babel:orders:created", + }); + assert.deepEqual([res.redriven, res.skipped], [1, 1]); + assert.equal(io.queue("orders").length, 1); + assert.equal(io.queue("emails").length, 0); + assert.equal(io.queue("dlq").length, 1, "unselected restored to the DLQ"); +}); + +test("max caps how many messages are pulled", async () => { + const io = memoryIO(); + for (let i = 0; i < 3; i++) { + io.queue("dlq").push(deadLetteredBody("urn:babel:orders:created", "orders")); + } + const res = await redrive(io, "dlq", { max: 2 }); + assert.equal(res.redriven, 2); + assert.equal(io.queue("dlq").length, 1); +}); + +test("a publish failure restores the message to the DLQ and re-throws", async () => { + const io = memoryIO(); + io.queue("dlq").push(deadLetteredBody("urn:babel:orders:created", "orders")); + io.failOn = "orders"; + + await assert.rejects(redrive(io, "dlq"), /publish refused/); + assert.equal(io.queue("dlq").length, 1, "message restored to the DLQ"); + assert.equal(io.queue("orders").length, 0); +}); + +test("an undecodable body is restored, not lost", async () => { + const io = memoryIO(); + io.queue("dlq").push("not-json{{{"); + + const res = await redrive(io, "dlq"); + assert.deepEqual([res.redriven, res.skipped], [0, 1]); + assert.equal(io.queue("dlq").length, 1); + assert.equal(io.queue("dlq")[0], "not-json{{{"); +});