Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
"build": "tsup",
"typecheck": "tsc --noEmit",
"lint": "eslint src test",
"test": "node --import tsx --test test/codec.test.ts test/dead-letter.test.ts test/conformance.test.ts test/overhead.test.ts test/idempotency.test.ts test/schema.test.ts test/otel.test.ts",
"test": "node --import tsx --test test/codec.test.ts test/dead-letter.test.ts test/conformance.test.ts test/overhead.test.ts test/idempotency.test.ts test/schema.test.ts test/otel.test.ts test/redrive.test.ts",
"coverage": "c8 --check-coverage --lines 90 --functions 90 --branches 85 --reporter=text npm test",
"prepublishOnly": "npm run build"
},
Expand Down
9 changes: 9 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,12 @@ export type { Handler, Store } from "./idempotency.js";

export * as schema from "./schema.js";
export type { SchemaProvider, SchemaNode } from "./schema.js";

export { redrive, resetForRedrive } from "./redrive.js";
export type {
RedriveIO,
RedriveItem,
RedriveMessage,
RedriveOptions,
RedriveResult,
} from "./redrive.js";
170 changes: 170 additions & 0 deletions src/redrive.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/**
* DLQ redrive tooling — safe replay off the dead-letter queue (ADR-0026).
*
* The Node mirror of the Go reference `babelqueue-go/redrive.go`. Because the Node core is
* codec-only (no runtime / no transport), the orchestration takes a small {@link RedriveIO}
* the caller implements over their transport — the same shape the optional `otel.publish`
* helper used. {@link resetForRedrive} is the pure, transport-free core of it.
*
* A redriven message is **reset for reprocessing**: its `dead_letter` block is removed and
* `attempts` reset to 0, while `job`, `trace_id`, `data` and `meta` are preserved verbatim, so
* the replay is still fully traceable (same `trace_id`). The wire envelope is untouched (GR-1).
*
* Replay safety here is `dryRun` + `select` + redrive-to-`toQueue` (a sandbox). The
* **Replay-Bypass** guard — a `bq-replay-bypass` transport header surfaced to handlers so a
* replay can skip external side-effects — is a documented phase-two follow-up that touches the
* runtime + every transport, like ADR-0025's `traceparent` follow-up.
*/

import { EnvelopeCodec, type Envelope } from "./codec.js";

/** A message reserved from a queue, plus a way to acknowledge (remove) it. */
export interface RedriveMessage {
body: string;
ack(): Promise<void>;
}

/** The minimal transport surface {@link redrive} needs: reserve the next message, and publish. */
export interface RedriveIO {
/** Reserve the next message from queue, or `null` when it is empty. */
pop(queue: string): Promise<RedriveMessage | null>;
/** Append an already-encoded body to queue. */
publish(queue: string, body: string): Promise<void>;
}

/** Options for {@link redrive}. */
export interface RedriveOptions {
/** Override where messages are re-published; default is each message's `dead_letter.original_queue`. Set a sandbox queue to replay safely. */
toQueue?: string;
/** Cap how many messages are pulled from the DLQ (0 / omitted = all currently available). */
max?: number;
/** Inspect without redriving: every message is read, reported, and returned to the DLQ unchanged. */
dryRun?: boolean;
/** Pick which messages to redrive (e.g. by reason or URN). Unselected messages are returned unchanged. */
select?: (envelope: Envelope) => boolean;
}

/** What happened to one message during a {@link redrive} run. */
export interface RedriveItem {
messageId: string;
traceId: string;
urn: string;
reason: string;
from: string;
/** Target queue (the plan, even on a dry run; "" when skipped or undecodable). */
to: string;
/** True only when actually re-published to `to`. */
redriven: boolean;
}

/** Summary of a {@link redrive} run. */
export interface RedriveResult {
redriven: number;
skipped: number;
items: RedriveItem[];
}

/**
* Returns a copy of `envelope` reset for reprocessing: no `dead_letter` block and `attempts`
* at 0, with `job`, `trace_id`, `data` and `meta` preserved verbatim. Pure — the input is not
* mutated.
*/
export function resetForRedrive(envelope: Envelope): Envelope {
return {
job: envelope.job,
trace_id: envelope.trace_id,
data: envelope.data,
meta: envelope.meta,
attempts: 0,
};
}

function sourceQueueOf(envelope: Envelope): string {
return envelope.dead_letter?.original_queue || envelope.meta.queue;
}

