Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions src/main/java/com/babelqueue/idempotency/Handler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.babelqueue.idempotency;

import com.babelqueue.Envelope;

/**
* A consume handler: processes one decoded {@link Envelope}. It may throw — a thrown
* handler leaves the message unacknowledged so the runtime redelivers it (the core is
* codec-only, so an adapter drives the actual consume loop).
*/
@FunctionalInterface
public interface Handler {

/**
* Handles one message.
*
* @param envelope the decoded envelope
* @throws Exception to signal failure (redelivery / dead-letter per the adapter)
*/
void handle(Envelope envelope) throws Exception;
}
55 changes: 55 additions & 0 deletions src/main/java/com/babelqueue/idempotency/Idempotent.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.babelqueue.idempotency;

import com.babelqueue.Envelope;
import com.babelqueue.Meta;

/**
* Wraps a {@link Handler} so a message whose {@code meta.id} was already processed
* successfully is skipped instead of run again (ADR-0022) — the Java mirror of the PHP
* {@code Idempotent::wrap}, Go {@code idempotency.Wrap}, Python {@code wrap}, and Node
* {@code Wrap} helpers.
*
* <pre>{@code
* Store store = new InMemoryStore();
* Handler handler = Idempotent.wrap(store, env -> process(env));
* }</pre>
*
* <p>A previously-seen id returns early (so an adapter acks it and the broker stops
* redelivering); a throwing handler leaves the id unmarked so a redelivery runs it again
* (retry / dead-letter still apply); a message with no usable {@code meta.id} runs
* unchanged.
*/
public final class Idempotent {

private Idempotent() {
}

/**
* Returns {@code handler} wrapped with dedupe on {@code meta.id} against {@code store}.
*
* @param store the dedupe record
* @param handler the handler to guard
* @return the wrapped handler
*/
public static Handler wrap(Store store, Handler handler) {
return envelope -> {
Meta meta = envelope.meta();
String id = (meta == null) ? null : meta.id();

// No usable id → cannot dedupe; run the handler unchanged.
if (id == null || id.isEmpty()) {
handler.handle(envelope);
return;
}

// Already processed on an earlier delivery: return so the adapter acks it.
if (store.seen(id)) {
return;
}

// First success wins; a throw here leaves the id unmarked → retry/DLQ apply.
handler.handle(envelope);
store.remember(id);
};
}
}
29 changes: 29 additions & 0 deletions src/main/java/com/babelqueue/idempotency/InMemoryStore.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.babelqueue.idempotency;

import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
* Process-local, thread-safe {@link Store} backed by a concurrent set. For tests and
* single-process consumers; it is not shared across workers and not persistent — use a
* Redis- or database-backed store for production fleets.
*/
public final class InMemoryStore implements Store {

private final Set<String> ids = ConcurrentHashMap.newKeySet();

@Override
public boolean seen(String messageId) {
return ids.contains(messageId);
}

@Override
public void remember(String messageId) {
ids.add(messageId);
}

@Override
public void forget(String messageId) {
ids.remove(messageId);
}
}
25 changes: 25 additions & 0 deletions src/main/java/com/babelqueue/idempotency/Store.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.babelqueue.idempotency;

/**
* A pluggable record of message ids that have already been processed, keyed on the
* envelope's {@code meta.id}. The reference {@link InMemoryStore} is for tests and
* single-process consumers; production backends (Redis, a database table) implement the
* same three methods.
*
* <p>The contract is "seen-set" dedupe: it answers <em>"was this id processed?"</em>, not
* <em>"what did it return"</em> — queue handlers have no response to replay. It provides
* post-success dedupe under at-least-once delivery with idempotent handlers, not
* exactly-once and not in-flight concurrency locking. A transactional / outbox mode is a
* documented future direction (ADR-0022).
*/
public interface Store {

/** Whether this message id has already been processed (remembered). */
boolean seen(String messageId);

/** Records this message id as processed. */
void remember(String messageId);

/** Drops an id from the store (manual eviction; a backend may also expire ids). */
void forget(String messageId);
}
11 changes: 11 additions & 0 deletions src/main/java/com/babelqueue/idempotency/package-info.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/**
* Optional idempotency helper (ADR-0022): dedupe a consume handler on {@code meta.id}.
*
* <p>{@link com.babelqueue.idempotency.Idempotent#wrap} wraps a
* {@link com.babelqueue.idempotency.Handler} so a message already processed is skipped;
* {@link com.babelqueue.idempotency.Store} is the pluggable dedupe record, with
* {@link com.babelqueue.idempotency.InMemoryStore} as the reference implementation. The
* wire envelope is unchanged ({@code schema_version: 1}) — this is a pure consumer-side
* concern. See {@code .ssot/contracts/error-handling.md} §1.
*/
package com.babelqueue.idempotency;
35 changes: 35 additions & 0 deletions src/main/java/com/babelqueue/schema/InvalidPayloadException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.babelqueue.schema;

