Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
"build": "tsup",
"typecheck": "tsc --noEmit",
"lint": "eslint src test",
"test": "node --import tsx --test test/codec.test.ts test/dead-letter.test.ts test/conformance.test.ts test/overhead.test.ts",
"test": "node --import tsx --test test/codec.test.ts test/dead-letter.test.ts test/conformance.test.ts test/overhead.test.ts test/idempotency.test.ts test/schema.test.ts",
"coverage": "c8 --check-coverage --lines 90 --functions 90 --branches 85 --reporter=text npm test",
"prepublishOnly": "npm run build"
},
Expand Down
15 changes: 15 additions & 0 deletions src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,18 @@ export class UnknownUrnError extends BabelQueueError {
this.name = "UnknownUrnError";
}
}

/**
* Raised when a message's `data` does not match the JSON Schema registered for its URN
* (ADR-0024). The consumer-side {@link schema.wrap} throws it so the adapter redelivers
* (and eventually dead-letters) a poison message.
*/
export class InvalidPayloadError extends BabelQueueError {
constructor(
readonly urn: string,
readonly violation: string,
) {
super(`Message data for "${urn}" does not match its URN schema: ${violation}.`);
this.name = "InvalidPayloadError";
}
}
81 changes: 81 additions & 0 deletions src/idempotency.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/**
* Optional idempotency helper (ADR-0022): dedupe a consume handler on `meta.id`.
*
* The Node mirror of the PHP `BabelQueue\Idempotency` and Go `idempotency` helpers.
* The core is codec-only (no dispatcher), so this wraps a user-provided handler that
* an adapter (NestJS, BullMQ, ...) drives:
*
* ```ts
* import { Wrap, InMemoryStore, type Handler } from "@babelqueue/core";
*
* const store = new InMemoryStore();
* const handler = Wrap(store, async (env) => { ... });
* ```
*
* A previously-seen id returns early (the adapter acks it); a throwing/rejecting
* handler leaves the id unmarked so a redelivery runs it again; a message with no
* usable `meta.id` runs unchanged. "Seen-set" post-success dedupe — not exactly-once,
* not in-flight concurrency locking (a transactional mode is a future direction).
*/
import type { Envelope } from "./codec.js";

/** A consume handler: receives a decoded envelope, may be sync or async. */
export type Handler = (env: Envelope) => void | Promise<void>;

/**
* A pluggable record of message ids already processed, keyed on `meta.id`. Methods may
* be sync or async so a production store can be Redis- or DB-backed; the reference
* {@link InMemoryStore} is synchronous.
*/
export interface Store {
seen(messageId: string): boolean | Promise<boolean>;
remember(messageId: string): void | Promise<void>;
forget(messageId: string): void | Promise<void>;
}

/**
* Process-local {@link Store} backed by a Set. For tests / single-process consumers;
* not shared across workers and not persistent — use a Redis- or DB-backed store for
* production fleets.
*/
export class InMemoryStore implements Store {
private readonly entries = new Set<string>();

seen(messageId: string): boolean {
return this.entries.has(messageId);
}

remember(messageId: string): void {
this.entries.add(messageId);
}

forget(messageId: string): void {
this.entries.delete(messageId);
}
}

/**
* Wraps `handler` so a message whose `meta.id` was already processed successfully is
* skipped. A thrown/rejected handler leaves the id unmarked, so a redelivery runs it
* again (retry / dead-letter still apply); a message with no usable id runs unchanged.
*/
export function Wrap(store: Store, handler: Handler): Handler {
return async (env: Envelope): Promise<void> => {
const id = env.meta.id;

// No usable id → cannot dedupe; run the handler unchanged.
if (!id) {
await handler(env);
return;
}

// Already processed on an earlier delivery: return so the adapter acks it.
if (await store.seen(id)) {
return;
}

// First success wins; a throw here leaves the id unmarked → retry/DLQ apply.
await handler(env);
await store.remember(id);
};
}
8 changes: 7 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,10 @@ export * as deadLetter from "./deadLetter.js";

export { UnknownUrnStrategy } from "./routing.js";

export { BabelQueueError, UnknownUrnError } from "./errors.js";
export { BabelQueueError, UnknownUrnError, InvalidPayloadError } from "./errors.js";

export { Wrap, InMemoryStore } from "./idempotency.js";
export type { Handler, Store } from "./idempotency.js";