/**
* Moves dead-lettered messages off the `dlq` queue and re-publishes each — via {@link resetForRedrive} —
* to its `dead_letter.original_queue` or `opts.toQueue`.
*
* Messages are drained from the DLQ first and then processed, so restored messages (skipped,
* dry-run, or undecodable) are never re-encountered in the same run. A message is acknowledged
* only after its re-publish succeeds; an undecodable body is restored, not dropped. On a publish
* failure the message is restored to the DLQ and the error is re-thrown.
*/
export async function redrive(
io: RedriveIO,
dlq: string,
opts: RedriveOptions = {},
): Promise<RedriveResult> {
const max = opts.max ?? 0;

interface Pending {
message: RedriveMessage;
envelope: Envelope | null;
}
const batch: Pending[] = [];
while (max === 0 || batch.length < max) {
const message = await io.pop(dlq);
if (!message) {
break;
}
const decoded = EnvelopeCodec.decode(message.body);
batch.push({ message, envelope: EnvelopeCodec.accepts(decoded) ? decoded : null });
}

const result: RedriveResult = { redriven: 0, skipped: 0, items: [] };

for (const { message, envelope } of batch) {
if (!envelope) {
await io.publish(dlq, message.body); // restore the undecodable body; never drop it
await message.ack();
result.skipped++;
result.items.push({ messageId: "", traceId: "", urn: "", reason: "", from: dlq, to: "", redriven: false });
continue;
}

const item: RedriveItem = {
messageId: envelope.meta.id,
traceId: envelope.trace_id,
urn: EnvelopeCodec.urn(envelope),
reason: envelope.dead_letter?.reason ?? "",
from: dlq,
to: "",
redriven: false,
};

if (opts.select && !opts.select(envelope)) {
await io.publish(dlq, message.body); // not selected: restore unchanged
await message.ack();
result.skipped++;
result.items.push(item);
continue;
}

const target = opts.toQueue ?? sourceQueueOf(envelope);
item.to = target;

if (opts.dryRun) {
await io.publish(dlq, message.body); // report the plan; restore unchanged
await message.ack();
result.skipped++;
result.items.push(item);
continue;
}

try {
await io.publish(target, EnvelopeCodec.encode(resetForRedrive(envelope)));
} catch (err) {
await io.publish(dlq, message.body); // restore on a publish failure
await message.ack();
throw err;
}
await message.ack();
item.redriven = true;
result.redriven++;
result.items.push(item);
}

return result;
}
159 changes: 159 additions & 0 deletions test/redrive.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
import assert from "node:assert/strict";
import { test } from "node:test";

import { EnvelopeCodec, type Envelope } from "../src/codec.js";
import {
redrive,
resetForRedrive,
type RedriveIO,
type RedriveMessage,
} from "../src/redrive.js";

// memoryIO is a tiny in-memory RedriveIO over a Map of queues, with a queue() accessor for assertions.
function memoryIO(): RedriveIO & { queue(name: string): string[]; failOn?: string } {
const queues = new Map<string, string[]>();
const q = (name: string): string[] => {
let arr = queues.get(name);
if (!arr) {
arr = [];
queues.set(name, arr);
}
return arr;
};
const io: RedriveIO & { queue(name: string): string[]; failOn?: string } = {
queue: q,
async pop(queue: string): Promise<RedriveMessage | null> {
const arr = q(queue);
if (arr.length === 0) {
return null;
}
const body = arr.shift() as string;
return {
body,
async ack(): Promise<void> {},
};
},
async publish(queue: string, body: string): Promise<void> {
if (io.failOn && queue === io.failOn) {
throw new Error(`publish refused for ${queue}`);
}
q(queue).push(body);
},
};
return io;
}

function deadLetteredBody(urn: string, originalQueue: string, data: Record<string, unknown> = {}): string {
const base = EnvelopeCodec.make(urn, data, { queue: originalQueue });
const dl: Envelope = {
...base,
attempts: 3,
dead_letter: {
reason: "failed",
error: "boom",
exception: "Error",
failed_at: 1,
original_queue: originalQueue,
attempts: 3,
lang: "node",
},
};
return EnvelopeCodec.encode(dl);
}