import com.babelqueue.BabelQueueException;

/**
* Raised when a message's {@code data} does not match the JSON Schema registered for its URN
* (ADR-0024). The consumer-side {@link SchemaValidation#wrap} throws it so the adapter
* redelivers (and eventually dead-letters) a poison message; the recommended primary use is
* producer-side ({@link SchemaValidation#validate}) so invalid data never enters the queue.
*/
public class InvalidPayloadException extends BabelQueueException {

private final transient String urn;
private final transient String violation;

/**
* @param urn the message URN whose schema was violated
* @param violation the first {@code "<json-pointer>: <reason>"} mismatch
*/
public InvalidPayloadException(String urn, String violation) {
super("Message data for \"" + urn + "\" does not match its URN schema: " + violation + ".");
this.urn = urn;
this.violation = violation;
}

/** @return the message URN whose schema was violated */
public String urn() {
return urn;
}

/** @return the first {@code "<json-pointer>: <reason>"} mismatch */
public String violation() {
return violation;
}
}
25 changes: 25 additions & 0 deletions src/main/java/com/babelqueue/schema/MapProvider.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.babelqueue.schema;

import java.util.HashMap;
import java.util.Map;

/**
* In-memory {@link SchemaProvider}, for tests and for embedding schemas in code. Construct it
* with each URN's already-decoded JSON Schema (a {@code Map}).
*/
public final class MapProvider implements SchemaProvider {

private final Map<String, Map<String, Object>> schemas;

/**
* @param schemas urn -&gt; decoded JSON Schema
*/
public MapProvider(Map<String, Map<String, Object>> schemas) {
this.schemas = new HashMap<>(schemas);
}

@Override
public Map<String, Object> schemaFor(String urn) {
return schemas.get(urn);
}
}
153 changes: 153 additions & 0 deletions src/main/java/com/babelqueue/schema/PayloadValidator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package com.babelqueue.schema;