export * as schema from "./schema.js";
export type { SchemaProvider, SchemaNode } from "./schema.js";
238 changes: 238 additions & 0 deletions src/schema.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
/**
* Optional per-URN payload schema validation (ADR-0024).
*
* The Node mirror of the Go `schema` package and PHP `BabelQueue\Schema`. A
* {@link SchemaProvider} supplies a JSON Schema for a message URN — typically built from a
* babelqueue-registry `registry.json` — and the message's `data` is validated against it.
* It is opt-in: a URN with no registered schema is never validated.
*
* ```ts
* import { schema } from "@babelqueue/core";
*
* const provider = schema.MapProvider.fromJson({ "urn:babel:orders:created": ORDERS_JSON });
* schema.validate(provider, "urn:babel:orders:created", { order_id: 7 }); // throws on mismatch
* const handler = schema.wrap(provider, async (env) => { ... }); // consumer safety net
* ```
*
* The core stays dependency-free and I/O-free, so it carries no file-based provider: a Node
* app or adapter reads its `registry.json` (with `node:fs`, etc.) and passes the schemas to
* {@link MapProvider.fromJson}. The validator is a small subset of JSON Schema (draft-07)
* whose verdicts match the Go, PHP and Python validators and babelqueue-registry's `compat`
* linter: `type`, `required`, `properties`, `additionalProperties`, `items`, `enum`,
* `const`, `minLength`, `minimum`. Unknown keywords are ignored.
*/
import type { Envelope } from "./codec.js";
import { InvalidPayloadError } from "./errors.js";

/** A parsed JSON Schema node. */
export type SchemaNode = Record<string, unknown>;

/** A consume handler: receives a decoded envelope, may be sync or async. */
export type SchemaHandler = (env: Envelope) => void | Promise<void>;

/**
* A source of per-URN `data` schemas, keyed on the message URN. `schemaFor` may be sync or
* async so a production provider can be service- or cache-backed; the reference
* {@link MapProvider} is synchronous.
*/
export interface SchemaProvider {
schemaFor(urn: string): SchemaNode | undefined | Promise<SchemaNode | undefined>;
}

/** In-memory {@link SchemaProvider}, for tests and for embedding schemas in code. */
export class MapProvider implements SchemaProvider {
private readonly schemas: Map<string, SchemaNode>;

constructor(schemas: Record<string, SchemaNode>) {
this.schemas = new Map(Object.entries(schemas));
}

/** Build a provider from URN -> raw JSON Schema strings, parsing each. */
static fromJson(raw: Record<string, string>): MapProvider {
const schemas: Record<string, SchemaNode> = {};
for (const [urn, body] of Object.entries(raw)) {
const decoded: unknown = JSON.parse(body);
if (typeof decoded !== "object" || decoded === null || Array.isArray(decoded)) {
throw new Error(`schema: invalid JSON schema for "${urn}"`);
}
schemas[urn] = decoded as SchemaNode;
}
return new MapProvider(schemas);
}

schemaFor(urn: string): SchemaNode | undefined {
return this.schemas.get(urn);
}
}

/**
* The first `data` violation for `(urn, data)`, or null when it is valid or when no schema is
* registered for the URN (opt-in). For producer-side branching.
*/
export async function check(
provider: SchemaProvider,
urn: string,
data: Record<string, unknown>,
): Promise<string | null> {
const schemaNode = await provider.schemaFor(urn);
if (!schemaNode) {
return null;
}
return validateSchema(schemaNode, data);
}

/**
* Validate `(urn, data)` against its registered schema, throwing {@link InvalidPayloadError}
* otherwise. The producer-side guard; call it before publishing.
*/
export async function validate(
provider: SchemaProvider,
urn: string,
data: Record<string, unknown>,
): Promise<void> {
const violation = await check(provider, urn, data);
if (violation !== null) {
throw new InvalidPayloadError(urn, violation);
}
}

/**
* Wrap a consume handler so each message's `data` is validated against its URN's schema
* before the handler runs (consumer-side safety net). Invalid data throws
* {@link InvalidPayloadError}, so the adapter redelivers (and eventually dead-letters) the
* poison message; a URN with no schema runs the handler unchanged. Prefer {@link check}
* producer-side to keep invalid data out of the queue entirely.
*/
export function wrap(provider: SchemaProvider, handler: SchemaHandler): SchemaHandler {
return async (env: Envelope): Promise<void> => {
await validate(provider, env.job, env.data);
await handler(env);
};
}