test("resetForRedrive strips dead_letter and resets attempts, without mutating the input", () => {
const base = EnvelopeCodec.make("urn:babel:orders:created", { order_id: 1 }, { queue: "orders" });
const input: Envelope = { ...base, attempts: 3, dead_letter: { reason: "failed", error: null, exception: null, failed_at: 1, original_queue: "orders", attempts: 3, lang: "node" } };

const out = resetForRedrive(input);
assert.equal(out.dead_letter, undefined);
assert.equal(out.attempts, 0);
assert.equal(out.trace_id, input.trace_id);
assert.equal(out.job, input.job);
// input untouched
assert.ok(input.dead_letter);
assert.equal(input.attempts, 3);
});

test("redrive sends a message back to its source queue, reset and traceable", async () => {
const io = memoryIO();
io.queue("orders.dlq").push(deadLetteredBody("urn:babel:orders:created", "orders", { order_id: 1 }));
const traceId = EnvelopeCodec.decode(io.queue("orders.dlq")[0]).trace_id;

const res = await redrive(io, "orders.dlq");
assert.deepEqual([res.redriven, res.skipped], [1, 0]);

assert.equal(io.queue("orders.dlq").length, 0, "DLQ should be drained");
assert.equal(io.queue("orders").length, 1);
const back = EnvelopeCodec.decode(io.queue("orders")[0]);
assert.equal(back.dead_letter, undefined);
assert.equal(back.attempts, 0);
assert.equal(back.trace_id, traceId);
});

test("redrive routes to a sandbox queue without touching the source", async () => {
const io = memoryIO();
io.queue("orders.dlq").push(deadLetteredBody("urn:babel:orders:created", "orders"));

const res = await redrive(io, "orders.dlq", { toQueue: "sandbox" });
assert.equal(res.redriven, 1);
assert.equal(io.queue("orders").length, 0);
assert.equal(io.queue("sandbox").length, 1);
});

test("dryRun reports the plan and leaves the DLQ unchanged", async () => {
const io = memoryIO();
io.queue("orders.dlq").push(deadLetteredBody("urn:babel:orders:created", "orders"));

const res = await redrive(io, "orders.dlq", { dryRun: true });
assert.deepEqual([res.redriven, res.skipped], [0, 1]);
assert.equal(res.items[0].to, "orders");
assert.equal(res.items[0].redriven, false);
assert.equal(io.queue("orders").length, 0, "source untouched");
assert.equal(io.queue("orders.dlq").length, 1, "DLQ unchanged");
assert.ok(EnvelopeCodec.decode(io.queue("orders.dlq")[0]).dead_letter, "dead_letter intact");
});

test("select redrives only the matching messages, restoring the rest", async () => {
const io = memoryIO();
io.queue("dlq").push(deadLetteredBody("urn:babel:orders:created", "orders"));
io.queue("dlq").push(deadLetteredBody("urn:babel:emails:welcome", "emails"));

const res = await redrive(io, "dlq", {
select: (e) => EnvelopeCodec.urn(e) === "urn:babel:orders:created",
});
assert.deepEqual([res.redriven, res.skipped], [1, 1]);
assert.equal(io.queue("orders").length, 1);
assert.equal(io.queue("emails").length, 0);
assert.equal(io.queue("dlq").length, 1, "unselected restored to the DLQ");
});

test("max caps how many messages are pulled", async () => {
const io = memoryIO();
for (let i = 0; i < 3; i++) {
io.queue("dlq").push(deadLetteredBody("urn:babel:orders:created", "orders"));
}
const res = await redrive(io, "dlq", { max: 2 });
assert.equal(res.redriven, 2);
assert.equal(io.queue("dlq").length, 1);
});

test("a publish failure restores the message to the DLQ and re-throws", async () => {
const io = memoryIO();
io.queue("dlq").push(deadLetteredBody("urn:babel:orders:created", "orders"));
io.failOn = "orders";

await assert.rejects(redrive(io, "dlq"), /publish refused/);
assert.equal(io.queue("dlq").length, 1, "message restored to the DLQ");
assert.equal(io.queue("orders").length, 0);
});

test("an undecodable body is restored, not lost", async () => {
const io = memoryIO();
io.queue("dlq").push("not-json{{{");

const res = await redrive(io, "dlq");
assert.deepEqual([res.redriven, res.skipped], [0, 1]);
assert.equal(io.queue("dlq").length, 1);
assert.equal(io.queue("dlq")[0], "not-json{{{");
});
Loading