diff --git a/src/main/java/com/babelqueue/idempotency/Handler.java b/src/main/java/com/babelqueue/idempotency/Handler.java
new file mode 100644
index 0000000..2004911
--- /dev/null
+++ b/src/main/java/com/babelqueue/idempotency/Handler.java
@@ -0,0 +1,20 @@
+package com.babelqueue.idempotency;
+
+import com.babelqueue.Envelope;
+
+/**
+ * A consume handler: processes one decoded {@link Envelope}. It may throw — a thrown
+ * handler leaves the message unacknowledged so the runtime redelivers it (the core is
+ * codec-only, so an adapter drives the actual consume loop).
+ */
+@FunctionalInterface
+public interface Handler {
+
+ /**
+ * Handles one message.
+ *
+ * @param envelope the decoded envelope
+ * @throws Exception to signal failure (redelivery / dead-letter per the adapter)
+ */
+ void handle(Envelope envelope) throws Exception;
+}
diff --git a/src/main/java/com/babelqueue/idempotency/Idempotent.java b/src/main/java/com/babelqueue/idempotency/Idempotent.java
new file mode 100644
index 0000000..3717be0
--- /dev/null
+++ b/src/main/java/com/babelqueue/idempotency/Idempotent.java
@@ -0,0 +1,55 @@
+package com.babelqueue.idempotency;
+
+import com.babelqueue.Envelope;
+import com.babelqueue.Meta;
+
+/**
+ * Wraps a {@link Handler} so a message whose {@code meta.id} was already processed
+ * successfully is skipped instead of run again (ADR-0022) — the Java mirror of the PHP
+ * {@code Idempotent::wrap}, Go {@code idempotency.Wrap}, Python {@code wrap}, and Node
+ * {@code Wrap} helpers.
+ *
+ *
{@code
+ * Store store = new InMemoryStore();
+ * Handler handler = Idempotent.wrap(store, env -> process(env));
+ * }
+ *
+ *
A previously-seen id returns early (so an adapter acks it and the broker stops
+ * redelivering); a throwing handler leaves the id unmarked so a redelivery runs it again
+ * (retry / dead-letter still apply); a message with no usable {@code meta.id} runs
+ * unchanged.
+ */
+public final class Idempotent {
+
+ private Idempotent() {
+ }
+
+ /**
+ * Returns {@code handler} wrapped with dedupe on {@code meta.id} against {@code store}.
+ *
+ * @param store the dedupe record
+ * @param handler the handler to guard
+ * @return the wrapped handler
+ */
+ public static Handler wrap(Store store, Handler handler) {
+ return envelope -> {
+ Meta meta = envelope.meta();
+ String id = (meta == null) ? null : meta.id();
+
+ // No usable id → cannot dedupe; run the handler unchanged.
+ if (id == null || id.isEmpty()) {
+ handler.handle(envelope);
+ return;
+ }
+
+ // Already processed on an earlier delivery: return so the adapter acks it.
+ if (store.seen(id)) {
+ return;
+ }
+
+ // First success wins; a throw here leaves the id unmarked → retry/DLQ apply.
+ handler.handle(envelope);
+ store.remember(id);
+ };
+ }
+}
diff --git a/src/main/java/com/babelqueue/idempotency/InMemoryStore.java b/src/main/java/com/babelqueue/idempotency/InMemoryStore.java
new file mode 100644
index 0000000..5829fec
--- /dev/null
+++ b/src/main/java/com/babelqueue/idempotency/InMemoryStore.java
@@ -0,0 +1,29 @@
+package com.babelqueue.idempotency;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Process-local, thread-safe {@link Store} backed by a concurrent set. For tests and
+ * single-process consumers; it is not shared across workers and not persistent — use a
+ * Redis- or database-backed store for production fleets.
+ */
+public final class InMemoryStore implements Store {
+
+ private final Set ids = ConcurrentHashMap.newKeySet();
+
+ @Override
+ public boolean seen(String messageId) {
+ return ids.contains(messageId);
+ }
+
+ @Override
+ public void remember(String messageId) {
+ ids.add(messageId);
+ }
+
+ @Override
+ public void forget(String messageId) {
+ ids.remove(messageId);
+ }
+}
diff --git a/src/main/java/com/babelqueue/idempotency/Store.java b/src/main/java/com/babelqueue/idempotency/Store.java
new file mode 100644
index 0000000..b5134cf
--- /dev/null
+++ b/src/main/java/com/babelqueue/idempotency/Store.java
@@ -0,0 +1,25 @@
+package com.babelqueue.idempotency;
+
+/**
+ * A pluggable record of message ids that have already been processed, keyed on the
+ * envelope's {@code meta.id}. The reference {@link InMemoryStore} is for tests and
+ * single-process consumers; production backends (Redis, a database table) implement the
+ * same three methods.
+ *
+ *
The contract is "seen-set" dedupe: it answers "was this id processed?", not
+ * "what did it return" — queue handlers have no response to replay. It provides
+ * post-success dedupe under at-least-once delivery with idempotent handlers, not
+ * exactly-once and not in-flight concurrency locking. A transactional / outbox mode is a
+ * documented future direction (ADR-0022).
+ */
+public interface Store {
+
+ /** Whether this message id has already been processed (remembered). */
+ boolean seen(String messageId);
+
+ /** Records this message id as processed. */
+ void remember(String messageId);
+
+ /** Drops an id from the store (manual eviction; a backend may also expire ids). */
+ void forget(String messageId);
+}
diff --git a/src/main/java/com/babelqueue/idempotency/package-info.java b/src/main/java/com/babelqueue/idempotency/package-info.java
new file mode 100644
index 0000000..a954e45
--- /dev/null
+++ b/src/main/java/com/babelqueue/idempotency/package-info.java
@@ -0,0 +1,11 @@
+/**
+ * Optional idempotency helper (ADR-0022): dedupe a consume handler on {@code meta.id}.
+ *
+ *
{@link com.babelqueue.idempotency.Idempotent#wrap} wraps a
+ * {@link com.babelqueue.idempotency.Handler} so a message already processed is skipped;
+ * {@link com.babelqueue.idempotency.Store} is the pluggable dedupe record, with
+ * {@link com.babelqueue.idempotency.InMemoryStore} as the reference implementation. The
+ * wire envelope is unchanged ({@code schema_version: 1}) — this is a pure consumer-side
+ * concern. See {@code .ssot/contracts/error-handling.md} §1.
+ */
+package com.babelqueue.idempotency;
diff --git a/src/main/java/com/babelqueue/schema/InvalidPayloadException.java b/src/main/java/com/babelqueue/schema/InvalidPayloadException.java
new file mode 100644
index 0000000..f383975
--- /dev/null
+++ b/src/main/java/com/babelqueue/schema/InvalidPayloadException.java
@@ -0,0 +1,35 @@
+package com.babelqueue.schema;
+
+import com.babelqueue.BabelQueueException;
+
+/**
+ * Raised when a message's {@code data} does not match the JSON Schema registered for its URN
+ * (ADR-0024). The consumer-side {@link SchemaValidation#wrap} throws it so the adapter
+ * redelivers (and eventually dead-letters) a poison message; the recommended primary use is
+ * producer-side ({@link SchemaValidation#validate}) so invalid data never enters the queue.
+ */
+public class InvalidPayloadException extends BabelQueueException {
+
+ private final transient String urn;
+ private final transient String violation;
+
+ /**
+ * @param urn the message URN whose schema was violated
+ * @param violation the first {@code ": "} mismatch
+ */
+ public InvalidPayloadException(String urn, String violation) {
+ super("Message data for \"" + urn + "\" does not match its URN schema: " + violation + ".");
+ this.urn = urn;
+ this.violation = violation;
+ }
+
+ /** @return the message URN whose schema was violated */
+ public String urn() {
+ return urn;
+ }
+
+ /** @return the first {@code ": "} mismatch */
+ public String violation() {
+ return violation;
+ }
+}
diff --git a/src/main/java/com/babelqueue/schema/MapProvider.java b/src/main/java/com/babelqueue/schema/MapProvider.java
new file mode 100644
index 0000000..efe1248
--- /dev/null
+++ b/src/main/java/com/babelqueue/schema/MapProvider.java
@@ -0,0 +1,25 @@
+package com.babelqueue.schema;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * In-memory {@link SchemaProvider}, for tests and for embedding schemas in code. Construct it
+ * with each URN's already-decoded JSON Schema (a {@code Map}).
+ */
+public final class MapProvider implements SchemaProvider {
+
+ private final Map> schemas;
+
+ /**
+ * @param schemas urn -> decoded JSON Schema
+ */
+ public MapProvider(Map> schemas) {
+ this.schemas = new HashMap<>(schemas);
+ }
+
+ @Override
+ public Map schemaFor(String urn) {
+ return schemas.get(urn);
+ }
+}
diff --git a/src/main/java/com/babelqueue/schema/PayloadValidator.java b/src/main/java/com/babelqueue/schema/PayloadValidator.java
new file mode 100644
index 0000000..3c25d86
--- /dev/null
+++ b/src/main/java/com/babelqueue/schema/PayloadValidator.java
@@ -0,0 +1,153 @@
+package com.babelqueue.schema;
+
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Validates a message's {@code data} block against a per-URN JSON Schema (ADR-0024). A
+ * hand-rolled subset of Draft-07 (zero dependencies, GR-7) whose verdicts match the Go, PHP,
+ * Python and Node validators and babelqueue-registry's {@code compat} linter. Supported
+ * keywords: {@code type}, {@code required}, {@code properties}, {@code additionalProperties},
+ * {@code items}, {@code enum}, {@code const}, {@code minLength}, {@code minimum}; unknown
+ * keywords are ignored. It works on the decoded {@code Map}/{@code List}/scalar structures
+ * the codec produces (numbers are {@link Long}/{@link BigInteger}/{@link Double}).
+ */
+public final class PayloadValidator {
+
+ private PayloadValidator() {
+ }
+
+ /**
+ * The first violation of {@code value} against {@code schema} as
+ * {@code ": "}, or {@code null} when it conforms.
+ *
+ * @param schema the decoded JSON Schema for the data block
+ * @param value the decoded value to validate
+ * @return the first violation, or null when valid
+ */
+ public static String validate(Map schema, Object value) {
+ return validateNode(schema, value, "");
+ }
+
+ private static String validateNode(Map, ?> schema, Object value, String path) {
+ if (schema.containsKey("const") && !Objects.equals(value, schema.get("const"))) {
+ return violation(path, "wrong_const");
+ }
+ if (schema.get("enum") instanceof List> enumValues && !contains(enumValues, value)) {
+ return violation(path, "not_in_enum");
+ }
+
+ String type = (schema.get("type") instanceof String s) ? s : "";
+ return switch (type) {
+ case "object" -> checkObject(schema, value, path);
+ case "array" -> checkArray(schema, value, path);
+ case "string" -> checkString(schema, value, path);
+ case "integer" -> isInteger(value)
+ ? checkMinimum(schema, value, path)
+ : violation(path, "not_an_integer");
+ case "number" -> (value instanceof Number)
+ ? checkMinimum(schema, value, path)
+ : violation(path, "not_a_number");
+ case "boolean" -> (value instanceof Boolean) ? null : violation(path, "not_a_boolean");
+ case "null" -> (value == null) ? null : violation(path, "not_null");
+ default -> null;
+ };
+ }
+
+ private static String checkObject(Map, ?> schema, Object value, String path) {
+ if (!(value instanceof Map, ?> obj)) {
+ return violation(path, "not_an_object");
+ }
+ if (schema.get("required") instanceof List> required) {
+ for (Object key : required) {
+ if (key instanceof String k && !obj.containsKey(k)) {
+ return violation(join(path, k), "missing_required");
+ }
+ }
+ }
+ Map, ?> properties = (schema.get("properties") instanceof Map, ?> p) ? p : Map.of();
+ boolean additionalAllowed = !Boolean.FALSE.equals(schema.get("additionalProperties"));
+
+ for (Map.Entry, ?> entry : obj.entrySet()) {
+ String name = String.valueOf(entry.getKey());
+ if (properties.get(name) instanceof Map, ?> propSchema) {
+ String found = validateNode(propSchema, entry.getValue(), join(path, name));
+ if (found != null) {
+ return found;
+ }
+ } else if (!additionalAllowed) {
+ return violation(join(path, name), "additional_not_allowed");
+ }
+ }
+ return null;
+ }
+
+ private static String checkArray(Map, ?> schema, Object value, String path) {
+ if (!(value instanceof List> list)) {
+ return violation(path, "not_an_array");
+ }
+ if (!(schema.get("items") instanceof Map, ?> items)) {
+ return null;
+ }
+ for (int i = 0; i < list.size(); i++) {
+ String found = validateNode(items, list.get(i), path + "[" + i + "]");
+ if (found != null) {
+ return found;
+ }
+ }
+ return null;
+ }
+
+ private static String checkString(Map, ?> schema, Object value, String path) {
+ if (!(value instanceof String str)) {
+ return violation(path, "not_a_string");
+ }
+ if (schema.get("minLength") instanceof Number min && str.length() < min.intValue()) {
+ return violation(path, "below_min_length");
+ }
+ return null;
+ }
+
+ private static String checkMinimum(Map, ?> schema, Object value, String path) {
+ if (schema.get("minimum") instanceof Number min
+ && value instanceof Number num
+ && num.doubleValue() < min.doubleValue()) {
+ return violation(path, "below_minimum");
+ }
+ return null;
+ }
+
+ // JSON numbers decode to Long/BigInteger (integers) or Double (fractions); a whole-valued
+ // Double still counts as an integer, matching the other SDKs. Booleans are not Numbers.
+ private static boolean isInteger(Object value) {
+ if (value instanceof Long || value instanceof Integer || value instanceof BigInteger) {
+ return true;
+ }
+ if (value instanceof Double d) {
+ return !d.isInfinite() && Double.compare(d, Math.floor(d)) == 0;
+ }
+ if (value instanceof Float f) {
+ return !f.isInfinite() && Float.compare(f, (float) Math.floor(f)) == 0;
+ }
+ return false;
+ }
+
+ private static boolean contains(List> values, Object value) {
+ for (Object item : values) {
+ if (Objects.equals(value, item)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private static String violation(String path, String reason) {
+ return (path.isEmpty() ? "" : path) + ": " + reason;
+ }
+
+ private static String join(String path, String key) {
+ return path.isEmpty() ? key : path + "." + key;
+ }
+}
diff --git a/src/main/java/com/babelqueue/schema/SchemaProvider.java b/src/main/java/com/babelqueue/schema/SchemaProvider.java
new file mode 100644
index 0000000..4791ea9
--- /dev/null
+++ b/src/main/java/com/babelqueue/schema/SchemaProvider.java
@@ -0,0 +1,25 @@
+package com.babelqueue.schema;
+
+import java.util.Map;
+
+/**
+ * A source of per-URN {@code data} schemas, keyed on the message URN (ADR-0024). Given a
+ * URN, it returns the decoded JSON Schema for that message's {@code data} block, or
+ * {@code null} when no schema is registered — in which case the caller skips validation
+ * (the feature is opt-in).
+ *
+ *
The reference {@link MapProvider} is in-memory. A Java app reads its babelqueue-registry
+ * {@code registry.json} with its own JSON tooling and passes the parsed schemas to a
+ * {@link MapProvider}; the zero-dependency core does not ship a file-based provider. This is
+ * the Java mirror of the Go {@code schema.Provider} interface.
+ */
+public interface SchemaProvider {
+
+ /**
+ * The decoded JSON Schema registered for {@code urn}, or {@code null} when none is.
+ *
+ * @param urn the message URN
+ * @return the schema map, or null
+ */
+ Map schemaFor(String urn);
+}
diff --git a/src/main/java/com/babelqueue/schema/SchemaValidation.java b/src/main/java/com/babelqueue/schema/SchemaValidation.java
new file mode 100644
index 0000000..5b3bafe
--- /dev/null
+++ b/src/main/java/com/babelqueue/schema/SchemaValidation.java
@@ -0,0 +1,75 @@
+package com.babelqueue.schema;
+
+import com.babelqueue.idempotency.Handler;
+import java.util.Map;
+
+/**
+ * Optional per-URN {@code data} schema validation for a babelqueue producer or consumer
+ * (ADR-0024). A {@link SchemaProvider} supplies a JSON Schema for a message URN — typically
+ * built from a babelqueue-registry {@code registry.json} — and the message's {@code data} is
+ * validated against it. It is opt-in: a URN with no registered schema is never validated.
+ *
+ *
+ *
Producer-side (recommended): call {@link #validate} before publishing so
+ * invalid data never enters the queue, or {@link #check} to branch without throwing.
+ *
Consumer-side (safety net): wrap a handler with {@link #wrap}. Invalid data
+ * throws {@link InvalidPayloadException}, so the adapter redelivers (and eventually
+ * dead-letters) the poison message; a URN with no schema runs the handler unchanged.
+ * It reuses the shared {@link Handler} so it composes with {@code Idempotent.wrap}.
+ *
+ *
+ *
The Java mirror of the Go {@code schema.Check}/{@code schema.Wrap} helpers.
+ */
+public final class SchemaValidation {
+
+ private SchemaValidation() {
+ }
+
+ /**
+ * The first {@code data} violation for {@code (urn, data)}, or {@code null} when it is
+ * valid or when no schema is registered for the URN (opt-in). For producer-side branching.
+ *
+ * @param provider the schema source
+ * @param urn the message URN
+ * @param data the message data
+ * @return the first violation, or null
+ */
+ public static String check(SchemaProvider provider, String urn, Map data) {
+ Map schema = provider.schemaFor(urn);
+ if (schema == null) {
+ return null;
+ }
+ return PayloadValidator.validate(schema, data);
+ }
+
+ /**
+ * Validate {@code (urn, data)} against its registered schema, throwing otherwise. The
+ * producer-side guard; call it before publishing.
+ *
+ * @param provider the schema source
+ * @param urn the message URN
+ * @param data the message data
+ * @throws InvalidPayloadException when the data does not match the URN's schema
+ */
+ public static void validate(SchemaProvider provider, String urn, Map data) {
+ String violation = check(provider, urn, data);
+ if (violation != null) {
+ throw new InvalidPayloadException(urn, violation);
+ }
+ }
+
+ /**
+ * Returns {@code handler} wrapped to validate each message's {@code data} against its
+ * URN's schema before the handler runs (consumer-side safety net).
+ *
+ * @param provider the schema source
+ * @param handler the handler to guard
+ * @return the wrapped handler
+ */
+ public static Handler wrap(SchemaProvider provider, Handler handler) {
+ return envelope -> {
+ validate(provider, envelope.job(), envelope.data());
+ handler.handle(envelope);
+ };
+ }
+}
diff --git a/src/main/java/com/babelqueue/schema/package-info.java b/src/main/java/com/babelqueue/schema/package-info.java
new file mode 100644
index 0000000..b7b99f1
--- /dev/null
+++ b/src/main/java/com/babelqueue/schema/package-info.java
@@ -0,0 +1,10 @@
+/**
+ * Optional per-URN {@code data} schema validation (ADR-0024): a dependency-free subset
+ * Draft-07 validator ({@link com.babelqueue.schema.PayloadValidator}), a
+ * {@link com.babelqueue.schema.SchemaProvider} (with the in-memory
+ * {@link com.babelqueue.schema.MapProvider}), and the
+ * {@link com.babelqueue.schema.SchemaValidation} producer/consumer helpers. Opt-in, with the
+ * wire envelope frozen — the Java mirror of the Go {@code schema} package and PHP
+ * {@code BabelQueue\Schema}.
+ */
+package com.babelqueue.schema;
diff --git a/src/test/java/com/babelqueue/PayloadSchemaConformanceTest.java b/src/test/java/com/babelqueue/PayloadSchemaConformanceTest.java
new file mode 100644
index 0000000..597e464
--- /dev/null
+++ b/src/test/java/com/babelqueue/PayloadSchemaConformanceTest.java
@@ -0,0 +1,53 @@
+package com.babelqueue;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import com.babelqueue.schema.PayloadValidator;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Runs the shared cross-SDK payload-schema cases (ADR-0024) from the vendored conformance
+ * suite: this validator must agree with the Go, PHP, Python and Node ones on each case's
+ * {@code valid} flag.
+ */
+class PayloadSchemaConformanceTest {
+
+ @Test
+ @SuppressWarnings("unchecked")
+ void payloadCasesMatchAcrossSdks() {
+ Map manifest =
+ (Map) Json.parse(readResource("/conformance/manifest.json"));
+ Object sectionObj = manifest.get("payload_schema");
+ assertNotNull(sectionObj, "manifest has a payload_schema section");
+
+ Map section = (Map) sectionObj;
+ Map schema = (Map) section.get("schema");
+ List