From 873fc3c4c82a469af6707ac6c38104b4a27e27cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muhammet=20=C5=9Eafak?= Date: Thu, 18 Jun 2026 13:44:04 +0300 Subject: [PATCH] feat: optional idempotency (ADR-0022) and per-URN schema validation (ADR-0024) Opt-in, dependency-free helpers; wire envelope stays frozen. Includes the vendored payload_schema cross-SDK conformance cases. --- .../com/babelqueue/idempotency/Handler.java | 20 +++ .../babelqueue/idempotency/Idempotent.java | 55 +++++++ .../babelqueue/idempotency/InMemoryStore.java | 29 ++++ .../com/babelqueue/idempotency/Store.java | 25 +++ .../babelqueue/idempotency/package-info.java | 11 ++ .../schema/InvalidPayloadException.java | 35 ++++ .../com/babelqueue/schema/MapProvider.java | 25 +++ .../babelqueue/schema/PayloadValidator.java | 153 ++++++++++++++++++ .../com/babelqueue/schema/SchemaProvider.java | 25 +++ .../babelqueue/schema/SchemaValidation.java | 75 +++++++++ .../com/babelqueue/schema/package-info.java | 10 ++ .../PayloadSchemaConformanceTest.java | 53 ++++++ .../com/babelqueue/SchemaValidationTest.java | 105 ++++++++++++ .../idempotency/IdempotencyTest.java | 100 ++++++++++++ src/test/resources/conformance/manifest.json | 23 +++ 15 files changed, 744 insertions(+) create mode 100644 src/main/java/com/babelqueue/idempotency/Handler.java create mode 100644 src/main/java/com/babelqueue/idempotency/Idempotent.java create mode 100644 src/main/java/com/babelqueue/idempotency/InMemoryStore.java create mode 100644 src/main/java/com/babelqueue/idempotency/Store.java create mode 100644 src/main/java/com/babelqueue/idempotency/package-info.java create mode 100644 src/main/java/com/babelqueue/schema/InvalidPayloadException.java create mode 100644 src/main/java/com/babelqueue/schema/MapProvider.java create mode 100644 src/main/java/com/babelqueue/schema/PayloadValidator.java create mode 100644 src/main/java/com/babelqueue/schema/SchemaProvider.java create mode 100644 src/main/java/com/babelqueue/schema/SchemaValidation.java create mode 100644 src/main/java/com/babelqueue/schema/package-info.java create mode 100644 src/test/java/com/babelqueue/PayloadSchemaConformanceTest.java create mode 100644 src/test/java/com/babelqueue/SchemaValidationTest.java create mode 100644 src/test/java/com/babelqueue/idempotency/IdempotencyTest.java diff --git a/src/main/java/com/babelqueue/idempotency/Handler.java b/src/main/java/com/babelqueue/idempotency/Handler.java new file mode 100644 index 0000000..2004911 --- /dev/null +++ b/src/main/java/com/babelqueue/idempotency/Handler.java @@ -0,0 +1,20 @@ +package com.babelqueue.idempotency; + +import com.babelqueue.Envelope; + +/** + * A consume handler: processes one decoded {@link Envelope}. It may throw — a thrown + * handler leaves the message unacknowledged so the runtime redelivers it (the core is + * codec-only, so an adapter drives the actual consume loop). + */ +@FunctionalInterface +public interface Handler { + + /** + * Handles one message. + * + * @param envelope the decoded envelope + * @throws Exception to signal failure (redelivery / dead-letter per the adapter) + */ + void handle(Envelope envelope) throws Exception; +} diff --git a/src/main/java/com/babelqueue/idempotency/Idempotent.java b/src/main/java/com/babelqueue/idempotency/Idempotent.java new file mode 100644 index 0000000..3717be0 --- /dev/null +++ b/src/main/java/com/babelqueue/idempotency/Idempotent.java @@ -0,0 +1,55 @@ +package com.babelqueue.idempotency; + +import com.babelqueue.Envelope; +import com.babelqueue.Meta; + +/** + * Wraps a {@link Handler} so a message whose {@code meta.id} was already processed + * successfully is skipped instead of run again (ADR-0022) — the Java mirror of the PHP + * {@code Idempotent::wrap}, Go {@code idempotency.Wrap}, Python {@code wrap}, and Node + * {@code Wrap} helpers. + * + *
{@code
+ * Store store = new InMemoryStore();
+ * Handler handler = Idempotent.wrap(store, env -> process(env));
+ * }
+ * + *

A previously-seen id returns early (so an adapter acks it and the broker stops + * redelivering); a throwing handler leaves the id unmarked so a redelivery runs it again + * (retry / dead-letter still apply); a message with no usable {@code meta.id} runs + * unchanged. + */ +public final class Idempotent { + + private Idempotent() { + } + + /** + * Returns {@code handler} wrapped with dedupe on {@code meta.id} against {@code store}. + * + * @param store the dedupe record + * @param handler the handler to guard + * @return the wrapped handler + */ + public static Handler wrap(Store store, Handler handler) { + return envelope -> { + Meta meta = envelope.meta(); + String id = (meta == null) ? null : meta.id(); + + // No usable id → cannot dedupe; run the handler unchanged. + if (id == null || id.isEmpty()) { + handler.handle(envelope); + return; + } + + // Already processed on an earlier delivery: return so the adapter acks it. + if (store.seen(id)) { + return; + } + + // First success wins; a throw here leaves the id unmarked → retry/DLQ apply. + handler.handle(envelope); + store.remember(id); + }; + } +} diff --git a/src/main/java/com/babelqueue/idempotency/InMemoryStore.java b/src/main/java/com/babelqueue/idempotency/InMemoryStore.java new file mode 100644 index 0000000..5829fec --- /dev/null +++ b/src/main/java/com/babelqueue/idempotency/InMemoryStore.java @@ -0,0 +1,29 @@ +package com.babelqueue.idempotency; + +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Process-local, thread-safe {@link Store} backed by a concurrent set. For tests and + * single-process consumers; it is not shared across workers and not persistent — use a + * Redis- or database-backed store for production fleets. + */ +public final class InMemoryStore implements Store { + + private final Set ids = ConcurrentHashMap.newKeySet(); + + @Override + public boolean seen(String messageId) { + return ids.contains(messageId); + } + + @Override + public void remember(String messageId) { + ids.add(messageId); + } + + @Override + public void forget(String messageId) { + ids.remove(messageId); + } +} diff --git a/src/main/java/com/babelqueue/idempotency/Store.java b/src/main/java/com/babelqueue/idempotency/Store.java new file mode 100644 index 0000000..b5134cf --- /dev/null +++ b/src/main/java/com/babelqueue/idempotency/Store.java @@ -0,0 +1,25 @@ +package com.babelqueue.idempotency; + +/** + * A pluggable record of message ids that have already been processed, keyed on the + * envelope's {@code meta.id}. The reference {@link InMemoryStore} is for tests and + * single-process consumers; production backends (Redis, a database table) implement the + * same three methods. + * + *

The contract is "seen-set" dedupe: it answers "was this id processed?", not + * "what did it return" — queue handlers have no response to replay. It provides + * post-success dedupe under at-least-once delivery with idempotent handlers, not + * exactly-once and not in-flight concurrency locking. A transactional / outbox mode is a + * documented future direction (ADR-0022). + */ +public interface Store { + + /** Whether this message id has already been processed (remembered). */ + boolean seen(String messageId); + + /** Records this message id as processed. */ + void remember(String messageId); + + /** Drops an id from the store (manual eviction; a backend may also expire ids). */ + void forget(String messageId); +} diff --git a/src/main/java/com/babelqueue/idempotency/package-info.java b/src/main/java/com/babelqueue/idempotency/package-info.java new file mode 100644 index 0000000..a954e45 --- /dev/null +++ b/src/main/java/com/babelqueue/idempotency/package-info.java @@ -0,0 +1,11 @@ +/** + * Optional idempotency helper (ADR-0022): dedupe a consume handler on {@code meta.id}. + * + *

{@link com.babelqueue.idempotency.Idempotent#wrap} wraps a + * {@link com.babelqueue.idempotency.Handler} so a message already processed is skipped; + * {@link com.babelqueue.idempotency.Store} is the pluggable dedupe record, with + * {@link com.babelqueue.idempotency.InMemoryStore} as the reference implementation. The + * wire envelope is unchanged ({@code schema_version: 1}) — this is a pure consumer-side + * concern. See {@code .ssot/contracts/error-handling.md} §1. + */ +package com.babelqueue.idempotency; diff --git a/src/main/java/com/babelqueue/schema/InvalidPayloadException.java b/src/main/java/com/babelqueue/schema/InvalidPayloadException.java new file mode 100644 index 0000000..f383975 --- /dev/null +++ b/src/main/java/com/babelqueue/schema/InvalidPayloadException.java @@ -0,0 +1,35 @@ +package com.babelqueue.schema; + +import com.babelqueue.BabelQueueException; + +/** + * Raised when a message's {@code data} does not match the JSON Schema registered for its URN + * (ADR-0024). The consumer-side {@link SchemaValidation#wrap} throws it so the adapter + * redelivers (and eventually dead-letters) a poison message; the recommended primary use is + * producer-side ({@link SchemaValidation#validate}) so invalid data never enters the queue. + */ +public class InvalidPayloadException extends BabelQueueException { + + private final transient String urn; + private final transient String violation; + + /** + * @param urn the message URN whose schema was violated + * @param violation the first {@code ": "} mismatch + */ + public InvalidPayloadException(String urn, String violation) { + super("Message data for \"" + urn + "\" does not match its URN schema: " + violation + "."); + this.urn = urn; + this.violation = violation; + } + + /** @return the message URN whose schema was violated */ + public String urn() { + return urn; + } + + /** @return the first {@code ": "} mismatch */ + public String violation() { + return violation; + } +} diff --git a/src/main/java/com/babelqueue/schema/MapProvider.java b/src/main/java/com/babelqueue/schema/MapProvider.java new file mode 100644 index 0000000..efe1248 --- /dev/null +++ b/src/main/java/com/babelqueue/schema/MapProvider.java @@ -0,0 +1,25 @@ +package com.babelqueue.schema; + +import java.util.HashMap; +import java.util.Map; + +/** + * In-memory {@link SchemaProvider}, for tests and for embedding schemas in code. Construct it + * with each URN's already-decoded JSON Schema (a {@code Map}). + */ +public final class MapProvider implements SchemaProvider { + + private final Map> schemas; + + /** + * @param schemas urn -> decoded JSON Schema + */ + public MapProvider(Map> schemas) { + this.schemas = new HashMap<>(schemas); + } + + @Override + public Map schemaFor(String urn) { + return schemas.get(urn); + } +} diff --git a/src/main/java/com/babelqueue/schema/PayloadValidator.java b/src/main/java/com/babelqueue/schema/PayloadValidator.java new file mode 100644 index 0000000..3c25d86 --- /dev/null +++ b/src/main/java/com/babelqueue/schema/PayloadValidator.java @@ -0,0 +1,153 @@ +package com.babelqueue.schema; + +import java.math.BigInteger; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * Validates a message's {@code data} block against a per-URN JSON Schema (ADR-0024). A + * hand-rolled subset of Draft-07 (zero dependencies, GR-7) whose verdicts match the Go, PHP, + * Python and Node validators and babelqueue-registry's {@code compat} linter. Supported + * keywords: {@code type}, {@code required}, {@code properties}, {@code additionalProperties}, + * {@code items}, {@code enum}, {@code const}, {@code minLength}, {@code minimum}; unknown + * keywords are ignored. It works on the decoded {@code Map}/{@code List}/scalar structures + * the codec produces (numbers are {@link Long}/{@link BigInteger}/{@link Double}). + */ +public final class PayloadValidator { + + private PayloadValidator() { + } + + /** + * The first violation of {@code value} against {@code schema} as + * {@code ": "}, or {@code null} when it conforms. + * + * @param schema the decoded JSON Schema for the data block + * @param value the decoded value to validate + * @return the first violation, or null when valid + */ + public static String validate(Map schema, Object value) { + return validateNode(schema, value, ""); + } + + private static String validateNode(Map schema, Object value, String path) { + if (schema.containsKey("const") && !Objects.equals(value, schema.get("const"))) { + return violation(path, "wrong_const"); + } + if (schema.get("enum") instanceof List enumValues && !contains(enumValues, value)) { + return violation(path, "not_in_enum"); + } + + String type = (schema.get("type") instanceof String s) ? s : ""; + return switch (type) { + case "object" -> checkObject(schema, value, path); + case "array" -> checkArray(schema, value, path); + case "string" -> checkString(schema, value, path); + case "integer" -> isInteger(value) + ? checkMinimum(schema, value, path) + : violation(path, "not_an_integer"); + case "number" -> (value instanceof Number) + ? checkMinimum(schema, value, path) + : violation(path, "not_a_number"); + case "boolean" -> (value instanceof Boolean) ? null : violation(path, "not_a_boolean"); + case "null" -> (value == null) ? null : violation(path, "not_null"); + default -> null; + }; + } + + private static String checkObject(Map schema, Object value, String path) { + if (!(value instanceof Map obj)) { + return violation(path, "not_an_object"); + } + if (schema.get("required") instanceof List required) { + for (Object key : required) { + if (key instanceof String k && !obj.containsKey(k)) { + return violation(join(path, k), "missing_required"); + } + } + } + Map properties = (schema.get("properties") instanceof Map p) ? p : Map.of(); + boolean additionalAllowed = !Boolean.FALSE.equals(schema.get("additionalProperties")); + + for (Map.Entry entry : obj.entrySet()) { + String name = String.valueOf(entry.getKey()); + if (properties.get(name) instanceof Map propSchema) { + String found = validateNode(propSchema, entry.getValue(), join(path, name)); + if (found != null) { + return found; + } + } else if (!additionalAllowed) { + return violation(join(path, name), "additional_not_allowed"); + } + } + return null; + } + + private static String checkArray(Map schema, Object value, String path) { + if (!(value instanceof List list)) { + return violation(path, "not_an_array"); + } + if (!(schema.get("items") instanceof Map items)) { + return null; + } + for (int i = 0; i < list.size(); i++) { + String found = validateNode(items, list.get(i), path + "[" + i + "]"); + if (found != null) { + return found; + } + } + return null; + } + + private static String checkString(Map schema, Object value, String path) { + if (!(value instanceof String str)) { + return violation(path, "not_a_string"); + } + if (schema.get("minLength") instanceof Number min && str.length() < min.intValue()) { + return violation(path, "below_min_length"); + } + return null; + } + + private static String checkMinimum(Map schema, Object value, String path) { + if (schema.get("minimum") instanceof Number min + && value instanceof Number num + && num.doubleValue() < min.doubleValue()) { + return violation(path, "below_minimum"); + } + return null; + } + + // JSON numbers decode to Long/BigInteger (integers) or Double (fractions); a whole-valued + // Double still counts as an integer, matching the other SDKs. Booleans are not Numbers. + private static boolean isInteger(Object value) { + if (value instanceof Long || value instanceof Integer || value instanceof BigInteger) { + return true; + } + if (value instanceof Double d) { + return !d.isInfinite() && Double.compare(d, Math.floor(d)) == 0; + } + if (value instanceof Float f) { + return !f.isInfinite() && Float.compare(f, (float) Math.floor(f)) == 0; + } + return false; + } + + private static boolean contains(List values, Object value) { + for (Object item : values) { + if (Objects.equals(value, item)) { + return true; + } + } + return false; + } + + private static String violation(String path, String reason) { + return (path.isEmpty() ? "" : path) + ": " + reason; + } + + private static String join(String path, String key) { + return path.isEmpty() ? key : path + "." + key; + } +} diff --git a/src/main/java/com/babelqueue/schema/SchemaProvider.java b/src/main/java/com/babelqueue/schema/SchemaProvider.java new file mode 100644 index 0000000..4791ea9 --- /dev/null +++ b/src/main/java/com/babelqueue/schema/SchemaProvider.java @@ -0,0 +1,25 @@ +package com.babelqueue.schema; + +import java.util.Map; + +/** + * A source of per-URN {@code data} schemas, keyed on the message URN (ADR-0024). Given a + * URN, it returns the decoded JSON Schema for that message's {@code data} block, or + * {@code null} when no schema is registered — in which case the caller skips validation + * (the feature is opt-in). + * + *

The reference {@link MapProvider} is in-memory. A Java app reads its babelqueue-registry + * {@code registry.json} with its own JSON tooling and passes the parsed schemas to a + * {@link MapProvider}; the zero-dependency core does not ship a file-based provider. This is + * the Java mirror of the Go {@code schema.Provider} interface. + */ +public interface SchemaProvider { + + /** + * The decoded JSON Schema registered for {@code urn}, or {@code null} when none is. + * + * @param urn the message URN + * @return the schema map, or null + */ + Map schemaFor(String urn); +} diff --git a/src/main/java/com/babelqueue/schema/SchemaValidation.java b/src/main/java/com/babelqueue/schema/SchemaValidation.java new file mode 100644 index 0000000..5b3bafe --- /dev/null +++ b/src/main/java/com/babelqueue/schema/SchemaValidation.java @@ -0,0 +1,75 @@ +package com.babelqueue.schema; + +import com.babelqueue.idempotency.Handler; +import java.util.Map; + +/** + * Optional per-URN {@code data} schema validation for a babelqueue producer or consumer + * (ADR-0024). A {@link SchemaProvider} supplies a JSON Schema for a message URN — typically + * built from a babelqueue-registry {@code registry.json} — and the message's {@code data} is + * validated against it. It is opt-in: a URN with no registered schema is never validated. + * + *

+ * + *

The Java mirror of the Go {@code schema.Check}/{@code schema.Wrap} helpers. + */ +public final class SchemaValidation { + + private SchemaValidation() { + } + + /** + * The first {@code data} violation for {@code (urn, data)}, or {@code null} when it is + * valid or when no schema is registered for the URN (opt-in). For producer-side branching. + * + * @param provider the schema source + * @param urn the message URN + * @param data the message data + * @return the first violation, or null + */ + public static String check(SchemaProvider provider, String urn, Map data) { + Map schema = provider.schemaFor(urn); + if (schema == null) { + return null; + } + return PayloadValidator.validate(schema, data); + } + + /** + * Validate {@code (urn, data)} against its registered schema, throwing otherwise. The + * producer-side guard; call it before publishing. + * + * @param provider the schema source + * @param urn the message URN + * @param data the message data + * @throws InvalidPayloadException when the data does not match the URN's schema + */ + public static void validate(SchemaProvider provider, String urn, Map data) { + String violation = check(provider, urn, data); + if (violation != null) { + throw new InvalidPayloadException(urn, violation); + } + } + + /** + * Returns {@code handler} wrapped to validate each message's {@code data} against its + * URN's schema before the handler runs (consumer-side safety net). + * + * @param provider the schema source + * @param handler the handler to guard + * @return the wrapped handler + */ + public static Handler wrap(SchemaProvider provider, Handler handler) { + return envelope -> { + validate(provider, envelope.job(), envelope.data()); + handler.handle(envelope); + }; + } +} diff --git a/src/main/java/com/babelqueue/schema/package-info.java b/src/main/java/com/babelqueue/schema/package-info.java new file mode 100644 index 0000000..b7b99f1 --- /dev/null +++ b/src/main/java/com/babelqueue/schema/package-info.java @@ -0,0 +1,10 @@ +/** + * Optional per-URN {@code data} schema validation (ADR-0024): a dependency-free subset + * Draft-07 validator ({@link com.babelqueue.schema.PayloadValidator}), a + * {@link com.babelqueue.schema.SchemaProvider} (with the in-memory + * {@link com.babelqueue.schema.MapProvider}), and the + * {@link com.babelqueue.schema.SchemaValidation} producer/consumer helpers. Opt-in, with the + * wire envelope frozen — the Java mirror of the Go {@code schema} package and PHP + * {@code BabelQueue\Schema}. + */ +package com.babelqueue.schema; diff --git a/src/test/java/com/babelqueue/PayloadSchemaConformanceTest.java b/src/test/java/com/babelqueue/PayloadSchemaConformanceTest.java new file mode 100644 index 0000000..597e464 --- /dev/null +++ b/src/test/java/com/babelqueue/PayloadSchemaConformanceTest.java @@ -0,0 +1,53 @@ +package com.babelqueue; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import com.babelqueue.schema.PayloadValidator; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import org.junit.jupiter.api.Test; + +/** + * Runs the shared cross-SDK payload-schema cases (ADR-0024) from the vendored conformance + * suite: this validator must agree with the Go, PHP, Python and Node ones on each case's + * {@code valid} flag. + */ +class PayloadSchemaConformanceTest { + + @Test + @SuppressWarnings("unchecked") + void payloadCasesMatchAcrossSdks() { + Map manifest = + (Map) Json.parse(readResource("/conformance/manifest.json")); + Object sectionObj = manifest.get("payload_schema"); + assertNotNull(sectionObj, "manifest has a payload_schema section"); + + Map section = (Map) sectionObj; + Map schema = (Map) section.get("schema"); + List cases = (List) section.get("cases"); + assertFalse(cases.isEmpty()); + + for (Object caseObj : cases) { + Map testCase = (Map) caseObj; + Map data = (Map) testCase.get("data"); + boolean expected = Boolean.TRUE.equals(testCase.get("valid")); + boolean isValid = PayloadValidator.validate(schema, data) == null; + assertEquals(expected, isValid, "case " + testCase.get("name")); + } + } + + private static String readResource(String path) { + try (InputStream in = PayloadSchemaConformanceTest.class.getResourceAsStream(path)) { + assertNotNull(in, "missing resource " + path); + return new String(in.readAllBytes(), StandardCharsets.UTF_8); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } +} diff --git a/src/test/java/com/babelqueue/SchemaValidationTest.java b/src/test/java/com/babelqueue/SchemaValidationTest.java new file mode 100644 index 0000000..88c5e02 --- /dev/null +++ b/src/test/java/com/babelqueue/SchemaValidationTest.java @@ -0,0 +1,105 @@ +package com.babelqueue; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import com.babelqueue.idempotency.Handler; +import com.babelqueue.schema.InvalidPayloadException; +import com.babelqueue.schema.MapProvider; +import com.babelqueue.schema.PayloadValidator; +import com.babelqueue.schema.SchemaProvider; +import com.babelqueue.schema.SchemaValidation; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.jupiter.api.Test; + +class SchemaValidationTest { + + private static final String ORDERS = + "{\"type\":\"object\",\"required\":[\"order_id\"]," + + "\"properties\":{\"order_id\":{\"type\":\"integer\"}},\"additionalProperties\":false}"; + + @SuppressWarnings("unchecked") + private static Map json(String raw) { + return (Map) Json.parse(raw); + } + + private static SchemaProvider provider() { + return new MapProvider(Map.of("urn:babel:orders:created", json(ORDERS))); + } + + @Test + void validatorEnforcesObjectRequiredTypesAndAdditional() { + Map s = json(ORDERS); + assertNull(PayloadValidator.validate(s, json("{\"order_id\":7}"))); + assertNotNull(PayloadValidator.validate(s, json("{}"))); + assertNotNull(PayloadValidator.validate(s, json("{\"order_id\":\"x\"}"))); + assertNotNull(PayloadValidator.validate(s, json("{\"order_id\":7,\"extra\":1}"))); + } + + @Test + void validatorScalarParity() { + assertNull(PayloadValidator.validate(json("{\"type\":\"boolean\"}"), Boolean.TRUE)); + assertNotNull(PayloadValidator.validate(json("{\"type\":\"boolean\"}"), "x")); + assertNull(PayloadValidator.validate(json("{\"type\":\"null\"}"), null)); + assertNotNull(PayloadValidator.validate(json("{\"type\":\"null\"}"), 1L)); + assertNull(PayloadValidator.validate(json("{\"type\":\"number\",\"minimum\":0.5}"), 0.6)); + assertNotNull(PayloadValidator.validate(json("{\"type\":\"number\",\"minimum\":0.5}"), 0.4)); + assertNotNull(PayloadValidator.validate(json("{\"type\":\"number\"}"), "x")); + assertNull(PayloadValidator.validate(json("{\"type\":\"integer\"}"), 1L)); + assertNotNull(PayloadValidator.validate(json("{\"type\":\"integer\"}"), 1.5)); + assertNotNull(PayloadValidator.validate(json("{\"type\":\"integer\"}"), Boolean.TRUE)); + } + + @Test + void validatorConstStringEnumAndArray() { + assertNull(PayloadValidator.validate(json("{\"const\":\"v1\"}"), "v1")); + assertNotNull(PayloadValidator.validate(json("{\"const\":\"v1\"}"), "v2")); + assertNull(PayloadValidator.validate(json("{\"type\":\"string\",\"minLength\":1}"), "a")); + assertNotNull(PayloadValidator.validate(json("{\"type\":\"string\",\"minLength\":1}"), "")); + assertNotNull(PayloadValidator.validate(json("{\"type\":\"string\"}"), 5L)); + assertNull(PayloadValidator.validate(json("{\"enum\":[\"a\",\"b\"]}"), "b")); + assertNotNull(PayloadValidator.validate(json("{\"enum\":[\"a\",\"b\"]}"), "c")); + assertNotNull(PayloadValidator.validate(json("{\"type\":\"object\"}"), "x")); + Map arr = json("{\"type\":\"array\",\"items\":{\"type\":\"string\"}}"); + assertNull(PayloadValidator.validate(arr, List.of("a", "b"))); + assertNotNull(PayloadValidator.validate(arr, List.of("a", 1L))); + assertNotNull(PayloadValidator.validate(json("{\"type\":\"array\"}"), "x")); + } + + @Test + void checkValidInvalidAndUnregistered() { + SchemaProvider p = provider(); + assertNull(SchemaValidation.check(p, "urn:babel:orders:created", json("{\"order_id\":1}"))); + assertNull(SchemaValidation.check(p, "urn:babel:unknown", json("{\"x\":1}"))); + assertNotNull(SchemaValidation.check(p, "urn:babel:orders:created", json("{}"))); + } + + @Test + void validateThrowsWithDetailsOnInvalid() { + InvalidPayloadException ex = assertThrows(InvalidPayloadException.class, () -> + SchemaValidation.validate(provider(), "urn:babel:orders:created", json("{\"order_id\":\"x\"}"))); + assertEquals("urn:babel:orders:created", ex.urn()); + assertNotNull(ex.violation()); + } + + @Test + void wrapRunsOnValidThrowsOnInvalidRunsForUnregistered() throws Exception { + SchemaProvider p = provider(); + AtomicInteger calls = new AtomicInteger(); + Handler handler = SchemaValidation.wrap(p, env -> calls.incrementAndGet()); + + handler.handle(EnvelopeCodec.make("urn:babel:orders:created", json("{\"order_id\":1}"))); + assertEquals(1, calls.get()); + + assertThrows(InvalidPayloadException.class, () -> + handler.handle(EnvelopeCodec.make("urn:babel:orders:created", json("{}")))); + assertEquals(1, calls.get()); + + handler.handle(EnvelopeCodec.make("urn:babel:unknown", json("{\"anything\":true}"))); + assertEquals(2, calls.get()); + } +} diff --git a/src/test/java/com/babelqueue/idempotency/IdempotencyTest.java b/src/test/java/com/babelqueue/idempotency/IdempotencyTest.java new file mode 100644 index 0000000..2479c83 --- /dev/null +++ b/src/test/java/com/babelqueue/idempotency/IdempotencyTest.java @@ -0,0 +1,100 @@ +package com.babelqueue.idempotency; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.babelqueue.Envelope; +import com.babelqueue.Meta; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.jupiter.api.Test; + +class IdempotencyTest { + + private Envelope msg(String id) { + return new Envelope( + "urn:babel:orders:created", + "trace-1", + Map.of("order_id", 7L), + new Meta(id, "orders", "java", 1, 1L), + 0, + null); + } + + @Test + void runsAndRemembersOnFirstDelivery() throws Exception { + Store store = new InMemoryStore(); + AtomicInteger calls = new AtomicInteger(); + Handler handler = Idempotent.wrap(store, env -> calls.incrementAndGet()); + + handler.handle(msg("m1")); + + assertEquals(1, calls.get()); + assertTrue(store.seen("m1")); + } + + @Test + void skipsRedeliveryOfSameId() throws Exception { + Store store = new InMemoryStore(); + AtomicInteger calls = new AtomicInteger(); + Handler handler = Idempotent.wrap(store, env -> calls.incrementAndGet()); + + handler.handle(msg("m1")); + handler.handle(msg("m1")); // redelivery → skipped + + assertEquals(1, calls.get()); + } + + @Test + void runsAgainForADifferentId() throws Exception { + Store store = new InMemoryStore(); + AtomicInteger calls = new AtomicInteger(); + Handler handler = Idempotent.wrap(store, env -> calls.incrementAndGet()); + + handler.handle(msg("m1")); + handler.handle(msg("m2")); + + assertEquals(2, calls.get()); + } + + @Test + void doesNotRememberWhenHandlerThrows() { + Store store = new InMemoryStore(); + AtomicInteger calls = new AtomicInteger(); + Handler handler = Idempotent.wrap(store, env -> { + calls.incrementAndGet(); + throw new IllegalStateException("boom"); + }); + + assertThrows(IllegalStateException.class, () -> handler.handle(msg("m1"))); + assertFalse(store.seen("m1")); + + // A redelivery runs the handler again — retry works. + assertThrows(IllegalStateException.class, () -> handler.handle(msg("m1"))); + assertEquals(2, calls.get()); + } + + @Test + void runsWhenNoUsableId() throws Exception { + Store store = new InMemoryStore(); + AtomicInteger calls = new AtomicInteger(); + Handler handler = Idempotent.wrap(store, env -> calls.incrementAndGet()); + + handler.handle(msg("")); // empty id → cannot dedupe → runs + handler.handle(msg(null)); // no id at all → runs + + assertEquals(2, calls.get()); + } + + @Test + void forgetRemovesARememberedId() { + InMemoryStore store = new InMemoryStore(); + store.remember("m1"); + assertTrue(store.seen("m1")); + + store.forget("m1"); + assertFalse(store.seen("m1")); + } +} diff --git a/src/test/resources/conformance/manifest.json b/src/test/resources/conformance/manifest.json index 78e5c3a..5b2fee4 100644 --- a/src/test/resources/conformance/manifest.json +++ b/src/test/resources/conformance/manifest.json @@ -226,5 +226,28 @@ "x-attempts": 0 } } + }, + "payload_schema": { + "description": "Per-URN data schema validation (ADR-0024). Each case validates `data` against `schema`; every SDK's optional payload validator (Go schema, PHP BabelQueue\\Schema, Python babelqueue.schema) MUST agree on `valid`. The wire envelope stays frozen — this governs the data block only, and is opt-in (consumers/producers without a registered schema skip it).", + "schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "required": ["order_id"], + "properties": { + "order_id": { "type": "integer", "minimum": 1 }, + "amount": { "type": "number", "minimum": 0 }, + "currency": { "enum": ["USD", "EUR", "TRY"] } + }, + "additionalProperties": false + }, + "cases": [ + { "name": "valid-minimal", "valid": true, "data": { "order_id": 1042 } }, + { "name": "valid-full", "valid": true, "data": { "order_id": 1042, "amount": 99.9, "currency": "USD" } }, + { "name": "invalid-missing-required", "valid": false, "data": { "amount": 10 } }, + { "name": "invalid-wrong-type", "valid": false, "data": { "order_id": "x" } }, + { "name": "invalid-additional-property", "valid": false, "data": { "order_id": 1, "extra": true } }, + { "name": "invalid-enum", "valid": false, "data": { "order_id": 1, "currency": "GBP" } }, + { "name": "invalid-below-minimum", "valid": false, "data": { "order_id": 0 } } + ] } }