import java.math.BigInteger;
import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
* Validates a message's {@code data} block against a per-URN JSON Schema (ADR-0024). A
* hand-rolled subset of Draft-07 (zero dependencies, GR-7) whose verdicts match the Go, PHP,
* Python and Node validators and babelqueue-registry's {@code compat} linter. Supported
* keywords: {@code type}, {@code required}, {@code properties}, {@code additionalProperties},
* {@code items}, {@code enum}, {@code const}, {@code minLength}, {@code minimum}; unknown
* keywords are ignored. It works on the decoded {@code Map}/{@code List}/scalar structures
* the codec produces (numbers are {@link Long}/{@link BigInteger}/{@link Double}).
*/
public final class PayloadValidator {

private PayloadValidator() {
}

/**
* The first violation of {@code value} against {@code schema} as
* {@code "<json-pointer>: <reason>"}, or {@code null} when it conforms.
*
* @param schema the decoded JSON Schema for the data block
* @param value the decoded value to validate
* @return the first violation, or null when valid
*/
public static String validate(Map<String, Object> schema, Object value) {
return validateNode(schema, value, "");
}

private static String validateNode(Map<?, ?> schema, Object value, String path) {
if (schema.containsKey("const") && !Objects.equals(value, schema.get("const"))) {
return violation(path, "wrong_const");
}
if (schema.get("enum") instanceof List<?> enumValues && !contains(enumValues, value)) {
return violation(path, "not_in_enum");
}

String type = (schema.get("type") instanceof String s) ? s : "";
return switch (type) {
case "object" -> checkObject(schema, value, path);
case "array" -> checkArray(schema, value, path);
case "string" -> checkString(schema, value, path);
case "integer" -> isInteger(value)
? checkMinimum(schema, value, path)
: violation(path, "not_an_integer");
case "number" -> (value instanceof Number)
? checkMinimum(schema, value, path)
: violation(path, "not_a_number");
case "boolean" -> (value instanceof Boolean) ? null : violation(path, "not_a_boolean");
case "null" -> (value == null) ? null : violation(path, "not_null");
default -> null;
};
}

private static String checkObject(Map<?, ?> schema, Object value, String path) {
if (!(value instanceof Map<?, ?> obj)) {
return violation(path, "not_an_object");
}
if (schema.get("required") instanceof List<?> required) {
for (Object key : required) {
if (key instanceof String k && !obj.containsKey(k)) {
return violation(join(path, k), "missing_required");
}
}
}
Map<?, ?> properties = (schema.get("properties") instanceof Map<?, ?> p) ? p : Map.of();
boolean additionalAllowed = !Boolean.FALSE.equals(schema.get("additionalProperties"));

for (Map.Entry<?, ?> entry : obj.entrySet()) {
String name = String.valueOf(entry.getKey());
if (properties.get(name) instanceof Map<?, ?> propSchema) {
String found = validateNode(propSchema, entry.getValue(), join(path, name));
if (found != null) {
return found;
}
} else if (!additionalAllowed) {
return violation(join(path, name), "additional_not_allowed");
}
}
return null;
}

private static String checkArray(Map<?, ?> schema, Object value, String path) {
if (!(value instanceof List<?> list)) {
return violation(path, "not_an_array");
}
if (!(schema.get("items") instanceof Map<?, ?> items)) {
return null;
}
for (int i = 0; i < list.size(); i++) {
String found = validateNode(items, list.get(i), path + "[" + i + "]");
if (found != null) {
return found;
}
}
return null;
}

private static String checkString(Map<?, ?> schema, Object value, String path) {
if (!(value instanceof String str)) {
return violation(path, "not_a_string");
}
if (schema.get("minLength") instanceof Number min && str.length() < min.intValue()) {
return violation(path, "below_min_length");
}
return null;
}

private static String checkMinimum(Map<?, ?> schema, Object value, String path) {
if (schema.get("minimum") instanceof Number min
&& value instanceof Number num
&& num.doubleValue() < min.doubleValue()) {
return violation(path, "below_minimum");
}
return null;
}

// JSON numbers decode to Long/BigInteger (integers) or Double (fractions); a whole-valued
// Double still counts as an integer, matching the other SDKs. Booleans are not Numbers.
private static boolean isInteger(Object value) {
if (value instanceof Long || value instanceof Integer || value instanceof BigInteger) {
return true;
}
if (value instanceof Double d) {
return !d.isInfinite() && Double.compare(d, Math.floor(d)) == 0;
}
if (value instanceof Float f) {
return !f.isInfinite() && Float.compare(f, (float) Math.floor(f)) == 0;
}
return false;
}

private static boolean contains(List<?> values, Object value) {
for (Object item : values) {
if (Objects.equals(value, item)) {
return true;
}
}
return false;
}

private static String violation(String path, String reason) {
return (path.isEmpty() ? "<root>" : path) + ": " + reason;
}

private static String join(String path, String key) {
return path.isEmpty() ? key : path + "." + key;
}
}
25 changes: 25 additions & 0 deletions src/main/java/com/babelqueue/schema/SchemaProvider.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.babelqueue.schema;

import java.util.Map;

/**
* A source of per-URN {@code data} schemas, keyed on the message URN (ADR-0024). Given a
* URN, it returns the decoded JSON Schema for that message's {@code data} block, or
* {@code null} when no schema is registered — in which case the caller skips validation
* (the feature is opt-in).
*
* <p>The reference {@link MapProvider} is in-memory. A Java app reads its babelqueue-registry
* {@code registry.json} with its own JSON tooling and passes the parsed schemas to a
* {@link MapProvider}; the zero-dependency core does not ship a file-based provider. This is
* the Java mirror of the Go {@code schema.Provider} interface.
*/
public interface SchemaProvider {

/**
* The decoded JSON Schema registered for {@code urn}, or {@code null} when none is.
*
* @param urn the message URN
* @return the schema map, or null
*/
Map<String, Object> schemaFor(String urn);
}
Loading