/** The first violation of `value` against a (subset) JSON Schema node, or null. */
export function validateSchema(schema: SchemaNode, value: unknown, path = ""): string | null {
if ("const" in schema && !equal(value, schema.const)) {
return violation(path, "wrong_const");
}
const enumValues = schema.enum;
if (Array.isArray(enumValues) && !enumValues.some((item) => equal(value, item))) {
return violation(path, "not_in_enum");
}

const type = typeof schema.type === "string" ? schema.type : "";
switch (type) {
case "object":
return checkObject(schema, value, path);
case "array":
return checkArray(schema, value, path);
case "string": {
if (typeof value !== "string") {
return violation(path, "not_a_string");
}
const minLength = schema.minLength;
if (typeof minLength === "number" && value.length < minLength) {
return violation(path, "below_min_length");
}
return null;
}
case "integer":
if (!isInteger(value)) {
return violation(path, "not_an_integer");
}
return checkMinimum(schema, value, path);
case "number":
if (typeof value !== "number") {
return violation(path, "not_a_number");
}
return checkMinimum(schema, value, path);
case "boolean":
return typeof value === "boolean" ? null : violation(path, "not_a_boolean");
case "null":
return value === null ? null : violation(path, "not_null");
default:
return null;
}
}

function checkObject(schema: SchemaNode, value: unknown, path: string): string | null {
if (typeof value !== "object" || value === null || Array.isArray(value)) {
return violation(path, "not_an_object");
}
const obj = value as Record<string, unknown>;

const required = schema.required;
if (Array.isArray(required)) {
for (const key of required) {
if (typeof key === "string" && !(key in obj)) {
return violation(join(path, key), "missing_required");
}
}
}

const properties =
typeof schema.properties === "object" && schema.properties !== null
? (schema.properties as Record<string, unknown>)
: {};
const additionalAllowed = schema.additionalProperties !== false;

for (const [name, item] of Object.entries(obj)) {
const propSchema = properties[name];
if (typeof propSchema === "object" && propSchema !== null) {
const found = validateSchema(propSchema as SchemaNode, item, join(path, name));
if (found !== null) {
return found;
}
continue;
}
if (!additionalAllowed) {
return violation(join(path, name), "additional_not_allowed");
}
}

return null;
}

function checkArray(schema: SchemaNode, value: unknown, path: string): string | null {
if (!Array.isArray(value)) {
return violation(path, "not_an_array");
}
const items = schema.items;
if (typeof items !== "object" || items === null) {
return null;
}
for (let i = 0; i < value.length; i++) {
const found = validateSchema(items as SchemaNode, value[i], `${path}[${i}]`);
if (found !== null) {
return found;
}
}
return null;
}

function checkMinimum(schema: SchemaNode, value: number, path: string): string | null {
const minimum = schema.minimum;
if (typeof minimum === "number" && value < minimum) {
return violation(path, "below_minimum");
}
return null;
}

// JSON numbers are all `number` in JS; an integer is a whole number (and never a boolean).
function isInteger(value: unknown): value is number {
return typeof value === "number" && Number.isInteger(value);
}

// Structural equality for enum/const checks: JSON.stringify distinguishes a string "1" from
// a number 1, matching the strict comparisons in the other SDK validators.
function equal(a: unknown, b: unknown): boolean {
return JSON.stringify(a) === JSON.stringify(b);
}

function violation(path: string, reason: string): string {
return `${path === "" ? "<root>" : path}: ${reason}`;
}

function join(path: string, key: string): string {
return path === "" ? key : `${path}.${key}`;
}
23 changes: 23 additions & 0 deletions test/conformance/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -226,5 +226,28 @@
"x-attempts": 0
}
}
},
"payload_schema": {
"description": "Per-URN data schema validation (ADR-0024). Each case validates `data` against `schema`; every SDK's optional payload validator (Go schema, PHP BabelQueue\\Schema, Python babelqueue.schema) MUST agree on `valid`. The wire envelope stays frozen — this governs the data block only, and is opt-in (consumers/producers without a registered schema skip it).",
"schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"required": ["order_id"],
"properties": {
"order_id": { "type": "integer", "minimum": 1 },
"amount": { "type": "number", "minimum": 0 },
"currency": { "enum": ["USD", "EUR", "TRY"] }
},
"additionalProperties": false
},
"cases": [
{ "name": "valid-minimal", "valid": true, "data": { "order_id": 1042 } },
{ "name": "valid-full", "valid": true, "data": { "order_id": 1042, "amount": 99.9, "currency": "USD" } },
{ "name": "invalid-missing-required", "valid": false, "data": { "amount": 10 } },
{ "name": "invalid-wrong-type", "valid": false, "data": { "order_id": "x" } },
{ "name": "invalid-additional-property", "valid": false, "data": { "order_id": 1, "extra": true } },
{ "name": "invalid-enum", "valid": false, "data": { "order_id": 1, "currency": "GBP" } },
{ "name": "invalid-below-minimum", "valid": false, "data": { "order_id": 0 } }
]
}
}
Loading