diff --git a/package.json b/package.json index 9bfb261..da76ed6 100644 --- a/package.json +++ b/package.json @@ -49,7 +49,7 @@ "build": "tsup", "typecheck": "tsc --noEmit", "lint": "eslint src test", - "test": "node --import tsx --test test/codec.test.ts test/dead-letter.test.ts test/conformance.test.ts test/overhead.test.ts", + "test": "node --import tsx --test test/codec.test.ts test/dead-letter.test.ts test/conformance.test.ts test/overhead.test.ts test/idempotency.test.ts test/schema.test.ts", "coverage": "c8 --check-coverage --lines 90 --functions 90 --branches 85 --reporter=text npm test", "prepublishOnly": "npm run build" }, diff --git a/src/errors.ts b/src/errors.ts index 834b90c..98c44c1 100644 --- a/src/errors.ts +++ b/src/errors.ts @@ -13,3 +13,18 @@ export class UnknownUrnError extends BabelQueueError { this.name = "UnknownUrnError"; } } + +/** + * Raised when a message's `data` does not match the JSON Schema registered for its URN + * (ADR-0024). The consumer-side {@link schema.wrap} throws it so the adapter redelivers + * (and eventually dead-letters) a poison message. + */ +export class InvalidPayloadError extends BabelQueueError { + constructor( + readonly urn: string, + readonly violation: string, + ) { + super(`Message data for "${urn}" does not match its URN schema: ${violation}.`); + this.name = "InvalidPayloadError"; + } +} diff --git a/src/idempotency.ts b/src/idempotency.ts new file mode 100644 index 0000000..ad002f3 --- /dev/null +++ b/src/idempotency.ts @@ -0,0 +1,81 @@ +/** + * Optional idempotency helper (ADR-0022): dedupe a consume handler on `meta.id`. + * + * The Node mirror of the PHP `BabelQueue\Idempotency` and Go `idempotency` helpers. + * The core is codec-only (no dispatcher), so this wraps a user-provided handler that + * an adapter (NestJS, BullMQ, ...) drives: + * + * ```ts + * import { Wrap, InMemoryStore, type Handler } from "@babelqueue/core"; + * + * const store = new InMemoryStore(); + * const handler = Wrap(store, async (env) => { ... }); + * ``` + * + * A previously-seen id returns early (the adapter acks it); a throwing/rejecting + * handler leaves the id unmarked so a redelivery runs it again; a message with no + * usable `meta.id` runs unchanged. "Seen-set" post-success dedupe — not exactly-once, + * not in-flight concurrency locking (a transactional mode is a future direction). + */ +import type { Envelope } from "./codec.js"; + +/** A consume handler: receives a decoded envelope, may be sync or async. */ +export type Handler = (env: Envelope) => void | Promise; + +/** + * A pluggable record of message ids already processed, keyed on `meta.id`. Methods may + * be sync or async so a production store can be Redis- or DB-backed; the reference + * {@link InMemoryStore} is synchronous. + */ +export interface Store { + seen(messageId: string): boolean | Promise; + remember(messageId: string): void | Promise; + forget(messageId: string): void | Promise; +} + +/** + * Process-local {@link Store} backed by a Set. For tests / single-process consumers; + * not shared across workers and not persistent — use a Redis- or DB-backed store for + * production fleets. + */ +export class InMemoryStore implements Store { + private readonly entries = new Set(); + + seen(messageId: string): boolean { + return this.entries.has(messageId); + } + + remember(messageId: string): void { + this.entries.add(messageId); + } + + forget(messageId: string): void { + this.entries.delete(messageId); + } +} + +/** + * Wraps `handler` so a message whose `meta.id` was already processed successfully is + * skipped. A thrown/rejected handler leaves the id unmarked, so a redelivery runs it + * again (retry / dead-letter still apply); a message with no usable id runs unchanged. + */ +export function Wrap(store: Store, handler: Handler): Handler { + return async (env: Envelope): Promise => { + const id = env.meta.id; + + // No usable id → cannot dedupe; run the handler unchanged. + if (!id) { + await handler(env); + return; + } + + // Already processed on an earlier delivery: return so the adapter acks it. + if (await store.seen(id)) { + return; + } + + // First success wins; a throw here leaves the id unmarked → retry/DLQ apply. + await handler(env); + await store.remember(id); + }; +} diff --git a/src/index.ts b/src/index.ts index e677d80..a11a8dc 100644 --- a/src/index.ts +++ b/src/index.ts @@ -31,4 +31,10 @@ export * as deadLetter from "./deadLetter.js"; export { UnknownUrnStrategy } from "./routing.js"; -export { BabelQueueError, UnknownUrnError } from "./errors.js"; +export { BabelQueueError, UnknownUrnError, InvalidPayloadError } from "./errors.js"; + +export { Wrap, InMemoryStore } from "./idempotency.js"; +export type { Handler, Store } from "./idempotency.js"; + +export * as schema from "./schema.js"; +export type { SchemaProvider, SchemaNode } from "./schema.js"; diff --git a/src/schema.ts b/src/schema.ts new file mode 100644 index 0000000..99a13a3 --- /dev/null +++ b/src/schema.ts @@ -0,0 +1,238 @@ +/** + * Optional per-URN payload schema validation (ADR-0024). + * + * The Node mirror of the Go `schema` package and PHP `BabelQueue\Schema`. A + * {@link SchemaProvider} supplies a JSON Schema for a message URN — typically built from a + * babelqueue-registry `registry.json` — and the message's `data` is validated against it. + * It is opt-in: a URN with no registered schema is never validated. + * + * ```ts + * import { schema } from "@babelqueue/core"; + * + * const provider = schema.MapProvider.fromJson({ "urn:babel:orders:created": ORDERS_JSON }); + * schema.validate(provider, "urn:babel:orders:created", { order_id: 7 }); // throws on mismatch + * const handler = schema.wrap(provider, async (env) => { ... }); // consumer safety net + * ``` + * + * The core stays dependency-free and I/O-free, so it carries no file-based provider: a Node + * app or adapter reads its `registry.json` (with `node:fs`, etc.) and passes the schemas to + * {@link MapProvider.fromJson}. The validator is a small subset of JSON Schema (draft-07) + * whose verdicts match the Go, PHP and Python validators and babelqueue-registry's `compat` + * linter: `type`, `required`, `properties`, `additionalProperties`, `items`, `enum`, + * `const`, `minLength`, `minimum`. Unknown keywords are ignored. + */ +import type { Envelope } from "./codec.js"; +import { InvalidPayloadError } from "./errors.js"; + +/** A parsed JSON Schema node. */ +export type SchemaNode = Record; + +/** A consume handler: receives a decoded envelope, may be sync or async. */ +export type SchemaHandler = (env: Envelope) => void | Promise; + +/** + * A source of per-URN `data` schemas, keyed on the message URN. `schemaFor` may be sync or + * async so a production provider can be service- or cache-backed; the reference + * {@link MapProvider} is synchronous. + */ +export interface SchemaProvider { + schemaFor(urn: string): SchemaNode | undefined | Promise; +} + +/** In-memory {@link SchemaProvider}, for tests and for embedding schemas in code. */ +export class MapProvider implements SchemaProvider { + private readonly schemas: Map; + + constructor(schemas: Record) { + this.schemas = new Map(Object.entries(schemas)); + } + + /** Build a provider from URN -> raw JSON Schema strings, parsing each. */ + static fromJson(raw: Record): MapProvider { + const schemas: Record = {}; + for (const [urn, body] of Object.entries(raw)) { + const decoded: unknown = JSON.parse(body); + if (typeof decoded !== "object" || decoded === null || Array.isArray(decoded)) { + throw new Error(`schema: invalid JSON schema for "${urn}"`); + } + schemas[urn] = decoded as SchemaNode; + } + return new MapProvider(schemas); + } + + schemaFor(urn: string): SchemaNode | undefined { + return this.schemas.get(urn); + } +} + +/** + * The first `data` violation for `(urn, data)`, or null when it is valid or when no schema is + * registered for the URN (opt-in). For producer-side branching. + */ +export async function check( + provider: SchemaProvider, + urn: string, + data: Record, +): Promise { + const schemaNode = await provider.schemaFor(urn); + if (!schemaNode) { + return null; + } + return validateSchema(schemaNode, data); +} + +/** + * Validate `(urn, data)` against its registered schema, throwing {@link InvalidPayloadError} + * otherwise. The producer-side guard; call it before publishing. + */ +export async function validate( + provider: SchemaProvider, + urn: string, + data: Record, +): Promise { + const violation = await check(provider, urn, data); + if (violation !== null) { + throw new InvalidPayloadError(urn, violation); + } +} + +/** + * Wrap a consume handler so each message's `data` is validated against its URN's schema + * before the handler runs (consumer-side safety net). Invalid data throws + * {@link InvalidPayloadError}, so the adapter redelivers (and eventually dead-letters) the + * poison message; a URN with no schema runs the handler unchanged. Prefer {@link check} + * producer-side to keep invalid data out of the queue entirely. + */ +export function wrap(provider: SchemaProvider, handler: SchemaHandler): SchemaHandler { + return async (env: Envelope): Promise => { + await validate(provider, env.job, env.data); + await handler(env); + }; +} + +/** The first violation of `value` against a (subset) JSON Schema node, or null. */ +export function validateSchema(schema: SchemaNode, value: unknown, path = ""): string | null { + if ("const" in schema && !equal(value, schema.const)) { + return violation(path, "wrong_const"); + } + const enumValues = schema.enum; + if (Array.isArray(enumValues) && !enumValues.some((item) => equal(value, item))) { + return violation(path, "not_in_enum"); + } + + const type = typeof schema.type === "string" ? schema.type : ""; + switch (type) { + case "object": + return checkObject(schema, value, path); + case "array": + return checkArray(schema, value, path); + case "string": { + if (typeof value !== "string") { + return violation(path, "not_a_string"); + } + const minLength = schema.minLength; + if (typeof minLength === "number" && value.length < minLength) { + return violation(path, "below_min_length"); + } + return null; + } + case "integer": + if (!isInteger(value)) { + return violation(path, "not_an_integer"); + } + return checkMinimum(schema, value, path); + case "number": + if (typeof value !== "number") { + return violation(path, "not_a_number"); + } + return checkMinimum(schema, value, path); + case "boolean": + return typeof value === "boolean" ? null : violation(path, "not_a_boolean"); + case "null": + return value === null ? null : violation(path, "not_null"); + default: + return null; + } +} + +function checkObject(schema: SchemaNode, value: unknown, path: string): string | null { + if (typeof value !== "object" || value === null || Array.isArray(value)) { + return violation(path, "not_an_object"); + } + const obj = value as Record; + + const required = schema.required; + if (Array.isArray(required)) { + for (const key of required) { + if (typeof key === "string" && !(key in obj)) { + return violation(join(path, key), "missing_required"); + } + } + } + + const properties = + typeof schema.properties === "object" && schema.properties !== null + ? (schema.properties as Record) + : {}; + const additionalAllowed = schema.additionalProperties !== false; + + for (const [name, item] of Object.entries(obj)) { + const propSchema = properties[name]; + if (typeof propSchema === "object" && propSchema !== null) { + const found = validateSchema(propSchema as SchemaNode, item, join(path, name)); + if (found !== null) { + return found; + } + continue; + } + if (!additionalAllowed) { + return violation(join(path, name), "additional_not_allowed"); + } + } + + return null; +} + +function checkArray(schema: SchemaNode, value: unknown, path: string): string | null { + if (!Array.isArray(value)) { + return violation(path, "not_an_array"); + } + const items = schema.items; + if (typeof items !== "object" || items === null) { + return null; + } + for (let i = 0; i < value.length; i++) { + const found = validateSchema(items as SchemaNode, value[i], `${path}[${i}]`); + if (found !== null) { + return found; + } + } + return null; +} + +function checkMinimum(schema: SchemaNode, value: number, path: string): string | null { + const minimum = schema.minimum; + if (typeof minimum === "number" && value < minimum) { + return violation(path, "below_minimum"); + } + return null; +} + +// JSON numbers are all `number` in JS; an integer is a whole number (and never a boolean). +function isInteger(value: unknown): value is number { + return typeof value === "number" && Number.isInteger(value); +} + +// Structural equality for enum/const checks: JSON.stringify distinguishes a string "1" from +// a number 1, matching the strict comparisons in the other SDK validators. +function equal(a: unknown, b: unknown): boolean { + return JSON.stringify(a) === JSON.stringify(b); +} + +function violation(path: string, reason: string): string { + return `${path === "" ? "" : path}: ${reason}`; +} + +function join(path: string, key: string): string { + return path === "" ? key : `${path}.${key}`; +} diff --git a/test/conformance/manifest.json b/test/conformance/manifest.json index 78e5c3a..5b2fee4 100644 --- a/test/conformance/manifest.json +++ b/test/conformance/manifest.json @@ -226,5 +226,28 @@ "x-attempts": 0 } } + }, + "payload_schema": { + "description": "Per-URN data schema validation (ADR-0024). Each case validates `data` against `schema`; every SDK's optional payload validator (Go schema, PHP BabelQueue\\Schema, Python babelqueue.schema) MUST agree on `valid`. The wire envelope stays frozen — this governs the data block only, and is opt-in (consumers/producers without a registered schema skip it).", + "schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "required": ["order_id"], + "properties": { + "order_id": { "type": "integer", "minimum": 1 }, + "amount": { "type": "number", "minimum": 0 }, + "currency": { "enum": ["USD", "EUR", "TRY"] } + }, + "additionalProperties": false + }, + "cases": [ + { "name": "valid-minimal", "valid": true, "data": { "order_id": 1042 } }, + { "name": "valid-full", "valid": true, "data": { "order_id": 1042, "amount": 99.9, "currency": "USD" } }, + { "name": "invalid-missing-required", "valid": false, "data": { "amount": 10 } }, + { "name": "invalid-wrong-type", "valid": false, "data": { "order_id": "x" } }, + { "name": "invalid-additional-property", "valid": false, "data": { "order_id": 1, "extra": true } }, + { "name": "invalid-enum", "valid": false, "data": { "order_id": 1, "currency": "GBP" } }, + { "name": "invalid-below-minimum", "valid": false, "data": { "order_id": 0 } } + ] } } diff --git a/test/idempotency.test.ts b/test/idempotency.test.ts new file mode 100644 index 0000000..84300e0 --- /dev/null +++ b/test/idempotency.test.ts @@ -0,0 +1,87 @@ +import assert from "node:assert/strict"; +import { test } from "node:test"; + +import { EnvelopeCodec, InMemoryStore, Wrap, type Envelope } from "../src/index.js"; + +function envWithId(id: string): Envelope { + const env = EnvelopeCodec.make("urn:babel:orders:created", { order_id: 7 }); + env.meta.id = id; + return env; +} + +test("runs the handler on first delivery and remembers it", async () => { + const store = new InMemoryStore(); + let calls = 0; + const handler = Wrap(store, () => { + calls += 1; + }); + + await handler(envWithId("m1")); + + assert.equal(calls, 1); + assert.equal(store.seen("m1"), true); +}); + +test("skips the handler on a redelivery of the same id", async () => { + const store = new InMemoryStore(); + let calls = 0; + const handler = Wrap(store, () => { + calls += 1; + }); + + await handler(envWithId("m1")); + await handler(envWithId("m1")); // redelivery → skipped + + assert.equal(calls, 1); +}); + +test("runs the handler again for a different id", async () => { + const store = new InMemoryStore(); + let calls = 0; + const handler = Wrap(store, () => { + calls += 1; + }); + + await handler(envWithId("m1")); + await handler(envWithId("m2")); + + assert.equal(calls, 2); +}); + +test("does not remember an id when the handler throws", async () => { + const store = new InMemoryStore(); + let calls = 0; + const handler = Wrap(store, () => { + calls += 1; + throw new Error("boom"); + }); + + await assert.rejects(() => Promise.resolve(handler(envWithId("m1"))), /boom/); + assert.equal(store.seen("m1"), false); + + // A redelivery runs the handler again — retry works. + await assert.rejects(() => Promise.resolve(handler(envWithId("m1"))), /boom/); + assert.equal(calls, 2); +}); + +test("runs the handler when the message has no usable id", async () => { + const store = new InMemoryStore(); + let calls = 0; + const handler = Wrap(store, () => { + calls += 1; + }); + + await handler(envWithId("")); // empty id → cannot dedupe → runs + await handler(envWithId("")); // still runs + + assert.equal(calls, 2); +}); + +test("forget removes a remembered id", () => { + const store = new InMemoryStore(); + store.remember("m1"); + assert.equal(store.seen("m1"), true); + + store.forget("m1"); + assert.equal(store.seen("m1"), false); +}); diff --git a/test/schema.test.ts b/test/schema.test.ts new file mode 100644 index 0000000..3f76ff1 --- /dev/null +++ b/test/schema.test.ts @@ -0,0 +1,112 @@ +import assert from "node:assert/strict"; +import { readFileSync } from "node:fs"; +import { fileURLToPath } from "node:url"; +import { test } from "node:test"; + +import { InvalidPayloadError } from "../src/index.js"; +import type { Envelope } from "../src/index.js"; +import { check, MapProvider, validate, validateSchema, wrap } from "../src/schema.js"; +import type { SchemaNode, SchemaProvider } from "../src/schema.js"; + +const ORDERS = + '{"type":"object","required":["order_id"],' + + '"properties":{"order_id":{"type":"integer"}},"additionalProperties":false}'; + +const provider = (): SchemaProvider => MapProvider.fromJson({ "urn:babel:orders:created": ORDERS }); + +function envelope(urn: string, data: Record): Envelope { + return { + job: urn, + trace_id: "trace-1", + data, + meta: { id: "m1", queue: "orders", lang: "node", schema_version: 1, created_at: 0 }, + attempts: 0, + }; +} + +test("validateSchema enforces object/required/types/additionalProperties", () => { + const s = JSON.parse(ORDERS) as SchemaNode; + assert.equal(validateSchema(s, { order_id: 7 }), null); + assert.notEqual(validateSchema(s, {}), null); + assert.notEqual(validateSchema(s, { order_id: "x" }), null); + assert.notEqual(validateSchema(s, { order_id: 7, extra: 1 }), null); +}); + +test("validateSchema scalar parity (bool != integer, enum, minimum, array items)", () => { + const cases: Array<[string, unknown, boolean]> = [ + ['{"type":"boolean"}', true, true], + ['{"type":"boolean"}', "x", false], + ['{"type":"null"}', null, true], + ['{"type":"null"}', 1, false], + ['{"type":"number","minimum":0.5}', 0.6, true], + ['{"type":"number","minimum":0.5}', 0.4, false], + ['{"type":"integer"}', 1, true], + ['{"type":"integer"}', 1.5, false], + ['{"type":"integer"}', true, false], + ['{"enum":["a","b"]}', "b", true], + ['{"enum":["a","b"]}', "c", false], + ['{"type":"array","items":{"type":"string"}}', ["a"], true], + ['{"type":"array","items":{"type":"string"}}', ["a", 1], false], + ]; + for (const [src, value, valid] of cases) { + const s = JSON.parse(src) as SchemaNode; + assert.equal(validateSchema(s, value) === null, valid, `${src} / ${JSON.stringify(value)}`); + } +}); + +test("check: valid, invalid, and unregistered (opt-in)", async () => { + const p = provider(); + assert.equal(await check(p, "urn:babel:orders:created", { order_id: 1 }), null); + assert.equal(await check(p, "urn:babel:unknown", { x: 1 }), null); + assert.notEqual(await check(p, "urn:babel:orders:created", {}), null); +}); + +test("validate throws InvalidPayloadError on invalid data", async () => { + await assert.rejects( + () => validate(provider(), "urn:babel:orders:created", { order_id: "x" }), + InvalidPayloadError, + ); +}); + +test("wrap runs on valid, throws + skips on invalid, runs for an unregistered urn", async () => { + const p = provider(); + let calls = 0; + const handler = wrap(p, async () => { + calls += 1; + }); + + await handler(envelope("urn:babel:orders:created", { order_id: 1 })); + assert.equal(calls, 1); + + await assert.rejects( + async () => { + await handler(envelope("urn:babel:orders:created", {})); + }, + InvalidPayloadError, + ); + assert.equal(calls, 1); + + await handler(envelope("urn:babel:unknown", { anything: true })); + assert.equal(calls, 2); +}); + +test("payload conformance: agrees with the shared cross-SDK cases", () => { + const suite = new URL("./conformance/", import.meta.url); + const manifest = JSON.parse( + readFileSync(fileURLToPath(new URL("manifest.json", suite)), "utf8"), + ) as { + payload_schema?: { + schema: SchemaNode; + cases: Array<{ name: string; valid: boolean; data: Record }>; + }; + }; + const section = manifest.payload_schema; + if (!section) { + throw new Error("manifest has no payload_schema section"); + } + assert.ok(section.cases.length > 0); + for (const c of section.cases) { + const isValid: boolean = validateSchema(section.schema, c.data) === null; + assert.equal(isValid, c.valid, `case ${c.name}`); + } +});